162 lines
4.7 KiB
Python
162 lines
4.7 KiB
Python
# -*- coding: utf-8 -*-
|
|
'''
|
|
* @Author : zoufuzhou
|
|
* @Date : 2024-09-02 16:34:37
|
|
* @Description : Fixed element pointer position.
|
|
The current pointer points to the latest element
|
|
* @LastEditTime : 2024-09-05 16:34:37
|
|
'''
|
|
|
|
from shm.Head import Head
|
|
from shm.GlobMem import GlobMem
|
|
import logging as d
|
|
|
|
|
|
class MemFix:
|
|
|
|
__head = Head(0, 0, 0)
|
|
__ptr = None
|
|
__getnext = None
|
|
__mem = None
|
|
__pos = None
|
|
__step = None
|
|
|
|
def __init(self, memname: str, step: int):
|
|
gmem = GlobMem()
|
|
self.__mem = gmem.GetMem(memname)
|
|
if (self.__mem is None):
|
|
d.error("MemFix init failed")
|
|
return
|
|
self.__pos = self.__mem.tell()
|
|
|
|
self.__readHead()
|
|
d.debug(f"Head: {self.__head}")
|
|
self.__ptr = self.__pos + self.__head.length()
|
|
self.__step = step
|
|
|
|
def __readHead(self):
|
|
|
|
self.__head.unpack(
|
|
self.__mem[self.__pos: self.__pos + self.__head.length()])
|
|
# d.debug(f"Head: {self.__head}")
|
|
|
|
def __writeHead(self):
|
|
packed_data = self.__head.pack()
|
|
self.__mem[self.__pos: self.__pos + self.__head.length()] = packed_data
|
|
|
|
def __init__(self, memname: str, step: int, cache_size=0):
|
|
if (cache_size <= 0):
|
|
self.__init(memname, step)
|
|
self.__getnext = self.__head.next
|
|
return
|
|
|
|
self.__init(memname, step)
|
|
self.__head.maxsize = cache_size
|
|
if (self.__head.size > self.__head.maxsize):
|
|
self.__head.size = self.__head.maxsize
|
|
|
|
if (self.__head.next > self.__head.maxsize):
|
|
self.__head.next = self.__head.maxsize
|
|
|
|
self.__head.next = self.__head.next % self.__head.maxsize
|
|
self.__getnext = self.__head.next
|
|
self.__writeHead()
|
|
|
|
def getNext(self) -> bytes:
|
|
if (self.__ptr is None):
|
|
return None
|
|
self.__readHead()
|
|
if (self.__getnext == self.__head.next):
|
|
return None
|
|
else:
|
|
tmpnext = self.__getnext
|
|
self.__getnext = (self.__getnext + 1) % self.__head.maxsize
|
|
self.__mem.seek(self.__ptr + tmpnext * self.__step)
|
|
return self.__mem.read(self.__step)
|
|
|
|
def read(self, index: int) -> bytes:
|
|
if (self.__ptr is None):
|
|
return None
|
|
self.__readHead()
|
|
if (index > self.__head.maxsize):
|
|
return None
|
|
self.__mem.seek(self.__ptr + index * self.__step)
|
|
return self.__mem.read(self.__step)
|
|
|
|
def getCur(self) -> bytes:
|
|
if (self.__ptr is None):
|
|
return None
|
|
self.__readHead()
|
|
self.__mem.seek(self.__ptr + self.index_cur() * self.__step)
|
|
return self.__mem.read(self.__step)
|
|
|
|
def index_cur(self) -> int:
|
|
if (self.__ptr is None):
|
|
return 0
|
|
self.__readHead()
|
|
if (self.__head.next <= 0):
|
|
return self.__head.maxsize - 1
|
|
else:
|
|
return (self.__head.next-1) % self.__head.maxsize
|
|
|
|
def max_size(self) -> int:
|
|
if (self.__ptr is None):
|
|
return 0
|
|
self.__readHead()
|
|
return self.__head.maxsize
|
|
|
|
def size(self) -> int:
|
|
if (self.__ptr is None):
|
|
return 0
|
|
self.__readHead()
|
|
return self.__head.size
|
|
|
|
def push(self, struct_data: bytes):
|
|
if (self.__ptr is None):
|
|
return None
|
|
self.__readHead()
|
|
if (self.__head.size < self.__head.maxsize):
|
|
self.__head.size += 1
|
|
|
|
# d.debug(f"next:{self.__head.next} {self.__ptr + self.__head.next * self.__step}: {self.__ptr + (self.__head.next + 1) * self.__step}")
|
|
|
|
self.__mem.seek(self.__ptr + self.__head.next * self.__step)
|
|
self.__mem.write(struct_data[: self.__step])
|
|
|
|
self.__head.next = (self.__head.next + 1) % self.__head.maxsize
|
|
self.__writeHead()
|
|
|
|
def clear(self):
|
|
if (self.__ptr is None):
|
|
return None
|
|
self.__head.size = 0
|
|
self.__head.next = 0
|
|
self.__getnext = self.__head.next
|
|
self.__writeHead()
|
|
d.info(f"init Head: {self.__head}")
|
|
|
|
|
|
# 测试代码
|
|
from log.LogUtil import LogUtil
|
|
import struct
|
|
if __name__ == '__main__':
|
|
LogUtil.init("app")
|
|
mm = MemFix('TRKCOIL', 50, 100) # 初始化
|
|
# mm.clear() # 清空内存
|
|
for i in range(51):
|
|
|
|
# 打包数据
|
|
packed_data = b'' # 初始化空字节串
|
|
|
|
for data in range(1, 26):
|
|
# 使用 struct.pack() 打包每个数据项
|
|
packed_data += struct.pack('h', data)
|
|
mm.push(packed_data) # 写入数据
|
|
|
|
packed_data = mm.getCur()
|
|
d.debug(f"packed_data: {struct.unpack('25h', packed_data)}")
|
|
|
|
for i in range(25):
|
|
unpacked_data = struct.unpack_from('h', packed_data, i*2)
|
|
d.debug(f"unpacked_data: {unpacked_data[0]}")
|