88 lines
2.5 KiB
Python
88 lines
2.5 KiB
Python
|
|
# -*- coding: utf-8 -*-
|
||
|
|
'''
|
||
|
|
* @Author : zoufuzhou
|
||
|
|
* @Date : 2024-08-23 16:34:37
|
||
|
|
* @Description : read and write memory-mapped files
|
||
|
|
* @LastEditTime : 2024-08-26 16:34:37
|
||
|
|
'''
|
||
|
|
|
||
|
|
import pandas as pd
|
||
|
|
import yaml
|
||
|
|
import mmap
|
||
|
|
import os
|
||
|
|
from file.PathUtil import PathUtil
|
||
|
|
from decor.singleton import singleton
|
||
|
|
import logging as d
|
||
|
|
|
||
|
|
|
||
|
|
@singleton
|
||
|
|
class GlobMem:
|
||
|
|
|
||
|
|
__df = None
|
||
|
|
__mem_dict = {}
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
path = PathUtil().getPath()
|
||
|
|
d.debug(path)
|
||
|
|
# 读取 hstables.cfg 文件
|
||
|
|
self.__df = pd.read_csv(path + '/config/hstables.cfg', sep='\\s+')
|
||
|
|
|
||
|
|
with open(path + '/config/config.yaml', 'r') as f:
|
||
|
|
yamlcfg = yaml.safe_load(f)
|
||
|
|
|
||
|
|
for key, value in yamlcfg['hsblock'].items():
|
||
|
|
d.debug(key, value)
|
||
|
|
mm = self.__CreateMap(
|
||
|
|
value['path'] + '/' + key + '.db', value['size'])
|
||
|
|
self.__mem_dict.update({key: mm})
|
||
|
|
|
||
|
|
def __CreateMap(self, mapfile, filesize):
|
||
|
|
|
||
|
|
if os.path.exists(mapfile):
|
||
|
|
file_size = os.path.getsize(mapfile)
|
||
|
|
if (file_size < filesize):
|
||
|
|
d.error(f"{mapfile} 的大小小于 {filesize} 字节")
|
||
|
|
with open(mapfile, "r+b") as f:
|
||
|
|
f.truncate(filesize)
|
||
|
|
f.close()
|
||
|
|
else:
|
||
|
|
with open(mapfile, 'wb') as f:
|
||
|
|
f.write(b'\x00' * filesize)
|
||
|
|
|
||
|
|
with open(mapfile, "r+b") as f:
|
||
|
|
mm = mmap.mmap(f.fileno(), 0)
|
||
|
|
return mm
|
||
|
|
|
||
|
|
def GetMem(self, memname) -> mmap:
|
||
|
|
|
||
|
|
filtered_df = self.__df[self.__df['table'] == memname]
|
||
|
|
if filtered_df.empty:
|
||
|
|
d.error(f"Mem {memname} not found in hstables.cfg")
|
||
|
|
return None
|
||
|
|
|
||
|
|
block = filtered_df.iloc[0]['block']
|
||
|
|
if block is None:
|
||
|
|
d.error(f"Mem {memname} block {block} not found")
|
||
|
|
return None
|
||
|
|
mm = self.__mem_dict.get(block)
|
||
|
|
if mm is None:
|
||
|
|
d.error(f"Mem {memname} block {block} not found in config.yaml")
|
||
|
|
return None
|
||
|
|
|
||
|
|
start_addr = int(filtered_df.iloc[0]['start_addr'], 16)
|
||
|
|
mm.seek(start_addr)
|
||
|
|
# return mm, start_addr, filtered_df.iloc[0]['size']
|
||
|
|
return mm
|
||
|
|
|
||
|
|
|
||
|
|
# # 测试代码
|
||
|
|
# from log.LogUtil import LogUtil
|
||
|
|
# if __name__ == '__main__':
|
||
|
|
# LogUtil.init("app")
|
||
|
|
# gmem = GlobMem() # 初始化
|
||
|
|
# mm = gmem.GetMem('TRKCOIL') # 获取 TRKCOIL 内存
|
||
|
|
# print(mm) # 打印内存地址
|
||
|
|
# print(mm.read(11)) # 打印内存地址
|
||
|
|
# print(mm)
|
||
|
|
# print(mm.tell())
|