eis/py/comlib/shm/Serialization.py

90 lines
2.8 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
'''
* @Author : zoufuzhou
* @Date : 2024-09-23 16:34:37
* @Description : structured data sequence serialization
* @LastEditTime : 2024-09-25 16:34:37
'''
import struct
import logging as d
class Serialization:
def __init__(self, format_string: str, encoding: str = 'utf-8'):
self.format_string = format_string
self.encoding = encoding
def __decode(self, struct_value: bytes, format_string: str = None) -> tuple:
format_str = format_string or self.format_string
if format_str is None:
d.warning("No format string provided.")
return ()
if not isinstance(struct_value, bytes):
d.warning("Input data is not of type 'bytes'.")
return ()
try:
struct_tuple = struct.unpack(format_str, struct_value)
return tuple(
item.decode(self.encoding).strip('\x00') if isinstance(item, bytes) else item
for item in struct_tuple
)
except struct.error as e:
d.error(f"Error unpacking data: {e}")
return ()
except UnicodeDecodeError as e:
d.error(f"Error decoding bytes: {e}")
return ()
def serialize(self, *args) -> bytes:
"""
Serialize the given values into bytes based on the format string.
Supports both multiple arguments and a single tuple.
:param args: Multiple values or a single tuple of values to serialize.
:return: Serialized byte data.
"""
# 检查是否只有一个参数并且是元组
if len(args) == 1 and isinstance(args[0], tuple):
data_tuple = args[0]
else:
data_tuple = args # 将多个参数作为元组处理
# 转换字符串为字节对象
processed_tuple = []
for item in data_tuple:
if isinstance(item, str):
processed_tuple.append(item.encode(self.encoding)) # 转换为字节对象
else:
processed_tuple.append(item)
try:
return struct.pack(self.format_string, *processed_tuple)
except struct.error as e:
d.error(f"Error packing data: {e}")
return b''
def deserialize(self, struct_value: bytes) -> tuple:
return self.__decode(struct_value)
# # 创建一个实例
# serializer = Serialization('10sif')
# # 使用多个参数
# data = serializer.serialize('hi', 42, 3.14)
# print("Serialized data (multiple args):", data)
# # 使用元组
# data_tuple = ('hello', 42, 3.14)
# data_from_tuple = serializer.serialize(data_tuple)
# print("Serialized data (tuple):", data_from_tuple)
# # 反序列化数据
# result = serializer.deserialize(data)
# print("Decoded result:", result)