将 exp_bound、exp_times、exp_sample2D 中对已删除成员的引用替换为 expr_engine_->evaluate()/evaluateBool() 调用。 exp_sample2D 中原来绑定 exp_feedback_ 和 exp_result_ 的 sample_X/sample_Y 表达式现在通过 expr_engine_->registerExpression() 注册。
362 lines
14 KiB
C++
362 lines
14 KiB
C++
#include <eqpalg/algs/exp_times.h>
|
||
#include <eqpalg/table_struct/t_rule_sample_1d.h>
|
||
#include <eqpalg/utility/async_db_worker.h>
|
||
#include <eqpalg/utility/build_alarm_info.h>
|
||
#include <eqpalg/utility/eqp_stat.h>
|
||
#include <mix_cc/sql.h>
|
||
#include <mix_cc/sql/database/db2_t.h>
|
||
extern ProcessType glob_process_type;
|
||
|
||
namespace {
|
||
|
||
/// 持久化累积次数/时间到 DB2——可在任意线程调用(不依赖 ExpTimes 实例)
|
||
int persist_exp_times(const std::string &rule_id, int exp_type,
|
||
int64_t now_times, double now_used_time) {
|
||
try {
|
||
T_RULE_SAMPLE_1D trs1a;
|
||
auto query_maybe = exec<db2_t, T_RULE_SAMPLE_1D>(
|
||
select(trs1a.Flag(), trs1a.Count(), trs1a.X1())
|
||
.from(trs1a)
|
||
.where(trs1a.RuleId() == rule_id));
|
||
if (query_maybe.is_nothing()) {
|
||
return -1;
|
||
}
|
||
auto &query_list = query_maybe.unsafe_get_just();
|
||
if (query_list.empty()) {
|
||
if (exp_type == ExpType::OccTimesAcc) {
|
||
exec<db2_t, size_t>(insert_into(trs1a).set(
|
||
trs1a.RuleId() = rule_id, trs1a.X1() = 1,
|
||
trs1a.Count() = now_times, trs1a.Flag() = exp_type));
|
||
} else {
|
||
exec<db2_t, size_t>(insert_into(trs1a).set(
|
||
trs1a.RuleId() = rule_id, trs1a.X1() = now_used_time,
|
||
trs1a.Count() = 1, trs1a.Flag() = exp_type));
|
||
}
|
||
} else {
|
||
if (exp_type == ExpType::OccTimesAcc) {
|
||
exec<db2_t, size_t>(update(trs1a)
|
||
.set(trs1a.Count() = now_times, trs1a.X1() = 2,
|
||
trs1a.Flag() = exp_type)
|
||
.where(trs1a.RuleId() == rule_id));
|
||
} else {
|
||
exec<db2_t, size_t>(update(trs1a)
|
||
.set(trs1a.X1() = now_used_time,
|
||
trs1a.Count() = 2,
|
||
trs1a.Flag() = exp_type)
|
||
.where(trs1a.RuleId() == rule_id));
|
||
}
|
||
}
|
||
return 0;
|
||
} catch (const std::exception &e) {
|
||
return -1;
|
||
}
|
||
}
|
||
|
||
} // namespace
|
||
ExpTimes::ExpTimes(const string &name, const mix_cc::json &rule_json,
|
||
const string &ruleId, size_t exp_type)
|
||
: ExpBase(name, rule_json, ruleId, exp_type) {
|
||
this->last_load_time_ = std::chrono::system_clock::now();
|
||
logger_.reset(
|
||
new LOG("ExpTimes-" + std::to_string(exp_type) + ":" + rule_name_,
|
||
AUTO_CATCH_PID));
|
||
}
|
||
|
||
ExpTimes::~ExpTimes() {
|
||
this->wait_flag_ = 0;
|
||
}
|
||
|
||
int ExpTimes::init() {
|
||
int ret = 0;
|
||
try {
|
||
ret += AlgBase::init(); /*1.tag点;2执行时间间隔和上次执行时间点*/
|
||
ret += reload_config_data_source(); /*载入数据源:0——ihd;1——内存*/
|
||
if (glob_process_type == ProcessType::kMon) {
|
||
ret += this->first_fill_mm_vars(); /*4.数据项*/
|
||
}
|
||
// 必须在刷新变量后,才可以初始化表达式
|
||
ret += this->reload_config_exp_act(); /*表达式*/
|
||
if (glob_process_type == ProcessType::kMon &&
|
||
this->get_history_times() == -1) {
|
||
ret += -1;
|
||
} /*查db2 ---延迟查询*/
|
||
ret += this->reload_params(); /*1.上限;2.初始值*/
|
||
logger_->Debug() << "ExpTimes::init(),"
|
||
<< "shear_times:" << rule_stat_.shear_times
|
||
<< ",running_time:" << rule_stat_.running_time << endl;
|
||
} catch (const std::exception &e) {
|
||
logger_->Error() << ":ExpTimes::init()异常!" << e.what()
|
||
<< ",location:" << BOOST_CURRENT_LOCATION << endl;
|
||
ret += -1;
|
||
}
|
||
if (ret == 0) {
|
||
this->exp_is_wrong_ = false;
|
||
}
|
||
if (exp_type_ == ExpType::OccTimesAcc) {
|
||
rule_stat_.unit = "次";
|
||
}
|
||
if (exp_type_ == ExpType::HoldTimeAcc) {
|
||
rule_stat_.unit = "h";
|
||
}
|
||
return ret;
|
||
}
|
||
|
||
int ExpTimes::reload_params() {
|
||
int res = 0;
|
||
this->rule_stat_.limit_down = -32768;
|
||
try {
|
||
if (rule_json_.at("function")
|
||
.at("result")
|
||
.at("param")
|
||
.contains("limit_times")) {
|
||
max_times_ = std::stoll(rule_json_.at("function")
|
||
.at("result")
|
||
.at("param")
|
||
.at("limit_times")
|
||
.at("value")
|
||
.get<std::string>());
|
||
this->rule_stat_.limit_up = max_times_;
|
||
this->rule_stat_.current_value = this->rule_stat_.shear_times;
|
||
} else {
|
||
max_time_ = std::stod(rule_json_.at("function")
|
||
.at("result")
|
||
.at("param")
|
||
.at("limit_time")
|
||
.at("value")
|
||
.get<std::string>());
|
||
this->rule_stat_.limit_up = max_time_;
|
||
this->rule_stat_.current_value = this->rule_stat_.running_time;
|
||
}
|
||
logger_->Debug() << "阈值:" << this->rule_stat_.limit_up
|
||
<< ",当前值:" << rule_stat_.current_value << endl;
|
||
} catch (const std::exception &e) {
|
||
logger_->Error() << ":ExpTimes::reload_params()异常!" << e.what()
|
||
<< ",location:" << BOOST_CURRENT_LOCATION << endl;
|
||
return -1;
|
||
}
|
||
return res;
|
||
}
|
||
|
||
AlarmInfo ExpTimes::mon_proc() {
|
||
AlarmInfo out_alarm{};
|
||
/**
|
||
* 1.检查存写周期,满足则存db2
|
||
* 2.运行累计
|
||
* 3.检查报警,返回
|
||
*/
|
||
if (this->now_time_ - last_load_time_ > minutes(rw_time_)) {
|
||
/* 存数据*/
|
||
logger_->Debug() << "now_time_:"
|
||
<< mix_cc::mix_time_t(this->now_time_).to_formatted_time()
|
||
<< ",last_load_time_:"
|
||
<< mix_cc::mix_time_t(last_load_time_).to_formatted_time()
|
||
<< ",rw_time:" << rw_time_ << "min" << endl;
|
||
if (update_history_times() == 0) {
|
||
logger_->Debug() << "exp_type:" << exp_type_
|
||
<< ",update_history_times(),shear_times:"
|
||
<< rule_stat_.shear_times
|
||
<< ",running_time:" << rule_stat_.running_time
|
||
<< ",rw_time:" << rw_time_ << "min" << endl;
|
||
last_load_time_ = this->now_time_;
|
||
}
|
||
}
|
||
if (update_times() == 0) {
|
||
if (check_alarm()) {
|
||
this->rule_stat_.alarm_value = this->rule_stat_.current_value;
|
||
auto msg =
|
||
rule_name_ + this->error_str_ +
|
||
",阈值:" + DAA::double2str(rule_stat_.limit_up) +
|
||
(exp_type_ == ExpType::OccTimesAcc ? ",当前累计次数:"
|
||
: ",当前运行时间(小时):") +
|
||
DAA::double2str(rule_stat_.current_value);
|
||
|
||
logger_->Debug() << msg << endl;
|
||
this->query_time_range_.set_left(query_time_range_.get_right() -
|
||
delay_time_);
|
||
return utility::build_alarm_info(
|
||
utility::get_msg_level(0, this->rule_stat_.limit_up,
|
||
this->rule_stat_.current_value),
|
||
rule_id_, rule_name_, "ExpTimes", msg, query_time_range_);
|
||
|
||
}
|
||
}
|
||
|
||
return out_alarm;
|
||
}
|
||
|
||
int ExpTimes::update_times() {
|
||
if (((this->exp_type_ == ExpType::OccTimesAcc) &&
|
||
this->rule_stat_.shear_times >=
|
||
std::numeric_limits<unsigned long>::max() - 1) ||
|
||
(exp_type_ == ExpType::HoldTimeAcc &&
|
||
this->rule_stat_.running_time >=
|
||
std::numeric_limits<double>::max() - 1)) {
|
||
this->logger_->Debug() << "数据已超限! max(long):"
|
||
<< std::numeric_limits<unsigned long>::max()
|
||
<< " max(double):"
|
||
<< std::numeric_limits<double>::max() << std::endl;
|
||
return -1;
|
||
}
|
||
|
||
this->act_triggered_ = expr_engine_->evaluateBool("act");
|
||
/*出现次数累计*/
|
||
if (this->exp_type_ == ExpType::OccTimesAcc) {
|
||
if (this->act_triggered_) {
|
||
this->rule_stat_.shear_times++;
|
||
this->rule_stat_.current_value = this->rule_stat_.shear_times;
|
||
this->logger_->Debug()
|
||
<< "出现次数:" << this->rule_stat_.shear_times << std::endl;
|
||
}
|
||
}
|
||
/*保持时间累计*/
|
||
else if (exp_type_ == ExpType::HoldTimeAcc) {
|
||
this->rule_stat_.current_value = this->rule_stat_.running_time;
|
||
if (!this->act_started_) {
|
||
if (this->act_triggered_) {
|
||
this->act_started_ = true;
|
||
mm_vars["stime"] =
|
||
duration_cast<milliseconds>(now_time_.time_since_epoch()).count();
|
||
this->logger_->Debug()
|
||
<< "开始计时,stime:" << mm_vars["stime"] << ","
|
||
<< mix_cc::mix_time_t(now_time_).to_formatted_time() << std::endl;
|
||
}
|
||
} else {
|
||
mm_vars["time"] = mm_vars["now"] - mm_vars["stime"];
|
||
if (!this->act_triggered_) {
|
||
this->act_started_ = false;
|
||
this->rule_stat_.running_time +=
|
||
mm_vars["time"] / (60.0 * 60000); this->logger_->Debug()
|
||
<< "计时结束:" << mix_cc::mix_time_t(now_time_).to_formatted_time()
|
||
<< " 本次统计的使用时间:" << mm_vars["time"] / 60000.0
|
||
<< " minites" << std::endl;
|
||
} else {
|
||
if ((mm_vars["time"] / 60000.0) > rw_time_) {
|
||
mm_vars["stime"] =
|
||
duration_cast<milliseconds>(now_time_.time_since_epoch()).count();
|
||
this->rule_stat_.running_time +=
|
||
mm_vars["time"] / (60.0 * 60000); this->logger_->Debug()
|
||
<< "计时结束:"
|
||
<< mix_cc::mix_time_t(now_time_).to_formatted_time()
|
||
<< " 本次统计的使用时间:" << mm_vars["time"] / 60000.0
|
||
<< "minites" << std::endl;
|
||
}
|
||
}
|
||
}
|
||
} else {
|
||
this->logger_->Debug() << "exp_type_:" << exp_type_ << ",未定义类型!"
|
||
<< std::endl;
|
||
}
|
||
|
||
return 0;
|
||
}
|
||
|
||
bool ExpTimes::check_alarm() {
|
||
if (this->act_triggered_ && exp_type_ == ExpType::HoldTimeAcc) {
|
||
return this->rule_stat_.running_time > this->max_time_ ? true : false;
|
||
} else if (this->act_triggered_ && exp_type_ == ExpType::OccTimesAcc) {
|
||
return this->rule_stat_.shear_times > this->max_times_ ? true : false;
|
||
}
|
||
return false;
|
||
}
|
||
|
||
int ExpTimes::get_history_times() {
|
||
T_RULE_SAMPLE_1D trs1a;
|
||
auto query_list_maybe = exec<db2_t, T_RULE_SAMPLE_1D>(
|
||
select(trs1a.Flag(), trs1a.Count(), trs1a.X1())
|
||
.from(trs1a)
|
||
.where(trs1a.RuleId() == this->rule_id_));
|
||
if (query_list_maybe.is_just()) {
|
||
auto &query_list = query_list_maybe.unsafe_get_just();
|
||
|
||
if (query_list.empty()) {
|
||
this->logger_->Error() << "db2 查询为空!" << std::endl;
|
||
this->rule_stat_.shear_times = 0;
|
||
this->rule_stat_.running_time = 0;
|
||
return -2;
|
||
} else {
|
||
if (exp_type_ == ExpType::HoldTimeAcc) {
|
||
this->rule_stat_.running_time = query_list[0].X1;
|
||
this->logger_->Info() << "db2,this->rule_stat_.running_time:"
|
||
<< this->rule_stat_.running_time << std::endl;
|
||
} else if (exp_type_ == ExpType::OccTimesAcc) {
|
||
this->rule_stat_.shear_times = query_list[0].Count;
|
||
this->logger_->Info()
|
||
<< "db2,this->rule_stat_.shear_times:"
|
||
<< this->rule_stat_.shear_times
|
||
<< ",query_list[0].Count:" << query_list[0].Count << std::endl;
|
||
}
|
||
}
|
||
|
||
} else {
|
||
this->logger_->Error() << "db2 查询失败!" << std::endl;
|
||
this->rule_stat_.shear_times = 0;
|
||
this->rule_stat_.running_time = 0;
|
||
return -1;
|
||
}
|
||
return 0;
|
||
}
|
||
|
||
int ExpTimes::update_history_times() {
|
||
auto now_times = this->rule_stat_.shear_times;
|
||
auto now_used_time = this->rule_stat_.running_time;
|
||
this->logger_->Debug() << "-----submit db2------"
|
||
<< " act_started_:"
|
||
<< std::to_string(this->act_started_)
|
||
<< ",now_times:" << now_times
|
||
<< ",now_used_time:" << now_used_time << std::endl;
|
||
|
||
// 投递到后台线程——mon 20ms 周期不阻塞
|
||
std::string rule_id = this->rule_id_;
|
||
int exp_type = static_cast<int>(this->exp_type_);
|
||
std::string rule_name = this->rule_name_;
|
||
|
||
AsyncDbWorker::instance().submit(rule_id, [=]() {
|
||
LOG logger("AsyncDb-" + rule_name, AUTO_CATCH_PID);
|
||
logger.Debug() << "persist start, now_times:" << now_times
|
||
<< ", now_used_time:" << now_used_time << std::endl;
|
||
if (persist_exp_times(rule_id, exp_type, now_times, now_used_time) == 0) {
|
||
SingletonTemp<EqpStat>::GetInstance().update_static(rule_id, now_times,
|
||
now_used_time);
|
||
logger.Debug() << "persist done" << std::endl;
|
||
} else {
|
||
logger.Error() << "persist failed" << std::endl;
|
||
}
|
||
});
|
||
|
||
return 0;
|
||
}
|
||
|
||
int ExpTimes::insert_history_times(int64_t now_times, double now_used_time) {
|
||
T_RULE_SAMPLE_1D trs1a;
|
||
this->logger_->Debug() << "-----insert db2------" << std::endl;
|
||
if (exp_type_ == ExpType::OccTimesAcc) {
|
||
exec<db2_t, size_t>(insert_into(trs1a).set(
|
||
trs1a.RuleId() = this->rule_id_, trs1a.X1() = 1,
|
||
trs1a.Count() = now_times, trs1a.Flag() = this->exp_type_));
|
||
this->logger_->Debug() << "当前次数:" << now_times << std::endl;
|
||
} else if (exp_type_ == ExpType::HoldTimeAcc) {
|
||
exec<db2_t, size_t>(insert_into(trs1a).set(
|
||
trs1a.RuleId() = this->rule_id_, trs1a.X1() = now_used_time,
|
||
trs1a.Count() = 1, trs1a.Flag() = this->exp_type_));
|
||
this->logger_->Debug() << "当前使用时间:" << now_used_time << std::endl;
|
||
}
|
||
return 0;
|
||
}
|
||
|
||
void ExpTimes::reset_dev_data() {
|
||
last_load_time_ = now_time_;
|
||
this->rule_stat_.shear_times = 0;
|
||
this->rule_stat_.running_time = 0;
|
||
// SHM 立即更新(不依赖 DB 读回,值已知为 0)
|
||
SingletonTemp<EqpStat>::GetInstance().update_static(this->rule_id_, 0, 0.0);
|
||
// DB 持久化异步投递
|
||
std::string rule_id = this->rule_id_;
|
||
int exp_type = static_cast<int>(this->exp_type_);
|
||
AsyncDbWorker::instance().submit(rule_id, [=]() {
|
||
persist_exp_times(rule_id, exp_type, 0, 0.0);
|
||
});
|
||
logger_->Debug() << rule_name_ << ":ExpTimes::reset_dev_data()" << endl;
|
||
this->wait_flag_ = 0;
|
||
this->act_started_ = false;
|
||
this->act_triggered_ = false;
|
||
} |