eis/mix_cc/ihyper_db/query_batch.h

417 lines
13 KiB
C
Raw Normal View History

/**
* @file mix_cc/ihyper_db/query_batch.h
* @brief ihyperDB的查询集
* @author Cat (null.null.null@qq.com)
* @version 0.1
* @date 2021-08-18
*
* Copyright: Baosight Co. Ltd.
* DO NOT COPY/USE WITHOUT PERMISSION
*
*/
#pragma once
#include "mix_cc/debug.h"
#include "mix_cc/exception.h"
#include "mix_cc/ihyper_db/connection.h"
#include "mix_cc/ihyper_db/read_tag_info.h"
#include "mix_cc/ihyper_db/utility.h"
#include "mix_cc/type/range.h"
#include <boost/optional.hpp>
#include <eigen3/Eigen/Eigen>
#include <algorithm>
#include <functional>
#include <future>
#include <iostream>
#include <string>
#include <tuple>
#include <utility>
#include <vector>
namespace mix_cc {
namespace ihd {
using namespace std::chrono;
/**
* @brief ihdb查询集类
* @tparam T
*/
template <class T>
struct QueryBatch {
/**
* @brief
* @param tag_infos tagid tag点名tag点名用于输出错误信息
* @param time_range
* @param time_interval
*/
QueryBatch(std::vector<std::tuple<int32_t, std::string>> tag_infos,
time_range_t time_range, TimeDur time_interval)
: interval_(time_interval), query_all_(true) {
for (auto tag_info : tag_infos) {
query_info_units_.emplace_back(QueryInfoUnit{
std::get<0>(tag_info), std::get<1>(tag_info), 0, time_range});
}
}
QueryBatch(QueryBatch<T>& rhs)
: query_info_units_(rhs.query_info_units_),
interval_(rhs.interval_),
query_all_(rhs.query_all_) {}
QueryBatch(QueryBatch<T>&& rhs)
: query_info_units_(rhs.query_info_units_),
interval_(rhs.interval_),
query_all_(rhs.query_all_) {}
struct QueryInfoUnit {
int32_t tag_id;
std::string tag_name;
uint64_t remain_batch_size;
time_range_t time_range;
};
typedef Eigen::Matrix<T, Eigen::Dynamic, Eigen::Dynamic> Mat2d;
typedef std::tuple<std::vector<TimePoint>, Mat2d> MatCtime;
std::vector<QueryInfoUnit> query_info_units_;
TimeDur interval_;
bool query_all_;
};
namespace {
/**
* @brief Get the count leave
* @param tag_index tag点索引
* @return uint64_t
*/
template <typename T>
uint64_t get_count_leave(QueryBatch<T>* const ptr, int tag_index) {
return duration_cast<milliseconds>(
ptr->query_info_units_[tag_index].time_range.get_right() -
ptr->query_info_units_[tag_index].time_range.get_left())
.count() /
duration_cast<milliseconds>(ptr->interval_).count();
}
/**
* @brief
* @param tag_index default is 0
* @return std::vector<double>
*/
template <typename T>
std::vector<TimePoint> get_query_time(QueryBatch<T>* const ptr,
int tag_index = 0) {
TimePoint begin;
TimePoint end;
size_t count = 0;
try {
begin = ptr->query_info_units_[tag_index].time_range.get_left();
end = ptr->query_info_units_[tag_index].time_range.get_right();
std::vector<TimePoint> ret;
count = get_count_leave(ptr, tag_index);
ret.reserve(count);
for (auto i = 0; begin + i * ptr->interval_ <= (end - ptr->interval_);
++i) {
ret.push_back(begin + i * ptr->interval_);
}
return ret;
} catch (...) {
throw_with_nested(mix_cc::Exception(
-1,
"query time calc error, from:" +
std::to_string(mix_time_t(begin).to_milliseconds()) +
" , to:" + std::to_string(mix_time_t(end).to_milliseconds()) +
", count is:" + std::to_string(get_count_leave(ptr, tag_index)),
BOOST_CURRENT_LOCATION));
}
}
/**
* @brief
* hd类型的数据转换为stl vector类型
* @param vec_data_ptr
* @param record_queried
* @param query_size
* @return
*/
template <typename T>
int decode_query(std::vector<T>* const vec_data_ptr,
HD3Record* const record_queried, int* const error_codes,
size_t query_size) {
if (query_size != 0) {
for (size_t i = 0; i < query_size; i++) {
if (error_codes[i] != RD_SUCCESS) {
throw mix_cc::Exception(
error_codes[i],
"query tag stats failure,error code :" +
std::to_string(error_codes[i]) + ", time is:" +
mix_cc::mix_time_t(time_t(record_queried[i].nSec))
.to_formatted_time(),
BOOST_CURRENT_LOCATION);
}
vec_data_ptr->template emplace_back(record_queried[i].NumberValue());
} // for-loop
}
return 0;
}
/**
* @brief tag点在规定时间内数据的函数
* @param vec_data_ptr
* @param tag_index tag点索引
* @return int
*/
template <typename T>
int query_one_tag(QueryBatch<T>* const ptr, std::vector<T>* const vec_data_ptr,
int32_t tag_index) {
// predefined query size(default)
uint64_t predefined_query_size = 40000;
HD3Record* records_queried = nullptr;
int32* error_codes = nullptr;
try {
if (!ptr->query_info_units_[tag_index].time_range.valid()) {
throw(Exception(-2, "query time invalid", BOOST_CURRENT_LOCATION));
}
int64_t count_leave = get_count_leave(ptr, tag_index);
if (count_leave > 2000000) {
predefined_query_size = 5000;
} else if (count_leave > 1000000) {
predefined_query_size = 10000;
} else if (count_leave > 500000) {
predefined_query_size = 20000;
} else if (count_leave > 250000) {
predefined_query_size = 40000;
}
// if query all data, query_info_units_[tag_index].remain_batch_size would
// be count leave(just query all)
if (ptr->query_all_) {
ptr->query_info_units_[tag_index].remain_batch_size = count_leave;
}
size_t once_query_size = predefined_query_size;
while (once_query_size == predefined_query_size) {
if (ptr->query_info_units_[tag_index].remain_batch_size >
predefined_query_size) {
ptr->query_info_units_[tag_index].remain_batch_size -=
predefined_query_size;
} else {
once_query_size = ptr->query_info_units_[tag_index].remain_batch_size;
ptr->query_info_units_[tag_index].remain_batch_size = 0;
}
records_queried = new HD3Record[once_query_size];
memset(records_queried, 0x00, sizeof(HD3Record) * once_query_size);
error_codes = new int32[once_query_size];
memset(error_codes, 0x00, sizeof(int32) * once_query_size);
for (size_t i = 0; i < once_query_size; i++) {
auto tmp_HD = convert_to_hd_tp(
ptr->query_info_units_[tag_index].time_range.get_left());
records_queried[i].nSec = tmp_HD.nSec;
records_queried[i].nMsec = tmp_HD.nMsec;
ptr->query_info_units_[tag_index].time_range.set_left(
ptr->query_info_units_[tag_index].time_range.get_left() +
ptr->interval_);
}
ar3_query_interp_records_by_mode(
// HD3_REC_INTERP_QUERY_MODE::HD3_REC_INTERP_QUERY_MODE_LINEAR,
HD3_REC_INTERP_QUERY_MODE::HD3_REC_INTERP_QUERY_MODE_PREV,
ptr->query_info_units_[tag_index].tag_id, once_query_size,
records_queried, error_codes);
decode_query(vec_data_ptr, records_queried, error_codes, once_query_size);
delete[] records_queried;
records_queried = nullptr;
delete[] error_codes;
error_codes = nullptr;
}
} catch (...) {
if (records_queried != nullptr) {
delete[] records_queried;
}
if (error_codes != nullptr) {
delete[] error_codes;
}
throw(Exception(
-1,
"UNKNOWN error dur ihd query, please check time param, tag name is " +
ptr->query_info_units_[tag_index].tag_name + " time range is [" +
mix_time_t(ptr->query_info_units_[tag_index].time_range.get_left())
.to_formatted_time() +
"," +
mix_time_t(ptr->query_info_units_[tag_index].time_range.get_right())
.to_formatted_time() +
")",
BOOST_CURRENT_LOCATION));
}
return 0;
}
/**
* @brief Core Query (All Count)
* @param vec_data_ptr
* @return uint32_t
*/
template <typename T>
uint32_t execute_query(QueryBatch<T>* const ptr,
std::vector<T>* const vec_data_ptr) {
ptr->query_all_ = true;
for (size_t i = 0; i < ptr->query_info_units_.size(); ++i) {
query_one_tag(ptr, vec_data_ptr, i);
}
return 0;
}
/**
* @brief Core Query (Spefic Count)
* @param vec_data_ptr
* @param batch_size
* @return uint32_t
*/
template <typename T>
uint32_t execute_query(QueryBatch<T>* const ptr,
std::vector<T>* const vec_data_ptr, size_t batch_size) {
for (size_t i = 0; i < ptr->query_info_units_.size(); ++i) {
ptr->query_info_units_[i].remain_batch_size = batch_size;
query_one_tag(ptr, vec_data_ptr, i);
}
return 0;
}
} // namespace
/**
* @brief
* @tparam T
* @param tag_names tag点名
* @param time_range
* @param time_interval (ms)
* @return QueryBatch<T>
*/
template <typename T = double>
auto make_query_batch_maybe(const std::vector<std::string>& tag_names,
const time_range_t& time_range,
TimeDur time_interval = 50ms)
-> maybe<QueryBatch<T>> {
auto tag_infos = std::vector<std::tuple<int32_t, std::string>>();
for (auto tag_name : tag_names) {
auto tag_maybe = read_tag_info_maybe(tag_name);
if (tag_maybe.is_just()) {
tag_infos.emplace_back(
std::make_tuple(tag_maybe.unsafe_get_just().nTagID, tag_name));
} else {
DEBUG_PRINT_MIX_CC(
"QueryBath make failure because can't find DESIRE TAG NAME");
return {};
}
}
if (tag_infos.empty()) {
return {};
}
return QueryBatch<T>(tag_infos, time_range, time_interval);
}
/**
* @brief
* 0
* @param batch_size
* @return
*/
template <typename T>
auto read_data_maybe(QueryBatch<T>* const ptr, size_t batch_size = 0)
-> maybe<typename QueryBatch<T>::Mat2d> {
size_t row_count = 0, col_count = 0;
try {
std::vector<T> origin_vector;
col_count = ptr->query_info_units_.size();
if (batch_size <= 0) {
row_count = get_count_leave(ptr, 0);
execute_query(ptr, &origin_vector);
} else {
row_count = batch_size;
execute_query(ptr, &origin_vector, row_count);
}
typename QueryBatch<T>::Mat2d tmp =
Eigen::Map<typename QueryBatch<T>::Mat2d>(origin_vector.data(),
row_count, col_count);
return tmp;
} catch (const std::exception& e) {
DEBUG_PRINT_MIX_CC(e.what());
return {};
} catch (...) {
DEBUG_PRINT_MIX_CC("unknown error in ihd query");
}
return {};
}
/**
* @brief
* 0
*
* @return time1, dataA1, dataB1,... ,dataZ1\
* time2, dataA2, dataB2,... ,dataZ2
*/
template <typename T>
auto read_data_with_time_maybe(QueryBatch<T>* const ptr, size_t batch_size = 0)
-> maybe<typename QueryBatch<T>::MatCtime> {
size_t row_count = 0, col_count = 0;
try {
std::vector<T> origin_vector;
auto time_list = get_query_time(ptr);
col_count = ptr->query_info_units_.size();
if (batch_size <= 0) {
row_count = get_count_leave(ptr, 0);
execute_query(ptr, &origin_vector);
} else {
row_count = batch_size;
execute_query(ptr, &origin_vector, row_count);
}
typename QueryBatch<T>::Mat2d tmp =
Eigen::Map<typename QueryBatch<T>::Mat2d>(&origin_vector.data()[0],
row_count, col_count);
return fp::maybe(std::make_tuple(time_list, tmp));
} catch (const std::exception& e) {
DEBUG_PRINT_MIX_CC(e.what());
return fp::nothing<typename QueryBatch<T>::MatCtime>();
} catch (...) {
DEBUG_PRINT_MIX_CC("unknown error in query with time");
return fp::nothing<typename QueryBatch<T>::MatCtime>();
}
return fp::nothing<typename QueryBatch<T>::MatCtime>();
}
template <typename T>
auto read_raw_data_with_time_maybe(QueryBatch<T>* const ptr, size_t batch_size = 0)
-> maybe<typename QueryBatch<T>::MatCtime> {
size_t row_count = 0, col_count = 0;
try {
std::vector<T> origin_vector;
auto time_list = get_query_time(ptr);
col_count = ptr->query_info_units_.size();
if (batch_size <= 0) {
row_count = get_count_leave(ptr, 0);
execute_query(ptr, &origin_vector);
} else {
row_count = batch_size;
execute_query(ptr, &origin_vector, row_count);
}
typename QueryBatch<T>::Mat2d tmp =
Eigen::Map<typename QueryBatch<T>::Mat2d>(&origin_vector.data()[0],
row_count, col_count);
return fp::maybe(std::make_tuple(time_list, tmp));
} catch (const std::exception& e) {
DEBUG_PRINT_MIX_CC(e.what());
return fp::nothing<typename QueryBatch<T>::MatCtime>();
} catch (...) {
DEBUG_PRINT_MIX_CC("unknown error in query with time");
return fp::nothing<typename QueryBatch<T>::MatCtime>();
}
return fp::nothing<typename QueryBatch<T>::MatCtime>();
}
} // namespace ihd
} // namespace mix_cc