// #include #include "mix_cc/exception.h" #include #include #include #include #define MON_CALL_MAX_COST 20ms extern ProcessType glob_process_type; // const std::vector do_not_stop_alg = {1111, 1110}; ///<不停机算法号 namespace threads { // 在类外定义并初始化静态成员变量(必须) int HandlerExec::instanceCount = 0; HandlerExec::HandlerExec(const string &alg_name) { ++instanceCount; // 创建logger this->logger_ = std::make_unique(string("HandlerExec") + alg_name); this->thread_name_ = "alg_" + alg_name; // 如果是单次执行, if (glob_process_type == ProcessType::kTask) { this->thread_name_ += "_task"; } else if (glob_process_type == ProcessType::kCron) { this->thread_name_ += "_cron"; } this->alg_id_ = stoi(alg_name); logger_->Debug() << "alg_name:" << alg_name << ",alg_id:" << alg_id_ << endl; is_running_ = true; // is_line_stop_ = true; ///< 初始化机组状态为停机状态 // if (this->alg_id_ > 1110) { // is_line_stop_ = false; ///< 初始化机组状态为开机机状态 // } } HandlerExec::~HandlerExec() { instanceCount--; this->is_running_ = false; if (r_thread_->joinable()) { r_thread_->join(); } logger_->Debug() << "HandlerExec::~HandlerExec()" << endl; // if (glob_process_type == ProcessType::kCron) { // if (this->alg_id_ == 221 || this->alg_id_ == 220 || this->alg_id_ == 251 // || // this->alg_id_ == 250 || this->alg_id_ == 331 || this->alg_id_ == 330) // { // if (r_thread_->joinable()) { // r_thread_->join(); // } // } // } else { // if (r_thread_->joinable()) { // r_thread_->join(); // } // } } std::thread::id HandlerExec::run_thread() { r_thread_ = std::make_unique([&]() { int logResult = LOG::doConfigure(thread_name_, THREAD_CONFIG); int task_seq = HandlerExec::extractCTaskNumberRegexGeneric(thread_name_); this->logger_->Debug() << "thread_name_:" << thread_name_ << ",logResult[0-成功;1-配置文件不存在;2-" "log输出目录不存在;3-" "相同的doConfigure不能执行多遍;4-其他错误]:" << logResult << ",task_seq:" << task_seq << endl; auto next_wake_time = std::chrono::steady_clock::now(); while (is_running_) { try { this->event_handler(); { // task 进程 if (glob_process_type == ProcessType::kTask) { for (auto &x : this->rule_pointers_) { x.second->logReset(task_seq * 10 + x.second->get_data_source()); auto it = time_ranges_.find(x.second->get_rule_id()); if (it != time_ranges_.end()) { x.second->exec_task_call(time_ranges_[x.second->get_rule_id()]); time_ranges_.erase(it); this->logger_->Debug() << "task:" << x.second->get_rule_name() << endl; this->rule_pointers_.erase(x.first); } } // this->is_running_ = false; } if (glob_process_type == ProcessType::kCron) { for (auto &x : this->rule_pointers_) { x.second->exec_cron_call(); } } if (glob_process_type == ProcessType::kMon) { run_t1 = std::chrono::system_clock::now(); auto runs_t1 = std::chrono::steady_clock::now(); for (auto &x : this->rule_pointers_) { x.second->exec_mon_call(); } run_t2 = std::chrono::system_clock::now(); auto runs_t2 = std::chrono::steady_clock::now(); if (CMemVar::Const()->is_print_eqpalg_mon_threading_info) { cost_time = (run_t2 - run_t1).count() / 1000000; int64_t costs_time = std::chrono::duration_cast( runs_t2 - runs_t1) .count(); if (cost_time > 50) { this->logger_->Info() << "instanceCount:" << instanceCount << ",algID:" << this->alg_id_ << ",thread_name_:" << thread_name_ << ",this->rule_pointers_ size:" << this->rule_pointers_.size() << ",run cost time(system_clock):" << cost_time << " ms" << ",run cost time(steady_clock):" << costs_time << " ms" << std::endl; } } } } if (glob_process_type == ProcessType::kMon) { // std::this_thread::sleep_for(1ms);// // 极短的sleep,兼顾响应性和CPU占用 if (std::chrono::milliseconds(cost_time) > MON_CALL_MAX_COST) { next_wake_time = std::chrono::steady_clock::now(); } else { next_wake_time += MON_CALL_MAX_COST; std::this_thread::sleep_until(next_wake_time); } } if (glob_process_type == ProcessType::kCron) { std::this_thread::sleep_for(1s); } if (glob_process_type == ProcessType::kTask) { std::this_thread::sleep_for(1s); } } catch (const std::exception &e) { this->logger_->Error() << mix_cc::get_nested_exception(e) << std::endl; } catch (...) { this->logger_->Error() << "未知异常" << std::endl; } } this->rule_pointers_.clear(); // is_running_ = false 跳出循环,准备析构 }); return r_thread_->get_id(); } // bool HandlerExec::look_is_cron_run() { return this->is_cron_run_; } // void HandlerExec::set_is_cron_run(bool is_cron_run) { // this->is_cron_run_ = is_cron_run; // } // bool HandlerExec::look_is_cron_run() { // bool all_is_run = true; // if (!this->rule_pointers_.empty()) { // for (auto& x : this->rule_pointers_) { // all_is_run = all_is_run && (x.second->get_is_cron_run()); // } // return all_is_run; // } else { // return true; // } // } // bool HandlerExec::look_is_pause() { return this->is_line_stop_; } // int HandlerExec::pause() { // // if (std::find(do_not_stop_alg.cbegin(), do_not_stop_alg.cend(), // // this->alg_id_) == do_not_stop_alg.cend()) { // // this->is_line_stop_ = true; // // } // this->is_line_stop_ = true; // return 0; // } // int HandlerExec::unpause() { // // 重置上次报警时间 屏蔽开机前几分钟的报警 // std::lock_guard guard(mutex_); // if (!this->rule_pointers_.empty() && this->is_line_stop_) { // for (auto& x : this->rule_pointers_) { // x.second->set_last_alarm_time(system_clock::now()); // } // } // this->is_line_stop_ = false; // return 0; // } std::vector HandlerExec::get_rule_ids() { std::vector ret; for (auto &x : rule_pointers_) { ret.push_back(x.first); } return ret; } int HandlerExec::load(std::unique_ptr &&pointer) { // 由于实例载入时,执行线程暂未运行,所以无需加锁 rule_pointers_.emplace( std::make_pair(pointer->get_rule_id(), std::move(pointer))); return 0; } int HandlerExec::attach(std::unique_ptr &&alg_pointer) { std::lock_guard guard(mutex_); this->attach_queue_.emplace(std::move(alg_pointer)); return 0; } int HandlerExec::reset(string ruleId) { std::lock_guard guard(mutex_); this->reset_queue_.emplace(ruleId); return 0; } int HandlerExec::detach(string ruleId) { std::lock_guard guard(mutex_); this->detach_queue_.push(ruleId); return 0; } int HandlerExec::set_usable(string ruleId, bool usable) { std::lock_guard guard(mutex_); this->usable_queue_.push(std::make_tuple(ruleId, usable)); return 0; } int HandlerExec::submit(std::unique_ptr &&instance_ptr, TimePoint begin_time, TimePoint end_time) { std::lock_guard guard(mutex_); this->once_exec_queue_.emplace(std::make_tuple( std::move(instance_ptr), mix_cc::time_range_t(begin_time, end_time))); return 0; } int HandlerExec::event_handler() { std::lock_guard guard(mutex_); while (!reset_queue_.empty()) { auto reset_rule_id = reset_queue_.front(); logger_->Info() << "alg_id:" << this->alg_id_ << ",重置算法:" << reset_rule_id << std::endl; reset_queue_.pop(); if (ProcessType::kMon == glob_process_type) { if (6 == this->alg_id_ || 7 == this->alg_id_) { rule_pointers_[reset_rule_id]->reset_dev_data(); this->logger_->Debug() << "alg_id:" << this->alg_id_ << ",清除历史数据" << std::endl; } else if (2 == this->alg_id_ || 4 == this->alg_id_ || 5 == this->alg_id_) { rule_pointers_[reset_rule_id]->reset_dev_data(); this->logger_->Debug() << "alg_id:" << this->alg_id_ << ",清除统计数据" << std::endl; } } } while (!detach_queue_.empty()) { auto detach_rule_id = detach_queue_.front(); logger_->Info() << "删除算法:" << detach_rule_id << std::endl; detach_queue_.pop(); rule_pointers_.erase(detach_rule_id); // 删除统计的样本rs(数字特征,包括均值方差标准差等)--已弃用 // try { // string ss0 = "rm /users/dsc/stat_data/" + detach_rule_id + "_0.dat"; // system(ss0.c_str()); // string ss1 = "rm /users/dsc/stat_data/" + detach_rule_id + "_1.dat"; // system(ss1.c_str()); // string ss2 = "rm /users/dsc/stat_data/" + detach_rule_id + "_2.dat"; // system(ss2.c_str()); // } catch (...) { // this->logger_->Debug() << "system(ss.c_str()) 失败!" << std::endl; // } logger_->Info() << "删除算法完成" << detach_rule_id << std::endl; } while (!attach_queue_.empty()) { // if (attach_queue_.front()->get_usable()) { // attach_queue_.front()->init(); // } auto pointer = std::move(attach_queue_.front()); attach_queue_.pop(); auto rule_id = pointer->get_rule_id(); if (rule_pointers_.find(rule_id) == rule_pointers_.end()) { logger_->Info() << "添加算法:" << pointer->get_rule_name() << std::endl; rule_pointers_.emplace(std::make_pair(rule_id, std::move(pointer))); logger_->Info() << "添加算法完成" << std::endl; } else { // rule_pointers_.erase(rule_id); logger_->Info() << "更新算法:" << pointer->get_rule_name() << std::endl; rule_pointers_[rule_id] = std::move(pointer); } } while (!usable_queue_.empty()) { auto usable_q = usable_queue_.front(); usable_queue_.pop(); auto rule_id = std::get<0>(usable_q); if (std::get<1>(usable_q)) { rule_pointers_[std::get<0>(usable_q)]->init(); logger_->Info() << "启用算法:" << rule_id << std::endl; } else { logger_->Info() << "停用算法:" << rule_id << std::endl; } // logger_->Info() << "启用算法:" << rule_id << std::endl; rule_pointers_[std::get<0>(usable_q)]->set_usable(std::get<1>(usable_q)); logger_->Info() << "algbase的is_usable_:" << rule_pointers_[std::get<0>(usable_q)]->get_usable() << std::endl; } if (!once_exec_queue_.empty()) { auto[pointer, time_range] = std::move(once_exec_queue_.front()); logger_->Info() << "单次执行算法:" << pointer->get_rule_name() << std::endl; once_exec_queue_.pop(); auto rule_id = (pointer->get_rule_id()); if (rule_pointers_.find(rule_id) != rule_pointers_.end()) { logger_->Info() << "task更新算法" << std::endl; rule_pointers_.erase(rule_id); } else { logger_->Info() << "task新增算法" << std::endl; } rule_pointers_.emplace(std::make_pair(rule_id, std::move(pointer))); logger_->Info() << "update_rule_for_task 完成" << std::endl; time_ranges_[rule_id] = time_range; } return 0; } void HandlerExec::update_limit_alarm(std::string ruleid, double lb, double ub, double va, int64_t stime, int64_t etime) { auto iter = this->rule_pointers_.find(ruleid); if (iter != this->rule_pointers_.end()) { iter->second->update_limit_alarm(lb, ub, va, stime, etime); } else { logger_->Debug() << "|HandlerExec::update_limit_alarm|" << ruleid << " not find" << std::endl; } } int HandlerExec::extractCTaskNumberRegexGeneric(std::string input) { // 匹配任意位置的数字部分,但通过贪婪匹配获取最后一个 std::regex pattern(R"((\d+))"); std::sregex_iterator begin(input.begin(), input.end(), pattern); std::sregex_iterator end; std::string lastMatch; for (auto it = begin; it != end; ++it) { lastMatch = (*it)[1].str(); } if (!lastMatch.empty()) { try { return std::stoi(lastMatch); } catch (...) { return 0; } } return 0; } } // namespace threads