/* Filename: drsdk_manager.h * Copyright: Shanghai Baosight Software Co., Ltd. * * Description: Define DRSdkManager, It is a singleton, managing the processing of data * * @author: wuzheqiang * @version 09/10/2024 wuzheqiang Initial Version **************************************************************/ #ifndef DRSDK_MANAGER_H #define DRSDK_MANAGER_H #include "drsdk_common.h" #include "drsdk_log.h" #include "drsdk_transport.h" #include "thread_pool.h" #include #include #include #include #include #include #include namespace dsfapi { // using Callback = std::function; // using VecFuncs = std::vector; // using MapFuncs = std::unordered_map; // using Callback = std::function &mapTagValue)>; using Promise = std::promise; using SharedPromise = std::shared_ptr; using SharedFuture = std::shared_future; struct RegInfo { std::string strRegContent = ""; uint16 nInterval = 0; uint8 nSendFlag = 0; uint8 nRegType = 0; Callback pCallback = nullptr; JsonCallback pJsonCallback = nullptr; RegInfo(const std::string &content, uint16 interval, uint8 sendflag, Callback callback) : strRegContent(content), nInterval(interval), nSendFlag(sendflag), pCallback(callback) { nRegType = SDK_REG_TAG; } RegInfo(const std::string &content, uint16 interval, uint8 sendflag, JsonCallback callback) : strRegContent(content), nInterval(interval), nSendFlag(sendflag), pJsonCallback(callback) { nRegType = SDK_REG_OBJ_JSON; } RegInfo() { } }; using MapRegInfo = std::unordered_map; class DRSdkManager { public: DRSdkManager(); ~DRSdkManager(); /** * @brief init DRSdkManager:connect to server, alloc memery and create threads * @param [in] tDRSdkConnectParam contains server ip, server port, connect timeout * @param [in] tDRSdkOption contains thread pool size, request timeout * @return Successfully returned 0, other returned error codes * @version 09/10/2024 wuzheqiang Initial Version */ int32 Init(const DRSdkConnectParam &tDRSdkConnectParam, const DRSdkOption &tDRSdkOption); /** * @brief uninit dsfapi: disconnect from server and join threads * @return Successfully returned 0, other returned error codes * @version 2024/09/25 wuzheqiang Initial Version */ int32 Uninit(); /** * @brief get connection status * @return true if connected else false * @version 2024/10/09 wuzheqiang Initial Version */ bool DrConnectStatus(); /** * @brief Send control commands * @param [in] pTagName tag names array * @param [in] pTagValue tag value array. should fill buffer and length * @param [in] nTagCount the count of tag * @return Successfully returned 0, other returned error codes * @version 09/10/2024 wuzheqiang Initial Version */ int32 DrWrite(const char *pTagName[], const TagValue *const pTagValue, const int32 nTagCount); /** * @brief Send control commands * @param [in] pTagName tag names array * @param [in] pTagValue tag value array. text value of tag * @param [in] nTagCount the count of tag * @return Successfully returned 0, other returned error codes * @version 09/10/2024 wuzheqiang Initial Version */ int32 DrWriteText(const char *pTagName[], const char *pTagValue[], const int32 nTagCount); /** * @brief save data to storage * @param [in] pTagName tag names array * @param [in] pTagValue tag value array. should fill buffer and length * @param [in] nTagCount the count of tag * @return Successfully returned 0, other returned error codes * @version 2024/09/29 wuzheqiang Initial Version */ int32 DrSave(const char *pTagName[], const TagValue *const pTagValue, const int32 nTagCount); /** /** * @brief Synchronize data reading * @param [in] pTagName tag names array * @param [in] nTagNameCount the cout of tag * @param [out] pTagValue tag value array. * @param [out] pErrorCode the error code for each tag value * @param [out] pTagValueCount the count of tag value * @return Successfully returned 0, other returned error codes * @version 09/10/2024 wuzheqiang Initial Version */ int32 DrRead(const char *pTagName[], const int32 nTagNameCount, TagValue **pTagValue, int32 **pErrorCode, int32 *pTagValueCount); /** * @brief register tag for async reading * @param [in] pContext dsfapi context handle * @param [in] pTagName tag names array * @param [in] nTagCount the count of tag * @param [in] nIntervalMs Interval for querying data, unit:ms. * 0 means push after data update * interval value should in {x|0 pTransport, const uint16 nBatchId); /** * @brief auto register tag for async reading * It's not initiated by user operation, but by dsfapi itself * @param [in] nBatchId origin batch_id * @param [in] tRegInfo register info * @return * @version 2024/10/18 wuzheqiang Initial Version */ int32 DrAutoRegisterTag(std::shared_ptr pTransport, const uint16 nBatchId, const RegInfo &tRegInfo); /** * @brief automatic register after connectd * @version 2024/10/09 wuzheqiang Initial Version */ void DrAutomaticRegister(std::shared_ptr pTransport); /** * @brief sync to other dsf_station when multilink * @param [in] nBatchId batch_id * @version 2025/02/20 wuzheqiang Initial Version */ void SyncRegister(const uint16 nBatchId); /** * @brief sync to other dsf_station when multilink * @param [in] nBatchId batch_id * @version 2025/02/20 wuzheqiang Initial Version */ void SyncUnregister(const uint16 nBatchId); /** * @brief get object attribute * @param [in] pContext dsfapi handle * @param [in] pObjectName object names array * @param [out] pTagtypeInfo object tagtype array * @param [out] pTagtypeInfoCount the count of tagtype * @return Successfully returned 0, other returned error codes * @version 2024/09/25 wuzheqiang Initial Version */ int32 DrGetTagtypeInfo(const char *const pObjectName, TagtypeInfo **pTagtypeInfo, int32 *pTagtypeInfoCount); /** * @brief get object data * @param [in] pContext dsfapi handle * @param [in] pObjectName object names array * @param [out] TagRecord object tag record array * @param [out] pTagRecordCount the count of tag record * @return Successfully returned 0, other returned error codes * @version 2024/09/25 wuzheqiang Initial Version */ int32 DrGetTagtypeValue(const char *const pObjectName, TagRecord **pTagRecord, int32 *pTagRecordCount); /** * @brief read model of the object and return its JSON string * @param [in] pContext dsfapi handle * @param [in] pObjectName object names array * @param [out] sModeJsonString json string of the object model * !!! [should release by Free_Tag_Record] * @param [out] length the count of sModeJsonString * @return Successfully returned 0, other returned error codes * @version 2024/10/19 wuzheqiang Initial Version */ int32 DrGetModelJson(const char *const pObjectName, char **sModeJsonString, int32 *nLength); private: DRSdkManager(const DRSdkManager &) = delete; DRSdkManager &operator=(const DRSdkManager &) = delete; /** * @brief callback function that loops through to get data * @param [in] pTransport transport ptr * @version 09/10/2024 wuzheqiang Initial Version */ void RecvThreadCallback(const std::shared_ptr &pTransport, const std::string &sServiceName); /** * @brief Write content into send_buffer, for available connections * @param [in] tHead msg head * @param [in] strContent msg content * @param [in] nLen msg length * @return Successfully returned 0, other returned error codes * @version 09/10/2024 wuzheqiang Initial Version */ int32 WriteToSendBuffer(const DataHeader &tHead, const std::string &strContent, const size_t nLen); /** * @brief Write content into send_buffer, for specific connections * @param [in] pTransport transport ptr * @param [in] tHead msg head * @param [in] strContent msg content * @param [in] nLen msg length * @return Successfully returned 0, other returned error codes * @version 09/10/2024 wuzheqiang Initial Version */ int32 WriteToSendBufferForTransport(std::shared_ptr pTransport, const DataHeader &tHead, const std::string &strContent, const size_t nLen); /** * @brief async get response result * @param [in] tHead msg head * @param [out] pRes response result * @return Successfully returned 0, other returned error codes * @version 2024/09/23 wuzheqiang Initial Version */ int32 AsyncGetResult(const DataHeader &tHead, std::string *pRes); /** * @brief check transport available * @return Successfully returned 0, other returned error codes * @version 2025/02/18 wuzheqiang Initial Version */ int32 CheckTransportAvailable(); private: std::atomic_bool m_bInit = {false}; std::atomic_bool m_bStop = {false}; DRSdkConnectParam m_tDRSdkConnectParam; DRSdkOption m_tDRSdkOption; std::shared_ptr m_pTransport = nullptr; // transport layer std::shared_ptr m_pTransportMain = nullptr; // main std::shared_ptr m_pTransportBak = nullptr; // backup std::atomic_bool m_bMultilink = {false}; // true: redundancy; false: single link std::atomic m_ConnServerType = {CS_UNKNOWN}; // 0:unknown; 1:main; 2:backup std::unique_ptr m_pRecvThread = nullptr; // thread for recv from DS std::unique_ptr m_pRecvThreadBak = nullptr; // thread for recv from DS std::atomic m_nHeadSeq = {0}; // request head seq std::mutex m_RequestMtx; std::unordered_map> m_mapRequest; // async requests std::shared_ptr m_pRecvBuffer = nullptr; // std::shared_ptr m_pSendBuffer = nullptr; std::shared_ptr> m_pSendBuffer = nullptr; std::atomic m_nSendBufferLen = {0}; std::mutex m_RecvBufferMtx; std::mutex m_SendBufferMtx; // call_back static std::atomic m_nBatchId; // start from 1 ; 0 is invalid. Maintain uniqueness within the process std::unique_ptr m_pThreadPool; // thread pool,for async exec register callback MapRegInfo m_mapRegInfos; // register info std::mutex m_CallbacksMtx; }; } // namespace dsfapi #endif