Cleaned 66 files across all eqpalg subdirectories: - Removed commented-out dead code - Removed redundant Chinese inline comments that restate variable/function names - Removed trailing ///< annotations on self-explanatory fields - Removed namespace closing comments - Preserved all file headers, Doxygen documentation, and logic explanations - No code changes — only comment removal
325 lines
11 KiB
C++
325 lines
11 KiB
C++
#include <eqpalg/threads/manager.h>
|
||
#include <mix_cc/debug.h>
|
||
#include <zlib/MemQueue.hpp>
|
||
|
||
extern ProcessType glob_process_type;
|
||
namespace threads {
|
||
|
||
/**
|
||
* @brief 报警信息队列
|
||
* 只读
|
||
*/
|
||
namespace {
|
||
const int AlarmLen = 4000;
|
||
const int AlarmMaxSize = 10000;
|
||
typedef char AlarmJsonData[AlarmLen];
|
||
CMemQueue<AlarmJsonData> alarm_json_data_ = CMemQueue<AlarmJsonData>("ZONE0");
|
||
|
||
}
|
||
|
||
Manager::Manager()
|
||
: is_running_(true), is_start_(false),
|
||
logger_(std::make_unique<LOG>("ThreadManager")),
|
||
alarm_logger_(std::make_unique<LOG>("alarm_logger")) {
|
||
if (glob_process_type == ProcessType::kMon) {
|
||
alarm_handler_ptr_ = std::make_unique<AlarmHandler>();
|
||
}
|
||
}
|
||
|
||
Manager::~Manager() {
|
||
this->logger_->Info() << "Manager::~Manager()" << endl;
|
||
if (glob_process_type == ProcessType::kTask ||
|
||
glob_process_type == ProcessType::kMon) {
|
||
this->is_running_ = false;
|
||
if (this->r_thread_->joinable()) {
|
||
this->r_thread_->join();
|
||
}
|
||
}
|
||
this->logger_->Info() << "--------程序关闭--------" << endl;
|
||
}
|
||
/**
|
||
* @brief 如果ruleid不存在就添加,否则就update
|
||
* @param ruleId My Param doc
|
||
* @param alg_id My Param doc
|
||
* @param name My Param doc
|
||
* @param rule_json My Param doc
|
||
* @param usable My Param doc
|
||
* @param padding_low My Param doc
|
||
* @param padding_up My Param doc
|
||
* @return int
|
||
*/
|
||
int Manager::storage(const string &ruleId, int alg_id, const string &name,
|
||
const mix_cc::json &rule_json, bool usable,
|
||
double padding_low, double padding_up, int task_seq) {
|
||
try {
|
||
if (this->stored_cfg_data_.contains(ruleId)) {
|
||
this->stored_cfg_data_[ruleId] = make_tuple(
|
||
alg_id, name, rule_json, usable, padding_low, padding_up, task_seq);
|
||
} else {
|
||
this->stored_cfg_data_.emplace(
|
||
make_pair(ruleId, make_tuple(alg_id, name, rule_json, usable,
|
||
padding_low, padding_up, task_seq)));
|
||
}
|
||
} catch (const std::exception &e) {
|
||
logger_->Error() << "Manager::storage:" << e.what() << endl;
|
||
return -1;
|
||
}
|
||
|
||
return 0;
|
||
}
|
||
|
||
int Manager::load(const string &ruleId) {
|
||
try {
|
||
this->build_alg_to_handle(ruleId);
|
||
logger_->Info() << "load complete" << endl;
|
||
} catch (const std::exception &e) {
|
||
logger_->Error() << mix_cc::get_nested_exception(e) << std::endl;
|
||
}
|
||
return 0;
|
||
}
|
||
|
||
int Manager::start() {
|
||
int res = 0;
|
||
// task 训练 析构不用的线程
|
||
if (glob_process_type == ProcessType::kTask) {
|
||
this->r_thread_ = std::make_unique<std::thread>([&]() {
|
||
LOG::doConfigure("TaskManager", THREAD_CONFIG);
|
||
alarm_logger_.reset(new LOG("TaskManager", AUTO_CATCH_PID));
|
||
while (this->is_running_) {
|
||
std::shared_lock read_lock(handles_mutex);
|
||
if (!this->handles_.empty()) {
|
||
bool test_label = true;
|
||
for (auto &x : this->handles_) {
|
||
test_label = test_label && !(x.second->get_is_running());
|
||
}
|
||
if (test_label) {
|
||
this->handles_.clear();
|
||
this->alarm_logger_->Debug()
|
||
<< "task this->handles_.clear()!" << std::endl;
|
||
}
|
||
}
|
||
this_thread::sleep_for(10ms);
|
||
}
|
||
});
|
||
}
|
||
/*-------cron和mon-------*/
|
||
else {
|
||
if (glob_process_type == ProcessType::kMon) {
|
||
this->r_thread_ = std::make_unique<std::thread>([&]() {
|
||
LOG::doConfigure("alarmInfo", THREAD_CONFIG);
|
||
alarm_logger_.reset(new LOG("alarmInfo", AUTO_CATCH_PID));
|
||
while (this->is_running_) {
|
||
if (!alarm_json_data_.empty()) {
|
||
auto alarm_now = alarm_json_data_.pop();
|
||
alarm_handler_ptr_->store_alarm(*alarm_now);
|
||
this->alarm_logger_->Debug()
|
||
<< "CMemQueue<AlarmJsonData>:" << *alarm_now << endl;
|
||
}
|
||
this_thread::sleep_for(500ms);
|
||
}
|
||
});
|
||
}
|
||
for (auto &x : this->stored_cfg_data_) {
|
||
auto[algId, name, rule_json, usable, padding_low, padding_up, task_seq] =
|
||
stored_cfg_data_.find(x.first)->second;
|
||
build_alg_to_handle(x.first);
|
||
}
|
||
logger_->Debug() << "stored_cfg_data size:" << stored_cfg_data_.size()
|
||
<< endl;
|
||
for (auto &x : this->handles_) {
|
||
tid_2_thread_name_.push_back(
|
||
std::make_tuple(x.second->run_thread(),
|
||
x.first)); //运行具体的算法线程
|
||
}
|
||
}
|
||
this->is_start_ = true;
|
||
thread_num_ = this->handles_.size();
|
||
return res;
|
||
}
|
||
|
||
int Manager::attach(const string &ruleId) {
|
||
try {
|
||
// 大致逻辑和attach相同,只有一点不同
|
||
auto iter = this->build_alg_to_handle(ruleId);
|
||
logger_->Info() << "attach complete" << endl;
|
||
} catch (const std::exception &e) {
|
||
logger_->Error() << mix_cc::get_nested_exception(e) << std::endl;
|
||
}
|
||
return 0;
|
||
}
|
||
|
||
int Manager::reset(const string &ruleId) {
|
||
try {
|
||
// 找到算法id对应的线程名
|
||
auto alg_thread_name = rule_alg_id_mapping_[ruleId];
|
||
auto iter = handles_.find(alg_thread_name);
|
||
// 如果找的到对应的算法
|
||
if (iter != handles_.end()) {
|
||
// detach对应的算法
|
||
iter->second->reset(ruleId);
|
||
}
|
||
logger_->Debug() << "reset instance complete" << endl;
|
||
} catch (const std::exception &e) {
|
||
logger_->Error() << mix_cc::get_nested_exception(e) << std::endl;
|
||
}
|
||
return 0;
|
||
}
|
||
|
||
int Manager::detach(const string &ruleId) {
|
||
try {
|
||
auto alg_thread_name = rule_alg_id_mapping_[ruleId];
|
||
auto iter = handles_.find(alg_thread_name);
|
||
if (iter != handles_.end()) {
|
||
iter->second->detach(ruleId);
|
||
if (iter->second->size() == 0) {
|
||
this->detach_full_alg(alg_thread_name);
|
||
}
|
||
}
|
||
rule_alg_id_mapping_.erase(alg_thread_name);
|
||
logger_->Debug() << "detach instance complete" << endl;
|
||
} catch (const std::exception &e) {
|
||
logger_->Error() << mix_cc::get_nested_exception(e) << std::endl;
|
||
}
|
||
return 0;
|
||
}
|
||
|
||
int Manager::exec_task(std::string ruleId, TimePoint time_start,
|
||
TimePoint time_end) {
|
||
auto[alg_id, rule_name, param_info, usable, padding_low, padding_up,
|
||
task_seq] = stored_cfg_data_.find(ruleId)->second;
|
||
|
||
auto ptr = build_algorithm(alg_id, ruleId, rule_name, param_info, padding_low,
|
||
padding_up);
|
||
ptr->init();
|
||
|
||
auto alg_thread_name = std::to_string(alg_id) + "_" +
|
||
std::to_string(ptr->get_data_source()) + "_" +
|
||
std::to_string(task_seq);
|
||
logger_->Debug() << "线程id:" << alg_thread_name << endl;
|
||
std::unique_lock write_lock(handles_mutex);
|
||
if (handles_.find(alg_thread_name) == handles_.end()) {
|
||
logger_->Debug() << "线程id:" << alg_thread_name << " not found,to create!"
|
||
<< endl;
|
||
auto exec_handler = std::make_unique<HandlerExec>(alg_thread_name);
|
||
tid_2_thread_name_.push_back(
|
||
std::make_tuple(exec_handler->run_thread(), alg_thread_name));
|
||
handles_.insert(make_pair(alg_thread_name, std::move(exec_handler)));
|
||
}
|
||
handles_[alg_thread_name]->submit(std::move(ptr), time_start, time_end);
|
||
logger_->Info() << "exec submit done" << endl;
|
||
return 0;
|
||
}
|
||
|
||
int Manager::enable(std::string ruleId, bool usable) {
|
||
auto alg_id_iter = rule_alg_id_mapping_.find(ruleId);
|
||
if (alg_id_iter != rule_alg_id_mapping_.end()) {
|
||
auto handle_iter = this->handles_.find(alg_id_iter->second);
|
||
if (handle_iter != this->handles_.end()) {
|
||
handle_iter->second->set_usable(ruleId, usable);
|
||
}
|
||
}
|
||
// 维护 stored_cfg_data_ 保证停机开机后,usable_保持一致
|
||
try {
|
||
// 1.查询参数
|
||
auto[alg_id, name, rule_json, usable_source, padding_low, padding_up,
|
||
task_seq] = stored_cfg_data_.find(ruleId)->second;
|
||
// 2.调用storage 没有就添加,存在就修改
|
||
storage(ruleId, alg_id, name, rule_json, usable, padding_low, padding_up,
|
||
task_seq);
|
||
} catch (...) {
|
||
logger_->Error() << ruleId << ":维护stored_cfg_data_失败" << endl;
|
||
}
|
||
|
||
logger_->Info() << "enable done" << endl;
|
||
return 0;
|
||
}
|
||
|
||
int Manager::delete_instance(std::string ruleId) {
|
||
// 先停止对应的算法
|
||
this->detach(ruleId);
|
||
// 再将存储的算法配置信息删除
|
||
this->stored_cfg_data_.erase(ruleId);
|
||
return 0;
|
||
}
|
||
|
||
int Manager::build_alg_to_handle(const string &ruleId) {
|
||
auto[algId, name, rule_json, usable, padding_low, padding_up, task_seq] =
|
||
stored_cfg_data_.find(ruleId)->second;
|
||
int data_source = 0;
|
||
if (rule_json.contains("datasource")) {
|
||
data_source =
|
||
std::stoi(rule_json.at("datasource").at("value").get<std::string>());
|
||
this->logger_->Debug() << "ruleid:" << ruleId
|
||
<< ",data_source:" << data_source << std::endl;
|
||
}
|
||
auto ptr =
|
||
build_algorithm(algId, ruleId, name, rule_json, padding_low, padding_up);
|
||
this->logger_->Debug() << "name:" << name << ",build_alg_to_handle --test--1"
|
||
<< std::endl;
|
||
|
||
if (usable) {
|
||
ptr->init();
|
||
} else {
|
||
ptr->AlgBase::init();
|
||
}
|
||
ptr->set_usable(usable);
|
||
|
||
auto alg_thread_name = std::to_string(algId) + "_" +
|
||
std::to_string(data_source) + "_" +
|
||
std::to_string(task_seq);
|
||
auto iter = handles_.find(alg_thread_name);
|
||
if (iter == handles_.end()) {
|
||
auto exec_handler = std::make_unique<HandlerExec>(alg_thread_name);
|
||
exec_handler->attach(std::move(ptr));
|
||
if (this->is_start_) {
|
||
tid_2_thread_name_.push_back(
|
||
std::make_tuple(exec_handler->run_thread(),
|
||
alg_thread_name));
|
||
this->logger_->Debug() << "开启新线程:" << alg_thread_name << std::endl;
|
||
}
|
||
handles_.emplace(make_pair(alg_thread_name, std::move(exec_handler)));
|
||
} else {
|
||
iter->second->attach(std::move(ptr));
|
||
}
|
||
rule_alg_id_mapping_.insert(make_pair(ruleId, alg_thread_name));
|
||
thread_num_ = this->handles_.size();
|
||
return 0;
|
||
}
|
||
|
||
int Manager::detach_full_alg(std::string alg_thread_name) {
|
||
try {
|
||
auto iter = handles_.find(alg_thread_name);
|
||
if (iter != handles_.end()) {
|
||
iter->second->destroy();
|
||
handles_.erase(alg_thread_name);
|
||
}
|
||
logger_->Debug() << "detach full alg complete" << endl;
|
||
} catch (const std::exception &e) {
|
||
logger_->Error() << mix_cc::get_nested_exception(e) << std::endl;
|
||
}
|
||
thread_num_ = this->handles_.size();
|
||
return 0;
|
||
}
|
||
|
||
void Manager::update_limit_alarm(std::string ruleid, double lb, double ub,
|
||
double va, int64_t stime, int64_t etime) {
|
||
|
||
auto alg_id_iter = rule_alg_id_mapping_.find(ruleid);
|
||
if (alg_id_iter == rule_alg_id_mapping_.end()) {
|
||
logger_->Debug() << "|Manager::update_limit_alarm|" << ruleid << " not find"
|
||
<< std::endl;
|
||
}
|
||
// 如果找得到对应的算法线程,在线程中寻找对应的实例
|
||
if (alg_id_iter != rule_alg_id_mapping_.end()) {
|
||
auto handle_iter = this->handles_.find(alg_id_iter->second);
|
||
// 如果找得到对应的实例
|
||
if (handle_iter != this->handles_.end()) {
|
||
handle_iter->second->update_limit_alarm(ruleid, lb, ub, va, stime, etime);
|
||
}
|
||
}
|
||
}
|
||
|
||
int Manager::get_thread_size() { return thread_num_; }
|
||
|
||
}
|