90 lines
2.8 KiB
Python
90 lines
2.8 KiB
Python
|
|
# -*- coding: utf-8 -*-
|
||
|
|
'''
|
||
|
|
* @Author : zoufuzhou
|
||
|
|
* @Date : 2024-09-23 16:34:37
|
||
|
|
* @Description : structured data sequence serialization
|
||
|
|
* @LastEditTime : 2024-09-25 16:34:37
|
||
|
|
'''
|
||
|
|
import struct
|
||
|
|
import logging as d
|
||
|
|
|
||
|
|
|
||
|
|
class Serialization:
|
||
|
|
def __init__(self, format_string: str, encoding: str = 'utf-8'):
|
||
|
|
self.format_string = format_string
|
||
|
|
self.encoding = encoding
|
||
|
|
|
||
|
|
def __decode(self, struct_value: bytes, format_string: str = None) -> tuple:
|
||
|
|
format_str = format_string or self.format_string
|
||
|
|
if format_str is None:
|
||
|
|
d.warning("No format string provided.")
|
||
|
|
return ()
|
||
|
|
|
||
|
|
if not isinstance(struct_value, bytes):
|
||
|
|
d.warning("Input data is not of type 'bytes'.")
|
||
|
|
return ()
|
||
|
|
|
||
|
|
try:
|
||
|
|
struct_tuple = struct.unpack(format_str, struct_value)
|
||
|
|
return tuple(
|
||
|
|
item.decode(self.encoding).strip('\x00') if isinstance(item, bytes) else item
|
||
|
|
for item in struct_tuple
|
||
|
|
)
|
||
|
|
except struct.error as e:
|
||
|
|
d.error(f"Error unpacking data: {e}")
|
||
|
|
return ()
|
||
|
|
except UnicodeDecodeError as e:
|
||
|
|
d.error(f"Error decoding bytes: {e}")
|
||
|
|
return ()
|
||
|
|
|
||
|
|
def serialize(self, *args) -> bytes:
|
||
|
|
"""
|
||
|
|
Serialize the given values into bytes based on the format string.
|
||
|
|
Supports both multiple arguments and a single tuple.
|
||
|
|
|
||
|
|
:param args: Multiple values or a single tuple of values to serialize.
|
||
|
|
:return: Serialized byte data.
|
||
|
|
"""
|
||
|
|
# 检查是否只有一个参数并且是元组
|
||
|
|
if len(args) == 1 and isinstance(args[0], tuple):
|
||
|
|
data_tuple = args[0]
|
||
|
|
else:
|
||
|
|
data_tuple = args # 将多个参数作为元组处理
|
||
|
|
|
||
|
|
# 转换字符串为字节对象
|
||
|
|
processed_tuple = []
|
||
|
|
for item in data_tuple:
|
||
|
|
if isinstance(item, str):
|
||
|
|
processed_tuple.append(item.encode(self.encoding)) # 转换为字节对象
|
||
|
|
else:
|
||
|
|
processed_tuple.append(item)
|
||
|
|
|
||
|
|
try:
|
||
|
|
return struct.pack(self.format_string, *processed_tuple)
|
||
|
|
except struct.error as e:
|
||
|
|
d.error(f"Error packing data: {e}")
|
||
|
|
return b''
|
||
|
|
|
||
|
|
def deserialize(self, struct_value: bytes) -> tuple:
|
||
|
|
return self.__decode(struct_value)
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
# # 创建一个实例
|
||
|
|
# serializer = Serialization('10sif')
|
||
|
|
|
||
|
|
# # 使用多个参数
|
||
|
|
# data = serializer.serialize('hi', 42, 3.14)
|
||
|
|
# print("Serialized data (multiple args):", data)
|
||
|
|
|
||
|
|
# # 使用元组
|
||
|
|
# data_tuple = ('hello', 42, 3.14)
|
||
|
|
# data_from_tuple = serializer.serialize(data_tuple)
|
||
|
|
# print("Serialized data (tuple):", data_from_tuple)
|
||
|
|
|
||
|
|
# # 反序列化数据
|
||
|
|
# result = serializer.deserialize(data)
|
||
|
|
# print("Decoded result:", result)
|