# -*- coding: utf-8 -*- ''' * @Author : zoufuzhou * @Date : 2024-09-02 16:34:37 * @Description : Memory Queue is a special linear table, which is a first in, first out (FIFO) data structure * @LastEditTime : 2024-09-05 16:34:37 ''' import struct from shm.GlobMem import GlobMem import logging as d class MemQueue: class Head: __format_string = 'IIII' # 使用 __slots__ 来优化内存使用 __slots__ = ('maxsize', 'size', 'head', 'tail') def __init__(self: 'Head', maxsize: int, size: int, head: int, tail: int): self.maxsize = maxsize self.size = size self.head = head self.tail = tail def length(self) -> int: """返回打包后的字节长度。""" return struct.calcsize(self.__format_string) def pack(self) -> bytes: """将属性打包为字节数据。""" return struct.pack(self.__format_string, self.maxsize, self.size, self.head, self.tail) def unpack(self, data: bytes) -> None: """从字节数据解包到属性中。""" if len(data) != self.length(): raise ValueError( f"数据长度不匹配: 期望 {self.length()}, 实际 {len(data)}.") self.maxsize, self.size, self.head, self.tail = struct.unpack( self.__format_string, data) def __str__(self) -> str: return f"maxsize: {self.maxsize}, size: {self.size}, head: {self.head}, tail: {self.tail}" def __repr__(self) -> str: return self.__str__() __head = Head(0, 0, 0, 0) __ptr = None __mem = None __pos = None __step = None def __init(self, memname: str, step: int): gmem = GlobMem() self.__mem = gmem.GetMem(memname) if (self.__mem is None): d.error("MemFix init failed") return self.__pos = self.__mem.tell() self.__readHead() d.debug(f"Head: {self.__head}") self.__ptr = self.__pos + self.__head.length() self.__step = step def __readHead(self): self.__head.unpack( self.__mem[self.__pos: self.__pos + self.__head.length()]) # d.debug(f"Head: {self.__head}") def __writeHead(self): packed_data = self.__head.pack() self.__mem[self.__pos: self.__pos + self.__head.length()] = packed_data def __init__(self, memname: str, step: int, cache_size=0): if (cache_size <= 0): self.__init(memname, step) return self.__init(memname, step) self.__head.maxsize = cache_size if (self.__head.size > self.__head.maxsize): self.__head.size = 0 self.__head.head = 0 self.__head.tail = 0 else: self.__head.size = self.__head.size % self.__head.maxsize self.__head.head = self.__head.head % self.__head.maxsize self.__head.tail = self.__head.tail % self.__head.maxsize self.__writeHead() def read(self, index: int): if (self.__ptr is None): return None self.__readHead() if (index > self.__head.maxsize): return None self.__mem.seek(self.__ptr + index * self.__step) return self.__mem.read(self.__step) def max_size(self) -> int: if (self.__ptr is None): return 0 self.__readHead() return self.__head.maxsize def size(self) -> int: if (self.__ptr is None): return 0 self.__readHead() return self.__head.size def push(self, struct_data: bytes): if (self.__ptr is None): return None self.__readHead() if (self.__head.size < self.__head.maxsize): self.__head.size += 1 else: self.pop() self.__mem.seek(self.__ptr + self.__head.tail * self.__step) self.__mem.write(struct_data[: self.__step]) self.__head.tail = (self.__head.tail + 1) % self.__head.maxsize self.__writeHead() def pop(self) -> bytes: if (self.__ptr is None): return None self.__readHead() if (self.empty()): return None else: self.__mem.seek(self.__ptr + self.__head.head * self.__step) self.__head.head = (self.__head.head + 1) % self.__head.maxsize self.__head.size -= 1 self.__writeHead() return self.__mem.read(self.__step) def back(self) -> bytes: if (self.__ptr is None): return None self.__readHead() if (self.empty()): return None else: self.__mem.seek(self.__ptr + self.__head.tail * self.__step) return self.__mem.read(self.__step) def front(self) -> bytes: if (self.__ptr is None): return None self.__readHead() if (self.empty()): return None else: self.__mem.seek(self.__ptr + self.__head.head * self.__step) return self.__mem.read(self.__step) def empty(self) -> bool: if (self.__ptr is None): return True self.__readHead() return self.__head.size == 0 def clear(self): if (self.__ptr is None): return None self.__head.size = 0 self.__head.head = 0 self.__head.tail = 0 self.__writeHead() d.info(f"init Head: {self.__head}") # 测试代码 # from log.LogUtil import LogUtil # if __name__ == '__main__': # LogUtil.init("app") # mm = MemQueue('TRKCOIL', 50) # 初始化 # # mm.clear() # 清空内存 # for i in range(20): # # 打包数据 # packed_data = b'' # 初始化空字节串 # for data in range(1, 26): # # 使用 struct.pack() 打包每个数据项 # packed_data += struct.pack('h', data) # mm.push(packed_data) # 写入数据 # d.debug(f"mm size: {mm.size()}") # for i in range(20): # d.debug(f"mm size: {mm.size()}, unpacked_data: {mm.pop()}")