eis/eqpalg/threads/manager.cc

349 lines
12 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <eqpalg/threads/manager.h>
#include <mix_cc/debug.h>
#include <zlib/MemQueue.hpp>
extern ProcessType glob_process_type;
namespace threads {
/**
* @brief 报警信息队列
* 只读
*/
namespace {
const int AlarmLen = 4000; ///<报警信息的最大长度
const int AlarmMaxSize = 10000; ///<报警信息队列容量
typedef char AlarmJsonData[AlarmLen];
// CMemQueue<AlarmJsonData> alarm_json_data_ = CMemQueue<AlarmJsonData>("2012");
CMemQueue<AlarmJsonData> alarm_json_data_ = CMemQueue<AlarmJsonData>("ZONE0");
} // namespace
Manager::Manager()
: is_running_(true), is_start_(false),
logger_(std::make_unique<LOG>("ThreadManager")),
alarm_logger_(std::make_unique<LOG>("alarm_logger")) {
if (glob_process_type == ProcessType::kMon) {
alarm_handler_ptr_ = std::make_unique<AlarmHandler>();
}
}
Manager::~Manager() {
this->logger_->Info() << "Manager::~Manager()" << endl;
if (glob_process_type == ProcessType::kTask ||
glob_process_type == ProcessType::kMon) {
this->is_running_ = false;
if (this->r_thread_->joinable()) {
this->r_thread_->join();
}
}
this->logger_->Info() << "--------程序关闭--------" << endl;
}
/**
* @brief 如果ruleid不存在就添加否则就update
* @param ruleId My Param doc
* @param alg_id My Param doc
* @param name My Param doc
* @param rule_json My Param doc
* @param usable My Param doc
* @param padding_low My Param doc
* @param padding_up My Param doc
* @return int
*/
int Manager::storage(const string &ruleId, int alg_id, const string &name,
const mix_cc::json &rule_json, bool usable,
double padding_low, double padding_up, int task_seq) {
try {
if (this->stored_cfg_data_.contains(ruleId)) {
this->stored_cfg_data_[ruleId] = make_tuple(
alg_id, name, rule_json, usable, padding_low, padding_up, task_seq);
} else {
this->stored_cfg_data_.emplace(
make_pair(ruleId, make_tuple(alg_id, name, rule_json, usable,
padding_low, padding_up, task_seq)));
}
} catch (const std::exception &e) {
logger_->Error() << "Manager::storage:" << e.what() << endl;
return -1;
}
return 0;
}
int Manager::load(const string &ruleId) {
try {
this->build_alg_to_handle(ruleId);
logger_->Info() << "load complete" << endl;
} catch (const std::exception &e) {
logger_->Error() << mix_cc::get_nested_exception(e) << std::endl;
}
return 0;
}
int Manager::start() {
int res = 0;
// task 训练 析构不用的线程
if (glob_process_type == ProcessType::kTask) {
this->r_thread_ = std::make_unique<std::thread>([&]() {
LOG::doConfigure("TaskManager", THREAD_CONFIG);
alarm_logger_.reset(new LOG("TaskManager", AUTO_CATCH_PID));
while (this->is_running_) {
std::shared_lock read_lock(handles_mutex);
if (!this->handles_.empty()) {
bool test_label = true;
for (auto &x : this->handles_) {
test_label = test_label && !(x.second->get_is_running());
}
if (test_label) {
this->handles_.clear();
this->alarm_logger_->Debug()
<< "task this->handles_.clear()!" << std::endl;
}
}
this_thread::sleep_for(10ms);
}
});
}
/*-------cron和mon-------*/
else {
if (glob_process_type == ProcessType::kMon) {
this->r_thread_ = std::make_unique<std::thread>([&]() {
LOG::doConfigure("alarmInfo", THREAD_CONFIG);
alarm_logger_.reset(new LOG("alarmInfo", AUTO_CATCH_PID));
while (this->is_running_) {
if (!alarm_json_data_.empty()) {
auto alarm_now = alarm_json_data_.pop();
alarm_handler_ptr_->store_alarm(*alarm_now);
this->alarm_logger_->Debug()
<< "CMemQueue<AlarmJsonData>:" << *alarm_now << endl;
}
this_thread::sleep_for(500ms);
}
});
}
for (auto &x : this->stored_cfg_data_) {
auto[algId, name, rule_json, usable, padding_low, padding_up, task_seq] =
stored_cfg_data_.find(x.first)->second;
build_alg_to_handle(x.first);
}
logger_->Debug() << "stored_cfg_data size:" << stored_cfg_data_.size()
<< endl;
for (auto &x : this->handles_) {
tid_2_thread_name_.push_back(
std::make_tuple(x.second->run_thread(),
x.first)); //运行具体的算法线程
}
}
this->is_start_ = true; ///<算法线程已启动标记
thread_num_ = this->handles_.size();
return res;
}
int Manager::attach(const string &ruleId) {
try {
// 大致逻辑和attach相同只有一点不同
auto iter = this->build_alg_to_handle(ruleId);
logger_->Info() << "attach complete" << endl;
} catch (const std::exception &e) {
logger_->Error() << mix_cc::get_nested_exception(e) << std::endl;
}
return 0;
}
int Manager::reset(const string &ruleId) {
try {
// 找到算法id对应的线程名
auto alg_thread_name = rule_alg_id_mapping_[ruleId];
auto iter = handles_.find(alg_thread_name);
// 如果找的到对应的算法
if (iter != handles_.end()) {
// detach对应的算法
iter->second->reset(ruleId);
}
logger_->Debug() << "reset instance complete" << endl;
} catch (const std::exception &e) {
logger_->Error() << mix_cc::get_nested_exception(e) << std::endl;
}
return 0;
}
int Manager::detach(const string &ruleId) {
// std::lock_guard<std::mutex> guard(r_mutex_); //有修改操作 2022-1-18
try {
// 找到算法id对应的线程名
auto alg_thread_name = rule_alg_id_mapping_[ruleId];
auto iter = handles_.find(alg_thread_name);
// 如果找的到对应的算法
if (iter != handles_.end()) {
// detach对应的算法
iter->second->detach(ruleId);
if (iter->second->size() == 0) {
// 如果当前线程算法实例数量为0则结束当前线程
this->detach_full_alg(alg_thread_name);
}
}
// 抹除对应关系
rule_alg_id_mapping_.erase(alg_thread_name);
logger_->Debug() << "detach instance complete" << endl;
} catch (const std::exception &e) {
logger_->Error() << mix_cc::get_nested_exception(e) << std::endl;
}
return 0;
}
int Manager::exec_task(std::string ruleId, TimePoint time_start,
TimePoint time_end) {
auto[alg_id, rule_name, param_info, usable, padding_low, padding_up,
task_seq] = stored_cfg_data_.find(ruleId)->second;
auto ptr = build_algorithm(alg_id, ruleId, rule_name, param_info, padding_low,
padding_up);
ptr->init();
/*线程号算法号_数据源_线程序号*/
auto alg_thread_name = std::to_string(alg_id) + "_" +
std::to_string(ptr->get_data_source()) + "_" +
std::to_string(task_seq); ///<添加 task_seq
logger_->Debug() << "线程id:" << alg_thread_name << endl;
// 如果找不到对应算法的线程,则新建线程
std::unique_lock write_lock(handles_mutex);
if (handles_.find(alg_thread_name) == handles_.end()) {
logger_->Debug() << "线程id:" << alg_thread_name << " not found,to create!"
<< endl;
auto exec_handler = std::make_unique<HandlerExec>(alg_thread_name);
tid_2_thread_name_.push_back(
std::make_tuple(exec_handler->run_thread(), alg_thread_name));
handles_.insert(make_pair(alg_thread_name, std::move(exec_handler)));
}
handles_[alg_thread_name]->submit(std::move(ptr), time_start, time_end);
logger_->Info() << "exec submit done" << endl;
// logger_->Info() << "handles_.clear()" << endl;
return 0;
}
int Manager::enable(std::string ruleId, bool usable) {
// std::lock_guard<std::mutex> guard(r_mutex_); //有修改操作 2022-1-18
// 找到对应算法的线程,启用/停用对应的算法
auto alg_id_iter = rule_alg_id_mapping_.find(ruleId);
// 如果找得到对应的算法线程,在线程中寻找对应的实例
if (alg_id_iter != rule_alg_id_mapping_.end()) {
auto handle_iter = this->handles_.find(alg_id_iter->second);
// 如果找得到对应的实例
if (handle_iter != this->handles_.end()) {
handle_iter->second->set_usable(ruleId, usable);
}
}
// 维护 stored_cfg_data_ 保证停机开机后usable_保持一致
try {
// 1.查询参数
auto[alg_id, name, rule_json, usable_source, padding_low, padding_up,
task_seq] = stored_cfg_data_.find(ruleId)->second;
// 2.调用storage 没有就添加,存在就修改
storage(ruleId, alg_id, name, rule_json, usable, padding_low, padding_up,
task_seq);
} catch (...) {
logger_->Error() << ruleId << ":维护stored_cfg_data_失败" << endl;
}
logger_->Info() << "enable done" << endl;
return 0;
}
int Manager::delete_instance(std::string ruleId) {
// 先停止对应的算法
this->detach(ruleId);
// 再将存储的算法配置信息删除
this->stored_cfg_data_.erase(ruleId);
return 0;
}
int Manager::build_alg_to_handle(const string &ruleId) {
// 构建算法
auto[algId, name, rule_json, usable, padding_low, padding_up, task_seq] =
stored_cfg_data_.find(ruleId)->second;
int data_source = 0;
if (rule_json.contains("datasource")) {
data_source =
std::stoi(rule_json.at("datasource").at("value").get<std::string>());
this->logger_->Debug() << "ruleid:" << ruleId
<< ",data_source:" << data_source << std::endl;
}
auto ptr =
build_algorithm(algId, ruleId, name, rule_json, padding_low, padding_up);
this->logger_->Debug() << "name:" << name << ",build_alg_to_handle --test--1"
<< std::endl;
if (usable) {
ptr->init();
} else {
ptr->AlgBase::init(); ///<提供页面数据(items)
}
ptr->set_usable(usable);
/*线程号算法号_数据源_线程序号*/
auto alg_thread_name = std::to_string(algId) + "_" +
std::to_string(data_source) + "_" +
std::to_string(task_seq); ///<添加task_seq
// 查找
auto iter = handles_.find(alg_thread_name);
if (iter == handles_.end()) {
// 如果找不到对应的算法句柄,新建
auto exec_handler = std::make_unique<HandlerExec>(alg_thread_name);
exec_handler->attach(std::move(ptr));
if (this->is_start_) {
tid_2_thread_name_.push_back(
std::make_tuple(exec_handler->run_thread(),
alg_thread_name)); //运行具体的算法线程
this->logger_->Debug() << "开启新线程:" << alg_thread_name << std::endl;
}
handles_.emplace(make_pair(alg_thread_name, std::move(exec_handler)));
} else {
// 否则向已有的算法句柄中插入
iter->second->attach(std::move(ptr));
}
// 将算法句柄的对应关系插入到对应关系map中
rule_alg_id_mapping_.insert(make_pair(ruleId, alg_thread_name));
// }
thread_num_ = this->handles_.size();
return 0;
}
int Manager::detach_full_alg(std::string alg_thread_name) {
try {
auto iter = handles_.find(alg_thread_name);
// 如果存在该线程
if (iter != handles_.end()) {
// 调用取消
iter->second->destroy();
// 设置active为false
handles_.erase(alg_thread_name);
}
logger_->Debug() << "detach full alg complete" << endl;
} catch (const std::exception &e) {
logger_->Error() << mix_cc::get_nested_exception(e) << std::endl;
}
thread_num_ = this->handles_.size();
return 0;
}
void Manager::update_limit_alarm(std::string ruleid, double lb, double ub,
double va, int64_t stime, int64_t etime) {
auto alg_id_iter = rule_alg_id_mapping_.find(ruleid);
if (alg_id_iter == rule_alg_id_mapping_.end()) {
logger_->Debug() << "|Manager::update_limit_alarm|" << ruleid << " not find"
<< std::endl;
}
// 如果找得到对应的算法线程,在线程中寻找对应的实例
if (alg_id_iter != rule_alg_id_mapping_.end()) {
auto handle_iter = this->handles_.find(alg_id_iter->second);
// 如果找得到对应的实例
if (handle_iter != this->handles_.end()) {
handle_iter->second->update_limit_alarm(ruleid, lb, ub, va, stime, etime);
}
}
}
int Manager::get_thread_size() { return thread_num_; }
} // namespace threads