eis/eqpalg/.do_not_use/data_handler/approximate_data.cc

606 lines
21 KiB
C++
Raw Normal View History

#include <eqpalg/data_handler/approximate_data.h>
// 预定义 首次采样的样本容量最小值
#define CAL_STEP_MIN_SIZE 100
namespace data_handler {
namespace policy {
/**
* @brief Construct a new Approximate Data object
* @param ruleId My Param doc
*/
ApproximateData::ApproximateData(const std::string& ruleId, size_t dims)
: Base(ruleId, dims), gb_logger_(std::make_unique<GbLogger>(ruleId)) {
c_r_.resize(dims);
rs_.resize(dims);
}
/**
* @brief T_RULE_SAMPLE_1D_INFO插入步长信息
*
* 100
* info插入first_runing_info至db2
* @param first_runing_infoMy Param doc
* @param tp My Param doc
* @return int
*/
int ApproximateData::first_sampling_batch(const InData& first_runing_info,
TimePoint tp, Rs running_state) {
if (first_runing_info.empty() ||
running_state[0].current_n() <= CAL_STEP_MIN_SIZE) {
if (running_state[0].current_n() <= CAL_STEP_MIN_SIZE) {
this->gb_logger_->log_error(this->rule_id_ + ":已累积的样本数量:" +
std::to_string(running_state[0].current_n()) +
"<" + std::to_string(CAL_STEP_MIN_SIZE) +
",不更新T_RULE_SAMPLE_1D_INFO");
}
return -1;
}
this->rs_ = running_state;
// 组距 组数的确定
for (size_t i = 0; i < rs_.size(); i++) {
auto cell_range = (rs_[i].max() - rs_[i].min()) / 50;
// if (cell_range > 100) {
// cell_range = 100;
// } else if (cell_range > 10) {
// cell_range = 10;
// } else if (cell_range > 1) {
// cell_range = 1;
// } else if (cell_range > 0.1) {
// cell_range = 0.1;
// } else {
// cell_range = 0.01;
// }
if (cell_range > 0.001) {
c_r_[i] = cell_range;
}
else {
this->gb_logger_->log_error(this->rule_id_ +
"当前rs=" + std::to_string(cell_range) +
" 过小info将被置为0.001");
c_r_[i] = 0.001;
}
}
// 插入
fplus::maybe<std::size_t> approx_info_ret =
mix_cc::fp::nothing<std::size_t>();
//更新
fplus::maybe<std::size_t> update_info_ret =
mix_cc::fp::nothing<std::size_t>();
if (dims_ == 1) {
// update info
T_RULE_SAMPLE_1D_INFO tci;
// 查当前ruleid 是否已经存在
auto info_maybe = exec<db2_t, T_RULE_SAMPLE_1D_INFO>(
select(tci.Range1())
.from(tci)
.where(tci.RuleId() == this->rule_id_));
if (info_maybe.is_just()) {
auto& info = info_maybe.unsafe_get_just();
if (!info.empty()) {
// cellrange变化了就更新
if (c_r_[0] != info[0].Range1)
// update 数据库的数据
{
update_info_ret =
exec<db2_t, size_t>(update(tci)
.set(tci.Range1() = c_r_[0])
.where(tci.RuleId() == this->rule_id_));
if (update_info_ret.is_nothing()) {
DEBUG_PRINT_MIX_CC("INFO信息数据更新异常");
return -1;
}
}
}
else {
// 如果没有当前 ruleid的range就插入
approx_info_ret = exec<db2_t, size_t>(insert_into(tci).set(
tci.RuleId() = rule_id_, tci.Range1() = c_r_[0], tci.Spare1() = 1));
if (approx_info_ret.is_nothing()) {
DEBUG_PRINT_MIX_CC("INFO信息数据插入异常");
return -1;
}
if (approx_info_ret.unsafe_get_just() == 0) {
DEBUG_PRINT_MIX_CC("插入数据量异常");
return -2;
}
}
}
else {
return -1;
}
// } else {
// }
}
else if (dims_ == 2) {
T_RULE_SAMPLE_2D_INFO tci;
// 查询当前ruleid 是否已经存在
auto info_maybe = exec<db2_t, T_RULE_SAMPLE_2D_INFO>(
select(tci.Range1(), tci.Range2())
.from(tci)
.where(tci.RuleId() == this->rule_id_));
if (info_maybe.is_just()) {
auto& info = info_maybe.unsafe_get_just();
if (!info.empty()) {
// cellrange变化了就更新
if (c_r_[0] != info[0].Range1 || c_r_[1] != info[0].Range2)
// update 数据库的数据
{
update_info_ret = exec<db2_t, size_t>(
update(tci)
.set(tci.Range1() = c_r_[0], tci.Range2() = c_r_[1])
.where(tci.RuleId() == this->rule_id_));
if (update_info_ret.is_nothing()) {
DEBUG_PRINT_MIX_CC("INFO信息数据更新异常");
return -1;
}
}
}
else {
// 如果没有当前 ruleid的range就插入
approx_info_ret = exec<db2_t, size_t>(insert_into(tci).set(
tci.RuleId() = rule_id_, tci.Range1() = c_r_[0],
tci.Range2() = c_r_[1], tci.Spare1() = 1,
tci.Spare2() = 1));
if (approx_info_ret.is_nothing()) {
DEBUG_PRINT_MIX_CC("INFO信息数据插入异常");
return -1;
}
if (approx_info_ret.unsafe_get_just() == 0) {
DEBUG_PRINT_MIX_CC("插入数据量异常");
return -2;
}
}
}
else {
return -1;
}
}
else if (dims_ == 3) {
T_RULE_SAMPLE_3D_INFO tci;
// 查询当前ruleid 是否已经存在
auto info_maybe = exec<db2_t, T_RULE_SAMPLE_3D_INFO>(
select(tci.Range1(), tci.Range2(), tci.Range3())
.from(tci)
.where(tci.RuleId() == this->rule_id_));
if (info_maybe.is_just()) {
auto& info = info_maybe.unsafe_get_just();
if (!info.empty()) {
// cellrange变化了就更新
if (c_r_[0] != info[0].Range1 || c_r_[1] != info[0].Range2 ||
c_r_[2] != info[0].Range3)
// update 数据库的数据
{
update_info_ret = exec<db2_t, size_t>(
update(tci)
.set(tci.Range1() = c_r_[0], tci.Range2() = c_r_[1],
tci.Range3() = c_r_[2])
.where(tci.RuleId() == this->rule_id_));
if (update_info_ret.is_nothing()) {
DEBUG_PRINT_MIX_CC("INFO信息数据更新异常");
return -1;
}
}
}
else {
// 如果没有当前 ruleid的range就插入
approx_info_ret = exec<db2_t, size_t>(insert_into(tci).set(
tci.RuleId() = rule_id_, tci.Range1() = c_r_[0],
tci.Range2() = c_r_[1], tci.Range3() = c_r_[2], tci.Spare1() = 1,
tci.Spare2() = 1, tci.Spare3() = 1));
if (approx_info_ret.is_nothing()) {
DEBUG_PRINT_MIX_CC("INFO信息数据插入异常");
return -1;
}
if (approx_info_ret.unsafe_get_just() == 0) {
DEBUG_PRINT_MIX_CC("插入数据量异常");
return -2;
}
}
}
else {
return -1;
}
}
// 保证cron第一次能store但mon第一次不能store
this->is_first_sampling_ = false;
for (const auto& x : first_runing_info) {
this->store(x);
}
// commit失败将返回int 2
// 只有 commit 成功才将this->is_first_sampling_置为false
// 否则置为true
auto commit_result = this->commit();
if (commit_result == 2) {
this->is_first_sampling_ = true;
}
return 0;
}
/**
* @brief
* @return int
*/
int ApproximateData::load() {
this->dump_size_ = 0; ///< 解压缩之后的数据量
this->scale_ = 0; ///< 数据缩放比例
this->data_.clear(); ///< 重新载入前情况data_
// db2数据info + 统计的数据载入
// ①步长c_r_ ②分布数据data_ ③解压缩之后的数据量dump_size_
if (dims_ == 1) {
// 一维数据载入
T_RULE_SAMPLE_1D_INFO ti; ///< 步长信息
auto info_maybe = exec<db2_t, T_RULE_SAMPLE_1D_INFO>(
select(ti.Range1(), ti.Spare1())
.from(ti)
.where(ti.RuleId() == this->rule_id_));
if (info_maybe.is_just()) {
auto& info = info_maybe.unsafe_get_just();
if (!info.empty()) {
this->c_r_[0] = info[0].Range1;
}
else {
this->gb_logger_->log_error(
this->rule_id_ +
":load时T_RULE_SAMPLE_1D_INFO没有记录");
return -3;
}
}
else {
this->gb_logger_->log_error(this->rule_id_ + ":load时db2查询错误");
return -1;
}
Dim1Table tc;
auto query_list_maybe =
exec<db2_t, Dim1Table>(select(tc.Flag(), tc.Count(), tc.X1())
.from(tc)
.where(tc.RuleId() == this->rule_id_));
if (query_list_maybe.is_just()) {
auto& query_list = query_list_maybe.unsafe_get_just();
for (const auto& x : query_list) {
this->data_.insert(
std::make_pair(SamplePointWR({ x.X1 }, c_r_), x.Count));
this->dump_size_ += x.Count;
}
}
else {
return -1;
}
}
else if (dims_ == 2) {
// 二维数据载入
T_RULE_SAMPLE_2D_INFO di;
auto info_maybe = exec<db2_t, T_RULE_SAMPLE_2D_INFO>(
select(di.Range1(), di.Range2(), di.Spare1(),
di.Spare2())
.from(di)
.where(di.RuleId() == this->rule_id_));
if (info_maybe.is_just()) {
auto& info = info_maybe.unsafe_get_just();
if (!info.empty()) {
this->c_r_[0] = info[0].Range1;
this->c_r_[1] = info[0].Range2;
}
else {
this->gb_logger_->log_error(
this->rule_id_ +
":load时T_RULE_SAMPLE_2D_INFO没有记录");
return -3;
}
}
else {
this->gb_logger_->log_error(this->rule_id_ + ":load时db2查询错误");
return -1;
}
Dim2Table dc;
auto query_list_maybe =
exec<db2_t, Dim2Table>(select(dc.Flag(), dc.Count(), dc.X1(), dc.X2())
.from(dc)
.where(dc.RuleId() == this->rule_id_));
if (query_list_maybe.is_just()) {
auto& query_list = query_list_maybe.unsafe_get_just();
for (const auto& x : query_list) {
this->data_.insert(
std::make_pair(SamplePointWR({ x.X1, x.X2 }, c_r_), x.Count));
this->dump_size_ += x.Count;
}
}
else {
return -1;
}
}
else if (dims_ == 3) {
// 三维数据载入
T_RULE_SAMPLE_3D_INFO ti;
auto info_maybe = exec<db2_t, T_RULE_SAMPLE_3D_INFO>(
select(ti.Range1(), ti.Range2(), ti.Range3(), ti.Spare1(), ti.Spare2(), ti.Spare3())
.from(ti)
.where(ti.RuleId() == this->rule_id_));
if (info_maybe.is_just()) {
auto& info = info_maybe.unsafe_get_just();
if (!info.empty()) {
c_r_[0] = info[0].Range1;
c_r_[1] = info[0].Range2;
c_r_[2] = info[0].Range3;
}
else {
this->gb_logger_->log_error(
this->rule_id_ +
":load时T_RULE_SAMPLE_3D_INFO没有记录");
return -3;
}
}
else {
this->gb_logger_->log_error(this->rule_id_ + ":load时db2查询错误");
return -1;
}
Dim3Table tc;
auto query_list_maybe = exec<db2_t, Dim3Table>(
select(tc.Flag(), tc.Count(), tc.X1(), tc.X2(), tc.X3())
.from(tc)
.where(tc.RuleId() == this->rule_id_));
if (query_list_maybe.is_just()) {
auto& query_list = query_list_maybe.unsafe_get_just();
for (const auto& x : query_list) {
this->data_.insert(
std::make_pair(SamplePointWR({ x.X1, x.X2, x.X3 }, c_r_), x.Count));
this->dump_size_ += x.Count;
}
}
}
// 计算缩放比列刷is_first_sampling_为false
if (dump_size_ != 0) {
this->scale_ = (double)k_dest_dump_size / (double)dump_size_;
this->is_first_sampling_ = false;
}
if (data_.size() > 100000) {
gb_logger_->log_error("data size is " + std::to_string(data_.size()));
}
if (dump_size_ * scale_ > 100000) {
gb_logger_->log_error("dump size is " +
std::to_string(dump_size_ * scale_));
}
return 0;
}
/**
* @brief
* @return int
*/
int ApproximateData::commit() {
if (this->insert_list_.empty() || this->update_list_.empty()) {
return 2;
}
auto last_update_time = system_clock::now();
fplus::maybe<std::size_t> maybe_ret = mix_cc::fp::nothing<std::size_t>();
if (dims_ == 1) {
// 1维数据提交
Dim1Table tc;
for (auto x : this->insert_list_) {
exec<db2_t, size_t>(insert_into(tc).set(
tc.RuleId() = this->rule_id_, tc.X1() = x.first.value[0].get_center(),
tc.Count() = x.second, tc.Flag() = 1));
}
for (auto x : this->update_list_) {
exec<db2_t, size_t>(update(tc)
.set(tc.Count() = x.second)
.where(tc.RuleId() == this->rule_id_,
tc.X1() == x.first.value[0].get_center()));
}
}
else if (dims_ == 2) {
// 2维数据提交
Dim2Table tc;
for (auto x : this->insert_list_) {
exec<db2_t, size_t>(insert_into(tc).set(
tc.RuleId() = this->rule_id_, tc.X1() = x.first.value[0].get_center(),
tc.X2() = x.first.value[1].get_center(), tc.Count() = x.second,
tc.Flag() = 1));
}
for (auto x : this->update_list_) {
exec<db2_t, size_t>(update(tc)
.set(tc.Count() = x.second)
.where(tc.RuleId() == this->rule_id_,
tc.X1() == x.first.value[0].get_center(),
tc.X2() == x.first.value[1].get_center()));
}
}
else if (dims_ == 3) {
// 3维数据提交
Dim3Table tc;
for (auto x : this->insert_list_) {
exec<db2_t, size_t>(insert_into(tc).set(
tc.RuleId() = this->rule_id_, tc.X1() = x.first.value[0].get_center(),
tc.X2() = x.first.value[1].get_center(),
tc.X3() = x.first.value[2].get_center(), tc.Count() = x.second,
tc.Flag() = 1));
}
for (auto x : this->update_list_) {
exec<db2_t, size_t>(update(tc)
.set(tc.Count() = x.second)
.where(tc.RuleId() == this->rule_id_,
tc.X1() == x.first.value[0].get_center(),
tc.X2() == x.first.value[1].get_center(),
tc.X3() == x.first.value[2].get_center()));
}
}
this->update_list_.clear();
this->insert_list_.clear();
gb_logger_->log_info("样本保存完成");
return 0;
}
/**
* @brief ()
* @param value My Param doc
* @return int
*/
int ApproximateData::store(const SamplePoint& i_value) {
// 对cron的第一次和非第一次开放mon的非第一次开放
// if (this->is_first_sampling()) {
// return 2;
// }
SamplePointWR value{ i_value, c_r_ }; ///< 数据点->[left,right]
auto iter_data = this->data_.find(value);
// 如果数据找得到
if (iter_data != this->data_.end()) {
iter_data->second++;
// 如果在update找得到更新update
auto iter_update = this->update_list_.find(value);
if (iter_update != this->update_list_.end()) {
iter_update->second = iter_data->second;
}
else {
// 否则插入update
update_list_[value] = iter_data->second;
}
}
else {
this->data_[value] = 1;
this->insert_list_.insert(std::make_pair(value, 1));
}
return 0;
}
/**
* @brief
* @warning 使
* @return int
*/
ApproximateData::OutData ApproximateData::extract() {
try {
// 初始化随机器
std::random_device r;
std::seed_seq seed2{ r(), r(), r(), r(), r(), r(), r() };
std::mt19937 e2(seed2);
// 把数据精简之后存储在数据中
OutData ret_data; ///< 解压后的样本数据,其实是正态随机数
for (const auto& x : this->data_) {
int count =
std::ceil(scale_ * x.second); //根据该条记录的频次 得到解压到数据
// 设置分布信息为正态分布
std::vector<std::normal_distribution<>> normal_dist;
normal_dist.reserve(dims_);
for (size_t i = 0; i < dims_; i++) {
// 正态分布:均值-分布信息数据中点 标准差-分布信息数据长度
// 如 [a,b] : 均值 a+b/2 标准差 b-a
normal_dist.emplace_back(std::normal_distribution<>{
x.first.value[i].get_center(), x.first.value[i].get_distance()});
}
if (count > 0) {
for (int j = 0; j < count; j++) {
SamplePoint tmp{};
for (size_t i = 0; i < dims_; i++) {
tmp.push_back(normal_dist[i](e2)); ///< 正态随机数 作为解压出的数据
}
ret_data.emplace_back(tmp);
}
}
else {
// 如果缩放成0个则只取一个
SamplePoint tmp{};
for (size_t i = 0; i < dims_; i++) {
tmp.push_back(normal_dist[i](e2));
}
ret_data.emplace_back(tmp);
}
}
return ret_data;
}
catch (const std::exception& e) {
std::throw_with_nested(
mix_cc::Exception(-1, "decompress data error", BOOST_CURRENT_LOCATION));
}
return {};
}
//
int ApproximateData::put_data_to_rs(const SampleWindow& input_data) {
// rs_.resize(dims_);
// gb_logger_->log_error(this->rule_id_ +
// "开始运行ApproximateData::put_data_to_rs");
try {
for (const auto& x : input_data) {
for (size_t i = 0; i < x.size(); i++) {
rs_[i].add(x[i]); // decide range info
}
}
return 1;
}
catch (...) {
gb_logger_->log_error(this->rule_id_ +
"ApproximateData::put_data_to_rs运行失败");
return -1;
}
}
vector<double> ApproximateData::get_rs_means() {
vector<double> rs_means(3, 0);
for (int i = 0; i < this->rs_.size(); i++) {
rs_means[i] = this->rs_[i].mean();
}
return rs_means;
}
vector<double> ApproximateData::get_rs_variances() {
vector<double> rs_variances(3, 0);
for (int i = 0; i < this->rs_.size(); i++) {
rs_variances[i] = this->rs_[i].variance();
}
return rs_variances;
}
vector<double> ApproximateData::get_rs_stddev() {
vector<double> rs_stddev(3, 0);
for (int i = 0; i < this->rs_.size(); i++) {
rs_stddev[i] = this->rs_[i].stddev();
}
return rs_stddev;
}
vector<double> ApproximateData::get_rs_kurtosis() {
vector<double> rs_kurtosis(3, 0);
for (int i = 0; i < this->rs_.size(); i++) {
rs_kurtosis[i] = this->rs_[i].ex_kurtosis();
}
return rs_kurtosis;
}
vector<double> ApproximateData::get_rs_skewness() {
vector<double> rs_skewness(3, 0);
for (int i = 0; i < this->rs_.size(); i++) {
rs_skewness[i] = this->rs_[i].skewness();
}
return rs_skewness;
}
vector<double> ApproximateData::get_rs_max() {
vector<double> rs_max(3, 0);
for (int i = 0; i < this->rs_.size(); i++) {
rs_max[i] = this->rs_[i].max();
}
return rs_max;
}
vector<double> ApproximateData::get_rs_min() {
vector<double> rs_min(3, 0);
for (int i = 0; i < this->rs_.size(); i++) {
rs_min[i] = this->rs_[i].min();
}
return rs_min;
}
} // namespace policy
} // namespace data_handler