Fix task thread lifecycle: self-destruct when idle, synchronized cleanup

HandlerExec in task mode now sets is_running_=false when rule_pointers_
and once_exec_queue_ are both empty. Manager cleanup uses two-phase
lock (shared_lock scan + unique_lock destroy/erase) synchronized with
exec_task via handles_mutex. exec_task checks is_running_ before submit
and destroys dead handlers to prevent task loss. Also fix logReset
self-assignment no-op.
This commit is contained in:
Huamonarch 2026-05-12 17:11:07 +08:00
parent 1e70af7a9d
commit 1ca922a4ef
3 changed files with 28 additions and 8 deletions

View File

@ -27,7 +27,7 @@ AlgBase::AlgBase(const string &name, const mix_cc::json &rule_json,
AlgBase::~AlgBase() { AlgBase::~AlgBase() {
} }
void AlgBase::logReset(int task_seq) { void AlgBase::logReset(int task_seq) {
task_seq = task_seq; this->task_seq = task_seq;
logger_.reset(new LOG("task:" + rule_name_, AUTO_CATCH_PID)); logger_.reset(new LOG("task:" + rule_name_, AUTO_CATCH_PID));
} }

View File

@ -57,6 +57,12 @@ std::thread::id HandlerExec::run_thread() {
this->rule_pointers_.erase(x.first); this->rule_pointers_.erase(x.first);
} }
} }
if (this->rule_pointers_.empty()) {
std::lock_guard<std::mutex> guard(mutex_);
if (this->once_exec_queue_.empty()) {
this->is_running_ = false;
}
}
} }
if (glob_process_type == ProcessType::kCron) { if (glob_process_type == ProcessType::kCron) {
for (auto &x : this->rule_pointers_) { for (auto &x : this->rule_pointers_) {

View File

@ -86,16 +86,23 @@ int Manager::start() {
LOG::doConfigure("TaskManager", THREAD_CONFIG); LOG::doConfigure("TaskManager", THREAD_CONFIG);
alarm_logger_.reset(new LOG("TaskManager", AUTO_CATCH_PID)); alarm_logger_.reset(new LOG("TaskManager", AUTO_CATCH_PID));
while (this->is_running_) { while (this->is_running_) {
std::vector<std::string> to_remove;
{
std::shared_lock read_lock(handles_mutex); std::shared_lock read_lock(handles_mutex);
if (!this->handles_.empty()) {
bool test_label = true;
for (auto &x : this->handles_) { for (auto &x : this->handles_) {
test_label = test_label && !(x.second->get_is_running()); if (!x.second->get_is_running()) {
to_remove.push_back(x.first);
} }
if (test_label) { }
this->handles_.clear(); }
for (auto &name : to_remove) {
std::unique_lock write_lock(handles_mutex);
auto it = handles_.find(name);
if (it != handles_.end() && !it->second->get_is_running()) {
it->second->destroy();
handles_.erase(it);
this->alarm_logger_->Debug() this->alarm_logger_->Debug()
<< "task this->handles_.clear()!" << std::endl; << "task cleanup: destroyed " << name << std::endl;
} }
} }
this_thread::sleep_for(10ms); this_thread::sleep_for(10ms);
@ -197,6 +204,13 @@ int Manager::exec_task(std::string ruleId, TimePoint time_start,
std::to_string(task_seq); std::to_string(task_seq);
logger_->Debug() << "线程id:" << alg_thread_name << endl; logger_->Debug() << "线程id:" << alg_thread_name << endl;
std::unique_lock write_lock(handles_mutex); std::unique_lock write_lock(handles_mutex);
{
auto it = handles_.find(alg_thread_name);
if (it != handles_.end() && !it->second->get_is_running()) {
it->second->destroy();
handles_.erase(it);
}
}
if (handles_.find(alg_thread_name) == handles_.end()) { if (handles_.find(alg_thread_name) == handles_.end()) {
logger_->Debug() << "线程id:" << alg_thread_name << " not found,to create!" logger_->Debug() << "线程id:" << alg_thread_name << " not found,to create!"
<< endl; << endl;