eis/third_party/dsf/include/drsdk/drsdk_manager.h

312 lines
13 KiB
C
Raw Normal View History

/* Filename: drsdk_manager.h
* Copyright: Shanghai Baosight Software Co., Ltd.
*
* Description: Define DRSdkManager, It is a singleton, managing the processing of data
*
* @author: wuzheqiang
* @version 09/10/2024 wuzheqiang Initial Version
**************************************************************/
#ifndef DRSDK_MANAGER_H
#define DRSDK_MANAGER_H
#include "drsdk_common.h"
#include "drsdk_log.h"
#include "drsdk_transport.h"
#include "thread_pool.h"
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>
namespace dsfapi
{
// using Callback = std::function<void(const std::string &strTagName, const std::string &strValue)>;
// using VecFuncs = std::vector<Callback>;
// using MapFuncs = std::unordered_map<std::string, VecFuncs>;
// using Callback = std::function<void(const std::map<std::string, std::string> &mapTagValue)>;
using Promise = std::promise<std::string>;
using SharedPromise = std::shared_ptr<Promise>;
using SharedFuture = std::shared_future<std::string>;
struct RegInfo
{
std::string strRegContent = "";
uint16 nInterval = 0;
uint8 nSendFlag = 0;
uint8 nRegType = 0;
Callback pCallback = nullptr;
JsonCallback pJsonCallback = nullptr;
RegInfo(const std::string &content, uint16 interval, uint8 sendflag, Callback callback)
: strRegContent(content), nInterval(interval), nSendFlag(sendflag), pCallback(callback)
{
nRegType = SDK_REG_TAG;
}
RegInfo(const std::string &content, uint16 interval, uint8 sendflag, JsonCallback callback)
: strRegContent(content), nInterval(interval), nSendFlag(sendflag), pJsonCallback(callback)
{
nRegType = SDK_REG_OBJ_JSON;
}
RegInfo()
{
}
};
using MapRegInfo = std::unordered_map<int32, RegInfo>;
class DRSdkManager
{
public:
DRSdkManager();
~DRSdkManager();
/**
* @brief init DRSdkManagerconnect to server, alloc memery and create threads
* @param [in] tDRSdkConnectParam contains server ip, server port, connect timeout
* @param [in] tDRSdkOption contains thread pool size, request timeout
* @return Successfully returned 0, other returned error codes
* @version 09/10/2024 wuzheqiang Initial Version
*/
int32 Init(const DRSdkConnectParam &tDRSdkConnectParam, const DRSdkOption &tDRSdkOption);
/**
* @brief uninit dsfapi: disconnect from server and join threads
* @return Successfully returned 0, other returned error codes
* @version 2024/09/25 wuzheqiang Initial Version
*/
int32 Uninit();
/**
* @brief get connection status
* @return true if connected else false
* @version 2024/10/09 wuzheqiang Initial Version
*/
bool DrConnectStatus();
/**
* @brief Send control commands
* @param [in] pTagName tag names array
* @param [in] pTagValue tag value array. should fill buffer and length
* @param [in] nTagCount the count of tag
* @return Successfully returned 0, other returned error codes
* @version 09/10/2024 wuzheqiang Initial Version
*/
int32 DrWrite(const char *pTagName[], const TagValue *const pTagValue, const int32 nTagCount);
/**
* @brief Send control commands
* @param [in] pTagName tag names array
* @param [in] pTagValue tag value array. text value of tag
* @param [in] nTagCount the count of tag
* @return Successfully returned 0, other returned error codes
* @version 09/10/2024 wuzheqiang Initial Version
*/
int32 DrWriteText(const char *pTagName[], const char *pTagValue[], const int32 nTagCount);
/**
* @brief save data to storage
* @param [in] pTagName tag names array
* @param [in] pTagValue tag value array. should fill buffer and length
* @param [in] nTagCount the count of tag
* @return Successfully returned 0, other returned error codes
* @version 2024/09/29 wuzheqiang Initial Version
*/
int32 DrSave(const char *pTagName[], const TagValue *const pTagValue, const int32 nTagCount);
/**
/**
* @brief Synchronize data reading
* @param [in] pTagName tag names array
* @param [in] nTagNameCount the cout of tag
* @param [out] pTagValue tag value array.
* @param [out] pErrorCode the error code for each tag value
* @param [out] pTagValueCount the count of tag value
* @return Successfully returned 0, other returned error codes
* @version 09/10/2024 wuzheqiang Initial Version
*/
int32 DrRead(const char *pTagName[], const int32 nTagNameCount, TagValue **pTagValue, int32 **pErrorCode,
int32 *pTagValueCount);
/**
* @brief register tag for async reading
* @param [in] pContext dsfapi context handle
* @param [in] pTagName tag names array
* @param [in] nTagCount the count of tag
* @param [in] nIntervalMs Interval for querying data, unit:ms.
* 0 means push after data update
* interval value should in {x|0<x<65535} (Less than 20 will be treated as 20)
* @param [in] pCallBack callback function ptr ,defined drsdk_manager.h
* @param [in] nSendFlag 0: direct push, 1: push after changed
* @param [out] pBatchId batch ID after successful registration
* @return Successfully returned 0, other returned error codes
* @version 09/10/2024 wuzheqiang Initial Version
*/
int32 DrRegisterTag(const char *pTagName[], const int32 nTagCount, const uint16 nIntervalMs,
const Callback &pCallBack, const uint8 nSendFlag, int32 *pBatchId);
int32 DrRegisterTagJson(const char *pTagName[], const int32 nTagCount, const uint16 nIntervalMs,
const JsonCallback &pJsonCallBack, const uint8 nSendFlag, int32 *pBatchId);
/**
* @brief unregister BatchId async reading
* @param [in] nBatchId batch ID return by DR_Register_Tag
* @return Successfully returned 0, other returned error codes
* @version 2024/11/11 wuzheqiang Initial Version
*/
int32 DrUnregisterTag(const int32 nBatchId);
/**
* @brief auto unregister tag for async reading
* It's not initiated by user operation, but by dsfapi itself
* @param [in] nBatchId origin batch_id
* @return
* @version 2024/10/18 wuzheqiang Initial Version
*/
int32 DrAutoUnregisterTag(std::shared_ptr<DrSdkTransport> pTransport, const uint16 nBatchId);
/**
* @brief auto register tag for async reading
* It's not initiated by user operation, but by dsfapi itself
* @param [in] nBatchId origin batch_id
* @param [in] tRegInfo register info
* @return
* @version 2024/10/18 wuzheqiang Initial Version
*/
int32 DrAutoRegisterTag(std::shared_ptr<DrSdkTransport> pTransport, const uint16 nBatchId, const RegInfo &tRegInfo);
/**
* @brief automatic register after connectd
* @version 2024/10/09 wuzheqiang Initial Version
*/
void DrAutomaticRegister(std::shared_ptr<DrSdkTransport> pTransport);
/**
* @brief sync to other dsf_station when multilink
* @param [in] nBatchId batch_id
* @version 2025/02/20 wuzheqiang Initial Version
*/
void SyncRegister(const uint16 nBatchId);
/**
* @brief sync to other dsf_station when multilink
* @param [in] nBatchId batch_id
* @version 2025/02/20 wuzheqiang Initial Version
*/
void SyncUnregister(const uint16 nBatchId);
/**
* @brief get object attribute
* @param [in] pContext dsfapi handle
* @param [in] pObjectName object names array
* @param [out] pTagtypeInfo object tagtype array
* @param [out] pTagtypeInfoCount the count of tagtype
* @return Successfully returned 0, other returned error codes
* @version 2024/09/25 wuzheqiang Initial Version
*/
int32 DrGetTagtypeInfo(const char *const pObjectName, TagtypeInfo **pTagtypeInfo, int32 *pTagtypeInfoCount);
/**
* @brief get object data
* @param [in] pContext dsfapi handle
* @param [in] pObjectName object names array
* @param [out] TagRecord object tag record array
* @param [out] pTagRecordCount the count of tag record
* @return Successfully returned 0, other returned error codes
* @version 2024/09/25 wuzheqiang Initial Version
*/
int32 DrGetTagtypeValue(const char *const pObjectName, TagRecord **pTagRecord, int32 *pTagRecordCount);
/**
* @brief read model of the object and return its JSON string
* @param [in] pContext dsfapi handle
* @param [in] pObjectName object names array
* @param [out] sModeJsonString json string of the object model
* !!! [should release by Free_Tag_Record]
* @param [out] length the count of sModeJsonString
* @return Successfully returned 0, other returned error codes
* @version 2024/10/19 wuzheqiang Initial Version
*/
int32 DrGetModelJson(const char *const pObjectName, char **sModeJsonString, int32 *nLength);
private:
DRSdkManager(const DRSdkManager &) = delete;
DRSdkManager &operator=(const DRSdkManager &) = delete;
/**
* @brief callback function that loops through to get data
* @param [in] pTransport transport ptr
* @version 09/10/2024 wuzheqiang Initial Version
*/
void RecvThreadCallback(const std::shared_ptr<DrSdkTransport> &pTransport, const std::string &sServiceName);
/**
* @brief Write content into send_buffer, for available connections
* @param [in] tHead msg head
* @param [in] strContent msg content
* @param [in] nLen msg length
* @return Successfully returned 0, other returned error codes
* @version 09/10/2024 wuzheqiang Initial Version
*/
int32 WriteToSendBuffer(const DataHeader &tHead, const std::string &strContent, const size_t nLen);
/**
* @brief Write content into send_buffer, for specific connections
* @param [in] pTransport transport ptr
* @param [in] tHead msg head
* @param [in] strContent msg content
* @param [in] nLen msg length
* @return Successfully returned 0, other returned error codes
* @version 09/10/2024 wuzheqiang Initial Version
*/
int32 WriteToSendBufferForTransport(std::shared_ptr<DrSdkTransport> pTransport, const DataHeader &tHead,
const std::string &strContent, const size_t nLen);
/**
* @brief async get response result
* @param [in] tHead msg head
* @param [out] pRes response result
* @return Successfully returned 0, other returned error codes
* @version 2024/09/23 wuzheqiang Initial Version
*/
int32 AsyncGetResult(const DataHeader &tHead, std::string *pRes);
/**
* @brief check transport available
* @return Successfully returned 0, other returned error codes
* @version 2025/02/18 wuzheqiang Initial Version
*/
int32 CheckTransportAvailable();
private:
std::atomic_bool m_bInit = {false};
std::atomic_bool m_bStop = {false};
DRSdkConnectParam m_tDRSdkConnectParam;
DRSdkOption m_tDRSdkOption;
std::shared_ptr<DrSdkTransport> m_pTransport = nullptr; // transport layer
std::shared_ptr<DrSdkTransport> m_pTransportMain = nullptr; // main
std::shared_ptr<DrSdkTransport> m_pTransportBak = nullptr; // backup
std::atomic_bool m_bMultilink = {false}; // true: redundancy; false: single link
std::atomic<uint8> m_ConnServerType = {CS_UNKNOWN}; // 0:unknown; 1:main; 2:backup
std::unique_ptr<std::thread> m_pRecvThread = nullptr; // thread for recv from DS
std::unique_ptr<std::thread> m_pRecvThreadBak = nullptr; // thread for recv from DS
std::atomic<uint32> m_nHeadSeq = {0}; // request head seq
std::mutex m_RequestMtx;
std::unordered_map<uint32, std::tuple<SharedPromise, SharedFuture, uint32>> m_mapRequest; // async requests
std::shared_ptr<char> m_pRecvBuffer = nullptr;
// std::shared_ptr<char[]> m_pSendBuffer = nullptr;
std::shared_ptr<std::vector<char>> m_pSendBuffer = nullptr;
std::atomic<int32> m_nSendBufferLen = {0};
std::mutex m_RecvBufferMtx;
std::mutex m_SendBufferMtx;
// call_back
static std::atomic<uint16> m_nBatchId; // start from 1 ; 0 is invalid. Maintain uniqueness within the process
std::unique_ptr<ThreadPool> m_pThreadPool; // thread pool,for async exec register callback
MapRegInfo m_mapRegInfos; // register info
std::mutex m_CallbacksMtx;
};
} // namespace dsfapi
#endif