Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

intra_process: message frequency lower than expected, zero-copy? #399

Closed
EgalYue opened this issue Oct 17, 2019 · 5 comments
Closed

intra_process: message frequency lower than expected, zero-copy? #399

EgalYue opened this issue Oct 17, 2019 · 5 comments
Assignees
Labels
more-information-needed Further information is required

Comments

@EgalYue
Copy link

EgalYue commented Oct 17, 2019

Hi,
I'm not sure, whether this problem is related to #289 or ros2/rclcpp#504 (comment). If it is,then I apologize and please just close this issue.

Bug report

Required Info:

  • Operating System:
    • Ubuntu 18.04.
  • Installation type:
    • binaries
  • Version or commit hash:
    • Dashing

Steps to reproduce issue

Using Intra_process communication:
One node published Pointcloud message(The size of Pointcloud is only 430KB,) with the frequency of 10Hz.
One node acted as subscriber and subscribed the topic.

Expected behavior

The subscriber should receive 10 messages per second. The frequency of this topic should be 10Hz.

Actual behavior

There had big latency(5-6 messages per second). The frequency of this topic was much lower than 10Hz.

Additional information

So I made further test, I just modified two_node_pipeline demo, read 1MB data from a txt file and publish this 1MB message(std_msgs::msg::String) .

I found that:

  1. the frequency of this topic is only 8-9Hz.
  2. If there are more than one subscriber, the address of unique_ptr is not same, only one of the subscriber is same as publisher.

Code

// Copyright 2015 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <chrono>
#include <cinttypes>
#include <cstdio>
#include <memory>
#include <string>
#include <utility>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/int32.hpp"

#include <fstream>
#include "std_msgs/msg/string.hpp"
#include <ctime>
#include <iostream>

using namespace std::chrono_literals;

std::string read_datafile(std::string message_filename){
  std::ifstream ifs(message_filename.c_str());
  if(ifs.fail()) {
        std::cerr << "data_*byte.txt do not exist.\n";
        exit(0);
  }
  std::string bytedata;
  getline(ifs, bytedata);

  return bytedata;
}


// Node that produces messages.
struct Producer : public rclcpp::Node
{
  Producer(const std::string & name, const std::string & output, const std::string & bytedata)//const std::string & bytedata
  : Node(name, rclcpp::NodeOptions().use_intra_process_comms(true))
  {
    // Create a publisher on the output topic.
    pub_ = this->create_publisher<std_msgs::msg::String>(output, 10);
    std::weak_ptr<std::remove_pointer<decltype(pub_.get())>::type> captured_pub = pub_;

    // Create a timer which publishes on the output topic at ~1Hz.
    auto callback = [captured_pub,bytedata]() -> void {
        auto pub_ptr = captured_pub.lock();
        if (!pub_ptr) {
          return;
        }

        std_msgs::msg::String::UniquePtr msg_str(new std_msgs::msg::String());
        
        //clock_t startTime,endTime;
        //startTime = clock();
        //std::string bytedata = read_datafile("/home/nv09/Desktop/ros2_simple/src/simple_test_talker/src/byte_data/data_1Mbyte.txt");
        //endTime = clock();
        //std::cout << "The run time is: " <<(double)(endTime - startTime) / CLOCKS_PER_SEC << "s" << std::endl;

        msg_str->data = bytedata;
        printf(
          "Published message with value: , and address: 0x%" PRIXPTR "\n",
          reinterpret_cast<std::uintptr_t>(msg_str.get())); // msg_str->data.c_str()
        pub_ptr->publish(std::move(msg_str));
      };
    timer_ = this->create_wall_timer(0.1s, callback);
  }

  rclcpp::Publisher<std_msgs::msg::String>::SharedPtr pub_;
  rclcpp::TimerBase::SharedPtr timer_;
};

// Node that consumes messages.
struct Consumer : public rclcpp::Node
{
  Consumer(const std::string & name, const std::string & input)
  : Node(name, rclcpp::NodeOptions().use_intra_process_comms(true))
  {
    // Create a subscription on the input topic which prints on receipt of new messages.
    sub_ = this->create_subscription<std_msgs::msg::String>(
      input,
      10,
      [](std_msgs::msg::String::UniquePtr msg) {
        printf(
          " Received message with value:, and address: 0x%" PRIXPTR "\n",
          reinterpret_cast<std::uintptr_t>(msg.get()));
      });
  }

  rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sub_;
};


// Node that consumes messages.
struct Consumer2 : public rclcpp::Node
{
  Consumer2(const std::string & name, const std::string & input)
  : Node(name, rclcpp::NodeOptions().use_intra_process_comms(true))
  {
    // Create a subscription on the input topic which prints on receipt of new messages.
    sub_ = this->create_subscription<std_msgs::msg::String>(
      input,
      10,
      [](std_msgs::msg::String::UniquePtr msg) {
        printf(
          " 2 Received message with value:, and address: 0x%" PRIXPTR "\n",
          reinterpret_cast<std::uintptr_t>(msg.get()));
      });
  }

  rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sub_;
};

// Node that consumes messages.
struct Consumer3 : public rclcpp::Node
{
  Consumer3(const std::string & name, const std::string & input)
  : Node(name, rclcpp::NodeOptions().use_intra_process_comms(true))
  {
    // Create a subscription on the input topic which prints on receipt of new messages.
    sub_ = this->create_subscription<std_msgs::msg::String>(
      input,
      10,
      [](std_msgs::msg::String::UniquePtr msg) {
        printf(
          " 3 Received message with value:, and address: 0x%" PRIXPTR "\n",
          reinterpret_cast<std::uintptr_t>(msg.get()));
      });
  }

  rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sub_;
};

int main(int argc, char * argv[])
{
  setvbuf(stdout, NULL, _IONBF, BUFSIZ);
  rclcpp::init(argc, argv);
  rclcpp::executors::SingleThreadedExecutor executor;


  std::string bytedata = read_datafile("/home/nv09/Desktop/ros2_intra/intra_process_demo/src/two_node_pipeline/byte_data/data_1Mbyte.txt");
    
  auto producer = std::make_shared<Producer>("producer", "test_str", bytedata);
  //auto producer = std::make_shared<Producer>("producer", "test_str");
  auto consumer = std::make_shared<Consumer>("consumer", "test_str");
  auto consumer2 = std::make_shared<Consumer2>("consumer2", "test_str");
  auto consumer3 = std::make_shared<Consumer3>("consumer3", "test_str");

  executor.add_node(producer);
  executor.add_node(consumer);
  executor.add_node(consumer2);
  executor.add_node(consumer3);
  executor.spin();

  rclcpp::shutdown();

  return 0;
}

Terminal screenshot

Screenshot from 2019-10-17 20-17-29

@claireyywang claireyywang self-assigned this Nov 21, 2019
@claireyywang claireyywang removed their assignment Feb 18, 2021
@mjeronimo
Copy link

After some digging I found that the IntraProcessManager only provides the original unique_ptr to the last subscription and copies the message for the others: https://github.com/ros2/rclcpp/blob/master/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp#L402-L412

In both cases, the message is delivered in-process, but only the last is not a copy.

Perhaps @wjwwood has a suggestion about whether there is anything that can be done to allow all subscribers to receive the original message without a copy.

@wjwwood
Copy link
Member

wjwwood commented Mar 23, 2021

Perhaps @wjwwood has a suggestion about whether there is anything that can be done to allow all subscribers to receive the original message without a copy.

You would have to subscribe with a const reference. If more than one thing subscribes with unique_ptr<msg> then we have to make a copy for all but the last...

@wjwwood
Copy link
Member

wjwwood commented Mar 23, 2021

Also, this issue is really old, and using Dashing, so consider that.

I don't have an explanation for the latency, but I also haven't tried it with rolling or foxy.

@clalancette
Copy link
Contributor

@EgalYue This is very old now. Would you mind retrying on the latest Foxy or Galactic to see if you still have the same problem?

@clalancette clalancette added the more-information-needed Further information is required label Feb 3, 2022
@clalancette
Copy link
Contributor

No response in a very long time, so I'm going to close this out. Feel free to reopen if you are still having problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
more-information-needed Further information is required
Projects
None yet
Development

No branches or pull requests

5 participants