201 lines
6.1 KiB
Python
201 lines
6.1 KiB
Python
|
|
# -*- coding: utf-8 -*-
|
||
|
|
'''
|
||
|
|
* @Author : zoufuzhou
|
||
|
|
* @Date : 2024-09-02 16:34:37
|
||
|
|
* @Description : Memory Queue is a special linear table,
|
||
|
|
which is a first in,
|
||
|
|
first out (FIFO) data structure
|
||
|
|
* @LastEditTime : 2024-09-05 16:34:37
|
||
|
|
'''
|
||
|
|
|
||
|
|
import struct
|
||
|
|
from shm.GlobMem import GlobMem
|
||
|
|
import logging as d
|
||
|
|
|
||
|
|
|
||
|
|
class MemQueue:
|
||
|
|
|
||
|
|
class Head:
|
||
|
|
__format_string = 'IIII'
|
||
|
|
|
||
|
|
# 使用 __slots__ 来优化内存使用
|
||
|
|
__slots__ = ('maxsize', 'size', 'head', 'tail')
|
||
|
|
|
||
|
|
def __init__(self: 'Head', maxsize: int, size: int, head: int, tail: int):
|
||
|
|
self.maxsize = maxsize
|
||
|
|
self.size = size
|
||
|
|
self.head = head
|
||
|
|
self.tail = tail
|
||
|
|
|
||
|
|
def length(self) -> int:
|
||
|
|
"""返回打包后的字节长度。"""
|
||
|
|
return struct.calcsize(self.__format_string)
|
||
|
|
|
||
|
|
def pack(self) -> bytes:
|
||
|
|
"""将属性打包为字节数据。"""
|
||
|
|
return struct.pack(self.__format_string, self.maxsize, self.size, self.head, self.tail)
|
||
|
|
|
||
|
|
def unpack(self, data: bytes) -> None:
|
||
|
|
"""从字节数据解包到属性中。"""
|
||
|
|
if len(data) != self.length():
|
||
|
|
raise ValueError(
|
||
|
|
f"数据长度不匹配: 期望 {self.length()}, 实际 {len(data)}.")
|
||
|
|
self.maxsize, self.size, self.head, self.tail = struct.unpack(
|
||
|
|
self.__format_string, data)
|
||
|
|
|
||
|
|
def __str__(self) -> str:
|
||
|
|
return f"maxsize: {self.maxsize}, size: {self.size}, head: {self.head}, tail: {self.tail}"
|
||
|
|
|
||
|
|
def __repr__(self) -> str:
|
||
|
|
return self.__str__()
|
||
|
|
|
||
|
|
__head = Head(0, 0, 0, 0)
|
||
|
|
__ptr = None
|
||
|
|
__mem = None
|
||
|
|
__pos = None
|
||
|
|
__step = None
|
||
|
|
|
||
|
|
def __init(self, memname: str, step: int):
|
||
|
|
gmem = GlobMem()
|
||
|
|
self.__mem = gmem.GetMem(memname)
|
||
|
|
if (self.__mem is None):
|
||
|
|
d.error("MemFix init failed")
|
||
|
|
return
|
||
|
|
self.__pos = self.__mem.tell()
|
||
|
|
|
||
|
|
self.__readHead()
|
||
|
|
d.debug(f"Head: {self.__head}")
|
||
|
|
self.__ptr = self.__pos + self.__head.length()
|
||
|
|
self.__step = step
|
||
|
|
|
||
|
|
def __readHead(self):
|
||
|
|
|
||
|
|
self.__head.unpack(
|
||
|
|
self.__mem[self.__pos: self.__pos + self.__head.length()])
|
||
|
|
# d.debug(f"Head: {self.__head}")
|
||
|
|
|
||
|
|
def __writeHead(self):
|
||
|
|
packed_data = self.__head.pack()
|
||
|
|
self.__mem[self.__pos: self.__pos + self.__head.length()] = packed_data
|
||
|
|
|
||
|
|
def __init__(self, memname: str, step: int, cache_size=0):
|
||
|
|
if (cache_size <= 0):
|
||
|
|
self.__init(memname, step)
|
||
|
|
return
|
||
|
|
|
||
|
|
self.__init(memname, step)
|
||
|
|
self.__head.maxsize = cache_size
|
||
|
|
if (self.__head.size > self.__head.maxsize):
|
||
|
|
self.__head.size = 0
|
||
|
|
self.__head.head = 0
|
||
|
|
self.__head.tail = 0
|
||
|
|
else:
|
||
|
|
self.__head.size = self.__head.size % self.__head.maxsize
|
||
|
|
self.__head.head = self.__head.head % self.__head.maxsize
|
||
|
|
self.__head.tail = self.__head.tail % self.__head.maxsize
|
||
|
|
|
||
|
|
self.__writeHead()
|
||
|
|
|
||
|
|
def read(self, index: int):
|
||
|
|
if (self.__ptr is None):
|
||
|
|
return None
|
||
|
|
self.__readHead()
|
||
|
|
if (index > self.__head.maxsize):
|
||
|
|
return None
|
||
|
|
self.__mem.seek(self.__ptr + index * self.__step)
|
||
|
|
return self.__mem.read(self.__step)
|
||
|
|
|
||
|
|
def max_size(self) -> int:
|
||
|
|
if (self.__ptr is None):
|
||
|
|
return 0
|
||
|
|
self.__readHead()
|
||
|
|
return self.__head.maxsize
|
||
|
|
|
||
|
|
def size(self) -> int:
|
||
|
|
if (self.__ptr is None):
|
||
|
|
return 0
|
||
|
|
self.__readHead()
|
||
|
|
return self.__head.size
|
||
|
|
|
||
|
|
def push(self, struct_data: bytes):
|
||
|
|
if (self.__ptr is None):
|
||
|
|
return None
|
||
|
|
self.__readHead()
|
||
|
|
if (self.__head.size < self.__head.maxsize):
|
||
|
|
self.__head.size += 1
|
||
|
|
else:
|
||
|
|
self.pop()
|
||
|
|
self.__mem.seek(self.__ptr + self.__head.tail * self.__step)
|
||
|
|
self.__mem.write(struct_data[: self.__step])
|
||
|
|
self.__head.tail = (self.__head.tail + 1) % self.__head.maxsize
|
||
|
|
self.__writeHead()
|
||
|
|
|
||
|
|
def pop(self) -> bytes:
|
||
|
|
if (self.__ptr is None):
|
||
|
|
return None
|
||
|
|
self.__readHead()
|
||
|
|
if (self.empty()):
|
||
|
|
return None
|
||
|
|
else:
|
||
|
|
self.__mem.seek(self.__ptr + self.__head.head * self.__step)
|
||
|
|
self.__head.head = (self.__head.head + 1) % self.__head.maxsize
|
||
|
|
self.__head.size -= 1
|
||
|
|
self.__writeHead()
|
||
|
|
return self.__mem.read(self.__step)
|
||
|
|
|
||
|
|
def back(self) -> bytes:
|
||
|
|
if (self.__ptr is None):
|
||
|
|
return None
|
||
|
|
self.__readHead()
|
||
|
|
if (self.empty()):
|
||
|
|
return None
|
||
|
|
else:
|
||
|
|
self.__mem.seek(self.__ptr + self.__head.tail * self.__step)
|
||
|
|
return self.__mem.read(self.__step)
|
||
|
|
|
||
|
|
def front(self) -> bytes:
|
||
|
|
if (self.__ptr is None):
|
||
|
|
return None
|
||
|
|
self.__readHead()
|
||
|
|
if (self.empty()):
|
||
|
|
return None
|
||
|
|
else:
|
||
|
|
self.__mem.seek(self.__ptr + self.__head.head * self.__step)
|
||
|
|
return self.__mem.read(self.__step)
|
||
|
|
|
||
|
|
def empty(self) -> bool:
|
||
|
|
if (self.__ptr is None):
|
||
|
|
return True
|
||
|
|
self.__readHead()
|
||
|
|
return self.__head.size == 0
|
||
|
|
|
||
|
|
def clear(self):
|
||
|
|
if (self.__ptr is None):
|
||
|
|
return None
|
||
|
|
self.__head.size = 0
|
||
|
|
self.__head.head = 0
|
||
|
|
self.__head.tail = 0
|
||
|
|
self.__writeHead()
|
||
|
|
d.info(f"init Head: {self.__head}")
|
||
|
|
|
||
|
|
|
||
|
|
# 测试代码
|
||
|
|
# from log.LogUtil import LogUtil
|
||
|
|
# if __name__ == '__main__':
|
||
|
|
# LogUtil.init("app")
|
||
|
|
# mm = MemQueue('TRKCOIL', 50) # 初始化
|
||
|
|
# # mm.clear() # 清空内存
|
||
|
|
# for i in range(20):
|
||
|
|
|
||
|
|
# # 打包数据
|
||
|
|
# packed_data = b'' # 初始化空字节串
|
||
|
|
|
||
|
|
# for data in range(1, 26):
|
||
|
|
# # 使用 struct.pack() 打包每个数据项
|
||
|
|
# packed_data += struct.pack('h', data)
|
||
|
|
# mm.push(packed_data) # 写入数据
|
||
|
|
# d.debug(f"mm size: {mm.size()}")
|
||
|
|
|
||
|
|
# for i in range(20):
|
||
|
|
# d.debug(f"mm size: {mm.size()}, unpacked_data: {mm.pop()}")
|