417 lines
13 KiB
C++
417 lines
13 KiB
C++
/**
|
||
* @file mix_cc/ihyper_db/query_batch.h
|
||
* @brief ihyperDB的查询集
|
||
* @author Cat (null.null.null@qq.com)
|
||
* @version 0.1
|
||
* @date 2021-08-18
|
||
*
|
||
* Copyright: Baosight Co. Ltd.
|
||
* DO NOT COPY/USE WITHOUT PERMISSION
|
||
*
|
||
*/
|
||
#pragma once
|
||
|
||
#include "mix_cc/debug.h"
|
||
#include "mix_cc/exception.h"
|
||
#include "mix_cc/ihyper_db/connection.h"
|
||
#include "mix_cc/ihyper_db/read_tag_info.h"
|
||
#include "mix_cc/ihyper_db/utility.h"
|
||
#include "mix_cc/type/range.h"
|
||
#include <boost/optional.hpp>
|
||
#include <eigen3/Eigen/Eigen>
|
||
#include <algorithm>
|
||
#include <functional>
|
||
#include <future>
|
||
#include <iostream>
|
||
#include <string>
|
||
#include <tuple>
|
||
#include <utility>
|
||
#include <vector>
|
||
|
||
namespace mix_cc {
|
||
namespace ihd {
|
||
|
||
using namespace std::chrono;
|
||
/**
|
||
* @brief ihdb查询集类,用于执行最终的数据查询
|
||
* @tparam T
|
||
*/
|
||
template <class T>
|
||
struct QueryBatch {
|
||
/**
|
||
* @brief 创建一个查询集合
|
||
* @param tag_infos tagid 和 tag点名(tag点名用于输出错误信息)
|
||
* @param time_range 获得对应的查询时间
|
||
* @param time_interval 查询的时间间隔
|
||
*/
|
||
QueryBatch(std::vector<std::tuple<int32_t, std::string>> tag_infos,
|
||
time_range_t time_range, TimeDur time_interval)
|
||
: interval_(time_interval), query_all_(true) {
|
||
for (auto tag_info : tag_infos) {
|
||
query_info_units_.emplace_back(QueryInfoUnit{
|
||
std::get<0>(tag_info), std::get<1>(tag_info), 0, time_range});
|
||
}
|
||
}
|
||
|
||
QueryBatch(QueryBatch<T>& rhs)
|
||
: query_info_units_(rhs.query_info_units_),
|
||
interval_(rhs.interval_),
|
||
query_all_(rhs.query_all_) {}
|
||
|
||
QueryBatch(QueryBatch<T>&& rhs)
|
||
: query_info_units_(rhs.query_info_units_),
|
||
interval_(rhs.interval_),
|
||
query_all_(rhs.query_all_) {}
|
||
|
||
struct QueryInfoUnit {
|
||
int32_t tag_id;
|
||
std::string tag_name;
|
||
uint64_t remain_batch_size;
|
||
time_range_t time_range;
|
||
};
|
||
|
||
typedef Eigen::Matrix<T, Eigen::Dynamic, Eigen::Dynamic> Mat2d;
|
||
|
||
typedef std::tuple<std::vector<TimePoint>, Mat2d> MatCtime;
|
||
|
||
std::vector<QueryInfoUnit> query_info_units_;
|
||
TimeDur interval_;
|
||
bool query_all_;
|
||
};
|
||
|
||
|
||
namespace {
|
||
|
||
/**
|
||
* @brief Get the count leave
|
||
* @param tag_index tag点索引
|
||
* @return uint64_t
|
||
*/
|
||
template <typename T>
|
||
uint64_t get_count_leave(QueryBatch<T>* const ptr, int tag_index) {
|
||
return duration_cast<milliseconds>(
|
||
ptr->query_info_units_[tag_index].time_range.get_right() -
|
||
ptr->query_info_units_[tag_index].time_range.get_left())
|
||
.count() /
|
||
duration_cast<milliseconds>(ptr->interval_).count();
|
||
}
|
||
|
||
/**
|
||
* @brief 得到指定索引的查询时间
|
||
* @param tag_index default is 0
|
||
* @return std::vector<double>
|
||
*/
|
||
template <typename T>
|
||
std::vector<TimePoint> get_query_time(QueryBatch<T>* const ptr,
|
||
int tag_index = 0) {
|
||
TimePoint begin;
|
||
TimePoint end;
|
||
size_t count = 0;
|
||
try {
|
||
begin = ptr->query_info_units_[tag_index].time_range.get_left();
|
||
end = ptr->query_info_units_[tag_index].time_range.get_right();
|
||
std::vector<TimePoint> ret;
|
||
count = get_count_leave(ptr, tag_index);
|
||
ret.reserve(count);
|
||
for (auto i = 0; begin + i * ptr->interval_ <= (end - ptr->interval_);
|
||
++i) {
|
||
ret.push_back(begin + i * ptr->interval_);
|
||
}
|
||
return ret;
|
||
} catch (...) {
|
||
throw_with_nested(mix_cc::Exception(
|
||
-1,
|
||
"query time calc error, from:" +
|
||
std::to_string(mix_time_t(begin).to_milliseconds()) +
|
||
" , to:" + std::to_string(mix_time_t(end).to_milliseconds()) +
|
||
", count is:" + std::to_string(get_count_leave(ptr, tag_index)),
|
||
BOOST_CURRENT_LOCATION));
|
||
}
|
||
}
|
||
|
||
/**
|
||
* @brief
|
||
* 解析查询,把hd类型的数据转换为stl vector类型
|
||
* @param vec_data_ptr
|
||
* @param record_queried
|
||
* @param query_size
|
||
* @return
|
||
*/
|
||
template <typename T>
|
||
int decode_query(std::vector<T>* const vec_data_ptr,
|
||
HD3Record* const record_queried, int* const error_codes,
|
||
size_t query_size) {
|
||
if (query_size != 0) {
|
||
for (size_t i = 0; i < query_size; i++) {
|
||
if (error_codes[i] != RD_SUCCESS) {
|
||
throw mix_cc::Exception(
|
||
error_codes[i],
|
||
"query tag stats failure,error code :" +
|
||
std::to_string(error_codes[i]) + ", time is:" +
|
||
mix_cc::mix_time_t(time_t(record_queried[i].nSec))
|
||
.to_formatted_time(),
|
||
BOOST_CURRENT_LOCATION);
|
||
}
|
||
vec_data_ptr->template emplace_back(record_queried[i].NumberValue());
|
||
} // for-loop
|
||
}
|
||
return 0;
|
||
}
|
||
|
||
/**
|
||
* @brief 最终查询某个tag点在规定时间内数据的函数
|
||
* @param vec_data_ptr 元数据指针
|
||
* @param tag_index tag点索引
|
||
* @return int
|
||
*/
|
||
template <typename T>
|
||
int query_one_tag(QueryBatch<T>* const ptr, std::vector<T>* const vec_data_ptr,
|
||
int32_t tag_index) {
|
||
// predefined query size(default)
|
||
uint64_t predefined_query_size = 40000;
|
||
HD3Record* records_queried = nullptr;
|
||
int32* error_codes = nullptr;
|
||
try {
|
||
if (!ptr->query_info_units_[tag_index].time_range.valid()) {
|
||
throw(Exception(-2, "query time invalid", BOOST_CURRENT_LOCATION));
|
||
}
|
||
int64_t count_leave = get_count_leave(ptr, tag_index);
|
||
if (count_leave > 2000000) {
|
||
predefined_query_size = 5000;
|
||
} else if (count_leave > 1000000) {
|
||
predefined_query_size = 10000;
|
||
} else if (count_leave > 500000) {
|
||
predefined_query_size = 20000;
|
||
} else if (count_leave > 250000) {
|
||
predefined_query_size = 40000;
|
||
}
|
||
|
||
// if query all data, query_info_units_[tag_index].remain_batch_size would
|
||
// be count leave(just query all)
|
||
if (ptr->query_all_) {
|
||
ptr->query_info_units_[tag_index].remain_batch_size = count_leave;
|
||
}
|
||
|
||
size_t once_query_size = predefined_query_size;
|
||
|
||
while (once_query_size == predefined_query_size) {
|
||
if (ptr->query_info_units_[tag_index].remain_batch_size >
|
||
predefined_query_size) {
|
||
ptr->query_info_units_[tag_index].remain_batch_size -=
|
||
predefined_query_size;
|
||
} else {
|
||
once_query_size = ptr->query_info_units_[tag_index].remain_batch_size;
|
||
ptr->query_info_units_[tag_index].remain_batch_size = 0;
|
||
}
|
||
records_queried = new HD3Record[once_query_size];
|
||
memset(records_queried, 0x00, sizeof(HD3Record) * once_query_size);
|
||
error_codes = new int32[once_query_size];
|
||
memset(error_codes, 0x00, sizeof(int32) * once_query_size);
|
||
for (size_t i = 0; i < once_query_size; i++) {
|
||
auto tmp_HD = convert_to_hd_tp(
|
||
ptr->query_info_units_[tag_index].time_range.get_left());
|
||
records_queried[i].nSec = tmp_HD.nSec;
|
||
records_queried[i].nMsec = tmp_HD.nMsec;
|
||
ptr->query_info_units_[tag_index].time_range.set_left(
|
||
ptr->query_info_units_[tag_index].time_range.get_left() +
|
||
ptr->interval_);
|
||
}
|
||
ar3_query_interp_records_by_mode(
|
||
// HD3_REC_INTERP_QUERY_MODE::HD3_REC_INTERP_QUERY_MODE_LINEAR,
|
||
HD3_REC_INTERP_QUERY_MODE::HD3_REC_INTERP_QUERY_MODE_PREV,
|
||
ptr->query_info_units_[tag_index].tag_id, once_query_size,
|
||
records_queried, error_codes);
|
||
decode_query(vec_data_ptr, records_queried, error_codes, once_query_size);
|
||
delete[] records_queried;
|
||
records_queried = nullptr;
|
||
delete[] error_codes;
|
||
error_codes = nullptr;
|
||
}
|
||
} catch (...) {
|
||
if (records_queried != nullptr) {
|
||
delete[] records_queried;
|
||
}
|
||
if (error_codes != nullptr) {
|
||
delete[] error_codes;
|
||
}
|
||
throw(Exception(
|
||
-1,
|
||
"UNKNOWN error dur ihd query, please check time param, tag name is " +
|
||
ptr->query_info_units_[tag_index].tag_name + " time range is [" +
|
||
mix_time_t(ptr->query_info_units_[tag_index].time_range.get_left())
|
||
.to_formatted_time() +
|
||
"," +
|
||
mix_time_t(ptr->query_info_units_[tag_index].time_range.get_right())
|
||
.to_formatted_time() +
|
||
")",
|
||
BOOST_CURRENT_LOCATION));
|
||
}
|
||
return 0;
|
||
}
|
||
|
||
/**
|
||
* @brief Core Query (All Count)
|
||
* @param vec_data_ptr 元数据指针
|
||
* @return uint32_t
|
||
*/
|
||
template <typename T>
|
||
uint32_t execute_query(QueryBatch<T>* const ptr,
|
||
std::vector<T>* const vec_data_ptr) {
|
||
ptr->query_all_ = true;
|
||
for (size_t i = 0; i < ptr->query_info_units_.size(); ++i) {
|
||
query_one_tag(ptr, vec_data_ptr, i);
|
||
}
|
||
return 0;
|
||
}
|
||
|
||
/**
|
||
* @brief Core Query (Spefic Count)
|
||
* @param vec_data_ptr 元数据指针
|
||
* @param batch_size 查询大小
|
||
* @return uint32_t
|
||
*/
|
||
template <typename T>
|
||
uint32_t execute_query(QueryBatch<T>* const ptr,
|
||
std::vector<T>* const vec_data_ptr, size_t batch_size) {
|
||
for (size_t i = 0; i < ptr->query_info_units_.size(); ++i) {
|
||
ptr->query_info_units_[i].remain_batch_size = batch_size;
|
||
query_one_tag(ptr, vec_data_ptr, i);
|
||
}
|
||
return 0;
|
||
}
|
||
} // namespace
|
||
|
||
/**
|
||
* @brief 根据参数创建一个查询集合
|
||
* @tparam T
|
||
* @param tag_names tag点名
|
||
* @param time_range 时间范围
|
||
* @param time_interval 查询数据的时间间隔(ms)
|
||
* @return QueryBatch<T>
|
||
*/
|
||
template <typename T = double>
|
||
auto make_query_batch_maybe(const std::vector<std::string>& tag_names,
|
||
const time_range_t& time_range,
|
||
TimeDur time_interval = 50ms)
|
||
-> maybe<QueryBatch<T>> {
|
||
auto tag_infos = std::vector<std::tuple<int32_t, std::string>>();
|
||
for (auto tag_name : tag_names) {
|
||
auto tag_maybe = read_tag_info_maybe(tag_name);
|
||
if (tag_maybe.is_just()) {
|
||
tag_infos.emplace_back(
|
||
std::make_tuple(tag_maybe.unsafe_get_just().nTagID, tag_name));
|
||
} else {
|
||
DEBUG_PRINT_MIX_CC(
|
||
"QueryBath make failure because can't find DESIRE TAG NAME");
|
||
return {};
|
||
}
|
||
}
|
||
if (tag_infos.empty()) {
|
||
return {};
|
||
}
|
||
return QueryBatch<T>(tag_infos, time_range, time_interval);
|
||
}
|
||
|
||
/**
|
||
* @brief
|
||
* 以规定的查询大小查询数据(查询大小默认为0,查询所有数据)
|
||
* @param batch_size
|
||
* @return
|
||
*/
|
||
template <typename T>
|
||
auto read_data_maybe(QueryBatch<T>* const ptr, size_t batch_size = 0)
|
||
-> maybe<typename QueryBatch<T>::Mat2d> {
|
||
size_t row_count = 0, col_count = 0;
|
||
try {
|
||
std::vector<T> origin_vector;
|
||
col_count = ptr->query_info_units_.size();
|
||
if (batch_size <= 0) {
|
||
row_count = get_count_leave(ptr, 0);
|
||
execute_query(ptr, &origin_vector);
|
||
} else {
|
||
row_count = batch_size;
|
||
execute_query(ptr, &origin_vector, row_count);
|
||
}
|
||
typename QueryBatch<T>::Mat2d tmp =
|
||
Eigen::Map<typename QueryBatch<T>::Mat2d>(origin_vector.data(),
|
||
row_count, col_count);
|
||
return tmp;
|
||
} catch (const std::exception& e) {
|
||
DEBUG_PRINT_MIX_CC(e.what());
|
||
return {};
|
||
} catch (...) {
|
||
DEBUG_PRINT_MIX_CC("unknown error in ihd query");
|
||
}
|
||
return {};
|
||
}
|
||
|
||
/**
|
||
* @brief
|
||
* 以规定的查询大小查询数据,并返回时间(查询大小默认为0,查询所有数据)
|
||
*插值
|
||
* @return time1, dataA1, dataB1,... ,dataZ1\
|
||
* time2, dataA2, dataB2,... ,dataZ2
|
||
*/
|
||
template <typename T>
|
||
auto read_data_with_time_maybe(QueryBatch<T>* const ptr, size_t batch_size = 0)
|
||
-> maybe<typename QueryBatch<T>::MatCtime> {
|
||
size_t row_count = 0, col_count = 0;
|
||
try {
|
||
std::vector<T> origin_vector;
|
||
auto time_list = get_query_time(ptr);
|
||
col_count = ptr->query_info_units_.size();
|
||
if (batch_size <= 0) {
|
||
row_count = get_count_leave(ptr, 0);
|
||
execute_query(ptr, &origin_vector);
|
||
} else {
|
||
row_count = batch_size;
|
||
execute_query(ptr, &origin_vector, row_count);
|
||
}
|
||
typename QueryBatch<T>::Mat2d tmp =
|
||
Eigen::Map<typename QueryBatch<T>::Mat2d>(&origin_vector.data()[0],
|
||
row_count, col_count);
|
||
return fp::maybe(std::make_tuple(time_list, tmp));
|
||
} catch (const std::exception& e) {
|
||
DEBUG_PRINT_MIX_CC(e.what());
|
||
return fp::nothing<typename QueryBatch<T>::MatCtime>();
|
||
} catch (...) {
|
||
DEBUG_PRINT_MIX_CC("unknown error in query with time");
|
||
return fp::nothing<typename QueryBatch<T>::MatCtime>();
|
||
}
|
||
return fp::nothing<typename QueryBatch<T>::MatCtime>();
|
||
}
|
||
|
||
|
||
template <typename T>
|
||
auto read_raw_data_with_time_maybe(QueryBatch<T>* const ptr, size_t batch_size = 0)
|
||
-> maybe<typename QueryBatch<T>::MatCtime> {
|
||
size_t row_count = 0, col_count = 0;
|
||
try {
|
||
std::vector<T> origin_vector;
|
||
auto time_list = get_query_time(ptr);
|
||
col_count = ptr->query_info_units_.size();
|
||
if (batch_size <= 0) {
|
||
row_count = get_count_leave(ptr, 0);
|
||
execute_query(ptr, &origin_vector);
|
||
} else {
|
||
row_count = batch_size;
|
||
execute_query(ptr, &origin_vector, row_count);
|
||
}
|
||
typename QueryBatch<T>::Mat2d tmp =
|
||
Eigen::Map<typename QueryBatch<T>::Mat2d>(&origin_vector.data()[0],
|
||
row_count, col_count);
|
||
return fp::maybe(std::make_tuple(time_list, tmp));
|
||
} catch (const std::exception& e) {
|
||
DEBUG_PRINT_MIX_CC(e.what());
|
||
return fp::nothing<typename QueryBatch<T>::MatCtime>();
|
||
} catch (...) {
|
||
DEBUG_PRINT_MIX_CC("unknown error in query with time");
|
||
return fp::nothing<typename QueryBatch<T>::MatCtime>();
|
||
}
|
||
return fp::nothing<typename QueryBatch<T>::MatCtime>();
|
||
}
|
||
|
||
|
||
} // namespace ihd
|
||
} // namespace mix_cc
|