Go2Py/deploy/dds_bridge/include/unitree/common/dds/dds_entity.hpp

407 lines
8.9 KiB
C++
Raw Normal View History

2024-01-04 03:45:40 +08:00
#ifndef __UT_DDS_ENTITY_HPP__
#define __UT_DDS_ENTITY_HPP__
#include <dds/dds.hpp>
#include <unitree/common/log/log.hpp>
#include <unitree/common/block_queue.hpp>
#include <unitree/common/thread/thread.hpp>
#include <unitree/common/time/time_tool.hpp>
#include <unitree/common/time/sleep.hpp>
#include <unitree/common/dds/dds_exception.hpp>
#include <unitree/common/dds/dds_callback.hpp>
#include <unitree/common/dds/dds_qos.hpp>
#include <unitree/common/dds/dds_traits.hpp>
#define __UT_DDS_NULL__ ::dds::core::null
/*
* dds wait sub/pub matched default time slice.
* default 50000 us
*/
#define __UT_DDS_WAIT_MATCHED_TIME_SLICE 50000
using namespace org::eclipse::cyclonedds;
namespace unitree
{
namespace common
{
class DdsLogger
{
public:
DdsLogger();
virtual ~DdsLogger();
protected:
Logger* mLogger;
};
/*
* @brief: DdsParticipant
*/
class DdsParticipant : public DdsLogger
{
public:
using NATIVE_TYPE = ::dds::domain::DomainParticipant;
explicit DdsParticipant(uint32_t domainId, const DdsParticipantQos& qos, const std::string& config = "");
~DdsParticipant();
const NATIVE_TYPE& GetNative() const;
private:
NATIVE_TYPE mNative;
};
using DdsParticipantPtr = std::shared_ptr<DdsParticipant>;
/*
* @brief: DdsPublisher
*/
class DdsPublisher : public DdsLogger
{
public:
using NATIVE_TYPE = ::dds::pub::Publisher;
explicit DdsPublisher(const DdsParticipantPtr& participant, const DdsPublisherQos& qos);
~DdsPublisher();
const NATIVE_TYPE& GetNative() const;
private:
NATIVE_TYPE mNative;
};
using DdsPublisherPtr = std::shared_ptr<DdsPublisher>;
/*
* @brief: DdsSubscriber
*/
class DdsSubscriber : public DdsLogger
{
public:
using NATIVE_TYPE = ::dds::sub::Subscriber;
explicit DdsSubscriber(const DdsParticipantPtr& participant, const DdsSubscriberQos& qos);
~DdsSubscriber();
const NATIVE_TYPE& GetNative() const;
private:
NATIVE_TYPE mNative;
};
using DdsSubscriberPtr = std::shared_ptr<DdsSubscriber>;
/*
* @brief: DdsTopic
*/
template<typename MSG>
class DdsTopic : public DdsLogger
{
public:
using NATIVE_TYPE = ::dds::topic::Topic<MSG>;
explicit DdsTopic(const DdsParticipantPtr& participant, const std::string& name, const DdsTopicQos& qos) :
mNative(__UT_DDS_NULL__)
{
UT_DDS_EXCEPTION_TRY
auto topicQos = participant->GetNative().default_topic_qos();
qos.CopyToNativeQos(topicQos);
mNative = NATIVE_TYPE(participant->GetNative(), name, topicQos);
UT_DDS_EXCEPTION_CATCH(mLogger, true)
}
~DdsTopic()
{
mNative = __UT_DDS_NULL__;
}
const NATIVE_TYPE& GetNative() const
{
return mNative;
}
private:
NATIVE_TYPE mNative;
};
template<typename MSG>
using DdsTopicPtr = std::shared_ptr<DdsTopic<MSG>>;
/*
* @brief: DdsWriter
*/
template<typename MSG>
class DdsWriter : public DdsLogger
{
public:
using NATIVE_TYPE = ::dds::pub::DataWriter<MSG>;
explicit DdsWriter(const DdsPublisherPtr publisher, const DdsTopicPtr<MSG>& topic, const DdsWriterQos& qos) :
mNative(__UT_DDS_NULL__)
{
UT_DDS_EXCEPTION_TRY
auto writerQos = publisher->GetNative().default_datawriter_qos();
qos.CopyToNativeQos(writerQos);
mNative = NATIVE_TYPE(publisher->GetNative(), topic->GetNative(), writerQos);
UT_DDS_EXCEPTION_CATCH(mLogger, true)
}
~DdsWriter()
{
mNative = __UT_DDS_NULL__;
}
const NATIVE_TYPE& GetNative() const
{
return mNative;
}
bool Write(const MSG& message, int64_t waitMicrosec)
{
if (waitMicrosec > 0 && !WaitReader(waitMicrosec))
{
return false;
}
UT_DDS_EXCEPTION_TRY
{
mNative.write(message);
return true;
}
UT_DDS_EXCEPTION_CATCH(mLogger, false)
return false;
}
private:
bool WaitReader(int64_t waitMicrosec)
{
while (mNative.publication_matched_status().current_count() == 0)
{
if (waitMicrosec <= 0)
{
return false;
}
MicroSleep(__UT_DDS_WAIT_MATCHED_TIME_SLICE);
waitMicrosec -=__UT_DDS_WAIT_MATCHED_TIME_SLICE;
}
return true;
}
private:
NATIVE_TYPE mNative;
};
template<typename MSG>
using DdsWriterPtr = std::shared_ptr<DdsWriter<MSG>>;
/*
* @brief: DdsReaderListener
*/
template<typename MSG>
class DdsReaderListener : public ::dds::sub::NoOpDataReaderListener<MSG>, DdsLogger
{
public:
using NATIVE_TYPE = ::dds::sub::DataReaderListener<MSG>;
using MSG_PTR = std::shared_ptr<MSG>;
explicit DdsReaderListener() :
mHasQueue(false), mQuit(false), mMask(::dds::core::status::StatusMask::none()), mLastDataAvailableTime(0)
{}
~DdsReaderListener()
{
if (mHasQueue)
{
mQuit = true;
mDataQueuePtr->Interrupt(false);
mDataQueueThreadPtr->Wait();
}
}
void SetCallback(const DdsReaderCallback& cb)
{
if (cb.HasMessageHandler())
{
mMask |= ::dds::core::status::StatusMask::data_available();
}
mCallbackPtr.reset(new DdsReaderCallback(cb));
}
void SetQueue(int32_t len)
{
if (len <= 0)
{
return;
}
mHasQueue = true;
mDataQueuePtr.reset(new BlockQueue<MSG_PTR>(len));
auto queueThreadFunc = [this]() {
while (true)
{
if (mCallbackPtr && mCallbackPtr->HasMessageHandler())
{
break;
}
else
{
MicroSleep(__UT_DDS_WAIT_MATCHED_TIME_SLICE);
}
}
while (!mQuit)
{
MSG_PTR dataPtr;
if (mDataQueuePtr->Get(dataPtr))
{
if (dataPtr)
{
mCallbackPtr->OnDataAvailable(dataPtr.get());
}
}
}
return 0;
};
mDataQueueThreadPtr = CreateThreadEx("rlsnr", UT_CPU_ID_NONE, queueThreadFunc);
}
int64_t GetLastDataAvailableTime() const
{
return mLastDataAvailableTime;
}
NATIVE_TYPE* GetNative() const
{
return (NATIVE_TYPE*)this;
}
const ::dds::core::status::StatusMask& GetStatusMask() const
{
return mMask;
}
private:
void on_data_available(::dds::sub::DataReader<MSG>& reader)
{
::dds::sub::LoanedSamples<MSG> samples;
samples = reader.take();
if (samples.length() <= 0)
{
return;
}
typename ::dds::sub::LoanedSamples<MSG>::const_iterator iter;
for (iter=samples.begin(); iter<samples.end(); ++iter)
{
const MSG& m = iter->data();
if (iter->info().valid())
{
mLastDataAvailableTime = GetCurrentMonotonicTimeNanosecond();
if (mHasQueue)
{
if (!mDataQueuePtr->Put(MSG_PTR(new MSG(m)), true))
{
LOG_WARNING(mLogger, "earliest mesage was evicted. type:", DdsGetTypeName(MSG));
}
}
else
{
mCallbackPtr->OnDataAvailable((const void*)&m);
}
}
}
}
private:
bool mHasQueue;
volatile bool mQuit;
::dds::core::status::StatusMask mMask;
int64_t mLastDataAvailableTime;
DdsReaderCallbackPtr mCallbackPtr;
BlockQueuePtr<MSG_PTR> mDataQueuePtr;
ThreadPtr mDataQueueThreadPtr;
};
template<typename MSG>
using DdsReaderListenerPtr = std::shared_ptr<DdsReaderListener<MSG>>;
/*
* @brief: DdsReader
*/
template<typename MSG>
class DdsReader : public DdsLogger
{
public:
using NATIVE_TYPE = ::dds::sub::DataReader<MSG>;
explicit DdsReader(const DdsSubscriberPtr& subscriber, const DdsTopicPtr<MSG>& topic, const DdsReaderQos& qos) :
mNative(__UT_DDS_NULL__)
{
UT_DDS_EXCEPTION_TRY
auto readerQos = subscriber->GetNative().default_datareader_qos();
qos.CopyToNativeQos(readerQos);
mNative = NATIVE_TYPE(subscriber->GetNative(), topic->GetNative(), readerQos);
UT_DDS_EXCEPTION_CATCH(mLogger, true)
}
~DdsReader()
{
mNative = __UT_DDS_NULL__;
}
const NATIVE_TYPE& GetNative() const
{
return mNative;
}
void SetListener(const DdsReaderCallback& cb, int32_t qlen)
{
mListener.SetCallback(cb);
mListener.SetQueue(qlen);
mNative.listener(mListener.GetNative(), mListener.GetStatusMask());
}
int64_t GetLastDataAvailableTime() const
{
return mListener.GetLastDataAvailableTime();
}
private:
NATIVE_TYPE mNative;
DdsReaderListener<MSG> mListener;
};
template<typename MSG>
using DdsReaderPtr = std::shared_ptr<DdsReader<MSG>>;
}
}
#endif//__UT_DDS_ENTITY_HPP__