eis/py/comlib/shm/MakePair.py

128 lines
4.2 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
'''
* @Author : zoufuzhou
* @Date : 2024-08-23 16:34:37
* @Description : key,value Indicates the matching of key values
* @LastEditTime : 2024-08-26 16:34:37
* @LastEditTime : 2025-10-24 16:34:37
'''
import struct
class MakePair:
__slots__ = ('key', 'value', 'format_string', 'key_length', 'step')
def __init__(self, key, struct_value: bytes):
"""
初始化键值对
:param key: 支持字符串或其他可转换为字符串的类型
:param struct_value: 必须为字节类型
"""
self.key_length = 48 # 键的最大字节长度
self.step = len(struct_value) if struct_value else 0
# 处理键
self.key = self._process_key(key)
# 处理值
if not isinstance(struct_value, bytes):
struct_value = struct_value.encode('utf-8') if isinstance(struct_value, str) else bytes(struct_value)
self.value = struct_value[:self.step] if struct_value else bytearray(self.step)
# 设置格式字符串
self.format_string = f'{self.key_length}s{self.step}s'
def _process_key(self, key) -> bytes:
"""处理键,确保其字节长度不超过限制。"""
key_str = key if isinstance(key, str) else str(key)
return key_str.encode('utf-8')[:self.key_length]
def length(self) -> int:
"""返回打包后的字节长度。"""
return struct.calcsize(self.format_string)
def make_pair(self, key, struct_value: bytes) -> 'MakePair':
"""创建一个新的键值对实例。"""
return self.__class__(key, struct_value)
def pack(self) -> bytes:
"""将键和值打包为字节数据。"""
if self.step == 0:
return struct.pack(self.format_string, self.key, self.value) # 打包键和值
if not isinstance(self.value, bytes):
self.value = bytes(self.value) if self.value else bytes()
return struct.pack(self.format_string, self.key, self.value[:self.step])
def unpack(self, data: bytes):
"""从字节数据解包键和值。"""
expected_length = self.length()
if len(data) < expected_length:
raise ValueError(f"数据长度不足: 期望 {expected_length}, 实际 {len(data)}.")
unpacked_data = struct.unpack(self.format_string, data[:expected_length])
self.key = unpacked_data[0].decode('utf-8', errors='ignore').rstrip('\x00')
print(self.key)
#self.key = unpacked_data[0].decode('utf-8').rstrip('\x00')
self.value = unpacked_data[1] if len(unpacked_data) > 1 else bytes()
return self
def __str__(self) -> str:
"""返回键值对的字符串表示。"""
return f"key: {self.key}, value: {self.value}"
def __repr__(self) -> str:
"""返回键值对的正式表示。"""
return self.__str__()
# 测试
if __name__ == '__main__':
pair = MakePair("test_key", bytes([1, 2, 3, 4]))
print(pair)
# print(pair.pack())
# print(pair.unpack(pair.pack()))
# print(pair.get_key())
# print(pair.get_value())
print(pair.length())
print(pair.format_string)
# 测试float value
pair = MakePair("test_key", struct.pack('fii', 1.234, 111,222))
print(pair)
print(pair.format_string)
print(pair.length())
print(pair.pack())
# print(pair.unpack(pair.pack()))
# print(pair.get_key())
# print(pair.get_value())
# value = pair.get_value()
# print(value)
# print(type(value))
# 解包
print(struct.unpack('fii', pair.value))
print(pair.key)
print(pair.value)
# print(pair.length())
# 测试value 是字符串
pair = MakePair("test_key", "test_value")
print(pair)
print(pair.pack())
print(pair.unpack(pair.pack()))
print(pair.key)
print(pair.value)
# 测试value 是空字符串
pair = MakePair("test_key", "")
print(pair)
print(pair.pack())
print(pair.unpack(pair.pack()))
print(pair.key)
print(pair.value)
# 测试value 是空字节
pair = MakePair("test_key", bytes())
print(pair)
print(pair.pack())
print(pair.unpack(pair.pack()))
print(pair.key)
print(pair.value)