eis/py/proxy/IceProxy.py

85 lines
2.5 KiB
Python
Raw Permalink Normal View History

#*********************************************************************
#
# file: get ice proxy
#
# copyright: Shanghai Baosight Software Co., Ltd.
#
# Version history
# 1.0 2023-09-12 zoufuzhou create
#
#********************************************************************/
import sys
import traceback
import Ice
import MessageICE_ice
import logging as d
baosight = Ice.openModule('baosight')
class IceProxy:
@staticmethod
def GetICEPrx(proxyname: str,communicator, timeout: int = 30000):
"""获取 ICE 代理。
参数:
proxyname (str): 代理名称
timeout (int): 超时时间默认为 30
返回:
代理对象或 None
"""
if communicator is None:
return None;
if not str(proxyname).isspace():
try:
proxy = baosight.MessageICEPrx.uncheckedCast(
communicator.stringToProxy(str(proxyname)).ice_oneway().ice_secure(False))
if not proxy:
d.warning("Invalid proxy: " + str(proxyname))
else:
if timeout > 10:
proxy.ice_timeout(timeout)
return proxy
except Exception as e:
d.error(f"Failed to get proxy for {proxyname} with timeout {timeout}: {e}")
traceback.print_exc()
else:
d.warning("Proxy name cannot be empty or whitespace.")
return None
@staticmethod
def SendDataShort(eventno: int, proxyname: str, seq: bytearray, timeout: int = 30000):
"""发送数据到远程代理。
参数:
eventno (int): 事件编号
proxyname (str): 代理名称
seq (bytearray): 发送的数据
timeout (int): 超时时间默认为 30
返回:
bool: 操作是否成功
"""
if not str(proxyname).isspace():
try:
with Ice.initialize(sys.argv) as communicator:
proxy = IceProxy.GetICEPrx(proxyname,communicator,timeout)
if proxy:
proxy.SendDataShort(eventno, seq, len(seq)) # 调用代理法
return True
except Exception as e:
d.error(f"Failed to get proxy for {proxyname} with timeout {timeout}: {e}")
traceback.print_exc()
else:
d.warning("Proxy name cannot be empty or whitespace.")
return False