#********************************************************************* # # file: get ice proxy # # copyright: Shanghai Baosight Software Co., Ltd. # # Version history # 1.0 2023-09-12 zoufuzhou create # #********************************************************************/ import sys import traceback import Ice import MessageICE_ice import logging as d baosight = Ice.openModule('baosight') class IceProxy: @staticmethod def GetICEPrx(proxyname: str,communicator, timeout: int = 30000): """获取 ICE 代理。 参数: proxyname (str): 代理名称。 timeout (int): 超时时间,默认为 30 秒。 返回: 代理对象或 None """ if communicator is None: return None; if not str(proxyname).isspace(): try: proxy = baosight.MessageICEPrx.uncheckedCast( communicator.stringToProxy(str(proxyname)).ice_oneway().ice_secure(False)) if not proxy: d.warning("Invalid proxy: " + str(proxyname)) else: if timeout > 10: proxy.ice_timeout(timeout) return proxy except Exception as e: d.error(f"Failed to get proxy for {proxyname} with timeout {timeout}: {e}") traceback.print_exc() else: d.warning("Proxy name cannot be empty or whitespace.") return None @staticmethod def SendDataShort(eventno: int, proxyname: str, seq: bytearray, timeout: int = 30000): """发送数据到远程代理。 参数: eventno (int): 事件编号。 proxyname (str): 代理名称。 seq (bytearray): 发送的数据。 timeout (int): 超时时间,默认为 30 秒。 返回: bool: 操作是否成功。 """ if not str(proxyname).isspace(): try: with Ice.initialize(sys.argv) as communicator: proxy = IceProxy.GetICEPrx(proxyname,communicator,timeout) if proxy: proxy.SendDataShort(eventno, seq, len(seq)) # 调用代理法 return True except Exception as e: d.error(f"Failed to get proxy for {proxyname} with timeout {timeout}: {e}") traceback.print_exc() else: d.warning("Proxy name cannot be empty or whitespace.") return False