eis/third_party/dsf/include/drsdk/drsdk_manager.h

312 lines
13 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/* Filename: drsdk_manager.h
* Copyright: Shanghai Baosight Software Co., Ltd.
*
* Description: Define DRSdkManager, It is a singleton, managing the processing of data
*
* @author: wuzheqiang
* @version 09/10/2024 wuzheqiang Initial Version
**************************************************************/
#ifndef DRSDK_MANAGER_H
#define DRSDK_MANAGER_H
#include "drsdk_common.h"
#include "drsdk_log.h"
#include "drsdk_transport.h"
#include "thread_pool.h"
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>
namespace dsfapi
{
// using Callback = std::function<void(const std::string &strTagName, const std::string &strValue)>;
// using VecFuncs = std::vector<Callback>;
// using MapFuncs = std::unordered_map<std::string, VecFuncs>;
// using Callback = std::function<void(const std::map<std::string, std::string> &mapTagValue)>;
using Promise = std::promise<std::string>;
using SharedPromise = std::shared_ptr<Promise>;
using SharedFuture = std::shared_future<std::string>;
struct RegInfo
{
std::string strRegContent = "";
uint16 nInterval = 0;
uint8 nSendFlag = 0;
uint8 nRegType = 0;
Callback pCallback = nullptr;
JsonCallback pJsonCallback = nullptr;
RegInfo(const std::string &content, uint16 interval, uint8 sendflag, Callback callback)
: strRegContent(content), nInterval(interval), nSendFlag(sendflag), pCallback(callback)
{
nRegType = SDK_REG_TAG;
}
RegInfo(const std::string &content, uint16 interval, uint8 sendflag, JsonCallback callback)
: strRegContent(content), nInterval(interval), nSendFlag(sendflag), pJsonCallback(callback)
{
nRegType = SDK_REG_OBJ_JSON;
}
RegInfo()
{
}
};
using MapRegInfo = std::unordered_map<int32, RegInfo>;
class DRSdkManager
{
public:
DRSdkManager();
~DRSdkManager();
/**
* @brief init DRSdkManagerconnect to server, alloc memery and create threads
* @param [in] tDRSdkConnectParam contains server ip, server port, connect timeout
* @param [in] tDRSdkOption contains thread pool size, request timeout
* @return Successfully returned 0, other returned error codes
* @version 09/10/2024 wuzheqiang Initial Version
*/
int32 Init(const DRSdkConnectParam &tDRSdkConnectParam, const DRSdkOption &tDRSdkOption);
/**
* @brief uninit dsfapi: disconnect from server and join threads
* @return Successfully returned 0, other returned error codes
* @version 2024/09/25 wuzheqiang Initial Version
*/
int32 Uninit();
/**
* @brief get connection status
* @return true if connected else false
* @version 2024/10/09 wuzheqiang Initial Version
*/
bool DrConnectStatus();
/**
* @brief Send control commands
* @param [in] pTagName tag names array
* @param [in] pTagValue tag value array. should fill buffer and length
* @param [in] nTagCount the count of tag
* @return Successfully returned 0, other returned error codes
* @version 09/10/2024 wuzheqiang Initial Version
*/
int32 DrWrite(const char *pTagName[], const TagValue *const pTagValue, const int32 nTagCount);
/**
* @brief Send control commands
* @param [in] pTagName tag names array
* @param [in] pTagValue tag value array. text value of tag
* @param [in] nTagCount the count of tag
* @return Successfully returned 0, other returned error codes
* @version 09/10/2024 wuzheqiang Initial Version
*/
int32 DrWriteText(const char *pTagName[], const char *pTagValue[], const int32 nTagCount);
/**
* @brief save data to storage
* @param [in] pTagName tag names array
* @param [in] pTagValue tag value array. should fill buffer and length
* @param [in] nTagCount the count of tag
* @return Successfully returned 0, other returned error codes
* @version 2024/09/29 wuzheqiang Initial Version
*/
int32 DrSave(const char *pTagName[], const TagValue *const pTagValue, const int32 nTagCount);
/**
/**
* @brief Synchronize data reading
* @param [in] pTagName tag names array
* @param [in] nTagNameCount the cout of tag
* @param [out] pTagValue tag value array.
* @param [out] pErrorCode the error code for each tag value
* @param [out] pTagValueCount the count of tag value
* @return Successfully returned 0, other returned error codes
* @version 09/10/2024 wuzheqiang Initial Version
*/
int32 DrRead(const char *pTagName[], const int32 nTagNameCount, TagValue **pTagValue, int32 **pErrorCode,
int32 *pTagValueCount);
/**
* @brief register tag for async reading
* @param [in] pContext dsfapi context handle
* @param [in] pTagName tag names array
* @param [in] nTagCount the count of tag
* @param [in] nIntervalMs Interval for querying data, unit:ms.
* 0 means push after data update
* interval value should in {x|0<x<65535} (Less than 20 will be treated as 20)
* @param [in] pCallBack callback function ptr ,defined drsdk_manager.h
* @param [in] nSendFlag 0: direct push, 1: push after changed
* @param [out] pBatchId batch ID after successful registration
* @return Successfully returned 0, other returned error codes
* @version 09/10/2024 wuzheqiang Initial Version
*/
int32 DrRegisterTag(const char *pTagName[], const int32 nTagCount, const uint16 nIntervalMs,
const Callback &pCallBack, const uint8 nSendFlag, int32 *pBatchId);
int32 DrRegisterTagJson(const char *pTagName[], const int32 nTagCount, const uint16 nIntervalMs,
const JsonCallback &pJsonCallBack, const uint8 nSendFlag, int32 *pBatchId);
/**
* @brief unregister BatchId async reading
* @param [in] nBatchId batch ID return by DR_Register_Tag
* @return Successfully returned 0, other returned error codes
* @version 2024/11/11 wuzheqiang Initial Version
*/
int32 DrUnregisterTag(const int32 nBatchId);
/**
* @brief auto unregister tag for async reading
* It's not initiated by user operation, but by dsfapi itself
* @param [in] nBatchId origin batch_id
* @return
* @version 2024/10/18 wuzheqiang Initial Version
*/
int32 DrAutoUnregisterTag(std::shared_ptr<DrSdkTransport> pTransport, const uint16 nBatchId);
/**
* @brief auto register tag for async reading
* It's not initiated by user operation, but by dsfapi itself
* @param [in] nBatchId origin batch_id
* @param [in] tRegInfo register info
* @return
* @version 2024/10/18 wuzheqiang Initial Version
*/
int32 DrAutoRegisterTag(std::shared_ptr<DrSdkTransport> pTransport, const uint16 nBatchId, const RegInfo &tRegInfo);
/**
* @brief automatic register after connectd
* @version 2024/10/09 wuzheqiang Initial Version
*/
void DrAutomaticRegister(std::shared_ptr<DrSdkTransport> pTransport);
/**
* @brief sync to other dsf_station when multilink
* @param [in] nBatchId batch_id
* @version 2025/02/20 wuzheqiang Initial Version
*/
void SyncRegister(const uint16 nBatchId);
/**
* @brief sync to other dsf_station when multilink
* @param [in] nBatchId batch_id
* @version 2025/02/20 wuzheqiang Initial Version
*/
void SyncUnregister(const uint16 nBatchId);
/**
* @brief get object attribute
* @param [in] pContext dsfapi handle
* @param [in] pObjectName object names array
* @param [out] pTagtypeInfo object tagtype array
* @param [out] pTagtypeInfoCount the count of tagtype
* @return Successfully returned 0, other returned error codes
* @version 2024/09/25 wuzheqiang Initial Version
*/
int32 DrGetTagtypeInfo(const char *const pObjectName, TagtypeInfo **pTagtypeInfo, int32 *pTagtypeInfoCount);
/**
* @brief get object data
* @param [in] pContext dsfapi handle
* @param [in] pObjectName object names array
* @param [out] TagRecord object tag record array
* @param [out] pTagRecordCount the count of tag record
* @return Successfully returned 0, other returned error codes
* @version 2024/09/25 wuzheqiang Initial Version
*/
int32 DrGetTagtypeValue(const char *const pObjectName, TagRecord **pTagRecord, int32 *pTagRecordCount);
/**
* @brief read model of the object and return its JSON string
* @param [in] pContext dsfapi handle
* @param [in] pObjectName object names array
* @param [out] sModeJsonString json string of the object model
* !!! [should release by Free_Tag_Record]
* @param [out] length the count of sModeJsonString
* @return Successfully returned 0, other returned error codes
* @version 2024/10/19 wuzheqiang Initial Version
*/
int32 DrGetModelJson(const char *const pObjectName, char **sModeJsonString, int32 *nLength);
private:
DRSdkManager(const DRSdkManager &) = delete;
DRSdkManager &operator=(const DRSdkManager &) = delete;
/**
* @brief callback function that loops through to get data
* @param [in] pTransport transport ptr
* @version 09/10/2024 wuzheqiang Initial Version
*/
void RecvThreadCallback(const std::shared_ptr<DrSdkTransport> &pTransport, const std::string &sServiceName);
/**
* @brief Write content into send_buffer, for available connections
* @param [in] tHead msg head
* @param [in] strContent msg content
* @param [in] nLen msg length
* @return Successfully returned 0, other returned error codes
* @version 09/10/2024 wuzheqiang Initial Version
*/
int32 WriteToSendBuffer(const DataHeader &tHead, const std::string &strContent, const size_t nLen);
/**
* @brief Write content into send_buffer, for specific connections
* @param [in] pTransport transport ptr
* @param [in] tHead msg head
* @param [in] strContent msg content
* @param [in] nLen msg length
* @return Successfully returned 0, other returned error codes
* @version 09/10/2024 wuzheqiang Initial Version
*/
int32 WriteToSendBufferForTransport(std::shared_ptr<DrSdkTransport> pTransport, const DataHeader &tHead,
const std::string &strContent, const size_t nLen);
/**
* @brief async get response result
* @param [in] tHead msg head
* @param [out] pRes response result
* @return Successfully returned 0, other returned error codes
* @version 2024/09/23 wuzheqiang Initial Version
*/
int32 AsyncGetResult(const DataHeader &tHead, std::string *pRes);
/**
* @brief check transport available
* @return Successfully returned 0, other returned error codes
* @version 2025/02/18 wuzheqiang Initial Version
*/
int32 CheckTransportAvailable();
private:
std::atomic_bool m_bInit = {false};
std::atomic_bool m_bStop = {false};
DRSdkConnectParam m_tDRSdkConnectParam;
DRSdkOption m_tDRSdkOption;
std::shared_ptr<DrSdkTransport> m_pTransport = nullptr; // transport layer
std::shared_ptr<DrSdkTransport> m_pTransportMain = nullptr; // main
std::shared_ptr<DrSdkTransport> m_pTransportBak = nullptr; // backup
std::atomic_bool m_bMultilink = {false}; // true: redundancy; false: single link
std::atomic<uint8> m_ConnServerType = {CS_UNKNOWN}; // 0:unknown; 1:main; 2:backup
std::unique_ptr<std::thread> m_pRecvThread = nullptr; // thread for recv from DS
std::unique_ptr<std::thread> m_pRecvThreadBak = nullptr; // thread for recv from DS
std::atomic<uint32> m_nHeadSeq = {0}; // request head seq
std::mutex m_RequestMtx;
std::unordered_map<uint32, std::tuple<SharedPromise, SharedFuture, uint32>> m_mapRequest; // async requests
std::shared_ptr<char> m_pRecvBuffer = nullptr;
// std::shared_ptr<char[]> m_pSendBuffer = nullptr;
std::shared_ptr<std::vector<char>> m_pSendBuffer = nullptr;
std::atomic<int32> m_nSendBufferLen = {0};
std::mutex m_RecvBufferMtx;
std::mutex m_SendBufferMtx;
// call_back
static std::atomic<uint16> m_nBatchId; // start from 1 ; 0 is invalid. Maintain uniqueness within the process
std::unique_ptr<ThreadPool> m_pThreadPool; // thread pool,for async exec register callback
MapRegInfo m_mapRegInfos; // register info
std::mutex m_CallbacksMtx;
};
} // namespace dsfapi
#endif