From 1ca922a4ef3fd27162eb27554b0572e2a86ead3b Mon Sep 17 00:00:00 2001 From: Huamonarch Date: Tue, 12 May 2026 17:11:07 +0800 Subject: [PATCH] Fix task thread lifecycle: self-destruct when idle, synchronized cleanup HandlerExec in task mode now sets is_running_=false when rule_pointers_ and once_exec_queue_ are both empty. Manager cleanup uses two-phase lock (shared_lock scan + unique_lock destroy/erase) synchronized with exec_task via handles_mutex. exec_task checks is_running_ before submit and destroys dead handlers to prevent task loss. Also fix logReset self-assignment no-op. --- eqpalg/alg_base.cpp | 2 +- eqpalg/threads/handler_exec.cc | 6 ++++++ eqpalg/threads/manager.cc | 28 +++++++++++++++++++++------- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/eqpalg/alg_base.cpp b/eqpalg/alg_base.cpp index 925b31a..cb4cafa 100644 --- a/eqpalg/alg_base.cpp +++ b/eqpalg/alg_base.cpp @@ -27,7 +27,7 @@ AlgBase::AlgBase(const string &name, const mix_cc::json &rule_json, AlgBase::~AlgBase() { } void AlgBase::logReset(int task_seq) { - task_seq = task_seq; + this->task_seq = task_seq; logger_.reset(new LOG("task:" + rule_name_, AUTO_CATCH_PID)); } diff --git a/eqpalg/threads/handler_exec.cc b/eqpalg/threads/handler_exec.cc index fe9d8be..22bd8e9 100644 --- a/eqpalg/threads/handler_exec.cc +++ b/eqpalg/threads/handler_exec.cc @@ -57,6 +57,12 @@ std::thread::id HandlerExec::run_thread() { this->rule_pointers_.erase(x.first); } } + if (this->rule_pointers_.empty()) { + std::lock_guard guard(mutex_); + if (this->once_exec_queue_.empty()) { + this->is_running_ = false; + } + } } if (glob_process_type == ProcessType::kCron) { for (auto &x : this->rule_pointers_) { diff --git a/eqpalg/threads/manager.cc b/eqpalg/threads/manager.cc index 262da4e..0f10f3a 100644 --- a/eqpalg/threads/manager.cc +++ b/eqpalg/threads/manager.cc @@ -86,16 +86,23 @@ int Manager::start() { LOG::doConfigure("TaskManager", THREAD_CONFIG); alarm_logger_.reset(new LOG("TaskManager", AUTO_CATCH_PID)); while (this->is_running_) { - std::shared_lock read_lock(handles_mutex); - if (!this->handles_.empty()) { - bool test_label = true; + std::vector to_remove; + { + std::shared_lock read_lock(handles_mutex); for (auto &x : this->handles_) { - test_label = test_label && !(x.second->get_is_running()); + if (!x.second->get_is_running()) { + to_remove.push_back(x.first); + } } - if (test_label) { - this->handles_.clear(); + } + for (auto &name : to_remove) { + std::unique_lock write_lock(handles_mutex); + auto it = handles_.find(name); + if (it != handles_.end() && !it->second->get_is_running()) { + it->second->destroy(); + handles_.erase(it); this->alarm_logger_->Debug() - << "task this->handles_.clear()!" << std::endl; + << "task cleanup: destroyed " << name << std::endl; } } this_thread::sleep_for(10ms); @@ -197,6 +204,13 @@ int Manager::exec_task(std::string ruleId, TimePoint time_start, std::to_string(task_seq); logger_->Debug() << "线程id:" << alg_thread_name << endl; std::unique_lock write_lock(handles_mutex); + { + auto it = handles_.find(alg_thread_name); + if (it != handles_.end() && !it->second->get_is_running()) { + it->second->destroy(); + handles_.erase(it); + } + } if (handles_.find(alg_thread_name) == handles_.end()) { logger_->Debug() << "线程id:" << alg_thread_name << " not found,to create!" << endl;