eis/eqpalg/threads/handler_exec.cc

358 lines
13 KiB
C++
Raw Normal View History

// #include <eqpalg/gb_item_memory.h>
#include "mix_cc/exception.h"
#include <eqpalg/threads/handler_exec.h>
#include <regex>
#include <string>
#include <zlib/MemVar.h>
#define MON_CALL_MAX_COST 20ms
extern ProcessType glob_process_type;
// const std::vector<int> do_not_stop_alg = {1111, 1110}; ///<不停机算法号
namespace threads {
// 在类外定义并初始化静态成员变量(必须)
int HandlerExec::instanceCount = 0;
HandlerExec::HandlerExec(const string &alg_name) {
++instanceCount;
// 创建logger
this->logger_ = std::make_unique<LOG>(string("HandlerExec") + alg_name);
this->thread_name_ = "alg_" + alg_name;
// 如果是单次执行,
if (glob_process_type == ProcessType::kTask) {
this->thread_name_ += "_task";
} else if (glob_process_type == ProcessType::kCron) {
this->thread_name_ += "_cron";
}
this->alg_id_ = stoi(alg_name);
logger_->Debug() << "alg_name:" << alg_name << ",alg_id:" << alg_id_ << endl;
is_running_ = true;
// is_line_stop_ = true; ///< 初始化机组状态为停机状态
// if (this->alg_id_ > 1110) {
// is_line_stop_ = false; ///< 初始化机组状态为开机机状态
// }
}
HandlerExec::~HandlerExec() {
instanceCount--;
this->is_running_ = false;
if (r_thread_->joinable()) {
r_thread_->join();
}
logger_->Debug() << "HandlerExec::~HandlerExec()" << endl;
// if (glob_process_type == ProcessType::kCron) {
// if (this->alg_id_ == 221 || this->alg_id_ == 220 || this->alg_id_ == 251
// ||
// this->alg_id_ == 250 || this->alg_id_ == 331 || this->alg_id_ == 330)
// {
// if (r_thread_->joinable()) {
// r_thread_->join();
// }
// }
// } else {
// if (r_thread_->joinable()) {
// r_thread_->join();
// }
// }
}
std::thread::id HandlerExec::run_thread() {
r_thread_ = std::make_unique<std::thread>([&]() {
int logResult = LOG::doConfigure(thread_name_, THREAD_CONFIG);
int task_seq = HandlerExec::extractCTaskNumberRegexGeneric(thread_name_);
this->logger_->Debug() << "thread_name_:" << thread_name_
<< ",logResult[0-成功1-配置文件不存在2-"
"log输出目录不存在3-"
"相同的doConfigure不能执行多遍4-其他错误]:"
<< logResult << ",task_seq:" << task_seq << endl;
auto next_wake_time = std::chrono::steady_clock::now();
while (is_running_) {
try {
this->event_handler();
{
// task 进程
if (glob_process_type == ProcessType::kTask) {
for (auto &x : this->rule_pointers_) {
x.second->logReset(task_seq * 10 + x.second->get_data_source());
auto it = time_ranges_.find(x.second->get_rule_id());
if (it != time_ranges_.end()) {
x.second->exec_task_call(time_ranges_[x.second->get_rule_id()]);
time_ranges_.erase(it);
this->logger_->Debug()
<< "task:" << x.second->get_rule_name() << endl;
this->rule_pointers_.erase(x.first);
}
}
// this->is_running_ = false;
}
if (glob_process_type == ProcessType::kCron) {
for (auto &x : this->rule_pointers_) {
x.second->exec_cron_call();
}
}
if (glob_process_type == ProcessType::kMon) {
run_t1 = std::chrono::system_clock::now();
auto runs_t1 = std::chrono::steady_clock::now();
for (auto &x : this->rule_pointers_) {
x.second->exec_mon_call();
}
run_t2 = std::chrono::system_clock::now();
auto runs_t2 = std::chrono::steady_clock::now();
if (CMemVar::Const()->is_print_eqpalg_mon_threading_info) {
cost_time = (run_t2 - run_t1).count() / 1000000;
int64_t costs_time =
std::chrono::duration_cast<std::chrono::milliseconds>(
runs_t2 - runs_t1)
.count();
if (cost_time > 50) {
this->logger_->Info()
<< "instanceCount:" << instanceCount
<< ",algID:" << this->alg_id_
<< "thread_name_:" << thread_name_
<< ",this->rule_pointers_ size:"
<< this->rule_pointers_.size()
<< "run cost time(system_clock):" << cost_time << " ms"
<< ",run cost time(steady_clock):" << costs_time << " ms"
<< std::endl;
}
}
}
}
if (glob_process_type == ProcessType::kMon) {
// std::this_thread::sleep_for(1ms);//
// 极短的sleep兼顾响应性和CPU占用
if (std::chrono::milliseconds(cost_time) > MON_CALL_MAX_COST) {
next_wake_time = std::chrono::steady_clock::now();
} else {
next_wake_time += MON_CALL_MAX_COST;
std::this_thread::sleep_until(next_wake_time);
}
}
if (glob_process_type == ProcessType::kCron) {
std::this_thread::sleep_for(1s);
}
if (glob_process_type == ProcessType::kTask) {
std::this_thread::sleep_for(1s);
}
} catch (const std::exception &e) {
this->logger_->Error() << mix_cc::get_nested_exception(e) << std::endl;
} catch (...) {
this->logger_->Error() << "未知异常" << std::endl;
}
}
this->rule_pointers_.clear(); // is_running_ = false 跳出循环,准备析构
});
return r_thread_->get_id();
}
// bool HandlerExec::look_is_cron_run() { return this->is_cron_run_; }
// void HandlerExec::set_is_cron_run(bool is_cron_run) {
// this->is_cron_run_ = is_cron_run;
// }
// bool HandlerExec::look_is_cron_run() {
// bool all_is_run = true;
// if (!this->rule_pointers_.empty()) {
// for (auto& x : this->rule_pointers_) {
// all_is_run = all_is_run && (x.second->get_is_cron_run());
// }
// return all_is_run;
// } else {
// return true;
// }
// }
// bool HandlerExec::look_is_pause() { return this->is_line_stop_; }
// int HandlerExec::pause() {
// // if (std::find(do_not_stop_alg.cbegin(), do_not_stop_alg.cend(),
// // this->alg_id_) == do_not_stop_alg.cend()) {
// // this->is_line_stop_ = true;
// // }
// this->is_line_stop_ = true;
// return 0;
// }
// int HandlerExec::unpause() {
// // 重置上次报警时间 屏蔽开机前几分钟的报警
// std::lock_guard<std::mutex> guard(mutex_);
// if (!this->rule_pointers_.empty() && this->is_line_stop_) {
// for (auto& x : this->rule_pointers_) {
// x.second->set_last_alarm_time(system_clock::now());
// }
// }
// this->is_line_stop_ = false;
// return 0;
// }
std::vector<std::string> HandlerExec::get_rule_ids() {
std::vector<std::string> ret;
for (auto &x : rule_pointers_) {
ret.push_back(x.first);
}
return ret;
}
int HandlerExec::load(std::unique_ptr<AlgBase> &&pointer) {
// 由于实例载入时,执行线程暂未运行,所以无需加锁
rule_pointers_.emplace(
std::make_pair(pointer->get_rule_id(), std::move(pointer)));
return 0;
}
int HandlerExec::attach(std::unique_ptr<AlgBase> &&alg_pointer) {
std::lock_guard<std::mutex> guard(mutex_);
this->attach_queue_.emplace(std::move(alg_pointer));
return 0;
}
int HandlerExec::reset(string ruleId) {
std::lock_guard<std::mutex> guard(mutex_);
this->reset_queue_.emplace(ruleId);
return 0;
}
int HandlerExec::detach(string ruleId) {
std::lock_guard<std::mutex> guard(mutex_);
this->detach_queue_.push(ruleId);
return 0;
}
int HandlerExec::set_usable(string ruleId, bool usable) {
std::lock_guard<std::mutex> guard(mutex_);
this->usable_queue_.push(std::make_tuple(ruleId, usable));
return 0;
}
int HandlerExec::submit(std::unique_ptr<AlgBase> &&instance_ptr,
TimePoint begin_time, TimePoint end_time) {
std::lock_guard<std::mutex> guard(mutex_);
this->once_exec_queue_.emplace(std::make_tuple(
std::move(instance_ptr), mix_cc::time_range_t(begin_time, end_time)));
return 0;
}
int HandlerExec::event_handler() {
std::lock_guard<std::mutex> guard(mutex_);
while (!reset_queue_.empty()) {
auto reset_rule_id = reset_queue_.front();
logger_->Info() << "alg_id:" << this->alg_id_
<< ",重置算法:" << reset_rule_id << std::endl;
reset_queue_.pop();
if (ProcessType::kMon == glob_process_type) {
if (6 == this->alg_id_ || 7 == this->alg_id_) {
rule_pointers_[reset_rule_id]->reset_dev_data();
this->logger_->Debug()
<< "alg_id:" << this->alg_id_ << ",清除历史数据" << std::endl;
} else if (2 == this->alg_id_ || 4 == this->alg_id_ ||
5 == this->alg_id_) {
rule_pointers_[reset_rule_id]->reset_dev_data();
this->logger_->Debug()
<< "alg_id:" << this->alg_id_ << ",清除统计数据" << std::endl;
}
}
}
while (!detach_queue_.empty()) {
auto detach_rule_id = detach_queue_.front();
logger_->Info() << "删除算法:" << detach_rule_id << std::endl;
detach_queue_.pop();
rule_pointers_.erase(detach_rule_id);
// 删除统计的样本rs数字特征包括均值方差标准差等--已弃用
// try {
// string ss0 = "rm /users/dsc/stat_data/" + detach_rule_id + "_0.dat";
// system(ss0.c_str());
// string ss1 = "rm /users/dsc/stat_data/" + detach_rule_id + "_1.dat";
// system(ss1.c_str());
// string ss2 = "rm /users/dsc/stat_data/" + detach_rule_id + "_2.dat";
// system(ss2.c_str());
// } catch (...) {
// this->logger_->Debug() << "system(ss.c_str()) 失败!" << std::endl;
// }
logger_->Info() << "删除算法完成" << detach_rule_id << std::endl;
}
while (!attach_queue_.empty()) {
// if (attach_queue_.front()->get_usable()) {
// attach_queue_.front()->init();
// }
auto pointer = std::move(attach_queue_.front());
attach_queue_.pop();
auto rule_id = pointer->get_rule_id();
if (rule_pointers_.find(rule_id) == rule_pointers_.end()) {
logger_->Info() << "添加算法:" << pointer->get_rule_name() << std::endl;
rule_pointers_.emplace(std::make_pair(rule_id, std::move(pointer)));
logger_->Info() << "添加算法完成" << std::endl;
} else {
// rule_pointers_.erase(rule_id);
logger_->Info() << "更新算法:" << pointer->get_rule_name() << std::endl;
rule_pointers_[rule_id] = std::move(pointer);
}
}
while (!usable_queue_.empty()) {
auto usable_q = usable_queue_.front();
usable_queue_.pop();
auto rule_id = std::get<0>(usable_q);
if (std::get<1>(usable_q)) {
rule_pointers_[std::get<0>(usable_q)]->init();
logger_->Info() << "启用算法:" << rule_id << std::endl;
} else {
logger_->Info() << "停用算法:" << rule_id << std::endl;
}
// logger_->Info() << "启用算法:" << rule_id << std::endl;
rule_pointers_[std::get<0>(usable_q)]->set_usable(std::get<1>(usable_q));
logger_->Info() << "algbase的is_usable_:"
<< rule_pointers_[std::get<0>(usable_q)]->get_usable()
<< std::endl;
}
if (!once_exec_queue_.empty()) {
auto[pointer, time_range] = std::move(once_exec_queue_.front());
logger_->Info() << "单次执行算法:" << pointer->get_rule_name() << std::endl;
once_exec_queue_.pop();
auto rule_id = (pointer->get_rule_id());
if (rule_pointers_.find(rule_id) != rule_pointers_.end()) {
logger_->Info() << "task更新算法" << std::endl;
rule_pointers_.erase(rule_id);
} else {
logger_->Info() << "task新增算法" << std::endl;
}
rule_pointers_.emplace(std::make_pair(rule_id, std::move(pointer)));
logger_->Info() << "update_rule_for_task 完成" << std::endl;
time_ranges_[rule_id] = time_range;
}
return 0;
}
void HandlerExec::update_limit_alarm(std::string ruleid, double lb, double ub,
double va, int64_t stime, int64_t etime) {
auto iter = this->rule_pointers_.find(ruleid);
if (iter != this->rule_pointers_.end()) {
iter->second->update_limit_alarm(lb, ub, va, stime, etime);
} else {
logger_->Debug() << "|HandlerExec::update_limit_alarm|" << ruleid
<< " not find" << std::endl;
}
}
int HandlerExec::extractCTaskNumberRegexGeneric(std::string input) {
// 匹配任意位置的数字部分,但通过贪婪匹配获取最后一个
std::regex pattern(R"((\d+))");
std::sregex_iterator begin(input.begin(), input.end(), pattern);
std::sregex_iterator end;
std::string lastMatch;
for (auto it = begin; it != end; ++it) {
lastMatch = (*it)[1].str();
}
if (!lastMatch.empty()) {
try {
return std::stoi(lastMatch);
} catch (...) {
return 0;
}
}
return 0;
}
} // namespace threads