eis/eqpalg/algorithm_manager.cpp
Huamonarch 224c2c45c4 Remove irrelevant comments from eqpalg source files
Cleaned 66 files across all eqpalg subdirectories:
- Removed commented-out dead code
- Removed redundant Chinese inline comments that restate variable/function names
- Removed trailing ///< annotations on self-explanatory fields
- Removed namespace closing comments
- Preserved all file headers, Doxygen documentation, and logic explanations
- No code changes — only comment removal
2026-05-09 13:30:09 +08:00

355 lines
13 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <eqpalg/algorithm_manager.h>
#include <eqpalg/table_struct/t_rule_cfg.h>
#include <eqpalg/table_struct/t_rule_record_time.h>
#include <mix_cc/sql.h>
#include <mix_cc/sql/database/db2_t.h>
#include <unordered_map>
#include <utility>
#include <vector>
extern ProcessType glob_process_type;
using namespace mix_cc::sql;
/**
* @brief Construct a new Algorithm Manager:: Algorithm Manager object
* eqpalg程序的入口1.将db2中的算法配置t_rule_cfg读入stored_cfg_data_2.调用线程管理的start
* 3.stored_cfg_data_只从db2取一次后续模型修改需要自己维护
*/
AlgorithmManager::AlgorithmManager() {
logger_ = std::make_unique<LOG>("AlgorithmManager");
try {
// refresh data first in order to prevent value error
if (glob_process_type == ProcessType::kMon) {
SingletonTemplate<GlobaltemSharedMemory>::GetInstance().cache_data();
logger_->Debug()
<< "GlobaltemSharedMemory::instanceCount:"
<< SingletonTemplate<GlobaltemSharedMemory>::GetInstance()
.get_instanceCount()
<< ", SingletonTemplate<Item2Chines>::GetInstance()[test]:"
<< SingletonTemplate<Item2Chines>::GetInstance()("test") << std::endl;
}
std::vector<std::string> one_exec_rule_id;
// 载入T_RULE_CFG表从其中读取算法
T_RULE_CFG t_rule_cfg;
auto sql_statement = select(t_rule_cfg.algId(), t_rule_cfg.ruleId(),
t_rule_cfg.ruleName(), t_rule_cfg.flag(),
t_rule_cfg.ruleParam(), t_rule_cfg.paddingUp(),
t_rule_cfg.paddingDown(), t_rule_cfg.TaskSeq())
.from(t_rule_cfg);
auto rule_list_maybe = mix_cc::sql::exec<db2_t, T_RULE_CFG>(sql_statement);
if (rule_list_maybe.is_just()) {
auto rule_list = rule_list_maybe.unsafe_get_just();
logger_->Info() << "rules in DB:" << rule_list.size() << endl;
for (auto x : rule_list) {
logger_->Debug() << "ID:" << x.ruleId << " AlgID:" << x.algId
<< " Name:" << x.ruleName << " Flag:" << x.flag
<< endl;
auto param_info = mix_cc::json::parse(x.ruleParam);
// 将算法参数载入 threads::manager 的 stored_cfg_data_std::map
thread_manager_.storage(x.ruleId, x.algId, x.ruleName, param_info,
x.flag, x.paddingDown, x.paddingUp, x.TaskSeq);
if (glob_process_type == ProcessType::kMon) {
this->update_rule_start_time(x.ruleId);
logger_->Info() << x.ruleName << "update rule start time done"
<< endl;
}
}
logger_->Debug() << "Done" << std::endl;
} else {
logger_->Error() << "无有效算法载入,请检查数据库连接" << std::endl;
}
thread_manager_.start();
} catch (const std::exception &e) {
logger_->Error() << mix_cc::get_nested_exception(e) << std::endl;
}
}
AlgorithmManager::~AlgorithmManager() {}
void AlgorithmManager::dispose(int event_no, const ::Ice::ByteSeq &seq) {
try {
// vector<u char> -> str
string str(seq.begin(), seq.end());
// str -> json
mix_cc::json json_value = mix_cc::json::parse(str);
std::ostringstream oss;
logger_->Info() << json_value << endl;
// 解析JSON信息
int event_no = -1, algId = 0, usable = 1;
string ruleId, rule_name;
mix_cc::json param_info;
TimePoint time_start, time_end;
event_no = json_value.at("eventNo").get<int64_t>();
if (event_no == -1) {
logger_->Debug() << "eqpalg 重启" << endl;
sleep(1);
exit(0);
}
ruleId = json_value.at("ruleId").get<std::string>();
int task_seq = 0;
if (event_no == EventCase::kExec) {
time_start =
TimePoint(milliseconds(json_value.at("startTime").get<int64_t>()));
time_end =
TimePoint(milliseconds(json_value.at("endTime").get<int64_t>()));
logger_->Info() << "exec option" << endl;
}
if (json_value.contains("algId")) {
algId = json_value.at("algId").get<int64_t>();
}
if (json_value.contains("taskSeq")) {
task_seq = json_value.at("taskSeq").get<int64_t>();
}
if (event_no == EventCase::kCreate || event_no == EventCase::kUpdate) {
rule_name = json_value.at("ruleName").get<std::string>();
param_info = json_value.at("ruleParam");
}
if (event_no != EventCase::kReset && event_no != EventCase::kDelete &&
event_no != EventCase::kExec) {
usable = json_value.at("usable").get<int64_t>();
}
// json 解析完成,开始调用不同的任务
logger_->Info() << "read done, enter sw" << endl;
switch (event_no) {
/*-------1---*/
case EventCase::kCreate: {
thread_manager_.storage(ruleId, algId, rule_name, param_info, usable, 0,
0, task_seq);
thread_manager_.attach(ruleId);
logger_->Info() << "create new rule instance done" << endl;
if (ProcessType::kMon == glob_process_type) {
if (usable) {
this->update_rule_start_time(ruleId);
} else {
this->update_rule_modify_time(ruleId);
}
logger_->Info() << rule_name << "update rule modify time done" << endl;
}
break;
}
/*-------0---*/
case EventCase::kDelete: {
// 删除实例的操作
thread_manager_.delete_instance(ruleId);
if (ProcessType::kMon == glob_process_type) {
// 删除trrt表中记录
delete_trrt_record(ruleId);
}
break;
}
/*-------4---*/
case EventCase::kReset: {
//重置操作
if (ProcessType::kMon == glob_process_type) {
thread_manager_.reset(ruleId);
}
break;
}
/*-------3---*/
case EventCase::kEnable: {
// 启用实例的操作
if (ProcessType::kMon == glob_process_type) {
thread_manager_.enable(ruleId, usable);
if (usable) {
this->update_rule_start_time(ruleId);
logger_->Info() << rule_name << "update rule start time done" << endl;
}
}
break;
}
/*-------2---*/
case EventCase::kUpdate: {
// 更新实例的操作
thread_manager_.detach(ruleId);
thread_manager_.storage(ruleId, algId, rule_name, param_info, usable, 0,
0, task_seq);
thread_manager_.attach(ruleId);
if (ProcessType::kMon == glob_process_type) {
this->update_rule_modify_time(ruleId);
logger_->Info() << rule_name << "update rule modify time done" << endl;
this->update_rule_start_time(ruleId);
logger_->Info() << rule_name << "update rule start time done" << endl;
}
logger_->Info() << "update done" << endl;
break;
}
/*-------10---*/
case EventCase::kExec: {
// 执行实例,只有单次执行进程才拥有该操作
//先更新
T_RULE_CFG t_rule_cfg;
auto sql_statement =
select(t_rule_cfg.algId(), t_rule_cfg.ruleId(), t_rule_cfg.ruleName(),
t_rule_cfg.flag(), t_rule_cfg.ruleParam(),
t_rule_cfg.paddingUp(), t_rule_cfg.paddingDown(),
t_rule_cfg.TaskSeq())
.from(t_rule_cfg)
.where(t_rule_cfg.ruleId() == ruleId);
auto rule_maybe = mix_cc::sql::exec<db2_t, T_RULE_CFG>(sql_statement);
if (rule_maybe.is_just()) {
auto rule_list = rule_maybe.unsafe_get_just();
logger_->Info() << "rules in DB:" << rule_list.size() << endl;
for (auto x : rule_list) {
logger_->Debug() << "ID:" << x.ruleId << " AlgID:" << x.algId
<< " Name:" << x.ruleName << " Flag:" << x.flag
<< endl;
auto param_info = mix_cc::json::parse(x.ruleParam);
thread_manager_.storage(x.ruleId, x.algId, x.ruleName, param_info,
x.flag, x.paddingDown, x.paddingUp,
x.TaskSeq);
}
logger_->Debug() << "Done" << std::endl;
} else {
logger_->Error() << "无有效算法载入,请检查数据库连接" << std::endl;
}
thread_manager_.exec_task(ruleId, time_start, time_end);
} break;
default: {
logger_->Info() << "event_no error" << endl;
break;
}
}
} catch (const std::exception &e) {
logger_->Error() << mix_cc::get_nested_exception(e) << std::endl;
}
}
int AlgorithmManager::cache_data() {
return SingletonTemplate<GlobaltemSharedMemory>::GetInstance().cache_data();
}
int AlgorithmManager::update_rule_start_time(std::string ruleId) {
if (glob_process_type != ProcessType::kMon) {
return 0;
}
T_RULE_RECORD_TIME trrt;
// 1.检查T_RULE_RECORD_TIME
// 中是否有该ruleid的记录没有则插入并将rule_tom字段插入cfg的toc
auto info_maybe = exec<db2_t, T_RULE_RECORD_TIME>(
select(trrt.ruleId()).from(trrt).where(trrt.ruleId() == ruleId));
if (info_maybe.is_just()) {
auto &info = info_maybe.unsafe_get_just();
if (info.empty()) // trrt中没有该ruleid的记录
{
// 2.插入toc 插入tos为now time
auto time_insert_ret = exec<db2_t, size_t>(insert_into(trrt).set(
trrt.ruleId() = ruleId,
trrt.rule_tom() = mix_cc::mix_time_t(system_clock::now()),
trrt.tos() = mix_cc::mix_time_t(system_clock::now())));
if (time_insert_ret.is_nothing()) {
logger_->Error() << "模型修改时间插入异常:" << ruleId << std::endl;
return -1;
}
if (time_insert_ret.unsafe_get_just() == 0) {
logger_->Error() << "插入数据量异常:" << ruleId << std::endl;
return -2;
}
} else // trrt中已有该ruleid
{
// 2.更新tos字段
auto sql_tmp =
update(trrt)
.set(trrt.tos() = mix_cc::mix_time_t(system_clock::now()))
.where(trrt.ruleId() == ruleId);
auto results = exec<db2_t, size_t>(sql_tmp);
if (results.is_nothing()) {
return -1;
logger_->Error() << ruleId << ":update 规则启用时间失败" << endl;
}
}
}
return 1;
}
int AlgorithmManager::update_rule_modify_time(std::string ruleId) {
if (glob_process_type != ProcessType::kMon) {
return 0;
}
fplus::maybe<std::size_t> time_update_ret =
mix_cc::fp::nothing<std::size_t>();
fplus::maybe<std::size_t> time_insert_ret =
mix_cc::fp::nothing<std::size_t>();
T_RULE_RECORD_TIME trrt;
// 查当前ruleid 是否已经存在
auto info_maybe = exec<db2_t, T_RULE_RECORD_TIME>(
select(trrt.ruleId()).from(trrt).where(trrt.ruleId() == ruleId));
if (info_maybe.is_just()) {
auto &info = info_maybe.unsafe_get_just();
if (!info.empty()) {
// 更新
time_update_ret = exec<db2_t, size_t>(
update(trrt)
.set(trrt.rule_tom() = mix_cc::mix_time_t(system_clock::now()))
.where(trrt.ruleId() == ruleId));
if (time_update_ret.is_nothing()) {
logger_->Error() << ruleId << ",模型修改时间更新异常" << std::endl;
return -1;
}
} else {
// 如果没有当前 ruleid就插入
time_insert_ret = exec<db2_t, size_t>(insert_into(trrt).set(
trrt.ruleId() = ruleId,
trrt.rule_tom() = mix_cc::mix_time_t(system_clock::now())));
if (time_insert_ret.is_nothing()) {
logger_->Error() << ruleId << ",模型修改时间插入异常" << std::endl;
return -1;
}
if (time_insert_ret.unsafe_get_just() == 0) {
logger_->Error() << ruleId << ",插入数据量异常" << std::endl;
return -2;
}
}
} else {
return -1;
}
return 1;
}
int AlgorithmManager::delete_trrt_record(std::string ruleId) {
if (glob_process_type != ProcessType::kMon) {
return 0;
}
T_RULE_RECORD_TIME trrt;
auto delete_ret =
exec<db2_t, size_t>(delete_from(trrt).where(trrt.ruleId() == ruleId));
if (delete_ret.is_nothing()) {
logger_->Debug() << "删除失败:" << ruleId << std::endl;
return -1;
}
if (delete_ret.unsafe_get_just() != 1) {
logger_->Debug() << "删除数量不对:" << ruleId << std::endl;
return -2;
}
return 1;
}
void AlgorithmManager::rule_handelr(std::string data_info) {
try {
mix_cc::json json_value = mix_cc::json::parse(data_info);
std::string ruleid = json_value["ruleid"].get<std::string>();
double lb = json_value["lb"].get<double>();
double ub = json_value["ub"].get<double>();
double va = json_value["va"].get<double>();
int64_t stime = json_value["stime"].get<double>();
int64_t etime = json_value["etime"].get<double>();
this->thread_manager_.update_limit_alarm(ruleid, lb, ub, va, stime, etime);
} catch (const std::exception &e) {
logger_->Error() << e.what() << std::endl;
}
}
int AlgorithmManager::get_thread_size() {
return this->thread_manager_.get_thread_size();
}