eis/py/comlib/shm/MemTable.py

243 lines
7.1 KiB
Python

# -*- coding: utf-8 -*-
'''
* @Author : zoufuzhou
* @Date : 2024-09-02 16:34:37
* @Description : memory table storage, element address is not fixed,
the current pointer to the latest element
* @LastEditTime : 2024-09-05 16:34:37
'''
import struct
from shm.GlobMem import GlobMem
import logging as d
class MemTable:
class Head:
__format_string = 'III'
# 使用 __slots__ 来优化内存使用
__slots__ = ('maxsize', 'size', 'count')
def __init__(self: 'Head', maxsize: int, size: int, count: int):
self.maxsize = maxsize
self.size = size
self.count = count
def length(self) -> int:
"""返回打包后的字节长度。"""
return struct.calcsize(self.__format_string)
def pack(self) -> bytes:
"""将属性打包为字节数据。"""
return struct.pack(self.__format_string, self.maxsize, self.size, self.count)
def unpack(self, data: bytes) -> None:
"""从字节数据解包到属性中。"""
if len(data) != self.length():
raise ValueError(
f"数据长度不匹配: 期望 {self.length()}, 实际 {len(data)}.")
self.maxsize, self.size, self.count = struct.unpack(
self.__format_string, data)
def __str__(self) -> str:
return f"maxsize: {self.maxsize}, size: {self.size}, count: {self.count}"
def __repr__(self) -> str:
return self.__str__()
__head = Head(0, 0, 0)
__ptr = None
__getcnt = None
__mem = None
__pos = None
__step = None
def __init(self, memname: str, step: int):
gmem = GlobMem()
self.__mem = gmem.GetMem(memname)
if (self.__mem is None):
d.error("MemTable init failed")
return
self.__pos = self.__mem.tell()
self.__readHead()
self.__ptr = self.__pos + self.__head.length()
self.__step = step
d.debug(f"Head: {self.__head},step: {self.__step}")
def __readHead(self):
self.__head.unpack(
self.__mem[self.__pos: self.__pos + self.__head.length()])
# d.debug(f"Head: {self.__head}")
def __writeHead(self):
packed_data = self.__head.pack()
self.__mem[self.__pos: self.__pos + self.__head.length()] = packed_data
def __init__(self, memname: str, step: int, cache_size=0):
if (cache_size <= 0):
self.__init(memname, step)
self.__getcnt = self.__head.count
return
self.__init(memname, step)
self.__head.maxsize = cache_size
if (self.__head.size > self.__head.maxsize):
self.__head.size = self.__head.maxsize
# d.debug(f"Head: {self.__head}")
self.__getcnt = self.__head.count
self.__writeHead()
def __countMemory(self):
if (self.__ptr is None):
return None
self.__head.count += 1
if (self.__head.count <= 1):
self.__head.count = 1
self.__getcnt = 1
def __countLocal(self) -> int:
if (self.__ptr is None):
return None
self.__getcnt += 1
if (self.__getcnt <= 1):
self.__getcnt = 1
cn = self.__head.count - self.__getcnt
if (cn < 0):
self.__getcnt = self.__head.count
cn = 0
elif (cn > self.__head.maxsize):
cn = self.__head.maxsize
self.__getcnt = self.__head.count - self.__head.maxsize
return cn
def __memmove(self, dest_offset, src_offset, count):
self.__mem.seek(src_offset)
data = self.__mem.read(count)
self.__mem.seek(dest_offset)
self.__mem.write(data)
def getNext(self) -> bytes:
if (self.__ptr is None):
return None
self.__readHead()
if (self.__isNew() is False):
return None
else:
cn = self.__countLocal()
if (cn == 0):
return None
else:
return self.read(cn)
def read(self, index: int) -> bytes:
if (self.__ptr is None):
return None
self.__readHead()
if (index > self.__head.maxsize):
return None
self.__mem.seek(self.__ptr + index * self.__step)
return self.__mem.read(self.__step)
def getCur(self) -> bytes:
if (self.__ptr is None):
return None
self.__readHead()
self.__mem.seek(self.__ptr + self.index_cur() * self.__step)
return self.__mem.read(self.__step)
def index_cur(self) -> int:
if (self.__ptr is None):
return 0
self.__readHead()
if (self.__head.count <= 0):
return self.__head.maxsize - 1
else:
return (self.__head.count-1) % self.__head.maxsize
def max_size(self) -> int:
if (self.__ptr is None):
return 0
self.__readHead()
return self.__head.maxsize
def count(self) -> int:
return self.size()
def size(self) -> int:
if (self.__ptr is None):
return 0
self.__readHead()
return self.__head.size
def push(self, struct_data: bytes):
if (self.__ptr is None):
return None
self.__readHead()
# d.debug(f"Head: {self.__head},step:{self.__step}")
if (self.__head.maxsize == 1):
self.__head.size = 1
elif (self.__head.size < self.__head.maxsize):
self.__memmove(self.__ptr + self.__step, self.__ptr,
self.__step * self.__head.size)
self.__head.size += 1
else:
self.__memmove(self.__ptr + self.__step, self.__ptr,
self.__step * (self.__head.maxsize - 1))
self.__head.size = self.__head.maxsize
self.__mem.seek(self.__ptr)
self.__mem.write(struct_data[: self.__step])
self.__countMemory()
self.__writeHead()
def __isNew(self) -> bool:
if (self.__ptr is None):
return False
self.__readHead()
if (self.__getcnt < self.__head.count):
return True
else:
return False
def clear(self):
if (self.__ptr is None):
return None
self.__head.size = 0
self.__head.count = 0
self.__getcnt = self.__head.count
self.__writeHead()
d.info(f"init Head: {self.__head}")
# 测试代码
# from log.LogUtil import LogUtil
# if __name__ == '__main__':
# LogUtil.init("app")
# mm = MemTable('TRKCOIL', 50, 100) # 初始化
# # mm.clear() # 清空内存
# for i in range(50):
# # 打包数据
# packed_data = b'' # 初始化空字节串
# for data in range(25):
# packed_data += struct.pack('h', data)
# mm.push(packed_data) # 写入数据
# packed_data = mm.getCur()
# d.debug(f"packed_data: {struct.unpack('25h', packed_data)}")
# for i in range(25):
# unpacked_data = struct.unpack_from('h', packed_data, i*2)
# d.debug(f"unpacked_data: {unpacked_data[0]}")