eis/eqpalg/algs/exp_sample2D.cc

449 lines
15 KiB
C++
Raw Normal View History

#include <eqpalg/algs/exp_sample2D.h>
#include <eqpalg/define/stat.h>
#include <eqpalg/exp_macro/get_macro_replaced_exp.h>
#include <eqpalg/table_struct/t_sample_fit.h>
#include <eqpalg/table_struct/t_sample_mag.h>
#include <eqpalg/table_struct/t_sample_stat.h>
#include <eqpalg/utility/build_alarm_info.h>
#include <mix_cc/sql.h>
#include <mix_cc/sql/database/db2_t.h>
using namespace mix_cc::sql;
extern ProcessType glob_process_type;
ExpSample2D::ExpSample2D(const string &name, const mix_cc::json &rule_json,
const string &ruleId, size_t exp_type)
: ExpBase(name, rule_json, ruleId, exp_type) {
is_running_ = false;
logger_.reset(
new LOG("ExpSample2D-" + std::to_string(exp_type) + ":" + rule_name_,
AUTO_CATCH_PID));
}
ExpSample2D::~ExpSample2D() {
}
AlarmInfo ExpSample2D::doMonProc() {
return AlarmInfo{}; // 此类重写 mon_proc(),不使用 doMonProc()
}
int ExpSample2D::init() {
int ret = 0;
ret += AlgBase::init();
// 重新载入数据源配置信息
ret += this->reload_config_data_source();
// 在载入数据源信息完成后载入表达式配置之前必须刷新变量把变量信息初始化到mm_vars内
ret += this->first_fill_mm_vars();
// 必须在刷新变量后,才可以初始化表达式
ret += this->reload_config_exp_act();
ret += this->reload_samples();
sample_type_ = SampleType::T_SAMPLE_FIT;
if (this->reload_param() == 0) {
is_running_ = true;
}
if (ret == 0) {
this->exp_is_wrong_ = false;
}
return ret;
}
AlarmInfo ExpSample2D::mon_proc() {
if (!is_running_) {
return AlarmInfo{};
} else {
switch (exp_type_) {
case ExpType::PolyFit:
if (expr_engine_->evaluateBool("act") && check_polyFit()) {
this->rule_stat_.alarm_value = this->rule_stat_.current_value;
auto msg = rule_name_ + this->error_str_ + " Y表达式当前值" +
DAA::double2str(this->rule_stat_.current_value) +
",拟合的合理区间:[" + DAA::double2strLimit(limit_down_) +
"," + DAA::double2strLimit(limit_up_) + "]";
logger_->Debug() << msg << endl;
this->query_time_range_.set_left(query_time_range_.get_right() -
2026-05-09 13:35:17 +08:00
delay_time_);
return utility::build_alarm_info(
utility::get_msg_level(limit_down_, limit_up_,
this->rule_stat_.current_value),
rule_id_, rule_name_, "ExpSample2D", msg, query_time_range_);
}
break;
case ExpType::PEAR:
if (check_pear()) {
this->rule_stat_.alarm_value = this->rule_stat_.current_value;
auto msg = rule_name_ + this->error_str_ + " 当前线性相关性系数:" +
DAA::double2str(this->rule_stat_.current_value) +
",合理区间:[" + DAA::double2strLimit(limit_down_) + "," +
DAA::double2strLimit(limit_up_) + "]";
logger_->Debug() << msg << endl;
this->query_time_range_.set_left(query_time_range_.get_right() -
2026-05-09 13:35:17 +08:00
milliseconds(min_len_ * 50));
logger_->Debug() << rule_name_ << ",stime:"
<< mix_cc::mix_time_t(query_time_range_.get_left())
.to_formatted_time()
<< ",etime:"
<< mix_cc::mix_time_t(query_time_range_.get_right())
.to_formatted_time()
<< std::endl;
return utility::build_alarm_info(
utility::get_msg_level(limit_down_, limit_up_,
this->rule_stat_.current_value),
rule_id_, rule_name_, "ExpSample2D", msg, query_time_range_);
}
break;
default:
break;
}
return AlarmInfo{};
}
}
int ExpSample2D::reload_samples() {
/*
sample_X绑定 expr_engine_->evaluate("sample_X")
sample_Y绑定 expr_engine_->evaluate("sample_Y")
*/
auto tmp_exp =
rule_json_.at("function").at("sample_X").at("value").get<std::string>();
exp_str_ = get_macro_replaced_exp(tmp_exp);
if (exp_str_ != "") {
int ret = expr_engine_->registerExpression("sample_X", exp_str_);
if (ret != 0) return -1;
logger_->Debug() << "sample_X:" << exp_str_ << "="
<< expr_engine_->evaluate("sample_X") << endl;
}
tmp_exp =
rule_json_.at("function").at("sample_Y").at("value").get<std::string>();
exp_str_ = get_macro_replaced_exp(tmp_exp);
if (exp_str_ != "") {
int ret = expr_engine_->registerExpression("sample_Y", exp_str_);
if (ret != 0) return -1;
logger_->Debug() << "sample_Y:" << exp_str_ << "="
<< expr_engine_->evaluate("sample_Y") << endl;
}
tmp_exp = rule_json_.at("function")
.at("sample_Y")
.at("param")
.at("scale")
.at("value")
.get<std::string>();
try {
scale_ = std::stod(tmp_exp);
logger_->Debug() << "scale_:" << scale_ << std::endl;
} catch (const std::exception &e) {
logger_->Error() << "scale_:" << tmp_exp << "stod出错" << e.what()
<< ",location:" << BOOST_CURRENT_LOCATION << endl;
scale_ = 0.3;
return -1;
}
if (exp_type_ == ExpType::PEAR) {
tmp_exp = rule_json_.at("function")
.at("sample_Y")
.at("param")
.at("min_len")
.at("value")
.get<std::string>();
try {
min_len_ = std::stod(tmp_exp);
if (min_len_ < 1000) {
min_len_ = 1000;
}
if (min_len_ > 200000) {
min_len_ = 200000;
}
logger_->Debug() << "min_len_:" << min_len_ << std::endl;
} catch (const std::exception &e) {
logger_->Error() << "min_len_:" << tmp_exp << "stod出错" << e.what()
<< endl;
min_len_ = 1000;
return -1;
}
}
return 0;
}
bool ExpSample2D::check_polyFit() {
double X = expr_engine_->evaluate("sample_X"); /* SampleX*/
double Y = expr_engine_->evaluate("sample_Y"); /* SampleY*/
double Y_Fit = PolyFitValue(X, this->fit_coefs_, this->orders_);
limit_down_ = Y_Fit - abs(Y_Fit) * scale_;
limit_up_ = Y_Fit + abs(Y_Fit) * scale_;
this->rule_stat_.limit_down = limit_down_;
this->rule_stat_.limit_up = limit_up_;
this->rule_stat_.current_value = Y;
logger_->Debug() << "X:" << X << ",Y:" << Y << ",Y_Fit:" << Y_Fit
<< "fit_coefs:" << fit_coefs_[0] << "," << fit_coefs_[1]
<< ",orders:" << orders_ << endl;
return (abs(Y_Fit - Y) > abs(Y_Fit) * scale_) ? true : false;
}
double ExpSample2D::PolyFitValue(double x, std::vector<double> &fit_coefs,
size_t orders) {
if (fit_coefs.size() != orders + 1) {
logger_->Debug() << "拟合次数不匹配!" << endl;
}
try {
double res = 0;
for (int i = 0; i < orders + 1; i++) {
res += std::pow(x, orders - i) * fit_coefs[i];
}
return res;
} catch (const std::exception &e) {
logger_->Error() << "PolyFitValue() 出错:" << e.what()
<< ",location:" << BOOST_CURRENT_LOCATION << endl;
return -1;
}
}
bool ExpSample2D::check_pear() {
if (this->data_len_ < min_len_) {
if (expr_engine_->evaluateBool("act")) {
SampleX_.push_back(expr_engine_->evaluate("sample_X"));
SampleY_.push_back(expr_engine_->evaluate("sample_Y"));
data_len_ = SampleX_.size();
} else {
reset_SampleXY();
}
return false;
}
limit_down_ = pear_coefs_ - abs(pear_coefs_) * scale_;
limit_up_ = pear_coefs_ + abs(pear_coefs_) * scale_;
if (pear_coefs_ > 0) {
limit_up_ = 1;
} else {
limit_down_ = -1;
}
this->rule_stat_.limit_down = limit_down_;
this->rule_stat_.limit_up = limit_up_;
double pear_now = PearValue(SampleX_, SampleY_);
reset_SampleXY();
if (pear_now > 1) {
return false;
}
this->rule_stat_.current_value = pear_now;
if (pear_coefs_ > 0) {
return (pear_now < this->limit_down_) ? true : false;
} else {
return (pear_now > this->limit_up_) ? true : false;
}
return false;
}
double ExpSample2D::PearValue(std::vector<double> &X, std::vector<double> &Y) {
if (X.size() != Y.size() || X.size() < 2) {
logger_->Debug() << "样本异常!"
<< "X size:" << X.size() << ",Y size" << Y.size() << endl;
return 2;
}
Eigen::Map<Eigen::VectorXd> sampleX(X.data(), X.size());
Eigen::Map<Eigen::VectorXd> sampleY(Y.data(), Y.size());
double m1 = sampleX.mean();
double m2 = sampleY.mean();
int num_data = sampleX.size();
double sigma1 = 0;
double sigma2 = 0;
double sigma12 = 0;
for (int i = 0; i < num_data; i++) {
sigma12 += (sampleX(num_data - 1 - i) - m1) *
(sampleY(num_data - 1 - i) - m2) / num_data;
sigma1 += (sampleX(num_data - 1 - i) - m1) *
(sampleX(num_data - 1 - i) - m1) / num_data;
sigma2 += (sampleY(num_data - 1 - i) - m2) *
(sampleY(num_data - 1 - i) - m2) / num_data;
}
if (sigma1 * sigma2 != 0) {
double dis1 = sigma1 / m1;
double dis2 = sigma2 / m2;
if (dis1 > 1 || dis2 > 1) {
return sigma12 / sqrt(sigma1 * sigma2);
} else {
logger_->Debug() << "存在离散度小于1" << endl;
return 2;
}
} else {
logger_->Debug() << "存在方差为0" << endl;
return 2;
}
}
int ExpSample2D::reload_param() {
T_SAMPLE_MAG tsm;
auto sql_statement =
select(tsm.result())
.from(tsm)
.where(tsm.ruleId() == this->rule_id_, tsm.usable() == 1);
auto sample_list_maybe = exec<db2_t, T_SAMPLE_MAG>(sql_statement);
if (sample_list_maybe.is_just()) {
auto sample_list = sample_list_maybe.unsafe_get_just();
if (sample_list.empty()) {
logger_->Info() << "无有效样本" << std::endl;
return -1;
} else {
try {
sample_param_ = mix_cc::json::parse(sample_list[0].result);
this->orders_ = sample_param_["orders"];
this->fit_coefs_ =
sample_param_["fit_coefs"]
.get<std::vector<std::vector<double>>>()[orders_ - 1];
this->pear_coefs_ = sample_param_["pear_coefs"];
logger_->Debug() << rule_id_ << ",orders_:" << orders_
<< ",pear_coefs_:" << pear_coefs_ << std::endl;
return 0;
} catch (const std::exception &e) {
logger_->Error() << rule_id_ << "," << e.what()
<< ",location:" << BOOST_CURRENT_LOCATION << endl;
return -1;
}
}
return 0;
} else {
logger_->Error() << "无样本,请检查数据库连接" << std::endl;
return -1;
}
}
int ExpSample2D::insert_mag(std::string sample_id) {
T_SAMPLE_MAG tsm;
auto inset_ret = exec<db2_t, size_t>(insert_into(tsm).set(
tsm.ruleId() = this->rule_id_, tsm.result() = this->sample_param_.dump(),
tsm.type() = SampleType_, tsm.sampleid() = sample_id, tsm.usable() = 0,
tsm.starttime() = mix_cc::mix_time_t(this->query_time_range_.get_left()),
tsm.endtime() = mix_cc::mix_time_t(query_time_range_.get_right())));
if (inset_ret.is_nothing()) {
std::string error_msg = "ruleid size:" + std::to_string(rule_id_.size()) +
",result:" + this->sample_param_.dump() +
",type:" + SampleType_ +
"smpleid size:" + std::to_string(sample_id.size());
gb_logger_->log_info("T_SAMPLE_MAG 表插入失败:" + rule_name_ + "," +
error_msg);
return -1;
}
return 0;
}
std::vector<AlarmInfo> ExpSample2D::exec_task(mix_cc::time_range_t time_range) {
/**
* 1.,SampleX_SampleY_
* 2.
* 3. db2 T_SAMPLE_FIT表
* 4. db2 T_SAMPLE_MAG
*/
bool cal_vlid = true;
logger_->Debug()
<< "stime:"
<< mix_cc::mix_time_t(time_range.get_left()).to_formatted_time()
<< ",endtime:"
<< mix_cc::mix_time_t(time_range.get_right()).to_formatted_time() << endl;
std::string sample_id = this->get_id(time_range);
this->sample_id_ = sample_id;
reset_SampleXY();
if ((time_range.get_right() - time_range.get_left()) <
this->query_interval_time_) {
this->query_time_range_ = time_range;
task_mon_pro();
} else {
for (auto t = time_range.get_left(); t < time_range.get_right();
t += query_interval_time_) {
this->query_time_range_.set_left(t);
if (time_range.get_right() - t > query_interval_time_) {
this->query_time_range_.set_right(t + query_interval_time_);
} else {
this->query_time_range_.set_right(time_range.get_right());
}
task_mon_pro();
}
}
if (data_len_ > 100) {
sample2D sample2d;
if (lsm_ptr_ == nullptr) {
for (int i = 0; i < data_len_; i++) {
logger_->Debug() << "[" << this->SampleX_[i] << "," << this->SampleY_[i]
<< "]" << endl;
}
lsm_ptr_ = std::make_unique<DAA::LSM>(this->SampleX_, this->SampleY_);
} else {
lsm_ptr_->reset(this->SampleX_, this->SampleY_);
}
try {
sample2d.fit_coefs = lsm_ptr_->polyfit();
} catch (const std::exception &e) {
this->sample_result_ = "polyfit ERROR!";
this->gb_logger_->log_error(rule_name_ + "exec_task:" + e.what());
cal_vlid = false;
this->update_t_sample_mag(cal_vlid);
return {};
}
sample2d.scores = lsm_ptr_->get_r2a();
sample2d.pear_coefs = lsm_ptr_->cor();
sample2d.orders = lsm_ptr_->get_order_best();
sample2d.method = std::to_string(exp_type_);
sample_param_ = sample2d.invert2json();
/* 插入样本表 T_SAMPLE_FIT */
for (int i = 0; i < data_len_; i++) {
insert_fit(sample_id, SampleX_[i], SampleY_[i], i);
}
/* 插入样本管理表 T_SAMPLE_MAG */
this->query_time_range_ = time_range;
this->sample_result_ = this->sample_param_.dump();
} else {
this->gb_logger_->log_error(rule_name_ + "数据量不足,不计算!");
cal_vlid = false;
}
this->update_t_sample_mag(cal_vlid);
this->alarm_poster_.zmqp_send(912, this->sample_result_);
return {};
}
void ExpSample2D::reset_SampleXY() {
SampleX_.clear();
SampleY_.clear();
SampleX_.shrink_to_fit();
SampleY_.shrink_to_fit();
data_len_ = 0;
}
void ExpSample2D::task_mon_pro() {
if (data_len_ > MAX_STORAGE_SIZE) {
return;
}
query_ihd_data();
logger_->Debug() << rule_name_ << ",data_len_:" << data_len_
<< ",ihd size:" << queried_data_.size() << endl;
for (auto i = 0; i < queried_data_.rows(); i++) {
refresh_exp_vars_ihd(i);
if (data_len_ < MAX_STORAGE_SIZE) {
if (expr_engine_->evaluateBool("act")) {
SampleX_.push_back(expr_engine_->evaluate("sample_X"));
SampleY_.push_back(expr_engine_->evaluate("sample_Y"));
data_len_ = SampleX_.size();
}
} else {
break;
}
}
}
int ExpSample2D::insert_fit(std::string sample_id, double X, double Y,
int seq) {
T_SAMPLE_FIT tsf;
auto inset_ret = exec<db2_t, size_t>(insert_into(tsf).set(
tsf.sampleid() = sample_id, tsf.X() = X, tsf.Y() = Y, tsf.Seq() = seq));
if (inset_ret.is_nothing()) {
gb_logger_->log_info("T_SAMPLE_FIT 表插入失败:" + rule_name_ +
",simpleid size:" + std::to_string(sample_id.size()));
return -1;
}
return 0;
}