#include #include #include extern ProcessType glob_process_type; namespace threads { /** * @brief 报警信息队列 * 只读 */ namespace { const int AlarmLen = 4000; const int AlarmMaxSize = 10000; typedef char AlarmJsonData[AlarmLen]; CMemQueue alarm_json_data_ = CMemQueue("ZONE0"); } Manager::Manager() : is_running_(true), is_start_(false), logger_(std::make_unique("ThreadManager")), alarm_logger_(std::make_unique("alarm_logger")) { if (glob_process_type == ProcessType::kMon) { alarm_handler_ptr_ = std::make_unique(); } } Manager::~Manager() { this->logger_->Info() << "Manager::~Manager()" << endl; if (glob_process_type == ProcessType::kTask || glob_process_type == ProcessType::kMon) { this->is_running_ = false; if (this->r_thread_->joinable()) { this->r_thread_->join(); } } this->logger_->Info() << "--------程序关闭--------" << endl; } /** * @brief 如果ruleid不存在就添加,否则就update * @param ruleId My Param doc * @param alg_id My Param doc * @param name My Param doc * @param rule_json My Param doc * @param usable My Param doc * @param padding_low My Param doc * @param padding_up My Param doc * @return int */ int Manager::storage(const string &ruleId, int alg_id, const string &name, const mix_cc::json &rule_json, bool usable, double padding_low, double padding_up, int task_seq) { try { if (this->stored_cfg_data_.contains(ruleId)) { this->stored_cfg_data_[ruleId] = make_tuple( alg_id, name, rule_json, usable, padding_low, padding_up, task_seq); } else { this->stored_cfg_data_.emplace( make_pair(ruleId, make_tuple(alg_id, name, rule_json, usable, padding_low, padding_up, task_seq))); } } catch (const std::exception &e) { logger_->Error() << "Manager::storage:" << e.what() << endl; return -1; } return 0; } int Manager::load(const string &ruleId) { try { this->build_alg_to_handle(ruleId); logger_->Info() << "load complete" << endl; } catch (const std::exception &e) { logger_->Error() << mix_cc::get_nested_exception(e) << std::endl; } return 0; } int Manager::start() { int res = 0; // task 训练 析构不用的线程 if (glob_process_type == ProcessType::kTask) { this->r_thread_ = std::make_unique([&]() { LOG::doConfigure("TaskManager", THREAD_CONFIG); alarm_logger_.reset(new LOG("TaskManager", AUTO_CATCH_PID)); while (this->is_running_) { std::shared_lock read_lock(handles_mutex); if (!this->handles_.empty()) { bool test_label = true; for (auto &x : this->handles_) { test_label = test_label && !(x.second->get_is_running()); } if (test_label) { this->handles_.clear(); this->alarm_logger_->Debug() << "task this->handles_.clear()!" << std::endl; } } this_thread::sleep_for(10ms); } }); } /*-------cron和mon-------*/ else { if (glob_process_type == ProcessType::kMon) { this->r_thread_ = std::make_unique([&]() { LOG::doConfigure("alarmInfo", THREAD_CONFIG); alarm_logger_.reset(new LOG("alarmInfo", AUTO_CATCH_PID)); while (this->is_running_) { if (!alarm_json_data_.empty()) { auto alarm_now = alarm_json_data_.pop(); alarm_handler_ptr_->store_alarm(*alarm_now); this->alarm_logger_->Debug() << "CMemQueue:" << *alarm_now << endl; } this_thread::sleep_for(500ms); } }); } for (auto &x : this->stored_cfg_data_) { auto[algId, name, rule_json, usable, padding_low, padding_up, task_seq] = stored_cfg_data_.find(x.first)->second; build_alg_to_handle(x.first); } logger_->Debug() << "stored_cfg_data size:" << stored_cfg_data_.size() << endl; for (auto &x : this->handles_) { tid_2_thread_name_.push_back( std::make_tuple(x.second->run_thread(), x.first)); } } this->is_start_ = true; thread_num_ = this->handles_.size(); return res; } int Manager::attach(const string &ruleId) { try { // 大致逻辑和attach相同,只有一点不同 auto iter = this->build_alg_to_handle(ruleId); logger_->Info() << "attach complete" << endl; } catch (const std::exception &e) { logger_->Error() << mix_cc::get_nested_exception(e) << std::endl; } return 0; } int Manager::reset(const string &ruleId) { try { // 找到算法id对应的线程名 auto alg_thread_name = rule_alg_id_mapping_[ruleId]; auto iter = handles_.find(alg_thread_name); // 如果找的到对应的算法 if (iter != handles_.end()) { // detach对应的算法 iter->second->reset(ruleId); } logger_->Debug() << "reset instance complete" << endl; } catch (const std::exception &e) { logger_->Error() << mix_cc::get_nested_exception(e) << std::endl; } return 0; } int Manager::detach(const string &ruleId) { try { auto alg_thread_name = rule_alg_id_mapping_[ruleId]; auto iter = handles_.find(alg_thread_name); if (iter != handles_.end()) { iter->second->detach(ruleId); if (iter->second->size() == 0) { this->detach_full_alg(alg_thread_name); } } rule_alg_id_mapping_.erase(alg_thread_name); logger_->Debug() << "detach instance complete" << endl; } catch (const std::exception &e) { logger_->Error() << mix_cc::get_nested_exception(e) << std::endl; } return 0; } int Manager::exec_task(std::string ruleId, TimePoint time_start, TimePoint time_end) { auto[alg_id, rule_name, param_info, usable, padding_low, padding_up, task_seq] = stored_cfg_data_.find(ruleId)->second; auto ptr = build_algorithm(alg_id, ruleId, rule_name, param_info, padding_low, padding_up); ptr->init(); auto alg_thread_name = std::to_string(alg_id) + "_" + std::to_string(ptr->get_data_source()) + "_" + std::to_string(task_seq); logger_->Debug() << "线程id:" << alg_thread_name << endl; std::unique_lock write_lock(handles_mutex); if (handles_.find(alg_thread_name) == handles_.end()) { logger_->Debug() << "线程id:" << alg_thread_name << " not found,to create!" << endl; auto exec_handler = std::make_unique(alg_thread_name); tid_2_thread_name_.push_back( std::make_tuple(exec_handler->run_thread(), alg_thread_name)); handles_.insert(make_pair(alg_thread_name, std::move(exec_handler))); } handles_[alg_thread_name]->submit(std::move(ptr), time_start, time_end); logger_->Info() << "exec submit done" << endl; return 0; } int Manager::enable(std::string ruleId, bool usable) { auto alg_id_iter = rule_alg_id_mapping_.find(ruleId); if (alg_id_iter != rule_alg_id_mapping_.end()) { auto handle_iter = this->handles_.find(alg_id_iter->second); if (handle_iter != this->handles_.end()) { handle_iter->second->set_usable(ruleId, usable); } } // 维护 stored_cfg_data_ 保证停机开机后,usable_保持一致 try { auto[alg_id, name, rule_json, usable_source, padding_low, padding_up, task_seq] = stored_cfg_data_.find(ruleId)->second; storage(ruleId, alg_id, name, rule_json, usable, padding_low, padding_up, task_seq); } catch (...) { logger_->Error() << ruleId << ":维护stored_cfg_data_失败" << endl; } logger_->Info() << "enable done" << endl; return 0; } int Manager::delete_instance(std::string ruleId) { // 先停止对应的算法 this->detach(ruleId); // 再将存储的算法配置信息删除 this->stored_cfg_data_.erase(ruleId); return 0; } int Manager::build_alg_to_handle(const string &ruleId) { auto[algId, name, rule_json, usable, padding_low, padding_up, task_seq] = stored_cfg_data_.find(ruleId)->second; int data_source = 0; if (rule_json.contains("datasource")) { data_source = std::stoi(rule_json.at("datasource").at("value").get()); this->logger_->Debug() << "ruleid:" << ruleId << ",data_source:" << data_source << std::endl; } auto ptr = build_algorithm(algId, ruleId, name, rule_json, padding_low, padding_up); this->logger_->Debug() << "name:" << name << ",build_alg_to_handle --test--1" << std::endl; if (usable) { ptr->init(); } else { ptr->AlgBase::init(); } ptr->set_usable(usable); auto alg_thread_name = std::to_string(algId) + "_" + std::to_string(data_source) + "_" + std::to_string(task_seq); auto iter = handles_.find(alg_thread_name); if (iter == handles_.end()) { auto exec_handler = std::make_unique(alg_thread_name); exec_handler->attach(std::move(ptr)); if (this->is_start_) { tid_2_thread_name_.push_back( std::make_tuple(exec_handler->run_thread(), alg_thread_name)); this->logger_->Debug() << "开启新线程:" << alg_thread_name << std::endl; } handles_.emplace(make_pair(alg_thread_name, std::move(exec_handler))); } else { iter->second->attach(std::move(ptr)); } rule_alg_id_mapping_.insert(make_pair(ruleId, alg_thread_name)); thread_num_ = this->handles_.size(); return 0; } int Manager::detach_full_alg(std::string alg_thread_name) { try { auto iter = handles_.find(alg_thread_name); if (iter != handles_.end()) { iter->second->destroy(); handles_.erase(alg_thread_name); } logger_->Debug() << "detach full alg complete" << endl; } catch (const std::exception &e) { logger_->Error() << mix_cc::get_nested_exception(e) << std::endl; } thread_num_ = this->handles_.size(); return 0; } void Manager::update_limit_alarm(std::string ruleid, double lb, double ub, double va, int64_t stime, int64_t etime) { auto alg_id_iter = rule_alg_id_mapping_.find(ruleid); if (alg_id_iter == rule_alg_id_mapping_.end()) { logger_->Debug() << "|Manager::update_limit_alarm|" << ruleid << " not find" << std::endl; } // 如果找得到对应的算法线程,在线程中寻找对应的实例 if (alg_id_iter != rule_alg_id_mapping_.end()) { auto handle_iter = this->handles_.find(alg_id_iter->second); // 如果找得到对应的实例 if (handle_iter != this->handles_.end()) { handle_iter->second->update_limit_alarm(ruleid, lb, ub, va, stime, etime); } } } int Manager::get_thread_size() { return thread_num_; } }