From 9bb810d9ed5238ff4ab75067729fd9f6a15541e0 Mon Sep 17 00:00:00 2001 From: Huamonarch Date: Wed, 13 May 2026 10:21:55 +0800 Subject: [PATCH] Replace broken ColdMutex with dual-layer ShmSpinLock + std::mutex MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The old ColdMutex (interprocess_mutex) was in process-local anonymous namespace storage, so each process had its own copy — no actual cross-process exclusion. Even if moved to SHM, interprocess_mutex is not robust: a crash while holding the lock would deadlock on restart. New design: - ShmSpinLock: atomic in shared memory, kill(pid,0) detects dead owners (ESRCH → takeover), crash-safe by construction - std::mutex: process-local, handles intra-process thread contention without burning CPU on the SHM spinlock - DualLock: locks local first, then shm; unlocks in reverse 9 lock sites in MapRuleStat upgraded to std::lock_guard. --- shm/RuleStatShm.h | 87 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 74 insertions(+), 13 deletions(-) diff --git a/shm/RuleStatShm.h b/shm/RuleStatShm.h index 9ae0941..d1ddb2f 100644 --- a/shm/RuleStatShm.h +++ b/shm/RuleStatShm.h @@ -5,7 +5,7 @@ * * 热数据(display)改走本地缓存 → Memcached,不经过共享内存。 * 冷数据(stat_values/running_time/shear_times 等)保留在共享内存中, - * 使用 boost::interprocess::interprocess_mutex 做真正的进程间同步。 + * 使用 ShmSpinLock(kill(pid,0) 崩溃接管)+ std::mutex(进程内)双层锁。 * * @author your name (you@domain.com) * @version 0.2 @@ -16,16 +16,73 @@ */ #include #include +#include +#include +#include +#include +#include +#include #include "shm_header.h" namespace RuleStatShm { using namespace ShmHeader; -namespace { +/// 跨进程自旋锁——崩溃安全(kill(pid,0) 检测持有者存活,ESRCH 则接管) +struct ShmSpinLock { + std::atomic owner{0}; -using ColdMutex = bipc::interprocess_mutex; -ColdMutex llmtx{}; ///< 进程间互斥锁 + ShmSpinLock() = default; + ShmSpinLock(const ShmSpinLock &) = delete; + ShmSpinLock &operator=(const ShmSpinLock &) = delete; + + void lock() { + pid_t my_pid = ::getpid(); + while (true) { + pid_t expected = 0; + if (owner.compare_exchange_strong(expected, my_pid, + std::memory_order_acquire)) { + return; + } + // 持有者进程已死 → 接管 + if (::kill(expected, 0) != 0 && errno == ESRCH) { + if (owner.compare_exchange_strong(expected, my_pid, + std::memory_order_acquire)) { + return; + } + continue; + } + // 持有者存活,短暂退避 + struct timespec ts = {0, 100000}; // 100µs + ::nanosleep(&ts, nullptr); + } + } + + void unlock() { + pid_t my_pid = ::getpid(); + pid_t expected = my_pid; + owner.compare_exchange_strong(expected, 0, std::memory_order_release); + } +}; + +/// 双层锁:进程内 std::mutex + 跨进程 ShmSpinLock +struct DualLock { + std::mutex local; + ShmSpinLock &shm; + + DualLock(ShmSpinLock &s) : shm(s) {} + + void lock() { + local.lock(); + shm.lock(); + } + void unlock() { + shm.unlock(); + local.unlock(); + } +}; + +namespace { const static std::string dir_path = "/users/dsc/shm"; const static std::string shm_file = "MapRuleStat"; ///<映射文件名 @@ -43,6 +100,9 @@ static void_allocator static vec_allocator_s items_allocator(obj_mapped_file.get_segment_manager()); ///< vector_s分配器 +static ShmSpinLock *shm_lock = + obj_mapped_file.find_or_construct("RuleStatLock")(); + thread_local static char_string tl_key_object("", default_allocator); thread_local static char_string tl_key_delete("", default_allocator); @@ -132,13 +192,14 @@ struct MapRuleStat { private: MapRuleStat_s *p_msg_map = obj_mapped_file.find_or_construct( shm_file.c_str())(less_s(), obj_mapped_file.get_segment_manager()); + DualLock lock_{*shm_lock}; public: // ── mon 高频调用 ── bool add_stat_value(const std::string &key, const double &value) { try { - bipc::scoped_lock guard(llmtx); + std::lock_guard guard(lock_); if (p_msg_map->operator[](get_thread_local_key(key)).fetch_mark) { p_msg_map->operator[](get_thread_local_key(key)).stat_values.clear(); p_msg_map->operator[](get_thread_local_key(key)).fetch_mark = false; @@ -156,7 +217,7 @@ public: /// mon 写入累积的冷字段(stat_values, running_time, shear_times) bool update_cold_fields(const std::string &key, const RuleStatCold &value) { - bipc::scoped_lock guard(llmtx); + std::lock_guard guard(lock_); try { if (p_msg_map->find(get_thread_local_key(key)) != p_msg_map->end()) { p_msg_map->operator[](get_thread_local_key(key)).update_cold(value); @@ -171,7 +232,7 @@ public: /// cron 写入静态字段(running_time, alarm_times, last_alarm_time, dev_coder 等) bool update_static_fields(const std::string &key, const RuleStatCold &value) { - bipc::scoped_lock guard(llmtx); + std::lock_guard guard(lock_); try { if (p_msg_map->find(get_thread_local_key(key)) != p_msg_map->end()) { p_msg_map->operator[](get_thread_local_key(key)).update_static(value); @@ -188,7 +249,7 @@ public: bool get_stat_value(const std::string &key, RuleStatCold &value) { try { - bipc::scoped_lock guard(llmtx); + std::lock_guard guard(lock_); if (!p_msg_map->operator[](get_thread_local_key(key)).fetch_mark && p_msg_map->operator[](get_thread_local_key(key)).stat_values.size() > stat_size_min) { @@ -206,7 +267,7 @@ public: // ── 管理操作 ── bool delete_data(const std::string &key) { - bipc::scoped_lock guard(llmtx); + std::lock_guard guard(lock_); try { if (p_msg_map->find(get_thread_local_delete_key(key)) != p_msg_map->end()) { @@ -219,18 +280,18 @@ public: } size_t size() { - bipc::scoped_lock guard(llmtx); + std::lock_guard guard(lock_); return p_msg_map->size(); } bool empty() { - bipc::scoped_lock guard(llmtx); + std::lock_guard guard(lock_); return p_msg_map->empty(); } std::vector find_rule_id(const std::vector &ruleid) { - bipc::scoped_lock guard(llmtx); + std::lock_guard guard(lock_); std::vector res; if (p_msg_map->empty()) { return {}; @@ -246,7 +307,7 @@ public: std::vector find_no_rule_id(const std::vector &ruleid) { - bipc::scoped_lock guard(llmtx); + std::lock_guard guard(lock_); std::vector res; if (p_msg_map->empty()) { return {};