195 lines
6.1 KiB
Python
195 lines
6.1 KiB
Python
|
|
# -*- coding: utf-8 -*-
|
||
|
|
'''
|
||
|
|
* @Author : zoufuzhou
|
||
|
|
* @Date : 2024-09-02 16:34:37
|
||
|
|
* @Description :The current pointer points to the latest element
|
||
|
|
by means of a primary key value
|
||
|
|
* @LastEditTime : 2024-09-05 16:34:37
|
||
|
|
'''
|
||
|
|
|
||
|
|
from shm.GlobMem import GlobMem
|
||
|
|
from shm.MakePair import MakePair
|
||
|
|
from shm.Head import Head
|
||
|
|
import logging as d
|
||
|
|
|
||
|
|
|
||
|
|
class MemMap:
|
||
|
|
|
||
|
|
__head = Head(0, 0, 0)
|
||
|
|
__mappair = None
|
||
|
|
__ptr = None
|
||
|
|
__mem = None
|
||
|
|
__pos = None
|
||
|
|
__step = None
|
||
|
|
__len_value = 0
|
||
|
|
__last_index = 0
|
||
|
|
|
||
|
|
def __init(self, memname: str, step: int):
|
||
|
|
gmem = GlobMem()
|
||
|
|
self.__mem = gmem.GetMem(memname)
|
||
|
|
if (self.__mem is None):
|
||
|
|
d.error("MemMap init failed")
|
||
|
|
return
|
||
|
|
self.__pos = self.__mem.tell()
|
||
|
|
|
||
|
|
self.__readHead()
|
||
|
|
d.debug(f"Head: {self.__head}")
|
||
|
|
self.__ptr = self.__pos + self.__head.length()
|
||
|
|
self.__mappair = MakePair('', bytearray(step))
|
||
|
|
self.__step = self.__mappair.length()
|
||
|
|
self.__len_value = step
|
||
|
|
self.__mem.seek(self.__ptr)
|
||
|
|
self.__mappair.unpack(self.__mem.read(self.__step))
|
||
|
|
|
||
|
|
def __readHead(self):
|
||
|
|
|
||
|
|
self.__head.unpack(
|
||
|
|
self.__mem[self.__pos: self.__pos + self.__head.length()])
|
||
|
|
# d.debug(f"Head: {self.__head}")
|
||
|
|
|
||
|
|
def __writeHead(self):
|
||
|
|
packed_data = self.__head.pack()
|
||
|
|
self.__mem[self.__pos: self.__pos + self.__head.length()] = packed_data
|
||
|
|
|
||
|
|
def __init__(self, memname: str, step: int, cache_size=0):
|
||
|
|
if (cache_size <= 0):
|
||
|
|
self.__init(memname, step)
|
||
|
|
self.__getnext = self.__head.next
|
||
|
|
return
|
||
|
|
|
||
|
|
self.__init(memname, step)
|
||
|
|
self.__head.maxsize = cache_size
|
||
|
|
if (self.__head.size > self.__head.maxsize):
|
||
|
|
self.__head.size = self.__head.maxsize
|
||
|
|
|
||
|
|
if (self.__head.next > self.__head.maxsize):
|
||
|
|
self.__head.next = self.__head.maxsize
|
||
|
|
|
||
|
|
self.__head.next = self.__head.next % self.__head.maxsize
|
||
|
|
self.__writeHead()
|
||
|
|
|
||
|
|
def getCur(self):
|
||
|
|
if (self.__ptr is None):
|
||
|
|
return None
|
||
|
|
self.__readHead()
|
||
|
|
self.__mem.seek(self.__ptr + self.index_cur() * self.__step)
|
||
|
|
return self.__mappair.unpack(self.__mem.read(self.__step))
|
||
|
|
|
||
|
|
def __setitem__(self, key: int, value: bytes):
|
||
|
|
return self.insert(key, value)
|
||
|
|
|
||
|
|
def __getitem__(self, key):
|
||
|
|
return self.find(key)
|
||
|
|
|
||
|
|
def index_cur(self) -> int:
|
||
|
|
if (self.__ptr is None):
|
||
|
|
return 0
|
||
|
|
self.__readHead()
|
||
|
|
# d.debug(f"ptr: {self.__ptr}, index: {self.__head.next}")
|
||
|
|
if (self.__head.next <= 0):
|
||
|
|
return self.__head.maxsize - 1
|
||
|
|
else:
|
||
|
|
return (self.__head.next-1) % self.__head.maxsize
|
||
|
|
|
||
|
|
def max_size(self) -> int:
|
||
|
|
if (self.__ptr is None):
|
||
|
|
return 0
|
||
|
|
self.__readHead()
|
||
|
|
return self.__head.maxsize
|
||
|
|
|
||
|
|
def size(self) -> int:
|
||
|
|
if (self.__ptr is None):
|
||
|
|
return 0
|
||
|
|
self.__readHead()
|
||
|
|
return self.__head.size
|
||
|
|
|
||
|
|
def empty(self) -> bool:
|
||
|
|
if (self.__ptr is None):
|
||
|
|
return True
|
||
|
|
self.__readHead()
|
||
|
|
return self.__head.size <= 0
|
||
|
|
|
||
|
|
def find(self, key):
|
||
|
|
# d.debug(f"find key: {key}")
|
||
|
|
tmpkey = str(key)
|
||
|
|
# d.debug(f"key: {key}, tmpkey: {tmpkey}")
|
||
|
|
if (self.__ptr is None):
|
||
|
|
return None
|
||
|
|
self.__readHead()
|
||
|
|
if (self.__head.size <= 0):
|
||
|
|
return None
|
||
|
|
# d.debug(f"find key: {tmpkey}, index: {self.__mappair.get_key()}")
|
||
|
|
if (self.__mappair.key == str(tmpkey)):
|
||
|
|
return self.__mappair
|
||
|
|
for i in range(self.__head.size):
|
||
|
|
self.__mem.seek(self.__ptr + i * self.__step)
|
||
|
|
self.__mappair.unpack(self.__mem.read(self.__step))
|
||
|
|
if (self.__mappair.key == tmpkey):
|
||
|
|
self.__last_index = i
|
||
|
|
return self.__mappair
|
||
|
|
return None
|
||
|
|
|
||
|
|
def insert(self, key, struct_data: bytes):
|
||
|
|
if (self.__ptr is None):
|
||
|
|
return None
|
||
|
|
self.__readHead()
|
||
|
|
if (self.find(key) is not None):
|
||
|
|
# d.debug(f"key: {key} already exists, update it")
|
||
|
|
self.__mem.seek(self.__ptr + self.__last_index * self.__step)
|
||
|
|
self.__mem.write(self.__mappair.make_pair(
|
||
|
|
key, struct_data[:self.__len_value]).pack())
|
||
|
|
else:
|
||
|
|
if (self.__head.size < self.__head.maxsize):
|
||
|
|
self.__head.size += 1
|
||
|
|
self.__mem.seek(self.__ptr + self.__head.next * self.__step)
|
||
|
|
self.__mem.write(self.__mappair.make_pair(
|
||
|
|
key, struct_data[:self.__len_value]).pack())
|
||
|
|
self.__head.next = (self.__head.next + 1) % self.__head.maxsize
|
||
|
|
|
||
|
|
self.__writeHead()
|
||
|
|
|
||
|
|
def clear(self):
|
||
|
|
if (self.__ptr is None):
|
||
|
|
return None
|
||
|
|
self.__head.size = 0
|
||
|
|
self.__head.next = 0
|
||
|
|
self.__getnext = self.__head.next
|
||
|
|
self.__writeHead()
|
||
|
|
d.info(f"init Head: {self.__head}")
|
||
|
|
|
||
|
|
|
||
|
|
# 测试代码
|
||
|
|
from log.LogUtil import LogUtil
|
||
|
|
import struct
|
||
|
|
if __name__ == '__main__':
|
||
|
|
LogUtil.init("app")
|
||
|
|
#mm = MemMap('TRKCOIL', 40, 100) # 初始化
|
||
|
|
mm = MemMap('ZONE0', 48) # 初始化
|
||
|
|
# mm.clear() # 清空内存
|
||
|
|
# for i in range(20):
|
||
|
|
#
|
||
|
|
# # 打包数据
|
||
|
|
# packed_data = b'hello world' # 初始化空字节串
|
||
|
|
#
|
||
|
|
# # for data in range(25):
|
||
|
|
# # # 使用 struct.pack() 打包每个数据项
|
||
|
|
# # packed_data += struct.pack('h', data)
|
||
|
|
#
|
||
|
|
# # 写入数据
|
||
|
|
# mm['key'+str(i)] = packed_data # 写入数据
|
||
|
|
# # packed_data = mm.getCur()
|
||
|
|
# # d.debug(f"packed_data: {packed_data}")
|
||
|
|
# # 读取数据
|
||
|
|
# for i in range(10):
|
||
|
|
# unpacked_data = mm['key'+str(i)]
|
||
|
|
# d.debug(f"unpacked_data: {unpacked_data}")
|
||
|
|
# # d.debug(f"unpacked_data: {unpacked_data.get_value().strip()}")
|
||
|
|
# mm['test1'] = struct.pack('fii', 1.234, 111,222)
|
||
|
|
#d.debug(f"{struct.unpack('fii',unpacked_data.value[:12])}")
|
||
|
|
for i in range(10):
|
||
|
|
key = 'maptest'+ str(i)
|
||
|
|
unpacked_data = mm[key]
|
||
|
|
#d.debug(f"key:{key},unpacked_data: {unpacked_data}")
|
||
|
|
d.debug(f"key:{key},unpacked_data: {unpacked_data.value}")
|
||
|
|
|