.. _program_listing_file_include_agnocast_agnocast_publisher.hpp: Program Listing for File agnocast_publisher.hpp =============================================== |exhale_lsh| :ref:`Return to documentation for file ` (``include/agnocast/agnocast_publisher.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #pragma once #include "agnocast/agnocast_ioctl.hpp" #include "agnocast/agnocast_mq.hpp" #include "agnocast/agnocast_smart_pointer.hpp" #include "agnocast/agnocast_utils.hpp" #include "rclcpp/rclcpp.hpp" #include "tracetools/tracetools.h" #include #include #include #include #include #include #include #include #include #include #include #include #include namespace agnocast { // These are cut out of the class for information hiding. topic_local_id_t initialize_publisher( const std::string & topic_name, const std::string & node_name, const rclcpp::QoS & qos); union ioctl_publish_args publish_core( [[maybe_unused]] const void * publisher_handle, /* for CARET */ const std::string & topic_name, const topic_local_id_t publisher_id, const uint64_t msg_virtual_address, std::unordered_map> & opened_mqs); uint32_t get_subscription_count_core(const std::string & topic_name); void increment_borrowed_publisher_num(); void decrement_borrowed_publisher_num(); extern int agnocast_fd; extern "C" uint32_t agnocast_get_borrowed_publisher_num(); struct PublisherOptions { // For transient local. If true, publish() does both Agnocast publish and ROS 2 publish, // regardless of the existence of ROS 2 subscriptions. bool do_always_ros2_publish = true; // No overrides allowed by default. rclcpp::QosOverridingOptions qos_overriding_options; }; template class Publisher { topic_local_id_t id_ = -1; std::string topic_name_; std::unordered_map> opened_mqs_; PublisherOptions options_; // ROS2 publish related variables typename rclcpp::Publisher::SharedPtr ros2_publisher_; mqd_t ros2_publish_mq_; std::string ros2_publish_mq_name_; std::queue> ros2_message_queue_; std::thread ros2_publish_thread_; std::mutex ros2_publish_mtx_; public: using SharedPtr = std::shared_ptr>; Publisher( rclcpp::Node * node, const std::string & topic_name, const rclcpp::QoS & qos, const PublisherOptions & options) : topic_name_(node->get_node_topics_interface()->resolve_topic_name(topic_name)) { rclcpp::PublisherOptions pub_options; pub_options.qos_overriding_options = options.qos_overriding_options; ros2_publisher_ = node->create_publisher(topic_name_, qos, pub_options); auto actual_qos = ros2_publisher_->get_actual_qos(); #ifdef TRACETOOLS_LTTNG_ENABLED TRACEPOINT( agnocast_publisher_init, static_cast(this), static_cast( node->get_node_base_interface()->get_shared_rcl_node_handle().get()), topic_name_.c_str(), actual_qos.depth()); #endif if (actual_qos.durability() == rclcpp::DurabilityPolicy::TransientLocal) { options_.do_always_ros2_publish = options.do_always_ros2_publish; } else { options_.do_always_ros2_publish = false; } id_ = initialize_publisher(topic_name_, node->get_fully_qualified_name(), actual_qos); ros2_publish_mq_name_ = create_mq_name_for_ros2_publish(topic_name_, id_); const int mq_mode = 0666; struct mq_attr attr = {}; attr.mq_flags = 0; attr.mq_maxmsg = 1; attr.mq_msgsize = sizeof(MqMsgROS2Publish); attr.mq_curmsgs = 0; ros2_publish_mq_ = mq_open(ros2_publish_mq_name_.c_str(), O_CREAT | O_WRONLY | O_NONBLOCK, mq_mode, &attr); if (ros2_publish_mq_ == -1) { RCLCPP_ERROR(logger, "mq_open failed: %s", strerror(errno)); close(agnocast_fd); exit(EXIT_FAILURE); } ros2_publish_thread_ = std::thread([this]() { do_ros2_publish(); }); } ~Publisher() { MqMsgROS2Publish mq_msg = {}; mq_msg.should_terminate = true; if (mq_send(ros2_publish_mq_, reinterpret_cast(&mq_msg), sizeof(mq_msg), 0) == -1) { RCLCPP_ERROR(logger, "mq_send failed: %s", strerror(errno)); } ros2_publish_thread_.join(); if (mq_close(ros2_publish_mq_) == -1) { RCLCPP_ERROR(logger, "mq_close failed: %s", strerror(errno)); } if (mq_unlink(ros2_publish_mq_name_.c_str()) == -1) { RCLCPP_ERROR(logger, "mq_unlink failed: %s", strerror(errno)); } } void do_ros2_publish() { mqd_t mq = mq_open(ros2_publish_mq_name_.c_str(), O_RDONLY); if (mq == -1) { RCLCPP_ERROR(logger, "mq_open failed: %s", strerror(errno)); close(agnocast_fd); exit(EXIT_FAILURE); } while (true) { MqMsgROS2Publish mq_msg = {}; auto ret = mq_receive(mq, reinterpret_cast(&mq_msg), sizeof(mq_msg), nullptr); if (ret == -1) { RCLCPP_ERROR(logger, "mq_receive failed: %s", strerror(errno)); close(agnocast_fd); exit(EXIT_FAILURE); } if (mq_msg.should_terminate) { break; } while (true) { ipc_shared_ptr message; { std::scoped_lock lock(ros2_publish_mtx_); if (ros2_message_queue_.empty()) { break; } message = std::move(ros2_message_queue_.front()); ros2_message_queue_.pop(); } ros2_publisher_->publish(*message.get()); } } if (mq_close(mq) == -1) { RCLCPP_ERROR(logger, "mq_close failed: %s", strerror(errno)); } } ipc_shared_ptr borrow_loaned_message() { increment_borrowed_publisher_num(); MessageT * ptr = new MessageT(); return ipc_shared_ptr(ptr, topic_name_.c_str(), id_); } void publish(ipc_shared_ptr && message) { if (!message || topic_name_ != message.get_topic_name()) { RCLCPP_ERROR(logger, "Invalid message to publish."); close(agnocast_fd); exit(EXIT_FAILURE); } decrement_borrowed_publisher_num(); const union ioctl_publish_args publish_args = publish_core(this, topic_name_, id_, reinterpret_cast(message.get()), opened_mqs_); message.set_entry_id(publish_args.ret_entry_id); for (uint32_t i = 0; i < publish_args.ret_released_num; i++) { MessageT * release_ptr = reinterpret_cast(publish_args.ret_released_addrs[i]); delete release_ptr; } if (options_.do_always_ros2_publish || ros2_publisher_->get_subscription_count() > 0) { { std::lock_guard lock(ros2_publish_mtx_); ros2_message_queue_.push(std::move(message)); } MqMsgROS2Publish mq_msg = {}; mq_msg.should_terminate = false; if (mq_send(ros2_publish_mq_, reinterpret_cast(&mq_msg), sizeof(mq_msg), 0) == -1) { // If it returns EAGAIN, it means mq_send has already been executed, but the ros2 publish // thread hasn't received it yet. Thus, there's no need to send it again since the // notification has already been sent. if (errno != EAGAIN) { RCLCPP_ERROR(logger, "mq_send failed: %s", strerror(errno)); } } } else { message.reset(); } } uint32_t get_subscription_count() const { return ros2_publisher_->get_subscription_count() + get_subscription_count_core(topic_name_); } }; } // namespace agnocast