# -*- coding: utf-8 -*- ''' * @Author : zoufuzhou * @Date : 2024-09-02 16:34:37 * @Description : Fixed element pointer position. The current pointer points to the latest element * @LastEditTime : 2024-09-05 16:34:37 ''' from shm.Head import Head from shm.GlobMem import GlobMem import logging as d class MemFix: __head = Head(0, 0, 0) __ptr = None __getnext = None __mem = None __pos = None __step = None def __init(self, memname: str, step: int): gmem = GlobMem() self.__mem = gmem.GetMem(memname) if (self.__mem is None): d.error("MemFix init failed") return self.__pos = self.__mem.tell() self.__readHead() d.debug(f"Head: {self.__head}") self.__ptr = self.__pos + self.__head.length() self.__step = step def __readHead(self): self.__head.unpack( self.__mem[self.__pos: self.__pos + self.__head.length()]) # d.debug(f"Head: {self.__head}") def __writeHead(self): packed_data = self.__head.pack() self.__mem[self.__pos: self.__pos + self.__head.length()] = packed_data def __init__(self, memname: str, step: int, cache_size=0): if (cache_size <= 0): self.__init(memname, step) self.__getnext = self.__head.next return self.__init(memname, step) self.__head.maxsize = cache_size if (self.__head.size > self.__head.maxsize): self.__head.size = self.__head.maxsize if (self.__head.next > self.__head.maxsize): self.__head.next = self.__head.maxsize self.__head.next = self.__head.next % self.__head.maxsize self.__getnext = self.__head.next self.__writeHead() def getNext(self) -> bytes: if (self.__ptr is None): return None self.__readHead() if (self.__getnext == self.__head.next): return None else: tmpnext = self.__getnext self.__getnext = (self.__getnext + 1) % self.__head.maxsize self.__mem.seek(self.__ptr + tmpnext * self.__step) return self.__mem.read(self.__step) def read(self, index: int) -> bytes: if (self.__ptr is None): return None self.__readHead() if (index > self.__head.maxsize): return None self.__mem.seek(self.__ptr + index * self.__step) return self.__mem.read(self.__step) def getCur(self) -> bytes: if (self.__ptr is None): return None self.__readHead() self.__mem.seek(self.__ptr + self.index_cur() * self.__step) return self.__mem.read(self.__step) def index_cur(self) -> int: if (self.__ptr is None): return 0 self.__readHead() if (self.__head.next <= 0): return self.__head.maxsize - 1 else: return (self.__head.next-1) % self.__head.maxsize def max_size(self) -> int: if (self.__ptr is None): return 0 self.__readHead() return self.__head.maxsize def size(self) -> int: if (self.__ptr is None): return 0 self.__readHead() return self.__head.size def push(self, struct_data: bytes): if (self.__ptr is None): return None self.__readHead() if (self.__head.size < self.__head.maxsize): self.__head.size += 1 # d.debug(f"next:{self.__head.next} {self.__ptr + self.__head.next * self.__step}: {self.__ptr + (self.__head.next + 1) * self.__step}") self.__mem.seek(self.__ptr + self.__head.next * self.__step) self.__mem.write(struct_data[: self.__step]) self.__head.next = (self.__head.next + 1) % self.__head.maxsize self.__writeHead() def clear(self): if (self.__ptr is None): return None self.__head.size = 0 self.__head.next = 0 self.__getnext = self.__head.next self.__writeHead() d.info(f"init Head: {self.__head}") # 测试代码 from log.LogUtil import LogUtil import struct if __name__ == '__main__': LogUtil.init("app") mm = MemFix('TRKCOIL', 50, 100) # 初始化 # mm.clear() # 清空内存 for i in range(51): # 打包数据 packed_data = b'' # 初始化空字节串 for data in range(1, 26): # 使用 struct.pack() 打包每个数据项 packed_data += struct.pack('h', data) mm.push(packed_data) # 写入数据 packed_data = mm.getCur() d.debug(f"packed_data: {struct.unpack('25h', packed_data)}") for i in range(25): unpacked_data = struct.unpack_from('h', packed_data, i*2) d.debug(f"unpacked_data: {unpacked_data[0]}")