eis/mix_cc/ihyper_db/query_batch.h

417 lines
13 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* @file mix_cc/ihyper_db/query_batch.h
* @brief ihyperDB的查询集
* @author Cat (null.null.null@qq.com)
* @version 0.1
* @date 2021-08-18
*
* Copyright: Baosight Co. Ltd.
* DO NOT COPY/USE WITHOUT PERMISSION
*
*/
#pragma once
#include "mix_cc/debug.h"
#include "mix_cc/exception.h"
#include "mix_cc/ihyper_db/connection.h"
#include "mix_cc/ihyper_db/read_tag_info.h"
#include "mix_cc/ihyper_db/utility.h"
#include "mix_cc/type/range.h"
#include <boost/optional.hpp>
#include <eigen3/Eigen/Eigen>
#include <algorithm>
#include <functional>
#include <future>
#include <iostream>
#include <string>
#include <tuple>
#include <utility>
#include <vector>
namespace mix_cc {
namespace ihd {
using namespace std::chrono;
/**
* @brief ihdb查询集类用于执行最终的数据查询
* @tparam T
*/
template <class T>
struct QueryBatch {
/**
* @brief 创建一个查询集合
* @param tag_infos tagid 和 tag点名tag点名用于输出错误信息
* @param time_range 获得对应的查询时间
* @param time_interval 查询的时间间隔
*/
QueryBatch(std::vector<std::tuple<int32_t, std::string>> tag_infos,
time_range_t time_range, TimeDur time_interval)
: interval_(time_interval), query_all_(true) {
for (auto tag_info : tag_infos) {
query_info_units_.emplace_back(QueryInfoUnit{
std::get<0>(tag_info), std::get<1>(tag_info), 0, time_range});
}
}
QueryBatch(QueryBatch<T>& rhs)
: query_info_units_(rhs.query_info_units_),
interval_(rhs.interval_),
query_all_(rhs.query_all_) {}
QueryBatch(QueryBatch<T>&& rhs)
: query_info_units_(rhs.query_info_units_),
interval_(rhs.interval_),
query_all_(rhs.query_all_) {}
struct QueryInfoUnit {
int32_t tag_id;
std::string tag_name;
uint64_t remain_batch_size;
time_range_t time_range;
};
typedef Eigen::Matrix<T, Eigen::Dynamic, Eigen::Dynamic> Mat2d;
typedef std::tuple<std::vector<TimePoint>, Mat2d> MatCtime;
std::vector<QueryInfoUnit> query_info_units_;
TimeDur interval_;
bool query_all_;
};
namespace {
/**
* @brief Get the count leave
* @param tag_index tag点索引
* @return uint64_t
*/
template <typename T>
uint64_t get_count_leave(QueryBatch<T>* const ptr, int tag_index) {
return duration_cast<milliseconds>(
ptr->query_info_units_[tag_index].time_range.get_right() -
ptr->query_info_units_[tag_index].time_range.get_left())
.count() /
duration_cast<milliseconds>(ptr->interval_).count();
}
/**
* @brief 得到指定索引的查询时间
* @param tag_index default is 0
* @return std::vector<double>
*/
template <typename T>
std::vector<TimePoint> get_query_time(QueryBatch<T>* const ptr,
int tag_index = 0) {
TimePoint begin;
TimePoint end;
size_t count = 0;
try {
begin = ptr->query_info_units_[tag_index].time_range.get_left();
end = ptr->query_info_units_[tag_index].time_range.get_right();
std::vector<TimePoint> ret;
count = get_count_leave(ptr, tag_index);
ret.reserve(count);
for (auto i = 0; begin + i * ptr->interval_ <= (end - ptr->interval_);
++i) {
ret.push_back(begin + i * ptr->interval_);
}
return ret;
} catch (...) {
throw_with_nested(mix_cc::Exception(
-1,
"query time calc error, from:" +
std::to_string(mix_time_t(begin).to_milliseconds()) +
" , to:" + std::to_string(mix_time_t(end).to_milliseconds()) +
", count is:" + std::to_string(get_count_leave(ptr, tag_index)),
BOOST_CURRENT_LOCATION));
}
}
/**
* @brief
* 解析查询把hd类型的数据转换为stl vector类型
* @param vec_data_ptr
* @param record_queried
* @param query_size
* @return
*/
template <typename T>
int decode_query(std::vector<T>* const vec_data_ptr,
HD3Record* const record_queried, int* const error_codes,
size_t query_size) {
if (query_size != 0) {
for (size_t i = 0; i < query_size; i++) {
if (error_codes[i] != RD_SUCCESS) {
throw mix_cc::Exception(
error_codes[i],
"query tag stats failure,error code :" +
std::to_string(error_codes[i]) + ", time is:" +
mix_cc::mix_time_t(time_t(record_queried[i].nSec))
.to_formatted_time(),
BOOST_CURRENT_LOCATION);
}
vec_data_ptr->template emplace_back(record_queried[i].NumberValue());
} // for-loop
}
return 0;
}
/**
* @brief 最终查询某个tag点在规定时间内数据的函数
* @param vec_data_ptr 元数据指针
* @param tag_index tag点索引
* @return int
*/
template <typename T>
int query_one_tag(QueryBatch<T>* const ptr, std::vector<T>* const vec_data_ptr,
int32_t tag_index) {
// predefined query size(default)
uint64_t predefined_query_size = 40000;
HD3Record* records_queried = nullptr;
int32* error_codes = nullptr;
try {
if (!ptr->query_info_units_[tag_index].time_range.valid()) {
throw(Exception(-2, "query time invalid", BOOST_CURRENT_LOCATION));
}
int64_t count_leave = get_count_leave(ptr, tag_index);
if (count_leave > 2000000) {
predefined_query_size = 5000;
} else if (count_leave > 1000000) {
predefined_query_size = 10000;
} else if (count_leave > 500000) {
predefined_query_size = 20000;
} else if (count_leave > 250000) {
predefined_query_size = 40000;
}
// if query all data, query_info_units_[tag_index].remain_batch_size would
// be count leave(just query all)
if (ptr->query_all_) {
ptr->query_info_units_[tag_index].remain_batch_size = count_leave;
}
size_t once_query_size = predefined_query_size;
while (once_query_size == predefined_query_size) {
if (ptr->query_info_units_[tag_index].remain_batch_size >
predefined_query_size) {
ptr->query_info_units_[tag_index].remain_batch_size -=
predefined_query_size;
} else {
once_query_size = ptr->query_info_units_[tag_index].remain_batch_size;
ptr->query_info_units_[tag_index].remain_batch_size = 0;
}
records_queried = new HD3Record[once_query_size];
memset(records_queried, 0x00, sizeof(HD3Record) * once_query_size);
error_codes = new int32[once_query_size];
memset(error_codes, 0x00, sizeof(int32) * once_query_size);
for (size_t i = 0; i < once_query_size; i++) {
auto tmp_HD = convert_to_hd_tp(
ptr->query_info_units_[tag_index].time_range.get_left());
records_queried[i].nSec = tmp_HD.nSec;
records_queried[i].nMsec = tmp_HD.nMsec;
ptr->query_info_units_[tag_index].time_range.set_left(
ptr->query_info_units_[tag_index].time_range.get_left() +
ptr->interval_);
}
ar3_query_interp_records_by_mode(
// HD3_REC_INTERP_QUERY_MODE::HD3_REC_INTERP_QUERY_MODE_LINEAR,
HD3_REC_INTERP_QUERY_MODE::HD3_REC_INTERP_QUERY_MODE_PREV,
ptr->query_info_units_[tag_index].tag_id, once_query_size,
records_queried, error_codes);
decode_query(vec_data_ptr, records_queried, error_codes, once_query_size);
delete[] records_queried;
records_queried = nullptr;
delete[] error_codes;
error_codes = nullptr;
}
} catch (...) {
if (records_queried != nullptr) {
delete[] records_queried;
}
if (error_codes != nullptr) {
delete[] error_codes;
}
throw(Exception(
-1,
"UNKNOWN error dur ihd query, please check time param, tag name is " +
ptr->query_info_units_[tag_index].tag_name + " time range is [" +
mix_time_t(ptr->query_info_units_[tag_index].time_range.get_left())
.to_formatted_time() +
"," +
mix_time_t(ptr->query_info_units_[tag_index].time_range.get_right())
.to_formatted_time() +
")",
BOOST_CURRENT_LOCATION));
}
return 0;
}
/**
* @brief Core Query (All Count)
* @param vec_data_ptr 元数据指针
* @return uint32_t
*/
template <typename T>
uint32_t execute_query(QueryBatch<T>* const ptr,
std::vector<T>* const vec_data_ptr) {
ptr->query_all_ = true;
for (size_t i = 0; i < ptr->query_info_units_.size(); ++i) {
query_one_tag(ptr, vec_data_ptr, i);
}
return 0;
}
/**
* @brief Core Query (Spefic Count)
* @param vec_data_ptr 元数据指针
* @param batch_size 查询大小
* @return uint32_t
*/
template <typename T>
uint32_t execute_query(QueryBatch<T>* const ptr,
std::vector<T>* const vec_data_ptr, size_t batch_size) {
for (size_t i = 0; i < ptr->query_info_units_.size(); ++i) {
ptr->query_info_units_[i].remain_batch_size = batch_size;
query_one_tag(ptr, vec_data_ptr, i);
}
return 0;
}
} // namespace
/**
* @brief 根据参数创建一个查询集合
* @tparam T
* @param tag_names tag点名
* @param time_range 时间范围
* @param time_interval 查询数据的时间间隔(ms)
* @return QueryBatch<T>
*/
template <typename T = double>
auto make_query_batch_maybe(const std::vector<std::string>& tag_names,
const time_range_t& time_range,
TimeDur time_interval = 50ms)
-> maybe<QueryBatch<T>> {
auto tag_infos = std::vector<std::tuple<int32_t, std::string>>();
for (auto tag_name : tag_names) {
auto tag_maybe = read_tag_info_maybe(tag_name);
if (tag_maybe.is_just()) {
tag_infos.emplace_back(
std::make_tuple(tag_maybe.unsafe_get_just().nTagID, tag_name));
} else {
DEBUG_PRINT_MIX_CC(
"QueryBath make failure because can't find DESIRE TAG NAME");
return {};
}
}
if (tag_infos.empty()) {
return {};
}
return QueryBatch<T>(tag_infos, time_range, time_interval);
}
/**
* @brief
* 以规定的查询大小查询数据查询大小默认为0查询所有数据
* @param batch_size
* @return
*/
template <typename T>
auto read_data_maybe(QueryBatch<T>* const ptr, size_t batch_size = 0)
-> maybe<typename QueryBatch<T>::Mat2d> {
size_t row_count = 0, col_count = 0;
try {
std::vector<T> origin_vector;
col_count = ptr->query_info_units_.size();
if (batch_size <= 0) {
row_count = get_count_leave(ptr, 0);
execute_query(ptr, &origin_vector);
} else {
row_count = batch_size;
execute_query(ptr, &origin_vector, row_count);
}
typename QueryBatch<T>::Mat2d tmp =
Eigen::Map<typename QueryBatch<T>::Mat2d>(origin_vector.data(),
row_count, col_count);
return tmp;
} catch (const std::exception& e) {
DEBUG_PRINT_MIX_CC(e.what());
return {};
} catch (...) {
DEBUG_PRINT_MIX_CC("unknown error in ihd query");
}
return {};
}
/**
* @brief
* 以规定的查询大小查询数据并返回时间查询大小默认为0查询所有数据
*插值
* @return time1, dataA1, dataB1,... ,dataZ1\
* time2, dataA2, dataB2,... ,dataZ2
*/
template <typename T>
auto read_data_with_time_maybe(QueryBatch<T>* const ptr, size_t batch_size = 0)
-> maybe<typename QueryBatch<T>::MatCtime> {
size_t row_count = 0, col_count = 0;
try {
std::vector<T> origin_vector;
auto time_list = get_query_time(ptr);
col_count = ptr->query_info_units_.size();
if (batch_size <= 0) {
row_count = get_count_leave(ptr, 0);
execute_query(ptr, &origin_vector);
} else {
row_count = batch_size;
execute_query(ptr, &origin_vector, row_count);
}
typename QueryBatch<T>::Mat2d tmp =
Eigen::Map<typename QueryBatch<T>::Mat2d>(&origin_vector.data()[0],
row_count, col_count);
return fp::maybe(std::make_tuple(time_list, tmp));
} catch (const std::exception& e) {
DEBUG_PRINT_MIX_CC(e.what());
return fp::nothing<typename QueryBatch<T>::MatCtime>();
} catch (...) {
DEBUG_PRINT_MIX_CC("unknown error in query with time");
return fp::nothing<typename QueryBatch<T>::MatCtime>();
}
return fp::nothing<typename QueryBatch<T>::MatCtime>();
}
template <typename T>
auto read_raw_data_with_time_maybe(QueryBatch<T>* const ptr, size_t batch_size = 0)
-> maybe<typename QueryBatch<T>::MatCtime> {
size_t row_count = 0, col_count = 0;
try {
std::vector<T> origin_vector;
auto time_list = get_query_time(ptr);
col_count = ptr->query_info_units_.size();
if (batch_size <= 0) {
row_count = get_count_leave(ptr, 0);
execute_query(ptr, &origin_vector);
} else {
row_count = batch_size;
execute_query(ptr, &origin_vector, row_count);
}
typename QueryBatch<T>::Mat2d tmp =
Eigen::Map<typename QueryBatch<T>::Mat2d>(&origin_vector.data()[0],
row_count, col_count);
return fp::maybe(std::make_tuple(time_list, tmp));
} catch (const std::exception& e) {
DEBUG_PRINT_MIX_CC(e.what());
return fp::nothing<typename QueryBatch<T>::MatCtime>();
} catch (...) {
DEBUG_PRINT_MIX_CC("unknown error in query with time");
return fp::nothing<typename QueryBatch<T>::MatCtime>();
}
return fp::nothing<typename QueryBatch<T>::MatCtime>();
}
} // namespace ihd
} // namespace mix_cc