eis/eqpalg/alg_base.cpp

562 lines
20 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <base/TCMTime.h>
#include <eqpalg/alg_base.h>
#include <eqpalg/exp_macro/get_macro_replaced_exp.h>
#include <eqpalg/gb_item_memory.h>
#include <eqpalg/table_struct/t_rule_cfg.h>
#include <glob/SingletonTemplate.h>
#include <memory>
#include <mix_cc/exception.h>
#include <mix_cc/ihyper_db.h>
#include <mix_cc/sql.h>
#include <mix_cc/sql/database/db2_t.h>
#include <string>
#include <sys/types.h>
#include <zlib/MemVar.h>
extern ProcessType glob_process_type;
AlgBase::AlgBase(const string &name, const mix_cc::json &rule_json,
const string &ruleId)
: rule_id_(ruleId), rule_name_(name), rule_json_(rule_json),
gb_logger_(std::make_unique<GbLogger>("Logger:" + name)),
logger_(std::make_unique<LOG>("AlgBase", AUTO_CATCH_PID)),
// last_alarm_time_(system_clock::now()), ///< 屏蔽启动时的报警信息
last_alarm_time_(
system_clock::now() -
(minutes(std::max(5, CMemVar::Const()->AlarmIntervalHours * 60) - 1) +
50s)),
con_monitor_(1), delay_time_(50ms) {}
AlgBase::~AlgBase() {
// this->is_usable_ = false;
// SingletonTemp<EqpStat>::release();
}
void AlgBase::logReset(int task_seq) {
task_seq = task_seq;
logger_.reset(new LOG("task:" + rule_name_, AUTO_CATCH_PID));
}
int AlgBase::init() {
this->refresh_now_time();
last_save_start_time_ = this->now_time_;
last_rule_state_updae_start_time_ = this->now_time_;
last_heart_beat_start_time_ = this->now_time_;
// 使用修正后的XorShift128Plus
XorShift128Plus xsrng(rule_id_);
// 生成随机偏移
int r1 = xsrng.get_int_fast(0, 101);
int r2 = xsrng.get_int_fast(0, 1001);
int r3 = xsrng.get_int_fast(0, 11);
// 计算总偏移
auto offset_ms = std::chrono::milliseconds(r1 * 10 + r2 * 5);
auto offset_ms2 = std::chrono::milliseconds(r1 * 2 + r3 * 10);
// 更新保存间隔
save_interval_ms_ += offset_ms;
rule_state_update_interval_ms_ += offset_ms2;
logger_->Debug() << "ruleid:" << rule_id_
<< "save_interval_ms_:" << save_interval_ms_.count()
<< ";rule_state_update_interval_ms_:"
<< rule_state_update_interval_ms_.count() << std::endl;
/*初始化保存正常数据的数据--使规则开始运行,不保存----start*/
data_info_.update(mix_cc::mix_time_t(now_time_).to_milliseconds(),
mix_cc::mix_time_t(now_time_).to_milliseconds());
SingletonTemplate<DsmHandle>::GetInstance().insert(rule_id_, &data_info_);
/*初始化保存正常数据的数据--使规则开始运行,不保存----end*/
int res = 0;
is_usable_ = false;
if (rule_json_.contains("before_exec")) {
try {
this->prr_ = std::stoi(this->rule_json_.at("before_exec").get<string>());
} catch (const std::exception &e) {
logger_->Error() << e.what() << "location" << BOOST_CURRENT_LOCATION
<< endl;
this->prr_ = this->rule_json_.at("before_exec").get<int>();
}
logger_->Debug() << rule_name_ << "的执行前置条件[0-无1-表达式]:" << prr_
<< endl;
}
try {
res += this->reload_config_tag();
data_info_.init(m_tags);
// int ms = rule_json_.at("interval").at("time").at(1).get<int64_t>();
try {
int ms =
std::stoi(rule_json_.at("trigger").at("value").get<std::string>());
if (ms < 20) {
ms = 20;
}
this->delay_time_ = milliseconds(ms);
exp_mpdule_ptr_ =
std::make_unique<ExpModule>(mm_vars, m_tags, is_exp_alg_);
if (this->prr_ == 1) {
string exp_str = "";
if (rule_json_.at("function").contains("pre_result")) {
auto tmp_exp = rule_json_.at("function")
.at("pre_result")
.at("value")
.get<std::string>();
logger_->Info() << "tmp_exp:" << tmp_exp << std::endl;
exp_str = get_macro_replaced_exp(tmp_exp);
logger_->Info() << "exp_str:" << exp_str << std::endl;
exp_mpdule_ptr_->add_exp("pre_result", exp_str);
logger_->Info() << "pre_result:"
<< exp_mpdule_ptr_->get_exp_str("pre_result")
<< std::endl;
}
logger_->Debug() << "ruleid:" << this->rule_id_
<< ",rulename:" << this->rule_name_
<< ",执行前提表达式:" << exp_str << std::endl;
}
} catch (const std::exception &e) {
this->error_code_list_.push_back(
{ErrorType::Empty, ErrorLocation::ExeCyc});
logger_->Error() << e.what() << "location" << BOOST_CURRENT_LOCATION
<< endl;
res += -1;
}
last_run_start_time_ = system_clock::now() - this->delay_time_;
} catch (const std::exception &e) {
// std::throw_with_nested(
// mix_cc::Exception(-1, "init error", BOOST_CURRENT_LOCATION));
logger_->Error() << e.what() << "location" << BOOST_CURRENT_LOCATION
<< endl;
res += -1;
}
logger_->Debug() << "|" << rule_id_ << "|alg init 完成 90% "
<< ",last_alarm_time_"
<< mix_cc::mix_time_t(last_alarm_time_).to_formatted_time()
<< std::endl;
// is_usable_ = true;
this->rule_stat_.limit_down = 0;
this->rule_stat_.limit_up = 0;
this->rule_stat_.current_value = 0;
if (glob_process_type == ProcessType::kMon) {
this->update_map_rule(); ///<测试
}
logger_->Debug() << "|" << rule_id_ << "|alg init 完成! " << std::endl;
return res;
}
/**
* @brief 保证间隔delay_time_以上执行一次
* @return true
* @return false
*/
bool AlgBase::get_cycled() {
auto now = system_clock::now();
if ((now - last_run_start_time_) > this->delay_time_) {
last_run_start_time_ = now;
} else {
return false;
}
return true;
}
// 载入tag点配置信息
int AlgBase::reload_config_tag() {
const auto tag_dict = rule_json_["tags"];
m_tags.resize(tag_dict.size());
this->rule_stat_.items.resize(tag_dict.size());
for (auto x : tag_dict.items()) {
// debug
int item_index = stoi(x.key().substr(3));
logger_->Debug() << "item_index:" << item_index << ",x.key():" << x.key()
<< std::endl;
if (item_index > tag_dict.size()) {
logger_->Error() << "|tag序号异常|"
<< ",ruleid:" << rule_id_ << ",rule_name:" << rule_name_
<< ",tag_dict.size():" << tag_dict.size()
<< ",item_index:" << item_index << std::endl;
return -1;
}
auto itemi = x.value().at("value").get<std::string>();
logger_->Debug() << "tag" << item_index << ":" << itemi << std::endl;
/*有机组号前缀*/
m_tags[item_index - 1] = string(CMemVar::Const()->UnitNo) + "_" + itemi;
/*无机组号前缀*/
// m_tags[item_index - 1] = itemi;
this->rule_stat_.items[item_index - 1] = itemi; ///<提供给页面的
}
var_cache_.init(m_tags.size(), pv_num_);
if (m_tags.empty()) {
error_code_list_.push_back({ErrorType::Empty, ErrorLocation::Tags});
logger_->Debug() << "m_tags为空" << endl;
return -1;
}
return 0;
}
// 获取数据源
int AlgBase::get_data_source() { return this->data_source_; }
// 设置上次报警时间,这个时间是由执行进行类决定的,
// 因为最终报警是否被发送也是由该类决定的
void AlgBase::set_last_alarm_time(TimePoint time_point) {
this->last_alarm_time_ = time_point;
}
void AlgBase::exec_mon_call() {
if (this->get_cycled()) {
if (this->get_usable() && get_prr()) {
auto alarm = this->exec_mon();
this->con_monitor_.update(alarm.alarmed); ///<更新状态
if (alarm.alarmed) {
logger_->Debug() << alarm.content << endl;
alarm_poster_.alarm(alarm, &last_alarm_time_);
exp_mpdule_ptr_->fun_reset();
} else {
// auto runs_t1 = std::chrono::steady_clock::now();
if (get_save_data_cycled()) {
this->save_rule_norm_data();
}
// auto runs_t2 = std::chrono::steady_clock::now();
// int64_t costs_time =
// std::chrono::duration_cast<std::chrono::nanoseconds>(runs_t2 -
// runs_t1)
// .count();
// gb_logger_->log_info(rule_name_ + "|save_rule_norm_data,cost:" +
// std::to_string(costs_time));
}
} else {
this->con_monitor_.update(true); ///<更新状态,不满足前提条件,重置
}
if (get_update_rule_stat_cycled()) {
this->update_map_rule();
}
}
if (get_heart_beat_log_cycled()) {
logger_->Debug() << "heart_beat_log----active!" << std::endl;
}
}
void AlgBase::save_rule_norm_data() {
data_info_.update(mix_cc::mix_time_t(now_time_).to_milliseconds() - 1000 * 5,
mix_cc::mix_time_t(now_time_).to_milliseconds());
SingletonTemplate<DsmHandle>::GetInstance().insert(rule_id_, &data_info_);
}
void AlgBase::exec_task_call(const mix_cc::time_range_t &time_range) {
gb_logger_->log_info("task-开始执行:" + rule_name_);
this->task_time_range_ = time_range;
auto alarms = this->exec_task(time_range);
for (auto alarm : alarms) {
if (alarm.alarmed) {
alarm_poster_.alarm(alarm, "task");
}
}
gb_logger_->log_info("task-执行完成:" + rule_name_);
}
/**
* @brief 区别需要统计样本的模型
*/
void AlgBase::exec_normal_task_call(const mix_cc::time_range_t &time_range) {
gb_logger_->log_info("task-开始执行:" + rule_name_);
this->task_time_range_ = time_range;
auto alarm = this->exec_normal_task(time_range);
if (alarm.alarmed) {
this->set_last_alarm_time(time_range.get_left());
alarm_poster_.alarm(alarm, &last_alarm_time_);
}
gb_logger_->log_info("task-执行完成:" + rule_name_);
}
void AlgBase::exec_cron_call() { this->exec_cron(); }
// void AlgBase::exec_cron_call() {
// if (!this->is_cron_run) {
// this->exec_cron();
// this->is_cron_run = true;
// } else {
// this->logger_->Debug() << this->rule_name_ << ":" << this->rule_id_
// << "cron执行完成" << std::endl;
// }
// }
// 设置是否可用
void AlgBase::set_usable(bool usable) {
is_usable_ = usable;
logger_->Debug() << rule_name_ << " usable:" << usable << endl;
}
// 单次执行
mix_cc::json AlgBase::exec_cron() { return mix_cc::json{}; }
// 刷新ihyperDB缓存把ihyperDB中的数据转移到当前程序缓存中
// 注意该函数是先更改query_time_range_再查询ihd的数据
int AlgBase::refresh_ihd_cache() {
try {
// 如果数据时间包含空时间
if (query_time_range_.get_left() == TimePoint() ||
query_time_range_.get_right() == TimePoint()) {
// 则设置时间为当前时间前5秒
query_time_range_.set_left(now_time_ - 5s);
query_time_range_.set_right(now_time_);
} else {
// 否则把开始时间设置为上次查询的结束时间
query_time_range_.set_left(query_time_range_.get_right());
// 结束时间设置为当前时间
query_time_range_.set_right(now_time_);
}
// 进行查询
auto queried_batch_maybe = mix_cc::ihd::make_query_batch_maybe(
m_tags, query_time_range_, ihd_min_time_particles_);
if (queried_batch_maybe.is_nothing()) {
queried_time_.clear();
queried_data_ = typename decltype(queried_batch_maybe)::type::Mat2d{};
gb_logger_->log_error("Tag点没有查到相应的信息");
return -1;
}
auto queried_bath = queried_batch_maybe.unsafe_get_just();
auto result_maybe = mix_cc::ihd::read_data_with_time_maybe(&queried_bath);
if (result_maybe.is_nothing()) {
queried_time_.clear();
queried_data_ = typename decltype(queried_batch_maybe)::type::Mat2d{};
gb_logger_->log_error("Tag点没有查到相应的数据");
return -1;
}
std::tie(queried_time_, queried_data_) = result_maybe.unsafe_get_just();
} catch (const std::exception &e) {
logger_->Debug() << e.what() << ",locations:" << BOOST_CURRENT_LOCATION
<< endl;
gb_logger_->log_exception(e);
}
return 0;
}
// query_time_range_ 会被刷新 l0 = r1 ,r0 = now
int AlgBase::refresh_ihd_cache(TimeDur delay_time) {
try {
// 如果数据时间包含空时间
if (query_time_range_.get_left() == TimePoint() ||
query_time_range_.get_right() == TimePoint()) {
query_time_range_.set_left(now_time_ - 10 * delay_time);
query_time_range_.set_right(now_time_);
} else {
// 否则把开始时间设置为上次查询的结束时间
query_time_range_.set_left(query_time_range_.get_right());
// 结束时间设置为当前时间
query_time_range_.set_right(now_time_);
}
{
gb_logger_->log_info(
"ihd函数内查询开始时间:" +
mix_cc::mix_time_t(query_time_range_.get_left()).to_formatted_time());
gb_logger_->log_info("ihd函数内查询结束时间:" +
mix_cc::mix_time_t(query_time_range_.get_right())
.to_formatted_time());
}
// 进行查询
auto queried_batch_maybe = mix_cc::ihd::make_query_batch_maybe(
m_tags, query_time_range_, delay_time);
if (queried_batch_maybe.is_nothing()) {
queried_time_.clear();
queried_data_ = typename decltype(queried_batch_maybe)::type::Mat2d{};
gb_logger_->log_error("Tag点没有查到相应的信息");
return -1;
}
auto queried_bath = queried_batch_maybe.unsafe_get_just();
auto result_maybe = mix_cc::ihd::read_data_with_time_maybe(&queried_bath);
if (result_maybe.is_nothing()) {
queried_time_.clear();
queried_data_ = typename decltype(queried_batch_maybe)::type::Mat2d{};
gb_logger_->log_error("Tag点没有查到相应的数据");
return -1;
}
std::tie(queried_time_, queried_data_) = result_maybe.unsafe_get_just();
} catch (const std::exception &e) {
gb_logger_->log_exception(e);
}
return 0;
}
int AlgBase::refresh_now_time() {
if (data_source_ == DataSource::IHDB) {
// 因为ihdb存储的耗时比较长直接查询当前的时间数据
// 可能不存在对应的存储的数据
// 所以需要把数据向前找5s
now_time_ = system_clock::now() - 5s;
} else {
now_time_ = system_clock::now();
}
return 0;
}
bool AlgBase::update_map_rule() {
try {
std::lock_guard<std::mutex> guard(lm);
return SingletonTemp<EqpStat>::GetInstance().update_dynamic(
this->rule_id_, this->rule_stat_);
} catch (...) {
gb_logger_->log_error(this->rule_name_ + "update_map_rule()");
return false;
}
}
bool AlgBase::get_prr() {
if (this->prr_ == 1) {
exp_mpdule_ptr_->update();
bool prr_result = (bool)exp_mpdule_ptr_->get_value("pre_result");
this->now_prr_ = prr_result;
// logger_->Debug() << "ruleid:" << this->rule_id_
// << ",rulename:" << this->rule_name_
// << " get_prr():" << prr_result << std::endl;
return prr_result;
}
now_prr_ = true;
return true;
}
int AlgBase::ihd_get_by_tag(string tag, mix_cc::time_range_t time_range) {
// 进行查询
std::vector<std::string> tags;
tags.push_back(tag);
try {
auto queried_batch_maybe = mix_cc::ihd::make_query_batch_maybe(
tags, time_range, ihd_min_time_particles_);
if (queried_batch_maybe.is_nothing()) {
queried_time_.clear();
queried_data_ = typename decltype(queried_batch_maybe)::type::Mat2d{};
gb_logger_->log_error("Tag点没有查到相应的信息");
return -1;
}
auto queried_bath = queried_batch_maybe.unsafe_get_just();
auto result_maybe = mix_cc::ihd::read_data_with_time_maybe(&queried_bath);
if (result_maybe.is_nothing()) {
queried_time_.clear();
queried_data_ = typename decltype(queried_batch_maybe)::type::Mat2d{};
gb_logger_->log_error("Tag点没有查到相应的数据");
return -1;
}
std::tie(queried_time_, queried_data_) = result_maybe.unsafe_get_just();
if (queried_data_.rows() == 0) {
gb_logger_->log_error("Tag点没有查到相应的数据");
return -1;
}
}
catch (const std::exception &e) {
gb_logger_->log_exception(e);
return -1;
}
return 0;
}
int AlgBase::refresh_ihd_cache(mix_cc::time_range_t time_range) {
try {
auto queried_batch_maybe = mix_cc::ihd::make_query_batch_maybe(
m_tags, time_range, ihd_min_time_particles_);
if (queried_batch_maybe.is_nothing()) {
queried_time_.clear();
queried_data_ = typename decltype(queried_batch_maybe)::type::Mat2d{};
gb_logger_->log_error("Tag点没有查到相应的信息");
return -1;
}
auto queried_bath = queried_batch_maybe.unsafe_get_just();
auto result_maybe = mix_cc::ihd::read_data_with_time_maybe(&queried_bath);
if (result_maybe.is_nothing()) {
queried_time_.clear();
queried_data_ = typename decltype(queried_batch_maybe)::type::Mat2d{};
gb_logger_->log_error("Tag点没有查到相应的数据");
return -1;
}
std::tie(queried_time_, queried_data_) = result_maybe.unsafe_get_just();
if (queried_data_.rows() == 0) {
gb_logger_->log_error("Tag点没有查到相应的数据");
return -1;
}
}
catch (const std::exception &e) {
gb_logger_->log_exception(e);
return -1;
}
return 0;
}
void AlgBase::update_limit_alarm(double lb, double ub, double va, int64_t stime,
int64_t etime) {
try {
std::lock_guard<std::mutex> guard(lm);
rule_stat_.limit_down = lb;
rule_stat_.limit_up = ub;
rule_stat_.current_value = va;
if (rule_stat_.current_value == 0 || va <= ub) {
this->query_time_range_.set_range(
std::chrono::time_point<std::chrono::system_clock,
std::chrono::milliseconds>(
std::chrono::milliseconds(stime)),
std::chrono::time_point<std::chrono::system_clock,
std::chrono::milliseconds>(
std::chrono::milliseconds(etime)));
}
// logger_->Debug() << "update_limit_alarm|"
// << "lb:" << lb << ",ub:" << ub << ",va:" << va
// << ",stime:" << stime << ",etime:" << etime <<
// std::endl;
} catch (const std::exception &e) {
logger_->Error() << e.what() << std::endl;
}
}
mix_cc::time_range_t AlgBase::get_alarm_time() {
try {
if ((this->query_time_range_.get_right() -
this->query_time_range_.get_left()) < seconds(5)) {
return mix_cc::time_range_t(this->query_time_range_.get_right() -
seconds(5),
this->query_time_range_.get_right());
} else if (std::chrono::duration_cast<std::chrono::seconds>(
this->query_time_range_.get_left().time_since_epoch())
.count() == 0) {
return mix_cc::time_range_t(this->query_time_range_.get_right() -
seconds(5),
this->query_time_range_.get_right());
} else if (std::chrono::duration_cast<std::chrono::seconds>(
this->query_time_range_.get_left().time_since_epoch())
.count() > 300) {
return mix_cc::time_range_t(this->query_time_range_.get_right() -
seconds(300),
this->query_time_range_.get_right());
}
return this->query_time_range_;
} catch (const std::exception &e) {
logger_->Error() << e.what() << std::endl;
return mix_cc::time_range_t(this->now_time_ - seconds(5), this->now_time_);
}
}
bool AlgBase::get_save_data_cycled() {
auto now = system_clock::now();
if ((now - last_save_start_time_) > save_interval_ms_) {
last_save_start_time_ = now;
} else {
return false;
}
return true;
}
bool AlgBase::get_update_rule_stat_cycled() {
auto now = system_clock::now();
if ((now - last_rule_state_updae_start_time_) >
rule_state_update_interval_ms_) {
last_rule_state_updae_start_time_ = now;
} else {
return false;
}
return true;
}
bool AlgBase::get_heart_beat_log_cycled() {
auto now = system_clock::now();
if ((now - last_heart_beat_start_time_) > std::chrono::minutes(5)) {
last_heart_beat_start_time_ = now;
} else {
return false;
}
return true;
}