#ifndef __UT_DDS_ENTITY_HPP__ #define __UT_DDS_ENTITY_HPP__ #include #include #include #include #include #include #include #include #include #include #define __UT_DDS_NULL__ ::dds::core::null /* * dds wait sub/pub matched default time slice. * default 50000 us */ #define __UT_DDS_WAIT_MATCHED_TIME_SLICE 50000 using namespace org::eclipse::cyclonedds; namespace unitree { namespace common { class DdsLogger { public: DdsLogger(); virtual ~DdsLogger(); protected: Logger* mLogger; }; /* * @brief: DdsParticipant */ class DdsParticipant : public DdsLogger { public: using NATIVE_TYPE = ::dds::domain::DomainParticipant; explicit DdsParticipant(uint32_t domainId, const DdsParticipantQos& qos, const std::string& config = ""); ~DdsParticipant(); const NATIVE_TYPE& GetNative() const; private: NATIVE_TYPE mNative; }; using DdsParticipantPtr = std::shared_ptr; /* * @brief: DdsPublisher */ class DdsPublisher : public DdsLogger { public: using NATIVE_TYPE = ::dds::pub::Publisher; explicit DdsPublisher(const DdsParticipantPtr& participant, const DdsPublisherQos& qos); ~DdsPublisher(); const NATIVE_TYPE& GetNative() const; private: NATIVE_TYPE mNative; }; using DdsPublisherPtr = std::shared_ptr; /* * @brief: DdsSubscriber */ class DdsSubscriber : public DdsLogger { public: using NATIVE_TYPE = ::dds::sub::Subscriber; explicit DdsSubscriber(const DdsParticipantPtr& participant, const DdsSubscriberQos& qos); ~DdsSubscriber(); const NATIVE_TYPE& GetNative() const; private: NATIVE_TYPE mNative; }; using DdsSubscriberPtr = std::shared_ptr; /* * @brief: DdsTopic */ template class DdsTopic : public DdsLogger { public: using NATIVE_TYPE = ::dds::topic::Topic; explicit DdsTopic(const DdsParticipantPtr& participant, const std::string& name, const DdsTopicQos& qos) : mNative(__UT_DDS_NULL__) { UT_DDS_EXCEPTION_TRY auto topicQos = participant->GetNative().default_topic_qos(); qos.CopyToNativeQos(topicQos); mNative = NATIVE_TYPE(participant->GetNative(), name, topicQos); UT_DDS_EXCEPTION_CATCH(mLogger, true) } ~DdsTopic() { mNative = __UT_DDS_NULL__; } const NATIVE_TYPE& GetNative() const { return mNative; } private: NATIVE_TYPE mNative; }; template using DdsTopicPtr = std::shared_ptr>; /* * @brief: DdsWriter */ template class DdsWriter : public DdsLogger { public: using NATIVE_TYPE = ::dds::pub::DataWriter; explicit DdsWriter(const DdsPublisherPtr publisher, const DdsTopicPtr& topic, const DdsWriterQos& qos) : mNative(__UT_DDS_NULL__) { UT_DDS_EXCEPTION_TRY auto writerQos = publisher->GetNative().default_datawriter_qos(); qos.CopyToNativeQos(writerQos); mNative = NATIVE_TYPE(publisher->GetNative(), topic->GetNative(), writerQos); UT_DDS_EXCEPTION_CATCH(mLogger, true) } ~DdsWriter() { mNative = __UT_DDS_NULL__; } const NATIVE_TYPE& GetNative() const { return mNative; } bool Write(const MSG& message, int64_t waitMicrosec) { if (waitMicrosec > 0 && !WaitReader(waitMicrosec)) { return false; } UT_DDS_EXCEPTION_TRY { mNative.write(message); return true; } UT_DDS_EXCEPTION_CATCH(mLogger, false) return false; } private: bool WaitReader(int64_t waitMicrosec) { while (mNative.publication_matched_status().current_count() == 0) { if (waitMicrosec <= 0) { return false; } MicroSleep(__UT_DDS_WAIT_MATCHED_TIME_SLICE); waitMicrosec -=__UT_DDS_WAIT_MATCHED_TIME_SLICE; } return true; } private: NATIVE_TYPE mNative; }; template using DdsWriterPtr = std::shared_ptr>; /* * @brief: DdsReaderListener */ template class DdsReaderListener : public ::dds::sub::NoOpDataReaderListener, DdsLogger { public: using NATIVE_TYPE = ::dds::sub::DataReaderListener; using MSG_PTR = std::shared_ptr; explicit DdsReaderListener() : mHasQueue(false), mQuit(false), mMask(::dds::core::status::StatusMask::none()), mLastDataAvailableTime(0) {} ~DdsReaderListener() { if (mHasQueue) { mQuit = true; mDataQueuePtr->Interrupt(false); mDataQueueThreadPtr->Wait(); } } void SetCallback(const DdsReaderCallback& cb) { if (cb.HasMessageHandler()) { mMask |= ::dds::core::status::StatusMask::data_available(); } mCallbackPtr.reset(new DdsReaderCallback(cb)); } void SetQueue(int32_t len) { if (len <= 0) { return; } mHasQueue = true; mDataQueuePtr.reset(new BlockQueue(len)); auto queueThreadFunc = [this]() { while (true) { if (mCallbackPtr && mCallbackPtr->HasMessageHandler()) { break; } else { MicroSleep(__UT_DDS_WAIT_MATCHED_TIME_SLICE); } } while (!mQuit) { MSG_PTR dataPtr; if (mDataQueuePtr->Get(dataPtr)) { if (dataPtr) { mCallbackPtr->OnDataAvailable(dataPtr.get()); } } } return 0; }; mDataQueueThreadPtr = CreateThreadEx("rlsnr", UT_CPU_ID_NONE, queueThreadFunc); } int64_t GetLastDataAvailableTime() const { return mLastDataAvailableTime; } NATIVE_TYPE* GetNative() const { return (NATIVE_TYPE*)this; } const ::dds::core::status::StatusMask& GetStatusMask() const { return mMask; } private: void on_data_available(::dds::sub::DataReader& reader) { ::dds::sub::LoanedSamples samples; samples = reader.take(); if (samples.length() <= 0) { return; } typename ::dds::sub::LoanedSamples::const_iterator iter; for (iter=samples.begin(); iterdata(); if (iter->info().valid()) { mLastDataAvailableTime = GetCurrentMonotonicTimeNanosecond(); if (mHasQueue) { if (!mDataQueuePtr->Put(MSG_PTR(new MSG(m)), true)) { LOG_WARNING(mLogger, "earliest mesage was evicted. type:", DdsGetTypeName(MSG)); } } else { mCallbackPtr->OnDataAvailable((const void*)&m); } } } } private: bool mHasQueue; volatile bool mQuit; ::dds::core::status::StatusMask mMask; int64_t mLastDataAvailableTime; DdsReaderCallbackPtr mCallbackPtr; BlockQueuePtr mDataQueuePtr; ThreadPtr mDataQueueThreadPtr; }; template using DdsReaderListenerPtr = std::shared_ptr>; /* * @brief: DdsReader */ template class DdsReader : public DdsLogger { public: using NATIVE_TYPE = ::dds::sub::DataReader; explicit DdsReader(const DdsSubscriberPtr& subscriber, const DdsTopicPtr& topic, const DdsReaderQos& qos) : mNative(__UT_DDS_NULL__) { UT_DDS_EXCEPTION_TRY auto readerQos = subscriber->GetNative().default_datareader_qos(); qos.CopyToNativeQos(readerQos); mNative = NATIVE_TYPE(subscriber->GetNative(), topic->GetNative(), readerQos); UT_DDS_EXCEPTION_CATCH(mLogger, true) } ~DdsReader() { mNative = __UT_DDS_NULL__; } const NATIVE_TYPE& GetNative() const { return mNative; } void SetListener(const DdsReaderCallback& cb, int32_t qlen) { mListener.SetCallback(cb); mListener.SetQueue(qlen); mNative.listener(mListener.GetNative(), mListener.GetStatusMask()); } int64_t GetLastDataAvailableTime() const { return mListener.GetLastDataAvailableTime(); } private: NATIVE_TYPE mNative; DdsReaderListener mListener; }; template using DdsReaderPtr = std::shared_ptr>; } } #endif//__UT_DDS_ENTITY_HPP__