eis/eqpalg/.do_not_use/no_need/loss_compress_single.cc

381 lines
12 KiB
C++
Raw Permalink Normal View History

/**
* @file loss_compress.cc
* @brief
* @author Cat (null.null.null@qq.com)
* @version 0.1
* @date 2021-08-18
*
* Copyright: Baosight Co. Ltd.
* DO NOT COPY/USE WITHOUT PERMISSION
*
*/
#include <eqpalg/distribution/loss_compress_single.h>
#include <cmath>
#include "mix_cc/exception.h"
namespace distribution {
/**
* @brief
*
* @return int
*/
int LossCompressSingle::reforge_precision() {
try {
if (data_.size() > dest_data_size_max) {
// 合并过量冗余数据
this->data_merge();
} else if (data_.size() < dest_data_size_min) {
// 细分数据区间
this->data_sub_div();
}
} catch (const std::exception& e) {
std::throw_with_nested(
mix_cc::Exception(-1, "reforge data error", BOOST_CURRENT_LOCATION));
}
return 0;
}
/**
* @brief :
*
*
* 10000
* >= 10000 > 10
* 0.1,10
* @warning
* @return int
*/
int LossCompressSingle::data_sub_div() {
try {
exchange_data_t internal_data;
// 从map中提取有序数据
for (auto x : data_) {
internal_data.push_back(x);
}
// 按照数据量进行排序
std::sort(internal_data.begin(), internal_data.end(),
[](const v_pair_t& left, const v_pair_t& right) {
return left.second > right.second;
});
// 如果细分的数据中数据量最大的数据依旧小于100个则直接认为数据不用细分
// 只是单纯数据分布较为密集
if (internal_data[0].second < 100) {
return 0;
}
// 把数据量最大的数据进行细分
{
size_t dest_size = 0;
double value, scale_prec = 0;
for (size_t i = 0; i < internal_data.size() / 10; i++) {
// 找到数据区间可以被细分的10%数据,
if (internal_data[i].first.precision > 0.3 &&
internal_data[i].second > 100) {
scale_prec = internal_data[i].first.precision / 2;
value = internal_data[i].first.value;
dest_size = internal_data[i].second / 2;
// 这里进行了插入操作,会导致迭代器失效,所以放在循环外执行
data_.erase(internal_data[i].first);
this->delete_list_.push_back(internal_data[i]);
value_t v1{value + scale_prec, scale_prec};
value_t v2{value - scale_prec, scale_prec};
auto tmp_v1 = std::make_pair(v1, dest_size);
auto tmp_v2 = std::make_pair(v2, dest_size);
data_.insert(tmp_v1);
this->insert_list_.push_back(tmp_v1);
data_.insert(tmp_v2);
this->insert_list_.push_back(tmp_v2);
}
if (i > 100) {
// 如果细分的数据量大于100个跳出循环
}
}
}
} catch (const std::exception& e) {
std::throw_with_nested(
mix_cc::Exception(-1, "sub divide data error", BOOST_CURRENT_LOCATION));
}
return 0;
}
/**
* @brief
*
* 1200010
* 1010
* ********************************************************
* @warning
* 20000
* *******************************************************
* @return int
*/
int LossCompressSingle::data_merge() {
try {
exchange_data_t internal_data;
for (auto x : data_) {
internal_data.push_back(x);
}
// 把2*[位置-数据量]信息存储在临时数组内,位置为其后要合并得到两个数组位置
std::vector<std::tuple<int, size_t>> counts;
for (size_t i = 0; i < internal_data.size() - 1; i++) {
counts.push_back(std::make_tuple(
i, internal_data[i].second + internal_data[i + 1].second));
}
// 对3*数据量信息排序,获得获得可以进行合并的区间值
std::sort(counts.begin(), counts.end(),
[](const std::tuple<int, size_t>& left,
const std::tuple<int, size_t>& right) {
return std::get<1>(left) < std::get<1>(right);
});
// 一次只合并10%最多300个数据如果这些数据都不符合合并规则即合并后的精度>10)则不进行合并
for (size_t i = 0; i < counts.size(); i++) {
if (i > counts.size() / 10 || i > 300) {
break;
}
auto dest_index = std::get<0>(counts[i]);
auto tmp1 = internal_data[dest_index];
auto tmp2 = internal_data[dest_index + 1];
double total_prec = tmp1.first.precision + tmp1.first.precision;
if (total_prec < 10) {
// 如果某三个区域发生了合并则i向下之间走过三个合并的区域防止再次操作合并的区域
// 如果
if (data_.find(tmp1.first) != data_.end() &&
data_.find(tmp2.first) != data_.end()) {
this->data_.erase(tmp1.first);
this->delete_list_.push_back(tmp1);
this->data_.erase(tmp1.first);
this->delete_list_.push_back(tmp1);
value_t tmp_v = {std::floor((tmp1.first.value + tmp2.first.value) /
(2 * total_prec)) *
total_prec,
total_prec};
auto tmp_pair = std::make_pair(tmp_v, tmp1.second + tmp2.second);
this->data_.insert(tmp_pair);
this->insert_list_.push_back(tmp_pair);
}
}
}
} catch (const std::exception& e) {
std::throw_with_nested(
mix_cc::Exception(-1, "merge data error", BOOST_CURRENT_LOCATION));
}
return 0;
}
/**
* @brief
* <1
*
* 使floor函数
* @return std::vector<double>
*/
std::vector<double> LossCompressSingle::decompress_data() {
try {
std::random_device r;
std::seed_seq seed2{r(), r(), r(), r(), r(), r(), r()};
std::mt19937 e2(seed2);
std::vector<double> ret_data;
for (auto x : data_) {
std::normal_distribution<> normal_dist(x.first.value, x.first.precision);
for (size_t i = 0; i < std::floor(scale_ * x.second); i++) {
ret_data.emplace_back(std::round(normal_dist(e2)));
}
}
return ret_data;
} catch (const std::exception& e) {
std::throw_with_nested(
mix_cc::Exception(-1, "decompress data error", BOOST_CURRENT_LOCATION));
}
return {};
}
/**
* @brief
*
* @return std::vector<double>
*/
// std::vector<double> LossCompressSingle::decompress_data_full() {
// std::random_device r;
// std::seed_seq seed2{r(), r(), r(), r(), r(), r(), r()};
// std::mt19937 e2(seed2);
// std::vector<double> ret_data;
// ret_data.reserve(total_size_);
// for (auto x : data_) {
// std::normal_distribution<> normal_dist(x.first.value, x.first.precision);
// for (size_t i = 0; i < x.second; i++) {
// ret_data.emplace_back(std::round(normal_dist(e2)));
// }
// }
// return ret_data;
// }
/**
* @brief
* @param value
* @return int
*/
int LossCompressSingle::Store(double value) {
this->data_to_commit_.push_back(value);
return 0;
}
/**
* @brief ,
* @param data
* @return int
*/
int LossCompressSingle::set_data(const exchange_data_t& data) {
try {
size_t min_size = std::numeric_limits<size_t>::max();
size_t max_size = 0;
for (auto x : data) {
size_t count = std ::get<1>(x);
data_[std::get<0>(x)] = count;
total_size_ += count;
if (min_size > count) {
min_size = count;
}
if (max_size < count) {
max_size = count;
}
}
double scale = dest_decompress_size / total_size_;
// 如果最小占比的在计算完成后数量小于0.01,则把压缩后数据*10
if (scale * min_size < 0.01) {
scale = scale * 10;
}
this->scale_ = scale;
} catch (const std::exception& e) {
std::throw_with_nested(
mix_cc::Exception(-1, "set data error", BOOST_CURRENT_LOCATION));
}
return 0;
}
int LossCompressSingle::commit() {
try {
// 如果数据量大于20000则停止插入操作
if (this->data_.size() > 20000) {
return -1;
}
if (is_first_commit_) {
if (first_commit_counts_ != 9) {
first_commit_counts_++;
return 0;
}
// reduce data size to < 5000
// get ordered list
std::sort(data_to_commit_.begin(), data_to_commit_.end());
// example: [1.1,1.3,1.8,3.6,4.0] [0 0.1000x 0.20000x 0.3 .. 1000]
// get delta
std::vector<double> delta;
for (size_t i = 0; i < data_to_commit_.size() - 1; i++) {
delta.push_back(data_to_commit_[i + 1] - data_to_commit_[i]);
}
// example: [0.2,0.5,1.8,0.4] [0.10000001, ... 0.1]
// remove sides values at the same time
for (size_t i = 0; i < delta.size() - 1; i++) {
auto prec = (delta[i] + delta[i + 1]) / 2;
if (prec < 0.1) {
prec = 0.1;
} else if (prec > 10) {
prec = 10;
}
value_t pr_dv{std::floor(data_to_commit_[i + 1] / prec) * prec, prec};
auto iter = data_.find(pr_dv);
if (iter != data_.end()) {
iter->second++;
this->update_list_.insert(*iter);
} else {
// insert pr_dv to dest map
auto pair = std::make_pair(pr_dv, 1);
data_.emplace(pair);
this->insert_list_.push_back(pair);
}
}
is_first_commit_ = false;
} else {
if (data_.empty()) {
is_first_commit_ = true;
first_commit_counts_ = 0;
return 0;
}
double prec;
auto diff = abs(data_.rbegin()->first.get_right() -
data_.begin()->first.get_left());
if (diff > 10000) {
prec = 10;
} else if (diff < 100) {
prec = 0.1;
} else {
prec = diff / 1000;
}
for (const auto& x : data_to_commit_) {
value_t tmp{std::floor(x / prec) * prec, prec};
auto iter = data_.find(tmp);
if (iter == data_.end()) {
auto pair = std::make_pair(tmp, 1);
data_.insert(pair);
this->insert_list_.push_back(pair);
} else {
iter->second++;
this->update_list_.insert(*iter);
}
}
}
data_to_commit_.clear();
} catch (const std::exception& e) {
std::throw_with_nested(
mix_cc::Exception(-1, "commit data error", BOOST_CURRENT_LOCATION));
}
return 0;
}
LossCompressSingle::exchange_data_t LossCompressSingle::receive_update_list() {
exchange_data_t ret;
for (auto x : update_list_) {
ret.push_back(x);
}
update_list_.clear();
return ret;
}
LossCompressSingle::exchange_data_t LossCompressSingle::receive_insert_list() {
exchange_data_t ret = insert_list_;
insert_list_.clear();
return ret;
}
LossCompressSingle::exchange_data_t LossCompressSingle::receive_delete_list() {
exchange_data_t ret = delete_list_;
delete_list_.clear();
return ret;
}
LossCompressSingle::exchange_data_t LossCompressSingle::get_ordered_compress_pack() {
exchange_data_t ret;
for (auto x : data_) {
ret.push_back(x);
}
return ret;
}
size_t LossCompressSingle::get_total_size() { return this->total_size_; }
size_t LossCompressSingle::get_scaled_size() { return scale_ * total_size_; }
LossCompressSingle::LossCompressSingle(/* args */) : total_size_(0), scale_(0) {}
LossCompressSingle::~LossCompressSingle() {}
} // namespace distribution