eis/py/comlib/shm/MemFix.py

162 lines
4.7 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
'''
* @Author : zoufuzhou
* @Date : 2024-09-02 16:34:37
* @Description : Fixed element pointer position.
The current pointer points to the latest element
* @LastEditTime : 2024-09-05 16:34:37
'''
from shm.Head import Head
from shm.GlobMem import GlobMem
import logging as d
class MemFix:
__head = Head(0, 0, 0)
__ptr = None
__getnext = None
__mem = None
__pos = None
__step = None
def __init(self, memname: str, step: int):
gmem = GlobMem()
self.__mem = gmem.GetMem(memname)
if (self.__mem is None):
d.error("MemFix init failed")
return
self.__pos = self.__mem.tell()
self.__readHead()
d.debug(f"Head: {self.__head}")
self.__ptr = self.__pos + self.__head.length()
self.__step = step
def __readHead(self):
self.__head.unpack(
self.__mem[self.__pos: self.__pos + self.__head.length()])
# d.debug(f"Head: {self.__head}")
def __writeHead(self):
packed_data = self.__head.pack()
self.__mem[self.__pos: self.__pos + self.__head.length()] = packed_data
def __init__(self, memname: str, step: int, cache_size=0):
if (cache_size <= 0):
self.__init(memname, step)
self.__getnext = self.__head.next
return
self.__init(memname, step)
self.__head.maxsize = cache_size
if (self.__head.size > self.__head.maxsize):
self.__head.size = self.__head.maxsize
if (self.__head.next > self.__head.maxsize):
self.__head.next = self.__head.maxsize
self.__head.next = self.__head.next % self.__head.maxsize
self.__getnext = self.__head.next
self.__writeHead()
def getNext(self) -> bytes:
if (self.__ptr is None):
return None
self.__readHead()
if (self.__getnext == self.__head.next):
return None
else:
tmpnext = self.__getnext
self.__getnext = (self.__getnext + 1) % self.__head.maxsize
self.__mem.seek(self.__ptr + tmpnext * self.__step)
return self.__mem.read(self.__step)
def read(self, index: int) -> bytes:
if (self.__ptr is None):
return None
self.__readHead()
if (index > self.__head.maxsize):
return None
self.__mem.seek(self.__ptr + index * self.__step)
return self.__mem.read(self.__step)
def getCur(self) -> bytes:
if (self.__ptr is None):
return None
self.__readHead()
self.__mem.seek(self.__ptr + self.index_cur() * self.__step)
return self.__mem.read(self.__step)
def index_cur(self) -> int:
if (self.__ptr is None):
return 0
self.__readHead()
if (self.__head.next <= 0):
return self.__head.maxsize - 1
else:
return (self.__head.next-1) % self.__head.maxsize
def max_size(self) -> int:
if (self.__ptr is None):
return 0
self.__readHead()
return self.__head.maxsize
def size(self) -> int:
if (self.__ptr is None):
return 0
self.__readHead()
return self.__head.size
def push(self, struct_data: bytes):
if (self.__ptr is None):
return None
self.__readHead()
if (self.__head.size < self.__head.maxsize):
self.__head.size += 1
# d.debug(f"next:{self.__head.next} {self.__ptr + self.__head.next * self.__step}: {self.__ptr + (self.__head.next + 1) * self.__step}")
self.__mem.seek(self.__ptr + self.__head.next * self.__step)
self.__mem.write(struct_data[: self.__step])
self.__head.next = (self.__head.next + 1) % self.__head.maxsize
self.__writeHead()
def clear(self):
if (self.__ptr is None):
return None
self.__head.size = 0
self.__head.next = 0
self.__getnext = self.__head.next
self.__writeHead()
d.info(f"init Head: {self.__head}")
# 测试代码
from log.LogUtil import LogUtil
import struct
if __name__ == '__main__':
LogUtil.init("app")
mm = MemFix('TRKCOIL', 50, 100) # 初始化
# mm.clear() # 清空内存
for i in range(51):
# 打包数据
packed_data = b'' # 初始化空字节串
for data in range(1, 26):
# 使用 struct.pack() 打包每个数据项
packed_data += struct.pack('h', data)
mm.push(packed_data) # 写入数据
packed_data = mm.getCur()
d.debug(f"packed_data: {struct.unpack('25h', packed_data)}")
for i in range(25):
unpacked_data = struct.unpack_from('h', packed_data, i*2)
d.debug(f"unpacked_data: {unpacked_data[0]}")