eis/eqpalg/.do_not_use/data_handler/approximate_data.cc

606 lines
21 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <eqpalg/data_handler/approximate_data.h>
// 预定义 首次采样的样本容量最小值
#define CAL_STEP_MIN_SIZE 100
namespace data_handler {
namespace policy {
/**
* @brief Construct a new Approximate Data object
* @param ruleId My Param doc
*/
ApproximateData::ApproximateData(const std::string& ruleId, size_t dims)
: Base(ruleId, dims), gb_logger_(std::make_unique<GbLogger>(ruleId)) {
c_r_.resize(dims);
rs_.resize(dims);
}
/**
* @brief 向T_RULE_SAMPLE_1D_INFO插入步长信息
* 检查数据量:小样本的不计算步长
* 规定样本数量大于100计算样本
* 如果info插入则保存本次样本first_runing_info至db2
* @param first_runing_infoMy Param doc
* @param tp My Param doc
* @return int
*/
int ApproximateData::first_sampling_batch(const InData& first_runing_info,
TimePoint tp, Rs running_state) {
if (first_runing_info.empty() ||
running_state[0].current_n() <= CAL_STEP_MIN_SIZE) {
if (running_state[0].current_n() <= CAL_STEP_MIN_SIZE) {
this->gb_logger_->log_error(this->rule_id_ + ":已累积的样本数量:" +
std::to_string(running_state[0].current_n()) +
"<" + std::to_string(CAL_STEP_MIN_SIZE) +
",不更新T_RULE_SAMPLE_1D_INFO");
}
return -1;
}
this->rs_ = running_state;
// 组距 组数的确定
for (size_t i = 0; i < rs_.size(); i++) {
auto cell_range = (rs_[i].max() - rs_[i].min()) / 50;
// if (cell_range > 100) {
// cell_range = 100;
// } else if (cell_range > 10) {
// cell_range = 10;
// } else if (cell_range > 1) {
// cell_range = 1;
// } else if (cell_range > 0.1) {
// cell_range = 0.1;
// } else {
// cell_range = 0.01;
// }
if (cell_range > 0.001) {
c_r_[i] = cell_range;
}
else {
this->gb_logger_->log_error(this->rule_id_ +
"当前rs=" + std::to_string(cell_range) +
" 过小info将被置为0.001");
c_r_[i] = 0.001;
}
}
// 插入
fplus::maybe<std::size_t> approx_info_ret =
mix_cc::fp::nothing<std::size_t>();
//更新
fplus::maybe<std::size_t> update_info_ret =
mix_cc::fp::nothing<std::size_t>();
if (dims_ == 1) {
// update info
T_RULE_SAMPLE_1D_INFO tci;
// 查当前ruleid 是否已经存在
auto info_maybe = exec<db2_t, T_RULE_SAMPLE_1D_INFO>(
select(tci.Range1())
.from(tci)
.where(tci.RuleId() == this->rule_id_));
if (info_maybe.is_just()) {
auto& info = info_maybe.unsafe_get_just();
if (!info.empty()) {
// cellrange变化了就更新
if (c_r_[0] != info[0].Range1)
// update 数据库的数据
{
update_info_ret =
exec<db2_t, size_t>(update(tci)
.set(tci.Range1() = c_r_[0])
.where(tci.RuleId() == this->rule_id_));
if (update_info_ret.is_nothing()) {
DEBUG_PRINT_MIX_CC("INFO信息数据更新异常");
return -1;
}
}
}
else {
// 如果没有当前 ruleid的range就插入
approx_info_ret = exec<db2_t, size_t>(insert_into(tci).set(
tci.RuleId() = rule_id_, tci.Range1() = c_r_[0], tci.Spare1() = 1));
if (approx_info_ret.is_nothing()) {
DEBUG_PRINT_MIX_CC("INFO信息数据插入异常");
return -1;
}
if (approx_info_ret.unsafe_get_just() == 0) {
DEBUG_PRINT_MIX_CC("插入数据量异常");
return -2;
}
}
}
else {
return -1;
}
// } else {
// }
}
else if (dims_ == 2) {
T_RULE_SAMPLE_2D_INFO tci;
// 查询当前ruleid 是否已经存在
auto info_maybe = exec<db2_t, T_RULE_SAMPLE_2D_INFO>(
select(tci.Range1(), tci.Range2())
.from(tci)
.where(tci.RuleId() == this->rule_id_));
if (info_maybe.is_just()) {
auto& info = info_maybe.unsafe_get_just();
if (!info.empty()) {
// cellrange变化了就更新
if (c_r_[0] != info[0].Range1 || c_r_[1] != info[0].Range2)
// update 数据库的数据
{
update_info_ret = exec<db2_t, size_t>(
update(tci)
.set(tci.Range1() = c_r_[0], tci.Range2() = c_r_[1])
.where(tci.RuleId() == this->rule_id_));
if (update_info_ret.is_nothing()) {
DEBUG_PRINT_MIX_CC("INFO信息数据更新异常");
return -1;
}
}
}
else {
// 如果没有当前 ruleid的range就插入
approx_info_ret = exec<db2_t, size_t>(insert_into(tci).set(
tci.RuleId() = rule_id_, tci.Range1() = c_r_[0],
tci.Range2() = c_r_[1], tci.Spare1() = 1,
tci.Spare2() = 1));
if (approx_info_ret.is_nothing()) {
DEBUG_PRINT_MIX_CC("INFO信息数据插入异常");
return -1;
}
if (approx_info_ret.unsafe_get_just() == 0) {
DEBUG_PRINT_MIX_CC("插入数据量异常");
return -2;
}
}
}
else {
return -1;
}
}
else if (dims_ == 3) {
T_RULE_SAMPLE_3D_INFO tci;
// 查询当前ruleid 是否已经存在
auto info_maybe = exec<db2_t, T_RULE_SAMPLE_3D_INFO>(
select(tci.Range1(), tci.Range2(), tci.Range3())
.from(tci)
.where(tci.RuleId() == this->rule_id_));
if (info_maybe.is_just()) {
auto& info = info_maybe.unsafe_get_just();
if (!info.empty()) {
// cellrange变化了就更新
if (c_r_[0] != info[0].Range1 || c_r_[1] != info[0].Range2 ||
c_r_[2] != info[0].Range3)
// update 数据库的数据
{
update_info_ret = exec<db2_t, size_t>(
update(tci)
.set(tci.Range1() = c_r_[0], tci.Range2() = c_r_[1],
tci.Range3() = c_r_[2])
.where(tci.RuleId() == this->rule_id_));
if (update_info_ret.is_nothing()) {
DEBUG_PRINT_MIX_CC("INFO信息数据更新异常");
return -1;
}
}
}
else {
// 如果没有当前 ruleid的range就插入
approx_info_ret = exec<db2_t, size_t>(insert_into(tci).set(
tci.RuleId() = rule_id_, tci.Range1() = c_r_[0],
tci.Range2() = c_r_[1], tci.Range3() = c_r_[2], tci.Spare1() = 1,
tci.Spare2() = 1, tci.Spare3() = 1));
if (approx_info_ret.is_nothing()) {
DEBUG_PRINT_MIX_CC("INFO信息数据插入异常");
return -1;
}
if (approx_info_ret.unsafe_get_just() == 0) {
DEBUG_PRINT_MIX_CC("插入数据量异常");
return -2;
}
}
}
else {
return -1;
}
}
// 保证cron第一次能store但mon第一次不能store
this->is_first_sampling_ = false;
for (const auto& x : first_runing_info) {
this->store(x);
}
// commit失败将返回int 2
// 只有 commit 成功才将this->is_first_sampling_置为false
// 否则置为true
auto commit_result = this->commit();
if (commit_result == 2) {
this->is_first_sampling_ = true;
}
return 0;
}
/**
* @brief 从数据库载入分布信息
* @return int
*/
int ApproximateData::load() {
this->dump_size_ = 0; ///< 解压缩之后的数据量
this->scale_ = 0; ///< 数据缩放比例
this->data_.clear(); ///< 重新载入前情况data_
// db2数据info + 统计的数据载入
// ①步长c_r_ ②分布数据data_ ③解压缩之后的数据量dump_size_
if (dims_ == 1) {
// 一维数据载入
T_RULE_SAMPLE_1D_INFO ti; ///< 步长信息
auto info_maybe = exec<db2_t, T_RULE_SAMPLE_1D_INFO>(
select(ti.Range1(), ti.Spare1())
.from(ti)
.where(ti.RuleId() == this->rule_id_));
if (info_maybe.is_just()) {
auto& info = info_maybe.unsafe_get_just();
if (!info.empty()) {
this->c_r_[0] = info[0].Range1;
}
else {
this->gb_logger_->log_error(
this->rule_id_ +
":load时T_RULE_SAMPLE_1D_INFO没有记录");
return -3;
}
}
else {
this->gb_logger_->log_error(this->rule_id_ + ":load时db2查询错误");
return -1;
}
Dim1Table tc;
auto query_list_maybe =
exec<db2_t, Dim1Table>(select(tc.Flag(), tc.Count(), tc.X1())
.from(tc)
.where(tc.RuleId() == this->rule_id_));
if (query_list_maybe.is_just()) {
auto& query_list = query_list_maybe.unsafe_get_just();
for (const auto& x : query_list) {
this->data_.insert(
std::make_pair(SamplePointWR({ x.X1 }, c_r_), x.Count));
this->dump_size_ += x.Count;
}
}
else {
return -1;
}
}
else if (dims_ == 2) {
// 二维数据载入
T_RULE_SAMPLE_2D_INFO di;
auto info_maybe = exec<db2_t, T_RULE_SAMPLE_2D_INFO>(
select(di.Range1(), di.Range2(), di.Spare1(),
di.Spare2())
.from(di)
.where(di.RuleId() == this->rule_id_));
if (info_maybe.is_just()) {
auto& info = info_maybe.unsafe_get_just();
if (!info.empty()) {
this->c_r_[0] = info[0].Range1;
this->c_r_[1] = info[0].Range2;
}
else {
this->gb_logger_->log_error(
this->rule_id_ +
":load时T_RULE_SAMPLE_2D_INFO没有记录");
return -3;
}
}
else {
this->gb_logger_->log_error(this->rule_id_ + ":load时db2查询错误");
return -1;
}
Dim2Table dc;
auto query_list_maybe =
exec<db2_t, Dim2Table>(select(dc.Flag(), dc.Count(), dc.X1(), dc.X2())
.from(dc)
.where(dc.RuleId() == this->rule_id_));
if (query_list_maybe.is_just()) {
auto& query_list = query_list_maybe.unsafe_get_just();
for (const auto& x : query_list) {
this->data_.insert(
std::make_pair(SamplePointWR({ x.X1, x.X2 }, c_r_), x.Count));
this->dump_size_ += x.Count;
}
}
else {
return -1;
}
}
else if (dims_ == 3) {
// 三维数据载入
T_RULE_SAMPLE_3D_INFO ti;
auto info_maybe = exec<db2_t, T_RULE_SAMPLE_3D_INFO>(
select(ti.Range1(), ti.Range2(), ti.Range3(), ti.Spare1(), ti.Spare2(), ti.Spare3())
.from(ti)
.where(ti.RuleId() == this->rule_id_));
if (info_maybe.is_just()) {
auto& info = info_maybe.unsafe_get_just();
if (!info.empty()) {
c_r_[0] = info[0].Range1;
c_r_[1] = info[0].Range2;
c_r_[2] = info[0].Range3;
}
else {
this->gb_logger_->log_error(
this->rule_id_ +
":load时T_RULE_SAMPLE_3D_INFO没有记录");
return -3;
}
}
else {
this->gb_logger_->log_error(this->rule_id_ + ":load时db2查询错误");
return -1;
}
Dim3Table tc;
auto query_list_maybe = exec<db2_t, Dim3Table>(
select(tc.Flag(), tc.Count(), tc.X1(), tc.X2(), tc.X3())
.from(tc)
.where(tc.RuleId() == this->rule_id_));
if (query_list_maybe.is_just()) {
auto& query_list = query_list_maybe.unsafe_get_just();
for (const auto& x : query_list) {
this->data_.insert(
std::make_pair(SamplePointWR({ x.X1, x.X2, x.X3 }, c_r_), x.Count));
this->dump_size_ += x.Count;
}
}
}
// 计算缩放比列刷is_first_sampling_为false
if (dump_size_ != 0) {
this->scale_ = (double)k_dest_dump_size / (double)dump_size_;
this->is_first_sampling_ = false;
}
if (data_.size() > 100000) {
gb_logger_->log_error("data size is " + std::to_string(data_.size()));
}
if (dump_size_ * scale_ > 100000) {
gb_logger_->log_error("dump size is " +
std::to_string(dump_size_ * scale_));
}
return 0;
}
/**
* @brief 把存储的数据提交到数据库中
* @return int
*/
int ApproximateData::commit() {
if (this->insert_list_.empty() || this->update_list_.empty()) {
return 2;
}
auto last_update_time = system_clock::now();
fplus::maybe<std::size_t> maybe_ret = mix_cc::fp::nothing<std::size_t>();
if (dims_ == 1) {
// 1维数据提交
Dim1Table tc;
for (auto x : this->insert_list_) {
exec<db2_t, size_t>(insert_into(tc).set(
tc.RuleId() = this->rule_id_, tc.X1() = x.first.value[0].get_center(),
tc.Count() = x.second, tc.Flag() = 1));
}
for (auto x : this->update_list_) {
exec<db2_t, size_t>(update(tc)
.set(tc.Count() = x.second)
.where(tc.RuleId() == this->rule_id_,
tc.X1() == x.first.value[0].get_center()));
}
}
else if (dims_ == 2) {
// 2维数据提交
Dim2Table tc;
for (auto x : this->insert_list_) {
exec<db2_t, size_t>(insert_into(tc).set(
tc.RuleId() = this->rule_id_, tc.X1() = x.first.value[0].get_center(),
tc.X2() = x.first.value[1].get_center(), tc.Count() = x.second,
tc.Flag() = 1));
}
for (auto x : this->update_list_) {
exec<db2_t, size_t>(update(tc)
.set(tc.Count() = x.second)
.where(tc.RuleId() == this->rule_id_,
tc.X1() == x.first.value[0].get_center(),
tc.X2() == x.first.value[1].get_center()));
}
}
else if (dims_ == 3) {
// 3维数据提交
Dim3Table tc;
for (auto x : this->insert_list_) {
exec<db2_t, size_t>(insert_into(tc).set(
tc.RuleId() = this->rule_id_, tc.X1() = x.first.value[0].get_center(),
tc.X2() = x.first.value[1].get_center(),
tc.X3() = x.first.value[2].get_center(), tc.Count() = x.second,
tc.Flag() = 1));
}
for (auto x : this->update_list_) {
exec<db2_t, size_t>(update(tc)
.set(tc.Count() = x.second)
.where(tc.RuleId() == this->rule_id_,
tc.X1() == x.first.value[0].get_center(),
tc.X2() == x.first.value[1].get_center(),
tc.X3() == x.first.value[2].get_center()));
}
}
this->update_list_.clear();
this->insert_list_.clear();
gb_logger_->log_info("样本保存完成");
return 0;
}
/**
* @brief 存储指定数据(通过数据点和步长信息,生成的压缩数据)
* @param value My Param doc
* @return int
*/
int ApproximateData::store(const SamplePoint& i_value) {
// 对cron的第一次和非第一次开放mon的非第一次开放
// if (this->is_first_sampling()) {
// return 2;
// }
SamplePointWR value{ i_value, c_r_ }; ///< 数据点->[left,right]
auto iter_data = this->data_.find(value);
// 如果数据找得到
if (iter_data != this->data_.end()) {
iter_data->second++;
// 如果在update找得到更新update
auto iter_update = this->update_list_.find(value);
if (iter_update != this->update_list_.end()) {
iter_update->second = iter_data->second;
}
else {
// 否则插入update
update_list_[value] = iter_data->second;
}
}
else {
this->data_[value] = 1;
this->insert_list_.insert(std::make_pair(value, 1));
}
return 0;
}
/**
* @brief 提取数据的大致分布特征
* @warning 建议使用该方法对数据进行操作
* @return int
*/
ApproximateData::OutData ApproximateData::extract() {
try {
// 初始化随机器
std::random_device r;
std::seed_seq seed2{ r(), r(), r(), r(), r(), r(), r() };
std::mt19937 e2(seed2);
// 把数据精简之后存储在数据中
OutData ret_data; ///< 解压后的样本数据,其实是正态随机数
for (const auto& x : this->data_) {
int count =
std::ceil(scale_ * x.second); //根据该条记录的频次 得到解压到数据
// 设置分布信息为正态分布
std::vector<std::normal_distribution<>> normal_dist;
normal_dist.reserve(dims_);
for (size_t i = 0; i < dims_; i++) {
// 正态分布:均值-分布信息数据中点 标准差-分布信息数据长度
// 如 [a,b] : 均值 a+b/2 标准差 b-a
normal_dist.emplace_back(std::normal_distribution<>{
x.first.value[i].get_center(), x.first.value[i].get_distance()});
}
if (count > 0) {
for (int j = 0; j < count; j++) {
SamplePoint tmp{};
for (size_t i = 0; i < dims_; i++) {
tmp.push_back(normal_dist[i](e2)); ///< 正态随机数 作为解压出的数据
}
ret_data.emplace_back(tmp);
}
}
else {
// 如果缩放成0个则只取一个
SamplePoint tmp{};
for (size_t i = 0; i < dims_; i++) {
tmp.push_back(normal_dist[i](e2));
}
ret_data.emplace_back(tmp);
}
}
return ret_data;
}
catch (const std::exception& e) {
std::throw_with_nested(
mix_cc::Exception(-1, "decompress data error", BOOST_CURRENT_LOCATION));
}
return {};
}
//
int ApproximateData::put_data_to_rs(const SampleWindow& input_data) {
// rs_.resize(dims_);
// gb_logger_->log_error(this->rule_id_ +
// "开始运行ApproximateData::put_data_to_rs");
try {
for (const auto& x : input_data) {
for (size_t i = 0; i < x.size(); i++) {
rs_[i].add(x[i]); // decide range info
}
}
return 1;
}
catch (...) {
gb_logger_->log_error(this->rule_id_ +
"ApproximateData::put_data_to_rs运行失败");
return -1;
}
}
vector<double> ApproximateData::get_rs_means() {
vector<double> rs_means(3, 0);
for (int i = 0; i < this->rs_.size(); i++) {
rs_means[i] = this->rs_[i].mean();
}
return rs_means;
}
vector<double> ApproximateData::get_rs_variances() {
vector<double> rs_variances(3, 0);
for (int i = 0; i < this->rs_.size(); i++) {
rs_variances[i] = this->rs_[i].variance();
}
return rs_variances;
}
vector<double> ApproximateData::get_rs_stddev() {
vector<double> rs_stddev(3, 0);
for (int i = 0; i < this->rs_.size(); i++) {
rs_stddev[i] = this->rs_[i].stddev();
}
return rs_stddev;
}
vector<double> ApproximateData::get_rs_kurtosis() {
vector<double> rs_kurtosis(3, 0);
for (int i = 0; i < this->rs_.size(); i++) {
rs_kurtosis[i] = this->rs_[i].ex_kurtosis();
}
return rs_kurtosis;
}
vector<double> ApproximateData::get_rs_skewness() {
vector<double> rs_skewness(3, 0);
for (int i = 0; i < this->rs_.size(); i++) {
rs_skewness[i] = this->rs_[i].skewness();
}
return rs_skewness;
}
vector<double> ApproximateData::get_rs_max() {
vector<double> rs_max(3, 0);
for (int i = 0; i < this->rs_.size(); i++) {
rs_max[i] = this->rs_[i].max();
}
return rs_max;
}
vector<double> ApproximateData::get_rs_min() {
vector<double> rs_min(3, 0);
for (int i = 0; i < this->rs_.size(); i++) {
rs_min[i] = this->rs_[i].min();
}
return rs_min;
}
} // namespace policy
} // namespace data_handler