eis/py/comlib/shm/MemMap.py

195 lines
6.1 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
'''
* @Author : zoufuzhou
* @Date : 2024-09-02 16:34:37
* @Description :The current pointer points to the latest element
by means of a primary key value
* @LastEditTime : 2024-09-05 16:34:37
'''
from shm.GlobMem import GlobMem
from shm.MakePair import MakePair
from shm.Head import Head
import logging as d
class MemMap:
__head = Head(0, 0, 0)
__mappair = None
__ptr = None
__mem = None
__pos = None
__step = None
__len_value = 0
__last_index = 0
def __init(self, memname: str, step: int):
gmem = GlobMem()
self.__mem = gmem.GetMem(memname)
if (self.__mem is None):
d.error("MemMap init failed")
return
self.__pos = self.__mem.tell()
self.__readHead()
d.debug(f"Head: {self.__head}")
self.__ptr = self.__pos + self.__head.length()
self.__mappair = MakePair('', bytearray(step))
self.__step = self.__mappair.length()
self.__len_value = step
self.__mem.seek(self.__ptr)
self.__mappair.unpack(self.__mem.read(self.__step))
def __readHead(self):
self.__head.unpack(
self.__mem[self.__pos: self.__pos + self.__head.length()])
# d.debug(f"Head: {self.__head}")
def __writeHead(self):
packed_data = self.__head.pack()
self.__mem[self.__pos: self.__pos + self.__head.length()] = packed_data
def __init__(self, memname: str, step: int, cache_size=0):
if (cache_size <= 0):
self.__init(memname, step)
self.__getnext = self.__head.next
return
self.__init(memname, step)
self.__head.maxsize = cache_size
if (self.__head.size > self.__head.maxsize):
self.__head.size = self.__head.maxsize
if (self.__head.next > self.__head.maxsize):
self.__head.next = self.__head.maxsize
self.__head.next = self.__head.next % self.__head.maxsize
self.__writeHead()
def getCur(self):
if (self.__ptr is None):
return None
self.__readHead()
self.__mem.seek(self.__ptr + self.index_cur() * self.__step)
return self.__mappair.unpack(self.__mem.read(self.__step))
def __setitem__(self, key: int, value: bytes):
return self.insert(key, value)
def __getitem__(self, key):
return self.find(key)
def index_cur(self) -> int:
if (self.__ptr is None):
return 0
self.__readHead()
# d.debug(f"ptr: {self.__ptr}, index: {self.__head.next}")
if (self.__head.next <= 0):
return self.__head.maxsize - 1
else:
return (self.__head.next-1) % self.__head.maxsize
def max_size(self) -> int:
if (self.__ptr is None):
return 0
self.__readHead()
return self.__head.maxsize
def size(self) -> int:
if (self.__ptr is None):
return 0
self.__readHead()
return self.__head.size
def empty(self) -> bool:
if (self.__ptr is None):
return True
self.__readHead()
return self.__head.size <= 0
def find(self, key):
# d.debug(f"find key: {key}")
tmpkey = str(key)
# d.debug(f"key: {key}, tmpkey: {tmpkey}")
if (self.__ptr is None):
return None
self.__readHead()
if (self.__head.size <= 0):
return None
# d.debug(f"find key: {tmpkey}, index: {self.__mappair.get_key()}")
if (self.__mappair.key == str(tmpkey)):
return self.__mappair
for i in range(self.__head.size):
self.__mem.seek(self.__ptr + i * self.__step)
self.__mappair.unpack(self.__mem.read(self.__step))
if (self.__mappair.key == tmpkey):
self.__last_index = i
return self.__mappair
return None
def insert(self, key, struct_data: bytes):
if (self.__ptr is None):
return None
self.__readHead()
if (self.find(key) is not None):
# d.debug(f"key: {key} already exists, update it")
self.__mem.seek(self.__ptr + self.__last_index * self.__step)
self.__mem.write(self.__mappair.make_pair(
key, struct_data[:self.__len_value]).pack())
else:
if (self.__head.size < self.__head.maxsize):
self.__head.size += 1
self.__mem.seek(self.__ptr + self.__head.next * self.__step)
self.__mem.write(self.__mappair.make_pair(
key, struct_data[:self.__len_value]).pack())
self.__head.next = (self.__head.next + 1) % self.__head.maxsize
self.__writeHead()
def clear(self):
if (self.__ptr is None):
return None
self.__head.size = 0
self.__head.next = 0
self.__getnext = self.__head.next
self.__writeHead()
d.info(f"init Head: {self.__head}")
# 测试代码
from log.LogUtil import LogUtil
import struct
if __name__ == '__main__':
LogUtil.init("app")
#mm = MemMap('TRKCOIL', 40, 100) # 初始化
mm = MemMap('ZONE0', 48) # 初始化
# mm.clear() # 清空内存
# for i in range(20):
#
# # 打包数据
# packed_data = b'hello world' # 初始化空字节串
#
# # for data in range(25):
# # # 使用 struct.pack() 打包每个数据项
# # packed_data += struct.pack('h', data)
#
# # 写入数据
# mm['key'+str(i)] = packed_data # 写入数据
# # packed_data = mm.getCur()
# # d.debug(f"packed_data: {packed_data}")
# # 读取数据
# for i in range(10):
# unpacked_data = mm['key'+str(i)]
# d.debug(f"unpacked_data: {unpacked_data}")
# # d.debug(f"unpacked_data: {unpacked_data.get_value().strip()}")
# mm['test1'] = struct.pack('fii', 1.234, 111,222)
#d.debug(f"{struct.unpack('fii',unpacked_data.value[:12])}")
for i in range(10):
key = 'maptest'+ str(i)
unpacked_data = mm[key]
#d.debug(f"key:{key},unpacked_data: {unpacked_data}")
d.debug(f"key:{key},unpacked_data: {unpacked_data.value}")