eis/eqpalg/eqpalg_icei.cpp

176 lines
6.1 KiB
C++
Raw Normal View History

#include <common/L2Event.h>
#include <ctime>
#include <dao/DbStandardDBAX.h>
#include <eqpalg/define/public.h>
#include <eqpalg/eqpalg_icei.h>
#include <eqpalg/utility/async_db_worker.h>
#include <eqpalg/utility/eqp_stat.h>
#include <log4cplus/LOG.h>
#include <shm/SingletonTemp.hpp>
#include <string>
#include <thread>
extern ProcessType glob_process_type;
#define CACHE_OUTTIME 19ms
// 自定义单调时钟,底层强制使用 CLOCK_MONOTONIC
struct MonotonicClock {
using duration = std::chrono::nanoseconds;
using rep = duration::rep;
using period = duration::period;
using time_point = std::chrono::time_point<MonotonicClock>;
// 告诉 std::chrono 这是一个单调时钟
static constexpr bool is_steady = true;
static time_point now() noexcept {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
auto d =
std::chrono::seconds(ts.tv_sec) + std::chrono::nanoseconds(ts.tv_nsec);
return time_point(std::chrono::duration_cast<duration>(d));
}
};
EqpAlgICEI::EqpAlgICEI() {
this->logger_ = std::make_unique<LOG>("eqpalg_icei");
if (glob_process_type == ProcessType::kMon) {
this->m_proxy2cron = ProxyMag::GetAppICEPrx("baosight/eqpalg-cron");
this->m_proxy2dsm = ProxyMag::GetAppICEPrx("baosight/dsm");
logger_->Debug() << "向其cron进程发送ice" << endl;
}
this->is_running_ = true;
alg_mgr_ = std::make_unique<AlgorithmManager>();
if (glob_process_type == ProcessType::kMon) {
up_date_data_ptr_ = std::make_unique<UpDateData>();
mem_cached_thread_ = std::make_unique<std::thread>([&]() {
// 使用单调时钟
auto next_wake_time = std::chrono::steady_clock::now();
while (is_running_) {
auto t1 = MonotonicClock::now();
const auto time_start = system_clock::now();
const auto time_starts = std::chrono::steady_clock::now();
auto res = this->alg_mgr_->cache_data();
if (res != 0) {
this->logger_->Debug() << "cache_data() return :" << res << endl;
}
auto t2 = MonotonicClock::now();
const auto time_end = system_clock::now();
const auto time_ends = std::chrono::steady_clock::now();
const auto time_cost = time_end - time_start;
const auto time_costs = time_ends - time_starts;
const auto elapsed_ms = t2 - t1;
if (time_costs >= CACHE_OUTTIME) {
this->logger_->Error()
<< "共享内存消耗时间超时(ms):"
<< duration_cast<milliseconds>(time_cost).count()
<< " ms,steady_clock:"
<< std::chrono::duration_cast<std::chrono::milliseconds>(
time_costs)
.count()
<< " ms"
<< ",elapsed_ms:"
<< std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count()
<< " ms" << std::endl;
// 立即开始下一次循环
next_wake_time = std::chrono::steady_clock::now();
} else {
// 计算下一次唤醒时间
next_wake_time += CACHE_OUTTIME;
std::this_thread::sleep_until(next_wake_time);
}
}
});
}
}
EqpAlgICEI::~EqpAlgICEI() {
this->is_running_ = false;
if (glob_process_type == ProcessType::kMon) {
if (mem_cached_thread_->joinable()) {
mem_cached_thread_->join();
}
}
// 先停掉所有算法线程,确保不再有新的 DB 任务投递
alg_mgr_.reset();
// 排空并停止后台 DB worker
AsyncDbWorker::instance().drain_and_stop();
if (glob_process_type == ProcessType::kTask) {
// TaskData 已改用动态 vector进程退出时自然析构无需 rm -rf
}
this->logger_->Info() << "EqpAlgICEI::~EqpAlgICEI() "
<< "Destruct" << endl;
}
void baosight::EqpAlgICEI::SendDataShort(::Ice::Int eventNo,
const ::Ice::ByteSeq &seq,
::Ice::Int length,
const Ice::Current &current) {
// 根据电文号调用,执行分派任务操作
switch (eventNo) {
case 99999: {
alg_mgr_->dispose(eventNo, seq);
if (glob_process_type == ProcessType::kMon) {
logger_->Debug() << "向 baosight/eqpalg-cron进程发送消息" << endl;
this->m_proxy2cron->SendDataShort(99999, seq, seq.size());
}
break;
}
/* eventNo=11111 报警信息 */
case 11111: {
string str(seq.begin(), seq.end());
alarm_poster_.alarm(str);
break;
}
/* eventNo=22222 规则状态更新 */
case 22222: {
string str(seq.begin(), seq.end());
alg_mgr_->rule_handelr(str);
break;
}
default:
logger_->Error() << "wrong event No.!" << endl;
break;
}
}
void baosight::EqpAlgICEI::SendDataLong(
::Ice::Int eventNo, const ::Ice::ByteSeq &seq, ::Ice::Int length,
const ::std::string &sender, const ::std::string &receiver,
const ::std::string &additional, const Ice::Current &current) {}
void baosight::EqpAlgICEI::TimeNotify(::Ice::Int eventNo,
const ::Ice::ByteSeq &seq,
const Ice::Current &current) {
if (eventNo == 1) {
up_date_data_ptr_->update_eqp_status();
} else if (eventNo == 2 && !is_rule_stat_data_updating_) {
is_rule_stat_data_updating_ = true;
up_date_data_ptr_->update_rule_stat_data();
is_rule_stat_data_updating_ = false;
}
if (eventNo == 5) {
try {
string Jvalue = SingletonTemp<EqpStat>::GetInstance().get_ruleid_json();
std::vector<unsigned char> seq((unsigned char *)Jvalue.c_str(),
(unsigned char *)Jvalue.c_str() +
Jvalue.length());
this->m_proxy2dsm->SendDataShort(99999, seq, Jvalue.length());
logger_->Info() << "data size:"
<< SingletonTemplate<GlobaltemSharedMemory>::GetInstance()
.get_data_size()
<< std::endl;
logger_->Info() << "rule threads size:" << alg_mgr_->get_thread_size()
<< std::endl;
} catch (const std::exception &e) {
logger_->Error() << e.what() << std::endl;
}
}
}