eis/eqpalg/threads/manager.cc
2026-05-09 13:32:38 +08:00

325 lines
11 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <eqpalg/threads/manager.h>
#include <mix_cc/debug.h>
#include <zlib/MemQueue.hpp>
extern ProcessType glob_process_type;
namespace threads {
/**
* @brief 报警信息队列
* 只读
*/
namespace {
const int AlarmLen = 4000;
const int AlarmMaxSize = 10000;
typedef char AlarmJsonData[AlarmLen];
CMemQueue<AlarmJsonData> alarm_json_data_ = CMemQueue<AlarmJsonData>("ZONE0");
}
Manager::Manager()
: is_running_(true), is_start_(false),
logger_(std::make_unique<LOG>("ThreadManager")),
alarm_logger_(std::make_unique<LOG>("alarm_logger")) {
if (glob_process_type == ProcessType::kMon) {
alarm_handler_ptr_ = std::make_unique<AlarmHandler>();
}
}
Manager::~Manager() {
this->logger_->Info() << "Manager::~Manager()" << endl;
if (glob_process_type == ProcessType::kTask ||
glob_process_type == ProcessType::kMon) {
this->is_running_ = false;
if (this->r_thread_->joinable()) {
this->r_thread_->join();
}
}
this->logger_->Info() << "--------程序关闭--------" << endl;
}
/**
* @brief 如果ruleid不存在就添加否则就update
* @param ruleId My Param doc
* @param alg_id My Param doc
* @param name My Param doc
* @param rule_json My Param doc
* @param usable My Param doc
* @param padding_low My Param doc
* @param padding_up My Param doc
* @return int
*/
int Manager::storage(const string &ruleId, int alg_id, const string &name,
const mix_cc::json &rule_json, bool usable,
double padding_low, double padding_up, int task_seq) {
try {
if (this->stored_cfg_data_.contains(ruleId)) {
this->stored_cfg_data_[ruleId] = make_tuple(
alg_id, name, rule_json, usable, padding_low, padding_up, task_seq);
} else {
this->stored_cfg_data_.emplace(
make_pair(ruleId, make_tuple(alg_id, name, rule_json, usable,
padding_low, padding_up, task_seq)));
}
} catch (const std::exception &e) {
logger_->Error() << "Manager::storage:" << e.what() << endl;
return -1;
}
return 0;
}
int Manager::load(const string &ruleId) {
try {
this->build_alg_to_handle(ruleId);
logger_->Info() << "load complete" << endl;
} catch (const std::exception &e) {
logger_->Error() << mix_cc::get_nested_exception(e) << std::endl;
}
return 0;
}
int Manager::start() {
int res = 0;
// task 训练 析构不用的线程
if (glob_process_type == ProcessType::kTask) {
this->r_thread_ = std::make_unique<std::thread>([&]() {
LOG::doConfigure("TaskManager", THREAD_CONFIG);
alarm_logger_.reset(new LOG("TaskManager", AUTO_CATCH_PID));
while (this->is_running_) {
std::shared_lock read_lock(handles_mutex);
if (!this->handles_.empty()) {
bool test_label = true;
for (auto &x : this->handles_) {
test_label = test_label && !(x.second->get_is_running());
}
if (test_label) {
this->handles_.clear();
this->alarm_logger_->Debug()
<< "task this->handles_.clear()!" << std::endl;
}
}
this_thread::sleep_for(10ms);
}
});
}
/*-------cron和mon-------*/
else {
if (glob_process_type == ProcessType::kMon) {
this->r_thread_ = std::make_unique<std::thread>([&]() {
LOG::doConfigure("alarmInfo", THREAD_CONFIG);
alarm_logger_.reset(new LOG("alarmInfo", AUTO_CATCH_PID));
while (this->is_running_) {
if (!alarm_json_data_.empty()) {
auto alarm_now = alarm_json_data_.pop();
alarm_handler_ptr_->store_alarm(*alarm_now);
this->alarm_logger_->Debug()
<< "CMemQueue<AlarmJsonData>:" << *alarm_now << endl;
}
this_thread::sleep_for(500ms);
}
});
}
for (auto &x : this->stored_cfg_data_) {
auto[algId, name, rule_json, usable, padding_low, padding_up, task_seq] =
stored_cfg_data_.find(x.first)->second;
build_alg_to_handle(x.first);
}
logger_->Debug() << "stored_cfg_data size:" << stored_cfg_data_.size()
<< endl;
for (auto &x : this->handles_) {
tid_2_thread_name_.push_back(
std::make_tuple(x.second->run_thread(),
x.first));
}
}
this->is_start_ = true;
thread_num_ = this->handles_.size();
return res;
}
int Manager::attach(const string &ruleId) {
try {
// 大致逻辑和attach相同只有一点不同
auto iter = this->build_alg_to_handle(ruleId);
logger_->Info() << "attach complete" << endl;
} catch (const std::exception &e) {
logger_->Error() << mix_cc::get_nested_exception(e) << std::endl;
}
return 0;
}
int Manager::reset(const string &ruleId) {
try {
// 找到算法id对应的线程名
auto alg_thread_name = rule_alg_id_mapping_[ruleId];
auto iter = handles_.find(alg_thread_name);
// 如果找的到对应的算法
if (iter != handles_.end()) {
// detach对应的算法
iter->second->reset(ruleId);
}
logger_->Debug() << "reset instance complete" << endl;
} catch (const std::exception &e) {
logger_->Error() << mix_cc::get_nested_exception(e) << std::endl;
}
return 0;
}
int Manager::detach(const string &ruleId) {
try {
auto alg_thread_name = rule_alg_id_mapping_[ruleId];
auto iter = handles_.find(alg_thread_name);
if (iter != handles_.end()) {
iter->second->detach(ruleId);
if (iter->second->size() == 0) {
this->detach_full_alg(alg_thread_name);
}
}
rule_alg_id_mapping_.erase(alg_thread_name);
logger_->Debug() << "detach instance complete" << endl;
} catch (const std::exception &e) {
logger_->Error() << mix_cc::get_nested_exception(e) << std::endl;
}
return 0;
}
int Manager::exec_task(std::string ruleId, TimePoint time_start,
TimePoint time_end) {
auto[alg_id, rule_name, param_info, usable, padding_low, padding_up,
task_seq] = stored_cfg_data_.find(ruleId)->second;
auto ptr = build_algorithm(alg_id, ruleId, rule_name, param_info, padding_low,
padding_up);
ptr->init();
auto alg_thread_name = std::to_string(alg_id) + "_" +
std::to_string(ptr->get_data_source()) + "_" +
std::to_string(task_seq);
logger_->Debug() << "线程id:" << alg_thread_name << endl;
std::unique_lock write_lock(handles_mutex);
if (handles_.find(alg_thread_name) == handles_.end()) {
logger_->Debug() << "线程id:" << alg_thread_name << " not found,to create!"
<< endl;
auto exec_handler = std::make_unique<HandlerExec>(alg_thread_name);
tid_2_thread_name_.push_back(
std::make_tuple(exec_handler->run_thread(), alg_thread_name));
handles_.insert(make_pair(alg_thread_name, std::move(exec_handler)));
}
handles_[alg_thread_name]->submit(std::move(ptr), time_start, time_end);
logger_->Info() << "exec submit done" << endl;
return 0;
}
int Manager::enable(std::string ruleId, bool usable) {
auto alg_id_iter = rule_alg_id_mapping_.find(ruleId);
if (alg_id_iter != rule_alg_id_mapping_.end()) {
auto handle_iter = this->handles_.find(alg_id_iter->second);
if (handle_iter != this->handles_.end()) {
handle_iter->second->set_usable(ruleId, usable);
}
}
// 维护 stored_cfg_data_ 保证停机开机后usable_保持一致
try {
// 1.查询参数
auto[alg_id, name, rule_json, usable_source, padding_low, padding_up,
task_seq] = stored_cfg_data_.find(ruleId)->second;
// 2.调用storage 没有就添加,存在就修改
storage(ruleId, alg_id, name, rule_json, usable, padding_low, padding_up,
task_seq);
} catch (...) {
logger_->Error() << ruleId << ":维护stored_cfg_data_失败" << endl;
}
logger_->Info() << "enable done" << endl;
return 0;
}
int Manager::delete_instance(std::string ruleId) {
// 先停止对应的算法
this->detach(ruleId);
// 再将存储的算法配置信息删除
this->stored_cfg_data_.erase(ruleId);
return 0;
}
int Manager::build_alg_to_handle(const string &ruleId) {
auto[algId, name, rule_json, usable, padding_low, padding_up, task_seq] =
stored_cfg_data_.find(ruleId)->second;
int data_source = 0;
if (rule_json.contains("datasource")) {
data_source =
std::stoi(rule_json.at("datasource").at("value").get<std::string>());
this->logger_->Debug() << "ruleid:" << ruleId
<< ",data_source:" << data_source << std::endl;
}
auto ptr =
build_algorithm(algId, ruleId, name, rule_json, padding_low, padding_up);
this->logger_->Debug() << "name:" << name << ",build_alg_to_handle --test--1"
<< std::endl;
if (usable) {
ptr->init();
} else {
ptr->AlgBase::init();
}
ptr->set_usable(usable);
auto alg_thread_name = std::to_string(algId) + "_" +
std::to_string(data_source) + "_" +
std::to_string(task_seq);
auto iter = handles_.find(alg_thread_name);
if (iter == handles_.end()) {
auto exec_handler = std::make_unique<HandlerExec>(alg_thread_name);
exec_handler->attach(std::move(ptr));
if (this->is_start_) {
tid_2_thread_name_.push_back(
std::make_tuple(exec_handler->run_thread(),
alg_thread_name));
this->logger_->Debug() << "开启新线程:" << alg_thread_name << std::endl;
}
handles_.emplace(make_pair(alg_thread_name, std::move(exec_handler)));
} else {
iter->second->attach(std::move(ptr));
}
rule_alg_id_mapping_.insert(make_pair(ruleId, alg_thread_name));
thread_num_ = this->handles_.size();
return 0;
}
int Manager::detach_full_alg(std::string alg_thread_name) {
try {
auto iter = handles_.find(alg_thread_name);
if (iter != handles_.end()) {
iter->second->destroy();
handles_.erase(alg_thread_name);
}
logger_->Debug() << "detach full alg complete" << endl;
} catch (const std::exception &e) {
logger_->Error() << mix_cc::get_nested_exception(e) << std::endl;
}
thread_num_ = this->handles_.size();
return 0;
}
void Manager::update_limit_alarm(std::string ruleid, double lb, double ub,
double va, int64_t stime, int64_t etime) {
auto alg_id_iter = rule_alg_id_mapping_.find(ruleid);
if (alg_id_iter == rule_alg_id_mapping_.end()) {
logger_->Debug() << "|Manager::update_limit_alarm|" << ruleid << " not find"
<< std::endl;
}
// 如果找得到对应的算法线程,在线程中寻找对应的实例
if (alg_id_iter != rule_alg_id_mapping_.end()) {
auto handle_iter = this->handles_.find(alg_id_iter->second);
// 如果找得到对应的实例
if (handle_iter != this->handles_.end()) {
handle_iter->second->update_limit_alarm(ruleid, lb, ub, va, stime, etime);
}
}
}
int Manager::get_thread_size() { return thread_num_; }
}