# -*- coding: utf-8 -*- ''' * @Author : zoufuzhou * @Date : 2024-08-23 16:34:37 * @Description : read and write memory-mapped files * @LastEditTime : 2024-08-26 16:34:37 ''' import pandas as pd import yaml import mmap import os from file.PathUtil import PathUtil from decor.singleton import singleton import logging as d @singleton class GlobMem: __df = None __mem_dict = {} def __init__(self): path = PathUtil().getPath() d.debug(path) # 读取 hstables.cfg 文件 self.__df = pd.read_csv(path + '/config/hstables.cfg', sep='\\s+') with open(path + '/config/config.yaml', 'r') as f: yamlcfg = yaml.safe_load(f) for key, value in yamlcfg['hsblock'].items(): d.debug(key, value) mm = self.__CreateMap( value['path'] + '/' + key + '.db', value['size']) self.__mem_dict.update({key: mm}) def __CreateMap(self, mapfile, filesize): if os.path.exists(mapfile): file_size = os.path.getsize(mapfile) if (file_size < filesize): d.error(f"{mapfile} 的大小小于 {filesize} 字节") with open(mapfile, "r+b") as f: f.truncate(filesize) f.close() else: with open(mapfile, 'wb') as f: f.write(b'\x00' * filesize) with open(mapfile, "r+b") as f: mm = mmap.mmap(f.fileno(), 0) return mm def GetMem(self, memname) -> mmap: filtered_df = self.__df[self.__df['table'] == memname] if filtered_df.empty: d.error(f"Mem {memname} not found in hstables.cfg") return None block = filtered_df.iloc[0]['block'] if block is None: d.error(f"Mem {memname} block {block} not found") return None mm = self.__mem_dict.get(block) if mm is None: d.error(f"Mem {memname} block {block} not found in config.yaml") return None start_addr = int(filtered_df.iloc[0]['start_addr'], 16) mm.seek(start_addr) # return mm, start_addr, filtered_df.iloc[0]['size'] return mm # # 测试代码 # from log.LogUtil import LogUtil # if __name__ == '__main__': # LogUtil.init("app") # gmem = GlobMem() # 初始化 # mm = gmem.GetMem('TRKCOIL') # 获取 TRKCOIL 内存 # print(mm) # 打印内存地址 # print(mm.read(11)) # 打印内存地址 # print(mm) # print(mm.tell())