/** * @file mix_cc/ihyper_db/query_batch.h * @brief ihyperDB的查询集 * @author Cat (null.null.null@qq.com) * @version 0.1 * @date 2021-08-18 * * Copyright: Baosight Co. Ltd. * DO NOT COPY/USE WITHOUT PERMISSION * */ #pragma once #include "mix_cc/debug.h" #include "mix_cc/exception.h" #include "mix_cc/ihyper_db/connection.h" #include "mix_cc/ihyper_db/read_tag_info.h" #include "mix_cc/ihyper_db/utility.h" #include "mix_cc/type/range.h" #include #include #include #include #include #include #include #include #include #include namespace mix_cc { namespace ihd { using namespace std::chrono; /** * @brief ihdb查询集类,用于执行最终的数据查询 * @tparam T */ template struct QueryBatch { /** * @brief 创建一个查询集合 * @param tag_infos tagid 和 tag点名(tag点名用于输出错误信息) * @param time_range 获得对应的查询时间 * @param time_interval 查询的时间间隔 */ QueryBatch(std::vector> tag_infos, time_range_t time_range, TimeDur time_interval) : interval_(time_interval), query_all_(true) { for (auto tag_info : tag_infos) { query_info_units_.emplace_back(QueryInfoUnit{ std::get<0>(tag_info), std::get<1>(tag_info), 0, time_range}); } } QueryBatch(QueryBatch& rhs) : query_info_units_(rhs.query_info_units_), interval_(rhs.interval_), query_all_(rhs.query_all_) {} QueryBatch(QueryBatch&& rhs) : query_info_units_(rhs.query_info_units_), interval_(rhs.interval_), query_all_(rhs.query_all_) {} struct QueryInfoUnit { int32_t tag_id; std::string tag_name; uint64_t remain_batch_size; time_range_t time_range; }; typedef Eigen::Matrix Mat2d; typedef std::tuple, Mat2d> MatCtime; std::vector query_info_units_; TimeDur interval_; bool query_all_; }; namespace { /** * @brief Get the count leave * @param tag_index tag点索引 * @return uint64_t */ template uint64_t get_count_leave(QueryBatch* const ptr, int tag_index) { return duration_cast( ptr->query_info_units_[tag_index].time_range.get_right() - ptr->query_info_units_[tag_index].time_range.get_left()) .count() / duration_cast(ptr->interval_).count(); } /** * @brief 得到指定索引的查询时间 * @param tag_index default is 0 * @return std::vector */ template std::vector get_query_time(QueryBatch* const ptr, int tag_index = 0) { TimePoint begin; TimePoint end; size_t count = 0; try { begin = ptr->query_info_units_[tag_index].time_range.get_left(); end = ptr->query_info_units_[tag_index].time_range.get_right(); std::vector ret; count = get_count_leave(ptr, tag_index); ret.reserve(count); for (auto i = 0; begin + i * ptr->interval_ <= (end - ptr->interval_); ++i) { ret.push_back(begin + i * ptr->interval_); } return ret; } catch (...) { throw_with_nested(mix_cc::Exception( -1, "query time calc error, from:" + std::to_string(mix_time_t(begin).to_milliseconds()) + " , to:" + std::to_string(mix_time_t(end).to_milliseconds()) + ", count is:" + std::to_string(get_count_leave(ptr, tag_index)), BOOST_CURRENT_LOCATION)); } } /** * @brief * 解析查询,把hd类型的数据转换为stl vector类型 * @param vec_data_ptr * @param record_queried * @param query_size * @return */ template int decode_query(std::vector* const vec_data_ptr, HD3Record* const record_queried, int* const error_codes, size_t query_size) { if (query_size != 0) { for (size_t i = 0; i < query_size; i++) { if (error_codes[i] != RD_SUCCESS) { throw mix_cc::Exception( error_codes[i], "query tag stats failure,error code :" + std::to_string(error_codes[i]) + ", time is:" + mix_cc::mix_time_t(time_t(record_queried[i].nSec)) .to_formatted_time(), BOOST_CURRENT_LOCATION); } vec_data_ptr->template emplace_back(record_queried[i].NumberValue()); } // for-loop } return 0; } /** * @brief 最终查询某个tag点在规定时间内数据的函数 * @param vec_data_ptr 元数据指针 * @param tag_index tag点索引 * @return int */ template int query_one_tag(QueryBatch* const ptr, std::vector* const vec_data_ptr, int32_t tag_index) { // predefined query size(default) uint64_t predefined_query_size = 40000; HD3Record* records_queried = nullptr; int32* error_codes = nullptr; try { if (!ptr->query_info_units_[tag_index].time_range.valid()) { throw(Exception(-2, "query time invalid", BOOST_CURRENT_LOCATION)); } int64_t count_leave = get_count_leave(ptr, tag_index); if (count_leave > 2000000) { predefined_query_size = 5000; } else if (count_leave > 1000000) { predefined_query_size = 10000; } else if (count_leave > 500000) { predefined_query_size = 20000; } else if (count_leave > 250000) { predefined_query_size = 40000; } // if query all data, query_info_units_[tag_index].remain_batch_size would // be count leave(just query all) if (ptr->query_all_) { ptr->query_info_units_[tag_index].remain_batch_size = count_leave; } size_t once_query_size = predefined_query_size; while (once_query_size == predefined_query_size) { if (ptr->query_info_units_[tag_index].remain_batch_size > predefined_query_size) { ptr->query_info_units_[tag_index].remain_batch_size -= predefined_query_size; } else { once_query_size = ptr->query_info_units_[tag_index].remain_batch_size; ptr->query_info_units_[tag_index].remain_batch_size = 0; } records_queried = new HD3Record[once_query_size]; memset(records_queried, 0x00, sizeof(HD3Record) * once_query_size); error_codes = new int32[once_query_size]; memset(error_codes, 0x00, sizeof(int32) * once_query_size); for (size_t i = 0; i < once_query_size; i++) { auto tmp_HD = convert_to_hd_tp( ptr->query_info_units_[tag_index].time_range.get_left()); records_queried[i].nSec = tmp_HD.nSec; records_queried[i].nMsec = tmp_HD.nMsec; ptr->query_info_units_[tag_index].time_range.set_left( ptr->query_info_units_[tag_index].time_range.get_left() + ptr->interval_); } ar3_query_interp_records_by_mode( // HD3_REC_INTERP_QUERY_MODE::HD3_REC_INTERP_QUERY_MODE_LINEAR, HD3_REC_INTERP_QUERY_MODE::HD3_REC_INTERP_QUERY_MODE_PREV, ptr->query_info_units_[tag_index].tag_id, once_query_size, records_queried, error_codes); decode_query(vec_data_ptr, records_queried, error_codes, once_query_size); delete[] records_queried; records_queried = nullptr; delete[] error_codes; error_codes = nullptr; } } catch (...) { if (records_queried != nullptr) { delete[] records_queried; } if (error_codes != nullptr) { delete[] error_codes; } throw(Exception( -1, "UNKNOWN error dur ihd query, please check time param, tag name is " + ptr->query_info_units_[tag_index].tag_name + " time range is [" + mix_time_t(ptr->query_info_units_[tag_index].time_range.get_left()) .to_formatted_time() + "," + mix_time_t(ptr->query_info_units_[tag_index].time_range.get_right()) .to_formatted_time() + ")", BOOST_CURRENT_LOCATION)); } return 0; } /** * @brief Core Query (All Count) * @param vec_data_ptr 元数据指针 * @return uint32_t */ template uint32_t execute_query(QueryBatch* const ptr, std::vector* const vec_data_ptr) { ptr->query_all_ = true; for (size_t i = 0; i < ptr->query_info_units_.size(); ++i) { query_one_tag(ptr, vec_data_ptr, i); } return 0; } /** * @brief Core Query (Spefic Count) * @param vec_data_ptr 元数据指针 * @param batch_size 查询大小 * @return uint32_t */ template uint32_t execute_query(QueryBatch* const ptr, std::vector* const vec_data_ptr, size_t batch_size) { for (size_t i = 0; i < ptr->query_info_units_.size(); ++i) { ptr->query_info_units_[i].remain_batch_size = batch_size; query_one_tag(ptr, vec_data_ptr, i); } return 0; } } // namespace /** * @brief 根据参数创建一个查询集合 * @tparam T * @param tag_names tag点名 * @param time_range 时间范围 * @param time_interval 查询数据的时间间隔(ms) * @return QueryBatch */ template auto make_query_batch_maybe(const std::vector& tag_names, const time_range_t& time_range, TimeDur time_interval = 50ms) -> maybe> { auto tag_infos = std::vector>(); for (auto tag_name : tag_names) { auto tag_maybe = read_tag_info_maybe(tag_name); if (tag_maybe.is_just()) { tag_infos.emplace_back( std::make_tuple(tag_maybe.unsafe_get_just().nTagID, tag_name)); } else { DEBUG_PRINT_MIX_CC( "QueryBath make failure because can't find DESIRE TAG NAME"); return {}; } } if (tag_infos.empty()) { return {}; } return QueryBatch(tag_infos, time_range, time_interval); } /** * @brief * 以规定的查询大小查询数据(查询大小默认为0,查询所有数据) * @param batch_size * @return */ template auto read_data_maybe(QueryBatch* const ptr, size_t batch_size = 0) -> maybe::Mat2d> { size_t row_count = 0, col_count = 0; try { std::vector origin_vector; col_count = ptr->query_info_units_.size(); if (batch_size <= 0) { row_count = get_count_leave(ptr, 0); execute_query(ptr, &origin_vector); } else { row_count = batch_size; execute_query(ptr, &origin_vector, row_count); } typename QueryBatch::Mat2d tmp = Eigen::Map::Mat2d>(origin_vector.data(), row_count, col_count); return tmp; } catch (const std::exception& e) { DEBUG_PRINT_MIX_CC(e.what()); return {}; } catch (...) { DEBUG_PRINT_MIX_CC("unknown error in ihd query"); } return {}; } /** * @brief * 以规定的查询大小查询数据,并返回时间(查询大小默认为0,查询所有数据) *插值 * @return time1, dataA1, dataB1,... ,dataZ1\ * time2, dataA2, dataB2,... ,dataZ2 */ template auto read_data_with_time_maybe(QueryBatch* const ptr, size_t batch_size = 0) -> maybe::MatCtime> { size_t row_count = 0, col_count = 0; try { std::vector origin_vector; auto time_list = get_query_time(ptr); col_count = ptr->query_info_units_.size(); if (batch_size <= 0) { row_count = get_count_leave(ptr, 0); execute_query(ptr, &origin_vector); } else { row_count = batch_size; execute_query(ptr, &origin_vector, row_count); } typename QueryBatch::Mat2d tmp = Eigen::Map::Mat2d>(&origin_vector.data()[0], row_count, col_count); return fp::maybe(std::make_tuple(time_list, tmp)); } catch (const std::exception& e) { DEBUG_PRINT_MIX_CC(e.what()); return fp::nothing::MatCtime>(); } catch (...) { DEBUG_PRINT_MIX_CC("unknown error in query with time"); return fp::nothing::MatCtime>(); } return fp::nothing::MatCtime>(); } template auto read_raw_data_with_time_maybe(QueryBatch* const ptr, size_t batch_size = 0) -> maybe::MatCtime> { size_t row_count = 0, col_count = 0; try { std::vector origin_vector; auto time_list = get_query_time(ptr); col_count = ptr->query_info_units_.size(); if (batch_size <= 0) { row_count = get_count_leave(ptr, 0); execute_query(ptr, &origin_vector); } else { row_count = batch_size; execute_query(ptr, &origin_vector, row_count); } typename QueryBatch::Mat2d tmp = Eigen::Map::Mat2d>(&origin_vector.data()[0], row_count, col_count); return fp::maybe(std::make_tuple(time_list, tmp)); } catch (const std::exception& e) { DEBUG_PRINT_MIX_CC(e.what()); return fp::nothing::MatCtime>(); } catch (...) { DEBUG_PRINT_MIX_CC("unknown error in query with time"); return fp::nothing::MatCtime>(); } return fp::nothing::MatCtime>(); } } // namespace ihd } // namespace mix_cc