2026-05-09 11:23:45 +08:00
|
|
|
|
#include "mix_cc/exception.h"
|
|
|
|
|
|
#include <eqpalg/threads/handler_exec.h>
|
|
|
|
|
|
#include <regex>
|
|
|
|
|
|
#include <string>
|
|
|
|
|
|
#include <zlib/MemVar.h>
|
|
|
|
|
|
#define MON_CALL_MAX_COST 20ms
|
|
|
|
|
|
extern ProcessType glob_process_type;
|
|
|
|
|
|
namespace threads {
|
|
|
|
|
|
int HandlerExec::instanceCount = 0;
|
|
|
|
|
|
HandlerExec::HandlerExec(const string &alg_name) {
|
|
|
|
|
|
++instanceCount;
|
|
|
|
|
|
this->logger_ = std::make_unique<LOG>(string("HandlerExec") + alg_name);
|
|
|
|
|
|
this->thread_name_ = "alg_" + alg_name;
|
|
|
|
|
|
if (glob_process_type == ProcessType::kTask) {
|
|
|
|
|
|
this->thread_name_ += "_task";
|
|
|
|
|
|
} else if (glob_process_type == ProcessType::kCron) {
|
|
|
|
|
|
this->thread_name_ += "_cron";
|
|
|
|
|
|
}
|
|
|
|
|
|
this->alg_id_ = stoi(alg_name);
|
|
|
|
|
|
logger_->Debug() << "alg_name:" << alg_name << ",alg_id:" << alg_id_ << endl;
|
|
|
|
|
|
is_running_ = true;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
HandlerExec::~HandlerExec() {
|
|
|
|
|
|
instanceCount--;
|
|
|
|
|
|
this->is_running_ = false;
|
|
|
|
|
|
if (r_thread_->joinable()) {
|
|
|
|
|
|
r_thread_->join();
|
|
|
|
|
|
}
|
|
|
|
|
|
logger_->Debug() << "HandlerExec::~HandlerExec()" << endl;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
std::thread::id HandlerExec::run_thread() {
|
|
|
|
|
|
r_thread_ = std::make_unique<std::thread>([&]() {
|
|
|
|
|
|
int logResult = LOG::doConfigure(thread_name_, THREAD_CONFIG);
|
|
|
|
|
|
|
|
|
|
|
|
int task_seq = HandlerExec::extractCTaskNumberRegexGeneric(thread_name_);
|
|
|
|
|
|
this->logger_->Debug() << "thread_name_:" << thread_name_
|
|
|
|
|
|
<< ",logResult[0-成功;1-配置文件不存在;2-"
|
|
|
|
|
|
"log输出目录不存在;3-"
|
|
|
|
|
|
"相同的doConfigure不能执行多遍;4-其他错误]:"
|
|
|
|
|
|
<< logResult << ",task_seq:" << task_seq << endl;
|
|
|
|
|
|
auto next_wake_time = std::chrono::steady_clock::now();
|
|
|
|
|
|
while (is_running_) {
|
|
|
|
|
|
try {
|
|
|
|
|
|
this->event_handler();
|
|
|
|
|
|
{
|
|
|
|
|
|
if (glob_process_type == ProcessType::kTask) {
|
|
|
|
|
|
for (auto &x : this->rule_pointers_) {
|
|
|
|
|
|
x.second->logReset(task_seq * 10 + x.second->get_data_source());
|
|
|
|
|
|
auto it = time_ranges_.find(x.second->get_rule_id());
|
|
|
|
|
|
if (it != time_ranges_.end()) {
|
|
|
|
|
|
x.second->exec_task_call(time_ranges_[x.second->get_rule_id()]);
|
|
|
|
|
|
time_ranges_.erase(it);
|
|
|
|
|
|
this->logger_->Debug()
|
|
|
|
|
|
<< "task:" << x.second->get_rule_name() << endl;
|
|
|
|
|
|
this->rule_pointers_.erase(x.first);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
if (glob_process_type == ProcessType::kCron) {
|
|
|
|
|
|
for (auto &x : this->rule_pointers_) {
|
|
|
|
|
|
x.second->exec_cron_call();
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
if (glob_process_type == ProcessType::kMon) {
|
|
|
|
|
|
run_t1 = std::chrono::system_clock::now();
|
|
|
|
|
|
auto runs_t1 = std::chrono::steady_clock::now();
|
|
|
|
|
|
for (auto &x : this->rule_pointers_) {
|
|
|
|
|
|
x.second->exec_mon_call();
|
|
|
|
|
|
}
|
|
|
|
|
|
run_t2 = std::chrono::system_clock::now();
|
|
|
|
|
|
auto runs_t2 = std::chrono::steady_clock::now();
|
|
|
|
|
|
if (CMemVar::Const()->is_print_eqpalg_mon_threading_info) {
|
|
|
|
|
|
cost_time = (run_t2 - run_t1).count() / 1000000;
|
|
|
|
|
|
int64_t costs_time =
|
|
|
|
|
|
std::chrono::duration_cast<std::chrono::milliseconds>(
|
|
|
|
|
|
runs_t2 - runs_t1)
|
|
|
|
|
|
.count();
|
|
|
|
|
|
if (cost_time > 50) {
|
|
|
|
|
|
this->logger_->Info()
|
|
|
|
|
|
<< "instanceCount:" << instanceCount
|
|
|
|
|
|
<< ",algID:" << this->alg_id_
|
|
|
|
|
|
<< ",thread_name_:" << thread_name_
|
|
|
|
|
|
<< ",this->rule_pointers_ size:"
|
|
|
|
|
|
<< this->rule_pointers_.size()
|
|
|
|
|
|
<< ",run cost time(system_clock):" << cost_time << " ms"
|
|
|
|
|
|
<< ",run cost time(steady_clock):" << costs_time << " ms"
|
|
|
|
|
|
<< std::endl;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
if (glob_process_type == ProcessType::kMon) {
|
|
|
|
|
|
if (std::chrono::milliseconds(cost_time) > MON_CALL_MAX_COST) {
|
|
|
|
|
|
next_wake_time = std::chrono::steady_clock::now();
|
|
|
|
|
|
} else {
|
|
|
|
|
|
next_wake_time += MON_CALL_MAX_COST;
|
|
|
|
|
|
std::this_thread::sleep_until(next_wake_time);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
if (glob_process_type == ProcessType::kCron) {
|
|
|
|
|
|
std::this_thread::sleep_for(1s);
|
|
|
|
|
|
}
|
|
|
|
|
|
if (glob_process_type == ProcessType::kTask) {
|
|
|
|
|
|
std::this_thread::sleep_for(1s);
|
|
|
|
|
|
}
|
|
|
|
|
|
} catch (const std::exception &e) {
|
|
|
|
|
|
this->logger_->Error() << mix_cc::get_nested_exception(e) << std::endl;
|
|
|
|
|
|
} catch (...) {
|
|
|
|
|
|
this->logger_->Error() << "未知异常" << std::endl;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2026-05-09 13:30:09 +08:00
|
|
|
|
this->rule_pointers_.clear();
|
2026-05-09 11:23:45 +08:00
|
|
|
|
});
|
|
|
|
|
|
return r_thread_->get_id();
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
std::vector<std::string> HandlerExec::get_rule_ids() {
|
|
|
|
|
|
std::vector<std::string> ret;
|
|
|
|
|
|
for (auto &x : rule_pointers_) {
|
|
|
|
|
|
ret.push_back(x.first);
|
|
|
|
|
|
}
|
|
|
|
|
|
return ret;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
int HandlerExec::load(std::unique_ptr<AlgBase> &&pointer) {
|
|
|
|
|
|
// 由于实例载入时,执行线程暂未运行,所以无需加锁
|
|
|
|
|
|
rule_pointers_.emplace(
|
|
|
|
|
|
std::make_pair(pointer->get_rule_id(), std::move(pointer)));
|
|
|
|
|
|
return 0;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
int HandlerExec::attach(std::unique_ptr<AlgBase> &&alg_pointer) {
|
|
|
|
|
|
std::lock_guard<std::mutex> guard(mutex_);
|
|
|
|
|
|
this->attach_queue_.emplace(std::move(alg_pointer));
|
|
|
|
|
|
return 0;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
int HandlerExec::reset(string ruleId) {
|
|
|
|
|
|
std::lock_guard<std::mutex> guard(mutex_);
|
|
|
|
|
|
this->reset_queue_.emplace(ruleId);
|
|
|
|
|
|
return 0;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
int HandlerExec::detach(string ruleId) {
|
|
|
|
|
|
std::lock_guard<std::mutex> guard(mutex_);
|
|
|
|
|
|
this->detach_queue_.push(ruleId);
|
|
|
|
|
|
return 0;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
int HandlerExec::set_usable(string ruleId, bool usable) {
|
|
|
|
|
|
std::lock_guard<std::mutex> guard(mutex_);
|
|
|
|
|
|
this->usable_queue_.push(std::make_tuple(ruleId, usable));
|
|
|
|
|
|
return 0;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
int HandlerExec::submit(std::unique_ptr<AlgBase> &&instance_ptr,
|
|
|
|
|
|
TimePoint begin_time, TimePoint end_time) {
|
|
|
|
|
|
std::lock_guard<std::mutex> guard(mutex_);
|
|
|
|
|
|
this->once_exec_queue_.emplace(std::make_tuple(
|
|
|
|
|
|
std::move(instance_ptr), mix_cc::time_range_t(begin_time, end_time)));
|
|
|
|
|
|
return 0;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
int HandlerExec::event_handler() {
|
|
|
|
|
|
std::lock_guard<std::mutex> guard(mutex_);
|
|
|
|
|
|
while (!reset_queue_.empty()) {
|
|
|
|
|
|
auto reset_rule_id = reset_queue_.front();
|
|
|
|
|
|
logger_->Info() << "alg_id:" << this->alg_id_
|
|
|
|
|
|
<< ",重置算法:" << reset_rule_id << std::endl;
|
|
|
|
|
|
reset_queue_.pop();
|
|
|
|
|
|
if (ProcessType::kMon == glob_process_type) {
|
|
|
|
|
|
if (6 == this->alg_id_ || 7 == this->alg_id_) {
|
|
|
|
|
|
rule_pointers_[reset_rule_id]->reset_dev_data();
|
|
|
|
|
|
this->logger_->Debug()
|
|
|
|
|
|
<< "alg_id:" << this->alg_id_ << ",清除历史数据" << std::endl;
|
|
|
|
|
|
} else if (2 == this->alg_id_ || 4 == this->alg_id_ ||
|
|
|
|
|
|
5 == this->alg_id_) {
|
|
|
|
|
|
rule_pointers_[reset_rule_id]->reset_dev_data();
|
|
|
|
|
|
this->logger_->Debug()
|
|
|
|
|
|
<< "alg_id:" << this->alg_id_ << ",清除统计数据" << std::endl;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
while (!detach_queue_.empty()) {
|
|
|
|
|
|
auto detach_rule_id = detach_queue_.front();
|
|
|
|
|
|
logger_->Info() << "删除算法:" << detach_rule_id << std::endl;
|
|
|
|
|
|
detach_queue_.pop();
|
|
|
|
|
|
rule_pointers_.erase(detach_rule_id);
|
|
|
|
|
|
|
|
|
|
|
|
logger_->Info() << "删除算法完成" << detach_rule_id << std::endl;
|
|
|
|
|
|
}
|
|
|
|
|
|
while (!attach_queue_.empty()) {
|
|
|
|
|
|
auto pointer = std::move(attach_queue_.front());
|
|
|
|
|
|
attach_queue_.pop();
|
|
|
|
|
|
auto rule_id = pointer->get_rule_id();
|
|
|
|
|
|
if (rule_pointers_.find(rule_id) == rule_pointers_.end()) {
|
|
|
|
|
|
logger_->Info() << "添加算法:" << pointer->get_rule_name() << std::endl;
|
|
|
|
|
|
rule_pointers_.emplace(std::make_pair(rule_id, std::move(pointer)));
|
|
|
|
|
|
logger_->Info() << "添加算法完成" << std::endl;
|
|
|
|
|
|
} else {
|
|
|
|
|
|
logger_->Info() << "更新算法:" << pointer->get_rule_name() << std::endl;
|
|
|
|
|
|
rule_pointers_[rule_id] = std::move(pointer);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
while (!usable_queue_.empty()) {
|
|
|
|
|
|
auto usable_q = usable_queue_.front();
|
|
|
|
|
|
usable_queue_.pop();
|
|
|
|
|
|
auto rule_id = std::get<0>(usable_q);
|
|
|
|
|
|
if (std::get<1>(usable_q)) {
|
|
|
|
|
|
rule_pointers_[std::get<0>(usable_q)]->init();
|
|
|
|
|
|
logger_->Info() << "启用算法:" << rule_id << std::endl;
|
|
|
|
|
|
} else {
|
|
|
|
|
|
logger_->Info() << "停用算法:" << rule_id << std::endl;
|
|
|
|
|
|
}
|
|
|
|
|
|
rule_pointers_[std::get<0>(usable_q)]->set_usable(std::get<1>(usable_q));
|
|
|
|
|
|
logger_->Info() << "algbase的is_usable_:"
|
|
|
|
|
|
<< rule_pointers_[std::get<0>(usable_q)]->get_usable()
|
|
|
|
|
|
<< std::endl;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (!once_exec_queue_.empty()) {
|
|
|
|
|
|
auto[pointer, time_range] = std::move(once_exec_queue_.front());
|
|
|
|
|
|
logger_->Info() << "单次执行算法:" << pointer->get_rule_name() << std::endl;
|
|
|
|
|
|
once_exec_queue_.pop();
|
|
|
|
|
|
auto rule_id = (pointer->get_rule_id());
|
|
|
|
|
|
|
|
|
|
|
|
if (rule_pointers_.find(rule_id) != rule_pointers_.end()) {
|
|
|
|
|
|
logger_->Info() << "task更新算法" << std::endl;
|
|
|
|
|
|
rule_pointers_.erase(rule_id);
|
|
|
|
|
|
} else {
|
|
|
|
|
|
logger_->Info() << "task新增算法" << std::endl;
|
|
|
|
|
|
}
|
|
|
|
|
|
rule_pointers_.emplace(std::make_pair(rule_id, std::move(pointer)));
|
|
|
|
|
|
logger_->Info() << "update_rule_for_task 完成" << std::endl;
|
|
|
|
|
|
time_ranges_[rule_id] = time_range;
|
|
|
|
|
|
}
|
|
|
|
|
|
return 0;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void HandlerExec::update_limit_alarm(std::string ruleid, double lb, double ub,
|
|
|
|
|
|
double va, int64_t stime, int64_t etime) {
|
|
|
|
|
|
auto iter = this->rule_pointers_.find(ruleid);
|
|
|
|
|
|
if (iter != this->rule_pointers_.end()) {
|
|
|
|
|
|
iter->second->update_limit_alarm(lb, ub, va, stime, etime);
|
|
|
|
|
|
} else {
|
|
|
|
|
|
logger_->Debug() << "|HandlerExec::update_limit_alarm|" << ruleid
|
|
|
|
|
|
<< " not find" << std::endl;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
int HandlerExec::extractCTaskNumberRegexGeneric(std::string input) {
|
|
|
|
|
|
std::regex pattern(R"((\d+))");
|
|
|
|
|
|
std::sregex_iterator begin(input.begin(), input.end(), pattern);
|
|
|
|
|
|
std::sregex_iterator end;
|
|
|
|
|
|
|
|
|
|
|
|
std::string lastMatch;
|
|
|
|
|
|
for (auto it = begin; it != end; ++it) {
|
|
|
|
|
|
lastMatch = (*it)[1].str();
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (!lastMatch.empty()) {
|
|
|
|
|
|
try {
|
|
|
|
|
|
return std::stoi(lastMatch);
|
|
|
|
|
|
} catch (...) {
|
|
|
|
|
|
return 0;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-05-09 13:30:09 +08:00
|
|
|
|
}
|
|
|
|
|
|
|