eis/eqpalg/alg_base.cpp
Huamonarch 1ca922a4ef Fix task thread lifecycle: self-destruct when idle, synchronized cleanup
HandlerExec in task mode now sets is_running_=false when rule_pointers_
and once_exec_queue_ are both empty. Manager cleanup uses two-phase
lock (shared_lock scan + unique_lock destroy/erase) synchronized with
exec_task via handles_mutex. exec_task checks is_running_ before submit
and destroys dead handlers to prevent task loss. Also fix logReset
self-assignment no-op.
2026-05-12 17:11:07 +08:00

525 lines
18 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <base/TCMTime.h>
#include <eqpalg/alg_base.h>
#include <eqpalg/exp_macro/get_macro_replaced_exp.h>
#include <eqpalg/gb_item_memory.h>
#include <eqpalg/table_struct/t_rule_cfg.h>
#include <glob/SingletonTemplate.h>
#include <memory>
#include <mix_cc/exception.h>
#include <mix_cc/ihyper_db.h>
#include <mix_cc/sql.h>
#include <mix_cc/sql/database/db2_t.h>
#include <string>
#include <sys/types.h>
#include <zlib/MemVar.h>
extern ProcessType glob_process_type;
AlgBase::AlgBase(const string &name, const mix_cc::json &rule_json,
const string &ruleId)
: rule_id_(ruleId), rule_name_(name), rule_json_(rule_json),
gb_logger_(std::make_unique<GbLogger>("Logger:" + name)),
logger_(std::make_unique<LOG>("AlgBase", AUTO_CATCH_PID)),
last_alarm_time_(
system_clock::now() -
(minutes(std::max(5, CMemVar::Const()->AlarmIntervalHours * 60) - 1) +
50s)),
con_monitor_(1), delay_time_(50ms) {}
AlgBase::~AlgBase() {
}
void AlgBase::logReset(int task_seq) {
this->task_seq = task_seq;
logger_.reset(new LOG("task:" + rule_name_, AUTO_CATCH_PID));
}
int AlgBase::init() {
this->refresh_now_time();
last_save_start_time_ = this->now_time_;
last_rule_state_updae_start_time_ = this->now_time_;
last_heart_beat_start_time_ = this->now_time_;
// 使用修正后的XorShift128Plus
XorShift128Plus xsrng(rule_id_);
// 生成随机偏移
int r1 = xsrng.get_int_fast(0, 101);
int r2 = xsrng.get_int_fast(0, 1001);
int r3 = xsrng.get_int_fast(0, 11);
// 计算总偏移
auto offset_ms = std::chrono::milliseconds(r1 * 10 + r2 * 5);
auto offset_ms2 = std::chrono::milliseconds(r1 * 2 + r3 * 10);
// 更新保存间隔
save_interval_ms_ += offset_ms;
rule_state_update_interval_ms_ += offset_ms2;
logger_->Debug() << "ruleid:" << rule_id_
<< "save_interval_ms_:" << save_interval_ms_.count()
<< ";rule_state_update_interval_ms_:"
<< rule_state_update_interval_ms_.count() << std::endl;
/*初始化保存正常数据的数据--使规则开始运行,不保存----start*/
data_info_.update(mix_cc::mix_time_t(now_time_).to_milliseconds(),
mix_cc::mix_time_t(now_time_).to_milliseconds());
SingletonTemplate<DsmHandle>::GetInstance().insert(rule_id_, &data_info_);
/*初始化保存正常数据的数据--使规则开始运行,不保存----end*/
int res = 0;
is_usable_ = false;
if (rule_json_.contains("before_exec")) {
try {
this->prr_ = std::stoi(this->rule_json_.at("before_exec").get<string>());
} catch (const std::exception &e) {
logger_->Error() << e.what() << "location" << BOOST_CURRENT_LOCATION
<< endl;
this->prr_ = this->rule_json_.at("before_exec").get<int>();
}
logger_->Debug() << rule_name_ << "的执行前置条件[0-无1-表达式]:" << prr_
<< endl;
}
try {
res += this->reload_config_tag();
data_info_.init(m_tags);
try {
int ms =
std::stoi(rule_json_.at("trigger").at("value").get<std::string>());
if (ms < 20) {
ms = 20;
}
this->delay_time_ = milliseconds(ms);
exp_mpdule_ptr_ =
std::make_unique<ExpModule>(mm_vars, m_tags, is_exp_alg_);
if (this->prr_ == 1) {
string exp_str = "";
if (rule_json_.at("function").contains("pre_result")) {
auto tmp_exp = rule_json_.at("function")
.at("pre_result")
.at("value")
.get<std::string>();
logger_->Info() << "tmp_exp:" << tmp_exp << std::endl;
exp_str = get_macro_replaced_exp(tmp_exp);
logger_->Info() << "exp_str:" << exp_str << std::endl;
exp_mpdule_ptr_->add_exp("pre_result", exp_str);
logger_->Info() << "pre_result:"
<< exp_mpdule_ptr_->get_exp_str("pre_result")
<< std::endl;
}
logger_->Debug() << "ruleid:" << this->rule_id_
<< ",rulename:" << this->rule_name_
<< ",执行前提表达式:" << exp_str << std::endl;
}
} catch (const std::exception &e) {
this->error_code_list_.push_back(
{ErrorType::Empty, ErrorLocation::ExeCyc});
logger_->Error() << e.what() << "location" << BOOST_CURRENT_LOCATION
<< endl;
res += -1;
}
last_run_start_time_ = system_clock::now() - this->delay_time_;
} catch (const std::exception &e) {
logger_->Error() << e.what() << "location" << BOOST_CURRENT_LOCATION
<< endl;
res += -1;
}
logger_->Debug() << "|" << rule_id_ << "|alg init 完成 90% "
<< ",last_alarm_time_"
<< mix_cc::mix_time_t(last_alarm_time_).to_formatted_time()
<< std::endl;
this->rule_stat_.limit_down = 0;
this->rule_stat_.limit_up = 0;
this->rule_stat_.current_value = 0;
if (glob_process_type == ProcessType::kMon) {
this->update_map_rule();
}
logger_->Debug() << "|" << rule_id_ << "|alg init 完成! " << std::endl;
return res;
}
/**
* @brief 保证间隔delay_time_以上执行一次
* @return true
* @return false
*/
bool AlgBase::get_cycled() {
auto now = system_clock::now();
if ((now - last_run_start_time_) > this->delay_time_) {
last_run_start_time_ = now;
} else {
return false;
}
return true;
}
// 载入tag点配置信息
int AlgBase::reload_config_tag() {
const auto tag_dict = rule_json_["tags"];
m_tags.resize(tag_dict.size());
this->rule_stat_.items.resize(tag_dict.size());
for (auto x : tag_dict.items()) {
int item_index = stoi(x.key().substr(3));
logger_->Debug() << "item_index:" << item_index << ",x.key():" << x.key()
<< std::endl;
if (item_index > tag_dict.size()) {
logger_->Error() << "|tag序号异常|"
<< ",ruleid:" << rule_id_ << ",rule_name:" << rule_name_
<< ",tag_dict.size():" << tag_dict.size()
<< ",item_index:" << item_index << std::endl;
return -1;
}
auto itemi = x.value().at("value").get<std::string>();
logger_->Debug() << "tag" << item_index << ":" << itemi << std::endl;
m_tags[item_index - 1] = string(CMemVar::Const()->UnitNo) + "_" + itemi;
this->rule_stat_.items[item_index - 1] = itemi;
}
var_cache_.init(m_tags.size(), pv_num_);
if (m_tags.empty()) {
error_code_list_.push_back({ErrorType::Empty, ErrorLocation::Tags});
logger_->Debug() << "m_tags为空" << endl;
return -1;
}
return 0;
}
// 获取数据源
int AlgBase::get_data_source() { return this->data_source_; }
// 设置上次报警时间,这个时间是由执行进行类决定的,
// 因为最终报警是否被发送也是由该类决定的
void AlgBase::set_last_alarm_time(TimePoint time_point) {
this->last_alarm_time_ = time_point;
}
void AlgBase::exec_mon_call() {
if (this->get_cycled()) {
if (this->get_usable() && get_prr()) {
auto alarm = this->exec_mon();
this->con_monitor_.update(alarm.alarmed);
if (alarm.alarmed) {
logger_->Debug() << alarm.content << endl;
alarm_poster_.alarm(alarm, &last_alarm_time_);
exp_mpdule_ptr_->fun_reset();
} else {
if (get_save_data_cycled()) {
this->save_rule_norm_data();
}
}
} else {
this->con_monitor_.update(true);
}
if (get_update_rule_stat_cycled()) {
this->update_map_rule();
}
}
if (get_heart_beat_log_cycled()) {
logger_->Debug() << "heart_beat_log----active!" << std::endl;
}
}
void AlgBase::save_rule_norm_data() {
data_info_.update(mix_cc::mix_time_t(now_time_).to_milliseconds() - 1000 * 5,
mix_cc::mix_time_t(now_time_).to_milliseconds());
SingletonTemplate<DsmHandle>::GetInstance().insert(rule_id_, &data_info_);
}
void AlgBase::exec_task_call(const mix_cc::time_range_t &time_range) {
gb_logger_->log_info("task-开始执行:" + rule_name_);
this->task_time_range_ = time_range;
auto alarms = this->exec_task(time_range);
for (auto alarm : alarms) {
if (alarm.alarmed) {
alarm_poster_.alarm(alarm, "task");
}
}
gb_logger_->log_info("task-执行完成:" + rule_name_);
}
/**
* @brief 区别需要统计样本的模型
*/
void AlgBase::exec_normal_task_call(const mix_cc::time_range_t &time_range) {
gb_logger_->log_info("task-开始执行:" + rule_name_);
this->task_time_range_ = time_range;
auto alarm = this->exec_normal_task(time_range);
if (alarm.alarmed) {
this->set_last_alarm_time(time_range.get_left());
alarm_poster_.alarm(alarm, &last_alarm_time_);
}
gb_logger_->log_info("task-执行完成:" + rule_name_);
}
void AlgBase::exec_cron_call() { this->exec_cron(); }
void AlgBase::set_usable(bool usable) {
is_usable_ = usable;
logger_->Debug() << rule_name_ << " usable:" << usable << endl;
}
mix_cc::json AlgBase::exec_cron() { return mix_cc::json{}; }
// 刷新ihyperDB缓存把ihyperDB中的数据转移到当前程序缓存中
// 注意该函数是先更改query_time_range_再查询ihd的数据
int AlgBase::refresh_ihd_cache() {
try {
// 如果数据时间包含空时间
if (query_time_range_.get_left() == TimePoint() ||
query_time_range_.get_right() == TimePoint()) {
// 则设置时间为当前时间前5秒
query_time_range_.set_left(now_time_ - 5s);
query_time_range_.set_right(now_time_);
} else {
// 否则把开始时间设置为上次查询的结束时间
query_time_range_.set_left(query_time_range_.get_right());
// 结束时间设置为当前时间
query_time_range_.set_right(now_time_);
}
// 进行查询
auto queried_batch_maybe = mix_cc::ihd::make_query_batch_maybe(
m_tags, query_time_range_, ihd_min_time_particles_);
if (queried_batch_maybe.is_nothing()) {
queried_time_.clear();
queried_data_ = typename decltype(queried_batch_maybe)::type::Mat2d{};
gb_logger_->log_error("Tag点没有查到相应的信息");
return -1;
}
auto queried_bath = queried_batch_maybe.unsafe_get_just();
auto result_maybe = mix_cc::ihd::read_data_with_time_maybe(&queried_bath);
if (result_maybe.is_nothing()) {
queried_time_.clear();
queried_data_ = typename decltype(queried_batch_maybe)::type::Mat2d{};
gb_logger_->log_error("Tag点没有查到相应的数据");
return -1;
}
std::tie(queried_time_, queried_data_) = result_maybe.unsafe_get_just();
} catch (const std::exception &e) {
logger_->Debug() << e.what() << ",locations:" << BOOST_CURRENT_LOCATION
<< endl;
gb_logger_->log_exception(e);
}
return 0;
}
// query_time_range_ 会被刷新 l0 = r1 ,r0 = now
int AlgBase::refresh_ihd_cache(TimeDur delay_time) {
try {
// 如果数据时间包含空时间
if (query_time_range_.get_left() == TimePoint() ||
query_time_range_.get_right() == TimePoint()) {
query_time_range_.set_left(now_time_ - 10 * delay_time);
query_time_range_.set_right(now_time_);
} else {
// 否则把开始时间设置为上次查询的结束时间
query_time_range_.set_left(query_time_range_.get_right());
// 结束时间设置为当前时间
query_time_range_.set_right(now_time_);
}
{
gb_logger_->log_info(
"ihd函数内查询开始时间:" +
mix_cc::mix_time_t(query_time_range_.get_left()).to_formatted_time());
gb_logger_->log_info("ihd函数内查询结束时间:" +
mix_cc::mix_time_t(query_time_range_.get_right())
.to_formatted_time());
}
// 进行查询
auto queried_batch_maybe = mix_cc::ihd::make_query_batch_maybe(
m_tags, query_time_range_, delay_time);
if (queried_batch_maybe.is_nothing()) {
queried_time_.clear();
queried_data_ = typename decltype(queried_batch_maybe)::type::Mat2d{};
gb_logger_->log_error("Tag点没有查到相应的信息");
return -1;
}
auto queried_bath = queried_batch_maybe.unsafe_get_just();
auto result_maybe = mix_cc::ihd::read_data_with_time_maybe(&queried_bath);
if (result_maybe.is_nothing()) {
queried_time_.clear();
queried_data_ = typename decltype(queried_batch_maybe)::type::Mat2d{};
gb_logger_->log_error("Tag点没有查到相应的数据");
return -1;
}
std::tie(queried_time_, queried_data_) = result_maybe.unsafe_get_just();
} catch (const std::exception &e) {
gb_logger_->log_exception(e);
}
return 0;
}
int AlgBase::refresh_now_time() {
if (data_source_ == DataSource::IHDB) {
// 因为ihdb存储的耗时比较长直接查询当前的时间数据
// 可能不存在对应的存储的数据
// 所以需要把数据向前找5s
now_time_ = system_clock::now() - 5s;
} else {
now_time_ = system_clock::now();
}
return 0;
}
bool AlgBase::update_map_rule() {
try {
std::lock_guard<std::mutex> guard(lm);
SingletonTemp<EqpStat>::GetInstance().update_display(
this->rule_id_, this->rule_stat_);
SingletonTemp<EqpStat>::GetInstance().update_cold(
this->rule_id_, this->rule_stat_);
return true;
} catch (...) {
gb_logger_->log_error(this->rule_name_ + "update_map_rule()");
return false;
}
}
bool AlgBase::get_prr() {
if (this->prr_ == 1) {
exp_mpdule_ptr_->update();
bool prr_result = (bool)exp_mpdule_ptr_->get_value("pre_result");
this->now_prr_ = prr_result;
return prr_result;
}
now_prr_ = true;
return true;
}
int AlgBase::ihd_get_by_tag(string tag, mix_cc::time_range_t time_range) {
std::vector<std::string> tags;
tags.push_back(tag);
try {
auto queried_batch_maybe = mix_cc::ihd::make_query_batch_maybe(
tags, time_range, ihd_min_time_particles_);
if (queried_batch_maybe.is_nothing()) {
queried_time_.clear();
queried_data_ = typename decltype(queried_batch_maybe)::type::Mat2d{};
gb_logger_->log_error("Tag点没有查到相应的信息");
return -1;
}
auto queried_bath = queried_batch_maybe.unsafe_get_just();
auto result_maybe = mix_cc::ihd::read_data_with_time_maybe(&queried_bath);
if (result_maybe.is_nothing()) {
queried_time_.clear();
queried_data_ = typename decltype(queried_batch_maybe)::type::Mat2d{};
gb_logger_->log_error("Tag点没有查到相应的数据");
return -1;
}
std::tie(queried_time_, queried_data_) = result_maybe.unsafe_get_just();
if (queried_data_.rows() == 0) {
gb_logger_->log_error("Tag点没有查到相应的数据");
return -1;
}
}
catch (const std::exception &e) {
gb_logger_->log_exception(e);
return -1;
}
return 0;
}
int AlgBase::refresh_ihd_cache(mix_cc::time_range_t time_range) {
try {
auto queried_batch_maybe = mix_cc::ihd::make_query_batch_maybe(
m_tags, time_range, ihd_min_time_particles_);
if (queried_batch_maybe.is_nothing()) {
queried_time_.clear();
queried_data_ = typename decltype(queried_batch_maybe)::type::Mat2d{};
gb_logger_->log_error("Tag点没有查到相应的信息");
return -1;
}
auto queried_bath = queried_batch_maybe.unsafe_get_just();
auto result_maybe = mix_cc::ihd::read_data_with_time_maybe(&queried_bath);
if (result_maybe.is_nothing()) {
queried_time_.clear();
queried_data_ = typename decltype(queried_batch_maybe)::type::Mat2d{};
gb_logger_->log_error("Tag点没有查到相应的数据");
return -1;
}
std::tie(queried_time_, queried_data_) = result_maybe.unsafe_get_just();
if (queried_data_.rows() == 0) {
gb_logger_->log_error("Tag点没有查到相应的数据");
return -1;
}
}
catch (const std::exception &e) {
gb_logger_->log_exception(e);
return -1;
}
return 0;
}
void AlgBase::update_limit_alarm(double lb, double ub, double va, int64_t stime,
int64_t etime) {
try {
std::lock_guard<std::mutex> guard(lm);
rule_stat_.limit_down = lb;
rule_stat_.limit_up = ub;
rule_stat_.current_value = va;
if (rule_stat_.current_value == 0 || va <= ub) {
this->query_time_range_.set_range(
std::chrono::time_point<std::chrono::system_clock,
std::chrono::milliseconds>(
std::chrono::milliseconds(stime)),
std::chrono::time_point<std::chrono::system_clock,
std::chrono::milliseconds>(
std::chrono::milliseconds(etime)));
}
} catch (const std::exception &e) {
logger_->Error() << e.what() << std::endl;
}
}
mix_cc::time_range_t AlgBase::get_alarm_time() {
try {
if ((this->query_time_range_.get_right() -
this->query_time_range_.get_left()) < seconds(5)) {
return mix_cc::time_range_t(this->query_time_range_.get_right() -
seconds(5),
this->query_time_range_.get_right());
} else if (std::chrono::duration_cast<std::chrono::seconds>(
this->query_time_range_.get_left().time_since_epoch())
.count() == 0) {
return mix_cc::time_range_t(this->query_time_range_.get_right() -
seconds(5),
this->query_time_range_.get_right());
} else if (std::chrono::duration_cast<std::chrono::seconds>(
this->query_time_range_.get_left().time_since_epoch())
.count() > 300) {
return mix_cc::time_range_t(this->query_time_range_.get_right() -
seconds(300),
this->query_time_range_.get_right());
}
return this->query_time_range_;
} catch (const std::exception &e) {
logger_->Error() << e.what() << std::endl;
return mix_cc::time_range_t(this->now_time_ - seconds(5), this->now_time_);
}
}
bool AlgBase::get_save_data_cycled() {
auto now = system_clock::now();
if ((now - last_save_start_time_) > save_interval_ms_) {
last_save_start_time_ = now;
} else {
return false;
}
return true;
}
bool AlgBase::get_update_rule_stat_cycled() {
auto now = system_clock::now();
if ((now - last_rule_state_updae_start_time_) >
rule_state_update_interval_ms_) {
last_rule_state_updae_start_time_ = now;
} else {
return false;
}
return true;
}
bool AlgBase::get_heart_beat_log_cycled() {
auto now = system_clock::now();
if ((now - last_heart_beat_start_time_) > std::chrono::minutes(5)) {
last_heart_beat_start_time_ = now;
} else {
return false;
}
return true;
}