eis/eqpalg/eqpalg_icei.cpp
Huamonarch f80a917ab7 Async-ify ExpTimes DB persistence with global singleton worker thread
Add AsyncDbWorker: a persistent background thread with dedup queue that
executes DB2 writes asynchronously, keeping the mon 20ms cycle free of
blocking I/O.

Changes:
- async_db_worker.h/.cc: singleton worker, submit() with rule_id dedup,
  drain_and_stop() for clean shutdown
- eqp_stat.h/.cc: new update_static(ruleid, shear_times, running_time)
  overload that skips redundant DB reads for known values (reduces
  5 SELECTs to 3 per persist cycle)
- exp_times.cc: extract persist_exp_times() as a standalone function,
  update_history_times() snapshots values and submits to worker
  (returns immediately), reset_dev_data() uses direct SHM update
- eqpalg_icei.cpp: alg_mgr_.reset() → drain_and_stop() in destructor
  ensures all algorithm threads are stopped before draining the worker

Risk: re-run cmake .. to pick up the new async_db_worker.cc file.
2026-05-13 13:32:50 +08:00

176 lines
6.1 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <common/L2Event.h>
#include <ctime>
#include <dao/DbStandardDBAX.h>
#include <eqpalg/define/public.h>
#include <eqpalg/eqpalg_icei.h>
#include <eqpalg/utility/async_db_worker.h>
#include <eqpalg/utility/eqp_stat.h>
#include <log4cplus/LOG.h>
#include <shm/SingletonTemp.hpp>
#include <string>
#include <thread>
extern ProcessType glob_process_type;
#define CACHE_OUTTIME 19ms
// 自定义单调时钟,底层强制使用 CLOCK_MONOTONIC
struct MonotonicClock {
using duration = std::chrono::nanoseconds;
using rep = duration::rep;
using period = duration::period;
using time_point = std::chrono::time_point<MonotonicClock>;
// 告诉 std::chrono 这是一个单调时钟
static constexpr bool is_steady = true;
static time_point now() noexcept {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
auto d =
std::chrono::seconds(ts.tv_sec) + std::chrono::nanoseconds(ts.tv_nsec);
return time_point(std::chrono::duration_cast<duration>(d));
}
};
EqpAlgICEI::EqpAlgICEI() {
this->logger_ = std::make_unique<LOG>("eqpalg_icei");
if (glob_process_type == ProcessType::kMon) {
this->m_proxy2cron = ProxyMag::GetAppICEPrx("baosight/eqpalg-cron");
this->m_proxy2dsm = ProxyMag::GetAppICEPrx("baosight/dsm");
logger_->Debug() << "向其cron进程发送ice" << endl;
}
this->is_running_ = true;
alg_mgr_ = std::make_unique<AlgorithmManager>();
if (glob_process_type == ProcessType::kMon) {
up_date_data_ptr_ = std::make_unique<UpDateData>();
mem_cached_thread_ = std::make_unique<std::thread>([&]() {
// 使用单调时钟
auto next_wake_time = std::chrono::steady_clock::now();
while (is_running_) {
auto t1 = MonotonicClock::now();
const auto time_start = system_clock::now();
const auto time_starts = std::chrono::steady_clock::now();
auto res = this->alg_mgr_->cache_data();
if (res != 0) {
this->logger_->Debug() << "cache_data() return :" << res << endl;
}
auto t2 = MonotonicClock::now();
const auto time_end = system_clock::now();
const auto time_ends = std::chrono::steady_clock::now();
const auto time_cost = time_end - time_start;
const auto time_costs = time_ends - time_starts;
const auto elapsed_ms = t2 - t1;
if (time_costs >= CACHE_OUTTIME) {
this->logger_->Error()
<< "共享内存消耗时间超时(ms):"
<< duration_cast<milliseconds>(time_cost).count()
<< " ms,steady_clock:"
<< std::chrono::duration_cast<std::chrono::milliseconds>(
time_costs)
.count()
<< " ms"
<< ",elapsed_ms:"
<< std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count()
<< " ms" << std::endl;
// 立即开始下一次循环
next_wake_time = std::chrono::steady_clock::now();
} else {
// 计算下一次唤醒时间
next_wake_time += CACHE_OUTTIME;
std::this_thread::sleep_until(next_wake_time);
}
}
});
}
}
EqpAlgICEI::~EqpAlgICEI() {
this->is_running_ = false;
if (glob_process_type == ProcessType::kMon) {
if (mem_cached_thread_->joinable()) {
mem_cached_thread_->join();
}
}
// 先停掉所有算法线程,确保不再有新的 DB 任务投递
alg_mgr_.reset();
// 排空并停止后台 DB worker
AsyncDbWorker::instance().drain_and_stop();
if (glob_process_type == ProcessType::kTask) {
// TaskData 已改用动态 vector进程退出时自然析构无需 rm -rf
}
this->logger_->Info() << "EqpAlgICEI::~EqpAlgICEI() "
<< "Destruct" << endl;
}
void baosight::EqpAlgICEI::SendDataShort(::Ice::Int eventNo,
const ::Ice::ByteSeq &seq,
::Ice::Int length,
const Ice::Current &current) {
// 根据电文号调用,执行分派任务操作
switch (eventNo) {
case 99999: {
alg_mgr_->dispose(eventNo, seq);
if (glob_process_type == ProcessType::kMon) {
logger_->Debug() << "向 baosight/eqpalg-cron进程发送消息" << endl;
this->m_proxy2cron->SendDataShort(99999, seq, seq.size());
}
break;
}
/* eventNo=11111 报警信息 */
case 11111: {
string str(seq.begin(), seq.end());
alarm_poster_.alarm(str);
break;
}
/* eventNo=22222 规则状态更新 */
case 22222: {
string str(seq.begin(), seq.end());
alg_mgr_->rule_handelr(str);
break;
}
default:
logger_->Error() << "wrong event No.!" << endl;
break;
}
}
void baosight::EqpAlgICEI::SendDataLong(
::Ice::Int eventNo, const ::Ice::ByteSeq &seq, ::Ice::Int length,
const ::std::string &sender, const ::std::string &receiver,
const ::std::string &additional, const Ice::Current &current) {}
void baosight::EqpAlgICEI::TimeNotify(::Ice::Int eventNo,
const ::Ice::ByteSeq &seq,
const Ice::Current &current) {
if (eventNo == 1) {
up_date_data_ptr_->update_eqp_status();
} else if (eventNo == 2 && !is_rule_stat_data_updating_) {
is_rule_stat_data_updating_ = true;
up_date_data_ptr_->update_rule_stat_data();
is_rule_stat_data_updating_ = false;
}
if (eventNo == 5) {
try {
string Jvalue = SingletonTemp<EqpStat>::GetInstance().get_ruleid_json();
std::vector<unsigned char> seq((unsigned char *)Jvalue.c_str(),
(unsigned char *)Jvalue.c_str() +
Jvalue.length());
this->m_proxy2dsm->SendDataShort(99999, seq, Jvalue.length());
logger_->Info() << "data size:"
<< SingletonTemplate<GlobaltemSharedMemory>::GetInstance()
.get_data_size()
<< std::endl;
logger_->Info() << "rule threads size:" << alg_mgr_->get_thread_size()
<< std::endl;
} catch (const std::exception &e) {
logger_->Error() << e.what() << std::endl;
}
}
}