eis/eqpalg/algs/roller.cpp

225 lines
7.8 KiB
C++
Raw Permalink Normal View History

#include <base/BitTool.h>
#include <eqpalg/algs/roller.h>
#include <eqpalg/feature_extraction/daa.h>
#include <eqpalg/utility/build_alarm_info.h>
#include <algorithm>
#include <limits>
#include <string>
#include <utility>
#include <vector>
#include "mix_cc/ihyper_db.h"
int Roller::init() {
int res = 0;
try {
res += AlgBase::init();
this->n_tags_ = m_tags.size();
if (this->delay_time_ > 3min) {
this->interval_time_ = this->delay_time_;
} else {
this->interval_time_ = 3min;
}
this->data_source_ = DataSource::IHDB;
this->error_name_ =
rule_json_.at("output").at("reason").at("value").get<std::string>();
this->error_diff_ = std::stod(rule_json_.at("function")
.at("result")
.at("param")
.at("limit_over")
.at("value")
.get<std::string>());
this->error_content_ =
rule_json_.at("output").at("error").at("value").get<std::string>();
print_load_content();
} catch (const std::exception& e) {
std::throw_with_nested(
mix_cc::Exception(-1, "load error", BOOST_CURRENT_LOCATION));
}
return res;
}
Roller::Roller(const string name, const mix_cc::json& rule_json,
const string ruleId)
: AlgBase(name, rule_json, ruleId) {
logger_.reset(new LOG("Roller:" + rule_name_, AUTO_CATCH_PID));
}
Roller::~Roller() {
}
AlarmInfo Roller::exec_mon() {
AlarmInfo now_alarm{};
this->refresh_now_time();
this->query_time_range_.set_left(this->now_time_ - this->interval_time_);
this->query_time_range_.set_right(this->now_time_);
auto ihd_result = this->refresh_mmean_tags_ihd();
if (ihd_result == -1) {
return now_alarm;
}
while (1) {
auto result_base = exec_mon_base();
if (result_base == 0 || result_base == -1) {
break;
}
}
if (this->tags_errors_.size() != 0) {
std::string msg = {};
for (auto iter = this->tags_errors_.begin();
iter != this->tags_errors_.end(); iter++) {
msg = msg + " tag" + std::to_string(iter->first + 1) + "" +
DAA::double2str(this->mmean_tags_[iter->first]) + "," +
BIG_SMALL[iter->second];
}
std::string error_info = this->error_name_ + this->error_content_ + msg;
this->logger_->Debug() << "报警!" << this->rule_name_ << error_info
<< std::endl;
now_alarm =
utility::build_alarm_info(MsgLevel::ERROR, rule_id_, rule_name_,
"Roller", error_info, query_time_range_);
} else {
this->rule_stat_.limit_down = 0;
this->rule_stat_.limit_up = 0;
this->rule_stat_.current_value = 0;
}
this->logger_->Debug()
<< this->rule_name_ << "执行完成 时间段"
<< mix_cc::mix_time_t(this->query_time_range_.get_left())
.to_formatted_time()
<< "~"
<< mix_cc::mix_time_t(this->query_time_range_.get_right())
.to_formatted_time()
<< std::endl;
logger_->Debug() << "区间:[" << this->rule_stat_.limit_down << ","
<< this->rule_stat_.limit_up
<< "],当前值:" << this->rule_stat_.current_value << endl;
return now_alarm;
}
int Roller::exec_mon_base() {
int flag = 0;
int n_tags = this->comparison_tags_.size();
//计算均值
int result = minmaxsum_get(this->comparison_tags_);
double sum_mmean = this->min_max_sum_[2];
double m_ave = sum_mmean / n_tags;
if (n_tags > 2) {
m_ave = (sum_mmean - this->min_max_sum_[0] - this->min_max_sum_[1]) /
(n_tags - 2);
}
//寻找异常
this->logger_->Debug() << "result=" << result << ",n_tags=" << n_tags
<< std::endl;
if (m_ave < std::numeric_limits<double>::epsilon() || result == -1) {
return -1;
} else {
double diff_max = std::max(fabs(this->min_max_sum_[1] - m_ave),
fabs(this->min_max_sum_[0] - m_ave));
for (auto iter = this->comparison_tags_.begin();
iter != this->comparison_tags_.end();) {
auto value_avg_diff = fabs(this->mmean_tags_[*iter] - m_ave);
if (fabs(diff_max - value_avg_diff) <
std::numeric_limits<double>::epsilon() &&
flag == 0 && fabs(100 * value_avg_diff / m_ave) > this->error_diff_) {
flag++;
this->rule_stat_.limit_down =
fabs(m_ave) * (1 - this->error_diff_ / 100.0);
this->rule_stat_.limit_up =
fabs(m_ave) * (1 + this->error_diff_ / 100.0);
this->rule_stat_.current_value = this->mmean_tags_[*iter];
this->rule_stat_.alarm_value = this->rule_stat_.current_value;
logger_->Debug() << rule_name_ << ",diff_max:" << diff_max
<< ",value_avg_diff:" << value_avg_diff
<< ",error_diff_:" << error_diff_ << ",m_ave:" << m_ave
<< ",min:" << this->min_max_sum_[0]
<< ",max:" << this->min_max_sum_[1]
<< ",sum:" << this->min_max_sum_[2]
<< ",n_tag2:" << n_tags << std::endl;
if (this->mmean_tags_[*iter] > m_ave) {
tags_errors_[*iter] = 0;
} else {
tags_errors_[*iter] = 1;
}
iter = this->comparison_tags_.erase(iter);
this->logger_->Debug() << "flag=" << flag << std::endl;
} else {
iter++;
}
}
return flag;
}
}
/**
* @brief tag值的 min max sum
* @param comparison_tags My Param doc
* @return int
*/
int Roller::minmaxsum_get(std::vector<int> comparison_tags) {
if (comparison_tags.size() < 3) {
return -1;
}
this->min_max_sum_ = {this->mmean_tags_[comparison_tags[0]],
this->mmean_tags_[comparison_tags[0]], 0};
for (int i = 0; i < comparison_tags.size(); i++) {
this->min_max_sum_[0] =
std::min(this->min_max_sum_[0], this->mmean_tags_[comparison_tags[i]]);
this->min_max_sum_[1] =
std::max(this->min_max_sum_[1], this->mmean_tags_[comparison_tags[i]]);
this->min_max_sum_[2] += this->mmean_tags_[comparison_tags[i]];
}
return 0;
}
std::vector<AlarmInfo> Roller::exec_task(mix_cc::time_range_t time_range) {
std::vector<AlarmInfo> result;
for (auto now_time = time_range.get_left();
now_time <= time_range.get_right(); now_time += delay_time_) {
this->now_time_ = now_time;
auto rr1 = this->exec_mon();
result.push_back(rr1);
}
return result;
}
/**
* @brief
* map和vector1.mmean_tags_
* 2.comparison_tags_ tagmap的key
* 3. tags_errors_ = {}; tag及对应的异常结果map的key0-1
* @return int
*/
int Roller::refresh_mmean_tags_ihd() {
this->comparison_tags_ = {};
this->tags_errors_ = {};
for (unsigned int i = 0; i < this->n_tags_; i++) {
auto ave_maybe = mix_cc::ihd::read_data_stats_maybe(
m_tags[i], this->query_time_range_, HD3_STATS_TYPE_ARITH_MEAN);
if (ave_maybe.is_nothing()) {
this->logger_->Debug() << this->rule_name_ << ":ihd查询失败" << std::endl;
return -1;
}
this->mmean_tags_[i] = ave_maybe.unsafe_get_just();
this->comparison_tags_.push_back(i);
}
return 0;
}
void Roller::print_load_content() {
this->logger_->Debug()
<< "执行周期:"
<< std::to_string(
std::chrono::duration_cast<std::chrono::seconds>(this->delay_time_)
.count())
<< "s,均值查询时间段:"
<< std::to_string(std::chrono::duration_cast<std::chrono::seconds>(
this->interval_time_)
.count())
<< "s,报警名称:" << this->error_name_
<< ",阈值:" << std::to_string(this->error_diff_)
<< ",报警内容:" << this->error_content_ << std::endl;
}