eis/py/comlib/shm/GlobMem.py

88 lines
2.5 KiB
Python

# -*- coding: utf-8 -*-
'''
* @Author : zoufuzhou
* @Date : 2024-08-23 16:34:37
* @Description : read and write memory-mapped files
* @LastEditTime : 2024-08-26 16:34:37
'''
import pandas as pd
import yaml
import mmap
import os
from file.PathUtil import PathUtil
from decor.singleton import singleton
import logging as d
@singleton
class GlobMem:
__df = None
__mem_dict = {}
def __init__(self):
path = PathUtil().getPath()
d.debug(path)
# 读取 hstables.cfg 文件
self.__df = pd.read_csv(path + '/config/hstables.cfg', sep='\\s+')
with open(path + '/config/config.yaml', 'r') as f:
yamlcfg = yaml.safe_load(f)
for key, value in yamlcfg['hsblock'].items():
d.debug(key, value)
mm = self.__CreateMap(
value['path'] + '/' + key + '.db', value['size'])
self.__mem_dict.update({key: mm})
def __CreateMap(self, mapfile, filesize):
if os.path.exists(mapfile):
file_size = os.path.getsize(mapfile)
if (file_size < filesize):
d.error(f"{mapfile} 的大小小于 {filesize} 字节")
with open(mapfile, "r+b") as f:
f.truncate(filesize)
f.close()
else:
with open(mapfile, 'wb') as f:
f.write(b'\x00' * filesize)
with open(mapfile, "r+b") as f:
mm = mmap.mmap(f.fileno(), 0)
return mm
def GetMem(self, memname) -> mmap:
filtered_df = self.__df[self.__df['table'] == memname]
if filtered_df.empty:
d.error(f"Mem {memname} not found in hstables.cfg")
return None
block = filtered_df.iloc[0]['block']
if block is None:
d.error(f"Mem {memname} block {block} not found")
return None
mm = self.__mem_dict.get(block)
if mm is None:
d.error(f"Mem {memname} block {block} not found in config.yaml")
return None
start_addr = int(filtered_df.iloc[0]['start_addr'], 16)
mm.seek(start_addr)
# return mm, start_addr, filtered_df.iloc[0]['size']
return mm
# # 测试代码
# from log.LogUtil import LogUtil
# if __name__ == '__main__':
# LogUtil.init("app")
# gmem = GlobMem() # 初始化
# mm = gmem.GetMem('TRKCOIL') # 获取 TRKCOIL 内存
# print(mm) # 打印内存地址
# print(mm.read(11)) # 打印内存地址
# print(mm)
# print(mm.tell())