eis/eqpalg/threads/handler_exec.cc
Huamonarch 1ca922a4ef Fix task thread lifecycle: self-destruct when idle, synchronized cleanup
HandlerExec in task mode now sets is_running_=false when rule_pointers_
and once_exec_queue_ are both empty. Manager cleanup uses two-phase
lock (shared_lock scan + unique_lock destroy/erase) synchronized with
exec_task via handles_mutex. exec_task checks is_running_ before submit
and destroys dead handlers to prevent task loss. Also fix logReset
self-assignment no-op.
2026-05-12 17:11:07 +08:00

283 lines
10 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "mix_cc/exception.h"
#include <eqpalg/threads/handler_exec.h>
#include <regex>
#include <string>
#include <zlib/MemVar.h>
#define MON_CALL_MAX_COST 20ms
extern ProcessType glob_process_type;
namespace threads {
int HandlerExec::instanceCount = 0;
HandlerExec::HandlerExec(const string &alg_name) {
++instanceCount;
this->logger_ = std::make_unique<LOG>(string("HandlerExec") + alg_name);
this->thread_name_ = "alg_" + alg_name;
if (glob_process_type == ProcessType::kTask) {
this->thread_name_ += "_task";
} else if (glob_process_type == ProcessType::kCron) {
this->thread_name_ += "_cron";
}
this->alg_id_ = stoi(alg_name);
logger_->Debug() << "alg_name:" << alg_name << ",alg_id:" << alg_id_ << endl;
is_running_ = true;
}
HandlerExec::~HandlerExec() {
instanceCount--;
this->is_running_ = false;
if (r_thread_->joinable()) {
r_thread_->join();
}
logger_->Debug() << "HandlerExec::~HandlerExec()" << endl;
}
std::thread::id HandlerExec::run_thread() {
r_thread_ = std::make_unique<std::thread>([&]() {
int logResult = LOG::doConfigure(thread_name_, THREAD_CONFIG);
int task_seq = HandlerExec::extractCTaskNumberRegexGeneric(thread_name_);
this->logger_->Debug() << "thread_name_:" << thread_name_
<< ",logResult[0-成功1-配置文件不存在2-"
"log输出目录不存在3-"
"相同的doConfigure不能执行多遍4-其他错误]:"
<< logResult << ",task_seq:" << task_seq << endl;
auto next_wake_time = std::chrono::steady_clock::now();
while (is_running_) {
try {
this->event_handler();
{
if (glob_process_type == ProcessType::kTask) {
for (auto &x : this->rule_pointers_) {
x.second->logReset(task_seq * 10 + x.second->get_data_source());
auto it = time_ranges_.find(x.second->get_rule_id());
if (it != time_ranges_.end()) {
x.second->exec_task_call(time_ranges_[x.second->get_rule_id()]);
time_ranges_.erase(it);
this->logger_->Debug()
<< "task:" << x.second->get_rule_name() << endl;
this->rule_pointers_.erase(x.first);
}
}
if (this->rule_pointers_.empty()) {
std::lock_guard<std::mutex> guard(mutex_);
if (this->once_exec_queue_.empty()) {
this->is_running_ = false;
}
}
}
if (glob_process_type == ProcessType::kCron) {
for (auto &x : this->rule_pointers_) {
x.second->exec_cron_call();
}
}
if (glob_process_type == ProcessType::kMon) {
run_t1 = std::chrono::system_clock::now();
auto runs_t1 = std::chrono::steady_clock::now();
for (auto &x : this->rule_pointers_) {
x.second->exec_mon_call();
}
run_t2 = std::chrono::system_clock::now();
auto runs_t2 = std::chrono::steady_clock::now();
if (CMemVar::Const()->is_print_eqpalg_mon_threading_info) {
cost_time = (run_t2 - run_t1).count() / 1000000;
int64_t costs_time =
std::chrono::duration_cast<std::chrono::milliseconds>(
runs_t2 - runs_t1)
.count();
if (cost_time > 50) {
this->logger_->Info()
<< "instanceCount:" << instanceCount
<< ",algID:" << this->alg_id_
<< "thread_name_:" << thread_name_
<< ",this->rule_pointers_ size:"
<< this->rule_pointers_.size()
<< "run cost time(system_clock):" << cost_time << " ms"
<< ",run cost time(steady_clock):" << costs_time << " ms"
<< std::endl;
}
}
}
}
if (glob_process_type == ProcessType::kMon) {
if (std::chrono::milliseconds(cost_time) > MON_CALL_MAX_COST) {
next_wake_time = std::chrono::steady_clock::now();
} else {
next_wake_time += MON_CALL_MAX_COST;
std::this_thread::sleep_until(next_wake_time);
}
}
if (glob_process_type == ProcessType::kCron) {
std::this_thread::sleep_for(1s);
}
if (glob_process_type == ProcessType::kTask) {
std::this_thread::sleep_for(1s);
}
} catch (const std::exception &e) {
this->logger_->Error() << mix_cc::get_nested_exception(e) << std::endl;
} catch (...) {
this->logger_->Error() << "未知异常" << std::endl;
}
}
this->rule_pointers_.clear();
});
return r_thread_->get_id();
}
std::vector<std::string> HandlerExec::get_rule_ids() {
std::vector<std::string> ret;
for (auto &x : rule_pointers_) {
ret.push_back(x.first);
}
return ret;
}
int HandlerExec::load(std::unique_ptr<AlgBase> &&pointer) {
// 由于实例载入时,执行线程暂未运行,所以无需加锁
rule_pointers_.emplace(
std::make_pair(pointer->get_rule_id(), std::move(pointer)));
return 0;
}
int HandlerExec::attach(std::unique_ptr<AlgBase> &&alg_pointer) {
std::lock_guard<std::mutex> guard(mutex_);
this->attach_queue_.emplace(std::move(alg_pointer));
return 0;
}
int HandlerExec::reset(string ruleId) {
std::lock_guard<std::mutex> guard(mutex_);
this->reset_queue_.emplace(ruleId);
return 0;
}
int HandlerExec::detach(string ruleId) {
std::lock_guard<std::mutex> guard(mutex_);
this->detach_queue_.push(ruleId);
return 0;
}
int HandlerExec::set_usable(string ruleId, bool usable) {
std::lock_guard<std::mutex> guard(mutex_);
this->usable_queue_.push(std::make_tuple(ruleId, usable));
return 0;
}
int HandlerExec::submit(std::unique_ptr<AlgBase> &&instance_ptr,
TimePoint begin_time, TimePoint end_time) {
std::lock_guard<std::mutex> guard(mutex_);
this->once_exec_queue_.emplace(std::make_tuple(
std::move(instance_ptr), mix_cc::time_range_t(begin_time, end_time)));
return 0;
}
int HandlerExec::event_handler() {
std::lock_guard<std::mutex> guard(mutex_);
while (!reset_queue_.empty()) {
auto reset_rule_id = reset_queue_.front();
logger_->Info() << "alg_id:" << this->alg_id_
<< ",重置算法:" << reset_rule_id << std::endl;
reset_queue_.pop();
if (ProcessType::kMon == glob_process_type) {
if (6 == this->alg_id_ || 7 == this->alg_id_) {
rule_pointers_[reset_rule_id]->reset_dev_data();
this->logger_->Debug()
<< "alg_id:" << this->alg_id_ << ",清除历史数据" << std::endl;
} else if (2 == this->alg_id_ || 4 == this->alg_id_ ||
5 == this->alg_id_) {
rule_pointers_[reset_rule_id]->reset_dev_data();
this->logger_->Debug()
<< "alg_id:" << this->alg_id_ << ",清除统计数据" << std::endl;
}
}
}
while (!detach_queue_.empty()) {
auto detach_rule_id = detach_queue_.front();
logger_->Info() << "删除算法:" << detach_rule_id << std::endl;
detach_queue_.pop();
rule_pointers_.erase(detach_rule_id);
logger_->Info() << "删除算法完成" << detach_rule_id << std::endl;
}
while (!attach_queue_.empty()) {
auto pointer = std::move(attach_queue_.front());
attach_queue_.pop();
auto rule_id = pointer->get_rule_id();
if (rule_pointers_.find(rule_id) == rule_pointers_.end()) {
logger_->Info() << "添加算法:" << pointer->get_rule_name() << std::endl;
rule_pointers_.emplace(std::make_pair(rule_id, std::move(pointer)));
logger_->Info() << "添加算法完成" << std::endl;
} else {
logger_->Info() << "更新算法:" << pointer->get_rule_name() << std::endl;
rule_pointers_[rule_id] = std::move(pointer);
}
}
while (!usable_queue_.empty()) {
auto usable_q = usable_queue_.front();
usable_queue_.pop();
auto rule_id = std::get<0>(usable_q);
if (std::get<1>(usable_q)) {
rule_pointers_[std::get<0>(usable_q)]->init();
logger_->Info() << "启用算法:" << rule_id << std::endl;
} else {
logger_->Info() << "停用算法:" << rule_id << std::endl;
}
rule_pointers_[std::get<0>(usable_q)]->set_usable(std::get<1>(usable_q));
logger_->Info() << "algbase的is_usable_:"
<< rule_pointers_[std::get<0>(usable_q)]->get_usable()
<< std::endl;
}
if (!once_exec_queue_.empty()) {
auto[pointer, time_range] = std::move(once_exec_queue_.front());
logger_->Info() << "单次执行算法:" << pointer->get_rule_name() << std::endl;
once_exec_queue_.pop();
auto rule_id = (pointer->get_rule_id());
if (rule_pointers_.find(rule_id) != rule_pointers_.end()) {
logger_->Info() << "task更新算法" << std::endl;
rule_pointers_.erase(rule_id);
} else {
logger_->Info() << "task新增算法" << std::endl;
}
rule_pointers_.emplace(std::make_pair(rule_id, std::move(pointer)));
logger_->Info() << "update_rule_for_task 完成" << std::endl;
time_ranges_[rule_id] = time_range;
}
return 0;
}
void HandlerExec::update_limit_alarm(std::string ruleid, double lb, double ub,
double va, int64_t stime, int64_t etime) {
auto iter = this->rule_pointers_.find(ruleid);
if (iter != this->rule_pointers_.end()) {
iter->second->update_limit_alarm(lb, ub, va, stime, etime);
} else {
logger_->Debug() << "|HandlerExec::update_limit_alarm|" << ruleid
<< " not find" << std::endl;
}
}
int HandlerExec::extractCTaskNumberRegexGeneric(std::string input) {
std::regex pattern(R"((\d+))");
std::sregex_iterator begin(input.begin(), input.end(), pattern);
std::sregex_iterator end;
std::string lastMatch;
for (auto it = begin; it != end; ++it) {
lastMatch = (*it)[1].str();
}
if (!lastMatch.empty()) {
try {
return std::stoi(lastMatch);
} catch (...) {
return 0;
}
}
return 0;
}
}