eis/third_party/dsf/include/drsdk/drsdk_transport_socket.h

218 lines
7.5 KiB
C
Raw Normal View History

/**
* Filename drsdk_transport_socket.h
* Copyright Shanghai Baosight Software Co., Ltd.
* Description Define DrSdkTransportSocket, as client use boost::asio to communicate with DataService.
*
* Author wuzheqiang
* Version 09/20/2024 wuzheqiang Initial Version
**************************************************************/
#ifndef DRSDK_TRANSPORT_SOCKET_H
#define DRSDK_TRANSPORT_SOCKET_H
#include "drsdk_transport.h"
#include <atomic>
#include <boost/asio.hpp>
#include <boost/asio/deadline_timer.hpp> // 使用 deadline_timer
#include <condition_variable>
#include <list>
#include <mutex>
#include <queue>
#include <tuple>
namespace dsfapi
{
class DrSdkTransportSocket : public DrSdkTransport, public std::enable_shared_from_this<DrSdkTransportSocket>
{
public:
using DataTuple = std::tuple<std::shared_ptr<char>, int32>; // buffer , length
using DataTupleSend = std::tuple<std::shared_ptr<char>, int32, uint64>; // buffer , length , ms_timestamp
DrSdkTransportSocket();
~DrSdkTransportSocket();
std::shared_ptr<DrSdkTransportSocket> getSharedFromThis()
{
return shared_from_this();
}
/**
* @brief init DrSdkTransportSocket: conenct to DataService
* @param [in] tDRSdkConnectParam contains server ip, server port, connect timeout
* @return Successfully returned 0, other returned error codes
* @version 09/10/2024 wuzheqiang Initial Version
*/
int32 Init(const DRSdkConnectParam &tDRSdkConnectParam, const bool bIsMain = true);
/**
* @brief Close socket
* @return Successfully returned 0, other returned error codes
* @version 09/10/2024 wuzheqiang Initial Version
*/
int32 Close();
/**
* @brief Send data to DataService
* @param [in] pSendBuf buffer to send
* @param [in] nLenBuf buffer length
* @return Successfully returned 0, other returned error codes
* @version 09/10/2024 wuzheqiang Initial Version
*/
int32 SendData(const char *pSendBuf, int32 nLenBuf);
/**
* @brief Recv data to DataService
* @param [in] pSendBuf buffer to recv
* @param [in] nLenBuf received length
* @return Successfully returned 0, other returned error codes
* @version 09/10/2024 wuzheqiang Initial Version
*/
int32 RecvData(std::shared_ptr<char> &pRecvBuf, int32 &pBufLen);
/**
* @brief get connection status
* @return true if connected else false
* @version 2024/10/09 wuzheqiang Initial Version
*/
bool GetConnectStatus() const;
/**
* @brief stop read thread and close connection
* @version 2024/10/18 wuzheqiang Initial Version
*/
void Stop();
/**
* @brief set connected callback, for automatic register async read callback
* @version 2024/10/09 wuzheqiang Initial Version
*/
void SetConnectedCallback(const std::function<void(std::shared_ptr<DrSdkTransport>)> &pConnectedCallback);
SDKRMStatus GetRmStatus() const
{
return m_RmStatus;
}
bool GetAvailable() const
{
return m_bAvailable.load();
}
const std::string &GetServiceName() const
{
return m_sServiceName;
}
private:
/**
* @brief start connect to dataseDataServicervice
* @version 2024/09/23 wuzheqiang Initial Version
*/
void StartConnect();
/**
* @brief after connected, start read data
* @version 2024/09/23 wuzheqiang Initial Version
*/
void HandleConnect(const boost::system::error_code error);
/**
* @brief start read data
* @version 2024/09/23 wuzheqiang Initial Version
*/
void StartRead();
/**
* @brief read data head
* @param [in] error error code
* @param [in] bytes_transferred readed data length
* @version 2024/09/23 wuzheqiang Initial Version
*/
void HandleReadHead(const boost::system::error_code &error, size_t bytes_transferred);
/**
* @brief read data body
* @param [in] error error code
* @param [in] bytes_transferred readed data length
* @version 2024/09/23 wuzheqiang Initial Version
*/
void HandleReadBody(const boost::system::error_code &error, size_t bytes_transferred);
/**
* @brief Handle send data result
* @param [in] error error code
* @param [in] bytes_transferred sent data length
* @version 2024/09/23 wuzheqiang Initial Version
*/
void HandleSend(const boost::system::error_code &error, size_t bytes_transferred);
void SendInternal();
/**
* @brief Start the reconnection process
* @version 2024/09/23 wuzheqiang Initial Version
*/
void StartReconnect();
/**
* @brief Handle reconnection result
* @version 2024/09/23 wuzheqiang Initial Version
*/
void HandleReconnect(const boost::system::error_code &error);
/**
* @brief put data to queue
* @version 2024/09/23 wuzheqiang Initial Version
*/
bool Enqueue(DataTuple &tData);
/**
* @brief get data to queue
* @version 2024/09/23 wuzheqiang Initial Version
*/
bool Dequeue(DataTuple &tData);
/**
* @brief deal heart beat
* @version 2025/02/18 wuzheqiang Initial Version
*/
void DealHeartBeat(const DataTuple &tData);
/**
* @brief check heart beat timeout
* @version 2025/02/18 wuzheqiang Initial Version
*/
void CheckHeartBeatTimeOut(const boost::system::error_code &error);
private:
DRSdkConnectParam m_tDRSdkConnectParam;
std::string m_sServiceName = "main"; // for log; redundancy [bak]
std::string m_sServerIp = "";
std::string m_sServerPort = "";
boost::asio::io_service m_IoService; // 使用 io_service
std::unique_ptr<boost::asio::ip::tcp::socket> m_pSocket = nullptr;
boost::asio::ip::tcp::resolver m_tResolver;
std::atomic_bool m_bConnected = {false};
boost::asio::deadline_timer m_HeartBeatTimer; // Timer for heartbeat
boost::asio::deadline_timer m_ReconnectTimer; // Timer for delayed reconnection
boost::asio::deadline_timer m_TimeoutTimer; // Timer for connect timeout
std::function<void(std::shared_ptr<DrSdkTransport>)> m_pConnectedCallback = nullptr; // Callback when connected
std::shared_ptr<char> m_RecvBufHead = nullptr;
// std::shared_ptr<char[]> m_RecvBufHead = nullptr;
// std::shared_ptr<char[]> m_RecvBufBody = nullptr;
std::vector<char> m_RecvBufBody = {};
size_t m_RecvBufBodyLen = 0;
std::queue<DataTuple> m_RecvQueue; // recv socket buf and send data to drsdk_manager
std::mutex m_Mutex;
std::condition_variable m_Cv;
std::atomic_bool m_bStop = {false};
uint64 m_nLastTransportTimeMs = 0; // updated when conncet/send/recv
// uint64 m_nLastHeartBeatTimeMs = 0; // last received heart beat time
std::chrono::steady_clock::time_point m_nLastHeartBeatTimeMs =
std::chrono::steady_clock::now(); // last received heart beat time
SDKRMStatus m_RmStatus = SDKRMStatus::SDKRM_STATUS_INACTIVE;
std::atomic_bool m_bAvailable = {false}; // transport is available ,for rm_status and connect timeout
std::atomic_bool m_bSendFlag = {false}; // sending flag
std::queue<DataTupleSend> m_SendQueue; // for send data to io_service
std::mutex m_SendQueueMutex;
};
} // namespace dsfapi
#endif