# -*- coding: utf-8 -*- ''' * @Author : zoufuzhou * @Date : 2024-09-02 16:34:37 * @Description :The current pointer points to the latest element by means of a primary key value * @LastEditTime : 2024-09-05 16:34:37 ''' from shm.GlobMem import GlobMem from shm.MakePair import MakePair from shm.Head import Head import logging as d class MemMap: __head = Head(0, 0, 0) __mappair = None __ptr = None __mem = None __pos = None __step = None __len_value = 0 __last_index = 0 def __init(self, memname: str, step: int): gmem = GlobMem() self.__mem = gmem.GetMem(memname) if (self.__mem is None): d.error("MemMap init failed") return self.__pos = self.__mem.tell() self.__readHead() d.debug(f"Head: {self.__head}") self.__ptr = self.__pos + self.__head.length() self.__mappair = MakePair('', bytearray(step)) self.__step = self.__mappair.length() self.__len_value = step self.__mem.seek(self.__ptr) self.__mappair.unpack(self.__mem.read(self.__step)) def __readHead(self): self.__head.unpack( self.__mem[self.__pos: self.__pos + self.__head.length()]) # d.debug(f"Head: {self.__head}") def __writeHead(self): packed_data = self.__head.pack() self.__mem[self.__pos: self.__pos + self.__head.length()] = packed_data def __init__(self, memname: str, step: int, cache_size=0): if (cache_size <= 0): self.__init(memname, step) self.__getnext = self.__head.next return self.__init(memname, step) self.__head.maxsize = cache_size if (self.__head.size > self.__head.maxsize): self.__head.size = self.__head.maxsize if (self.__head.next > self.__head.maxsize): self.__head.next = self.__head.maxsize self.__head.next = self.__head.next % self.__head.maxsize self.__writeHead() def getCur(self): if (self.__ptr is None): return None self.__readHead() self.__mem.seek(self.__ptr + self.index_cur() * self.__step) return self.__mappair.unpack(self.__mem.read(self.__step)) def __setitem__(self, key: int, value: bytes): return self.insert(key, value) def __getitem__(self, key): return self.find(key) def index_cur(self) -> int: if (self.__ptr is None): return 0 self.__readHead() # d.debug(f"ptr: {self.__ptr}, index: {self.__head.next}") if (self.__head.next <= 0): return self.__head.maxsize - 1 else: return (self.__head.next-1) % self.__head.maxsize def max_size(self) -> int: if (self.__ptr is None): return 0 self.__readHead() return self.__head.maxsize def size(self) -> int: if (self.__ptr is None): return 0 self.__readHead() return self.__head.size def empty(self) -> bool: if (self.__ptr is None): return True self.__readHead() return self.__head.size <= 0 def find(self, key): # d.debug(f"find key: {key}") tmpkey = str(key) # d.debug(f"key: {key}, tmpkey: {tmpkey}") if (self.__ptr is None): return None self.__readHead() if (self.__head.size <= 0): return None # d.debug(f"find key: {tmpkey}, index: {self.__mappair.get_key()}") if (self.__mappair.key == str(tmpkey)): return self.__mappair for i in range(self.__head.size): self.__mem.seek(self.__ptr + i * self.__step) self.__mappair.unpack(self.__mem.read(self.__step)) if (self.__mappair.key == tmpkey): self.__last_index = i return self.__mappair return None def insert(self, key, struct_data: bytes): if (self.__ptr is None): return None self.__readHead() if (self.find(key) is not None): # d.debug(f"key: {key} already exists, update it") self.__mem.seek(self.__ptr + self.__last_index * self.__step) self.__mem.write(self.__mappair.make_pair( key, struct_data[:self.__len_value]).pack()) else: if (self.__head.size < self.__head.maxsize): self.__head.size += 1 self.__mem.seek(self.__ptr + self.__head.next * self.__step) self.__mem.write(self.__mappair.make_pair( key, struct_data[:self.__len_value]).pack()) self.__head.next = (self.__head.next + 1) % self.__head.maxsize self.__writeHead() def clear(self): if (self.__ptr is None): return None self.__head.size = 0 self.__head.next = 0 self.__getnext = self.__head.next self.__writeHead() d.info(f"init Head: {self.__head}") # 测试代码 from log.LogUtil import LogUtil import struct if __name__ == '__main__': LogUtil.init("app") #mm = MemMap('TRKCOIL', 40, 100) # 初始化 mm = MemMap('ZONE0', 48) # 初始化 # mm.clear() # 清空内存 # for i in range(20): # # # 打包数据 # packed_data = b'hello world' # 初始化空字节串 # # # for data in range(25): # # # 使用 struct.pack() 打包每个数据项 # # packed_data += struct.pack('h', data) # # # 写入数据 # mm['key'+str(i)] = packed_data # 写入数据 # # packed_data = mm.getCur() # # d.debug(f"packed_data: {packed_data}") # # 读取数据 # for i in range(10): # unpacked_data = mm['key'+str(i)] # d.debug(f"unpacked_data: {unpacked_data}") # # d.debug(f"unpacked_data: {unpacked_data.get_value().strip()}") # mm['test1'] = struct.pack('fii', 1.234, 111,222) #d.debug(f"{struct.unpack('fii',unpacked_data.value[:12])}") for i in range(10): key = 'maptest'+ str(i) unpacked_data = mm[key] #d.debug(f"key:{key},unpacked_data: {unpacked_data}") d.debug(f"key:{key},unpacked_data: {unpacked_data.value}")