#include #include #include #include #include #include DistInterval::~DistInterval() {} int DistInterval::init() { int ret = 0; act_started_ = false; try { Exp::init(); this->reload_config_vars(); sample_stat_ = std::make_unique( rule_id_, rule_name_, dims_, test_mode_, std::chrono::system_clock::now() - 1h, std::chrono::system_clock::now(), 0, 0, 0); // // 目前暂时使用小时作为归档的单位间隔 // sample_stat_->set_archive_interval(hours(archive_interval_day_)); this->sample_stat_->set_prob(this->judge_diff_); // this->sample_stat_->load_data(); this->last_select_db2_time_ = std::chrono::system_clock::now(); } catch (const std::exception& e) { logger_->Info() << "数据载入错误 :" << rule_name_ << endl; // 测试 std::throw_with_nested( mix_cc::Exception(-1, "数据载入发生异常", BOOST_CURRENT_LOCATION)); ret = -1; } return ret; } int DistInterval::reload_config_vars() { try { if (rule_json_.at("action_condition").contains("interval_source")) { this->interval_source_ = rule_json_.at("action_condition") .at("interval_source") .at(1) .get(); gb_logger_->log_info(rule_name_ + "interval_source_:" + std::to_string(this->interval_source_)); } if (rule_json_.at("action_condition").contains("interval_input")) { this->dist_interval_input_ = rule_json_.at("action_condition") .at("interval_input") .at(1) .get(); gb_logger_->log_info(rule_name_ + "interval_input:" + this->dist_interval_input_); } if (rule_json_.at("alarm_option").contains("value")) { auto tmp_exp = rule_json_.at("alarm_option").at("value").at(1).get(); exp_str_ = get_macro_replaced_exp(tmp_exp); if (exp_result_ == nullptr) { exp_result_ = std::make_unique(exp_str_, &mm_vars); logger_->Debug() << exp_str_ << ":" << exp_result_->evaluate() << endl; } else { logger_->Info() << "指针已经初始化完成" << exp_str_ << ":" << exp_result_->evaluate() << endl; } } if (rule_json_.at("alarm_option").contains("sample_is_infinite_mode")) { this->is_no_down_limit_ = static_cast(rule_json_.at("alarm_option") .at("sample_is_infinite_mode") .at(1) .get()); } test_mode_ = static_cast( rule_json_.at("alarm_option").at("mode").at(1).get()); judge_diff_ = rule_json_.at("alarm_option").at("diff").at(1).get(); error_str_ = rule_json_.at("alarm_option").at("error").at(1).get(); logger_->Debug() << "modejudge(0: absolute difference, 1: error percentage (%), 2: " "normal value signal (%),3. dtw curve distance(%), 4.poly fit(%)" << static_cast(test_mode_) << " samplediff:" << judge_diff_ << endl; } catch (std::exception& e) { logger_->Error() << "表达式-样本取样异常,表达式为: " << exp_str_ << std::endl; throw_with_nested( mix_cc::Exception(-1, "表达式样本载入错误", BOOST_CURRENT_LOCATION)); } return 0; } AlarmInfo DistInterval::exec_mon() { AlarmInfo out_alarm; // 刷新当前时间 this->refresh_now_time(); if (!is_to_detect_ || (std::chrono::system_clock::now() - this->last_select_db2_time_) > 5min) { is_to_detect_ = get_is_to_detect(); update_show_interval(); this->last_select_db2_time_ = std::chrono::system_clock::now(); } if (this->is_to_detect_) { switch (data_source_) { case DataSource::IHDB: case DataSource::MEMORY: { // 刷新内存对应的变量数据 refresh_exp_vars_mem(); query_time_range_.set_left(query_time_range_.get_right() - delay_time_); // 2021-10-27 刷新报警开始时间 auto o_value = this->cron_proc_sample(); if (o_value.has_value()) { if (this->is_no_down_limit_) { if (o_value.value()[0] > this->dist_right_) { // 报警 auto alarm_time_range = query_time_range_; alarm_time_range.set_left(alarm_time_range.get_right() - this->delay_time_); auto msg = rule_name_ + " " + error_str_ + " 当前值为" + std::to_string(o_value.value()[0]) + " 3σ区间为[" + std::to_string(this->dist_left_) + "," + std::to_string(this->dist_right_) + "]"; return utility::build_alarm_info(MsgLevel::ERROR, rule_id_, rule_name_, "DistInterval", msg, alarm_time_range); } } else { if (o_value.value()[0] < this->dist_left_ || o_value.value()[0] > this->dist_right_) { // 报警 auto alarm_time_range = query_time_range_; alarm_time_range.set_left(alarm_time_range.get_right() - this->delay_time_); auto msg = rule_name_ + " " + error_str_ + " 当前值为" + std::to_string(o_value.value()[0]) + " 3σ区间为[" + std::to_string(this->dist_left_) + "," + std::to_string(this->dist_right_) + "]"; return utility::build_alarm_info(MsgLevel::ERROR, rule_id_, rule_name_, "DistInterval", msg, alarm_time_range); } } } // if (o_value.has_value()) break; } default: break; } // switch (data_source_) { } // if (this->get_is_to_detect()) { return {}; } bool DistInterval::get_is_to_detect() { if (this->interval_source_) { T_RULE_DIST_INTERVAL trdi; // 查询 ruleid 是否存在 auto info_maybe = exec( select(trdi.dist_left(), trdi.dist_right()) .from(trdi) .where(trdi.ruleId() == this->rule_id_)); // 如果存在 if (info_maybe.is_just()) { auto& info = info_maybe.unsafe_get_just(); if (!info.empty()) { this->dist_left_ = info[0].dist_left; this->dist_right_ = info[0].dist_right; return true; } else { return false; } } else { gb_logger_->log_info("db2查询失败" + rule_name_); return false; } } else { auto lr = this->intervalToDouble(this->dist_interval_input_); if (lr.empty()) { gb_logger_->log_info("区间读取或转换错误" + rule_name_); return false; } else { this->dist_left_ = lr[0]; this->dist_right_ = lr[1]; return true; } } } int DistInterval::update_show_interval() { if (this->is_to_detect_) { std::string msg = "[" + std::to_string(this->dist_left_) + "," + std::to_string(this->dist_right_) + "]"; T_RULE_CFG trc; auto update_rusult = exec(update(trc) .set(trc.remark() = msg) .where(trc.ruleId() == this->rule_id_)); if (update_rusult.is_nothing()) { gb_logger_->log_info("区间显示 更新失败:" + rule_name_); return -1; } return 1; } else { return -1; } } AlarmInfo DistInterval::exec_normal_task(mix_cc::time_range_t time_range) { gb_logger_->log_info("exec_normal_task:" + rule_name_); gb_logger_->log_info( "开始时间:" + mix_cc::mix_time_t(time_range.get_left()).to_formatted_time()); gb_logger_->log_info( "结束时间:" + mix_cc::mix_time_t(time_range.get_right()).to_formatted_time()); SampleWindow tmp_data; //刷一遍mmvar TimeDur f_delay_time = 5 * delay_time_; auto time_end = time_range.get_right(); auto time_start = time_range.get_left(); query_interval_time_ = std::chrono::duration_cast( time_end - time_start); this->query_time_range_.set_left(time_start - 2 * f_delay_time); this->query_time_range_.set_right(time_start - f_delay_time); // 目的在于刷pv变量 query_interval_time_ start end this->now_time_ = time_start; refresh_ihd_cache(this->delay_time_); //刷 queried_time_,query_time_range_ { gb_logger_->log_info( "ihd查询开始时间1:" + mix_cc::mix_time_t(query_time_range_.get_left()).to_formatted_time()); gb_logger_->log_info( "ihd查询结束时间1:" + mix_cc::mix_time_t(query_time_range_.get_right()).to_formatted_time()); gb_logger_->log_info("ihd查询的时间间隔:" + std::to_string(delay_time_.count())); gb_logger_->log_info("ihd查询的数据量:" + std::to_string(queried_data_.rows())); } for (auto i = 0; i < queried_data_.rows(); i++) { refresh_exp_vars_ihd(i); } // 刷 mmvar this->now_time_ = time_end; // 防止出问题,没有实际用处 refresh_ihd_cache(this->delay_time_); // { // 日志 gb_logger_->log_info( "ihd查询开始时间2:" + mix_cc::mix_time_t(query_time_range_.get_left()).to_formatted_time()); gb_logger_->log_info( "ihd查询结束时间2:" + mix_cc::mix_time_t(query_time_range_.get_right()).to_formatted_time()); gb_logger_->log_info("ihd查询的时间间隔:" + std::to_string(delay_time_.count())); gb_logger_->log_info("ihd查询的数据量:" + std::to_string(queried_data_.rows())); } try { for (auto i = 0; i < queried_data_.rows(); i++) { refresh_exp_vars_ihd(i); // gb_logger_->log_info("开始运行:this->cron_proc_sample()"); auto o_value = this->cron_proc_sample(); ///< 数据筛选 // gb_logger_->log_info("运行结束:this->cron_proc_sample()"); if (o_value.has_value()) { tmp_data.push_back(o_value.value()); ///< 筛选后的数据 } } } catch (...) { gb_logger_->log_info(this->rule_name_ + ":cron_proc_sample()运行失败"); } // gb_logger_->log_info("运行结束:this->cron_proc_sample()"); // gb_logger_->log_info("this->test_mode_:"+to_string(static_cast(this->test_mode_) // )); StatAlarm alarm_info; try { // gb_logger_->log_info(" try {"); alarm_info = this->sample_stat_->get_task_normal_info(tmp_data); //保存至本地 方便分析 { //写文件 ofstream outFile; string dir = "/users/dsc/download_csv/"; // TimePoint now_time_to_write = std::chrono::system_clock::now(); outFile.open( dir + rule_name_ + "data.csv", ios::out); outFile << "time" << endl; int i = 0; while (i < tmp_data.size()) { outFile << tmp_data[i++][0] << endl; } } } catch (...) { logger_->Error() << "get_task_normal_info执行错误: " << this->rule_name_ << std::endl; } if (alarm_info) { gb_logger_->log_info("exec_normal_task:" + rule_name_ + alarm_info.alarm_str); auto dist_param = this->sample_stat_->get_dist_param(); if (dist_param[2][0] != 0) { // 方差不为零,存数据库 if (this->test_mode_ == stat_tools::TestMode::normal_dist_diff) { this->dist_left_ = dist_param[0][0] - 3 * dist_param[2][0]; this->dist_right_ = dist_param[0][0] + 3 * dist_param[2][0]; } else if (this->test_mode_ == stat_tools::TestMode::abs_diff) { this->dist_left_ = dist_param[0][0] - this->judge_diff_; this->dist_right_ = dist_param[0][0] + this->judge_diff_; } else if (this->test_mode_ == stat_tools::TestMode::percent_diff) { auto abs_m = std::abs(dist_param[0][0]); this->judge_diff_ = std::abs(this->judge_diff_); while (this->judge_diff_ > 1) { this->judge_diff_ * 0.1; } this->dist_left_ = dist_param[0][0] - abs_m * this->judge_diff_; this->dist_right_ = dist_param[0][0] + abs_m * this->judge_diff_; } // dist_param = {samples_mean, samples_variance, samples_stddev, // samples_max, // samples_min, samples_skewness, samples_kurtosis} T_RULE_DIST_INTERVAL trdi; // 更新表 fplus::maybe update_info_ret = mix_cc::fp::nothing(); // 插入 fplus::maybe approx_info_ret = mix_cc::fp::nothing(); // 查询 ruleid 是否存在 auto info_maybe = exec( select(trdi.train_end_time()) .from(trdi) .where(trdi.ruleId() == this->rule_id_)); if (info_maybe.is_just()) { auto& info = info_maybe.unsafe_get_just(); if (!info.empty()) { update_info_ret = exec( update(trdi) .set(trdi.dist_mean() = dist_param[0][0], trdi.update_time() = mix_cc::mix_time_t(system_clock::now()), trdi.train_start_time() = mix_cc::mix_time_t(time_range.get_left()), trdi.train_end_time() = mix_cc::mix_time_t(time_range.get_right()), trdi.dist_variance() = dist_param[1][0], trdi.dist_stddev() = dist_param[2][0], trdi.dist_skewness() = dist_param[5][0], trdi.dist_kurtosis() = dist_param[6][0], trdi.dist_left() = this->dist_left_, trdi.dist_right() = this->dist_right_) .where(trdi.ruleId() == this->rule_id_)); if (update_info_ret.is_nothing()) { gb_logger_->log_info("trdi表更新失败:" + rule_name_); DEBUG_PRINT_MIX_CC("统计特征表信息数据更新异常"); return {}; } // 不存在 insert } else { // 如果没有当前 ruleid的range,就插入 approx_info_ret = exec(insert_into(trdi).set( trdi.ruleId() = this->rule_id_, trdi.update_time() = mix_cc::mix_time_t(system_clock::now()), trdi.train_start_time() = mix_cc::mix_time_t(time_range.get_left()), trdi.train_end_time() = mix_cc::mix_time_t(time_range.get_right()), trdi.dist_mean() = dist_param[0][0], trdi.dist_variance() = dist_param[1][0], trdi.dist_stddev() = dist_param[2][0], trdi.dist_skewness() = dist_param[5][0], trdi.dist_kurtosis() = dist_param[6][0], trdi.dist_left() = this->dist_left_, trdi.dist_right() = this->dist_right_)); if (approx_info_ret.is_nothing()) { gb_logger_->log_info("trdi表插入失败:" + rule_name_); DEBUG_PRINT_MIX_CC("INFO信息数据插入异常"); return {}; } } } else { gb_logger_->log_info("trdi表查询失败:" + rule_name_); return {}; } gb_logger_->log_info("task" + rule_name_ + "准备创建info信息报警"); query_time_range_.set_left(std::chrono::system_clock::now()); query_time_range_.set_right(query_time_range_.get_left() + this->delay_time_); return utility::build_alarm_info(MsgLevel::INFO, rule_id_, rule_name_, "EXPSMP", alarm_info.alarm_str, query_time_range_); } else { query_time_range_.set_left(std::chrono::system_clock::now()); query_time_range_.set_right(query_time_range_.get_left() + this->delay_time_); return utility::build_alarm_info(MsgLevel::ERROR, rule_id_, rule_name_, "EXPSMP", alarm_info.alarm_str, query_time_range_); } } // info else { gb_logger_->log_info(" alarm_info.alarm_str获取失败:" + rule_name_); return {}; } } /** * @brief interval To double */ vector DistInterval::intervalToDouble(string& str) { if (*str.begin() == '[') { str.erase(str.begin()); } if (*(str.end() - 1) == ']') { str.erase(str.end() - 1); } for (size_t i = 0; i < str.size(); i++) { if (str[i] == ' ') { str.erase(str.begin() + i); } } string::iterator itr; for (itr = str.begin(); *itr != ','; itr++) { } string str1(str.begin(), itr); string str2(itr + 1, str.end()); istringstream iss1(str1); double num1; iss1 >> num1; istringstream iss2(str2); double num2; iss2 >> num2; return {num1, num2}; }