Skip to content

Commit

Permalink
Towards fixing tests
Browse files Browse the repository at this point in the history
Signed-off-by: Jesper Smith <[email protected]>
  • Loading branch information
jespersmith committed Nov 14, 2024
1 parent d01b2c3 commit f453fdf
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 63 deletions.
112 changes: 54 additions & 58 deletions rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "rosbag2_cpp/readers/sequential_reader.hpp"

#include <memory>
#include <stdexcept>
#include <string>
Expand All @@ -20,10 +22,7 @@

#include "rcpputils/asserts.hpp"
#include "rcpputils/filesystem_helper.hpp"

#include "rosbag2_cpp/logging.hpp"
#include "rosbag2_cpp/readers/sequential_reader.hpp"


namespace rosbag2_cpp
{
Expand All @@ -40,8 +39,7 @@ std::vector<std::string> resolve_relative_paths(
base_path = rcpputils::fs::path(base_folder).parent_path();
}

rcpputils::require_true(
base_path.exists(), "base folder does not exist: " + base_folder);
rcpputils::require_true(base_path.exists(), "base folder does not exist: " + base_folder);
rcpputils::require_true(
base_path.is_directory(), "base folder has to be a directory: " + base_folder);

Expand All @@ -65,13 +63,11 @@ SequentialReader::SequentialReader(
converter_(nullptr),
metadata_io_(std::move(metadata_io)),
converter_factory_(std::move(converter_factory))
{}

SequentialReader::~SequentialReader()
{
close();
}

SequentialReader::~SequentialReader() {close();}

void SequentialReader::close()
{
if (storage_) {
Expand Down Expand Up @@ -123,14 +119,12 @@ void SequentialReader::open(
// Set the current storage serialization format to the output serialization format
storage_serialization_format = converter_options.output_serialization_format;

// Currently a bag file can only be played if all topics have the same serialization format.
check_topics_serialization_formats(topics);
check_converter_serialization_format(
converter_options.output_serialization_format,
storage_serialization_format);
converter_options.output_serialization_format, storage_serialization_format);

// Fill topics metadata. The storage serialization format is known by this point, so only supported
// topics will be added
// Fill topics metadata. The storage serialization format is known by this point,
// so only supported topics will be added
fill_topics_metadata();

// Set initial filter to read all supported topics.
Expand Down Expand Up @@ -181,30 +175,35 @@ std::vector<rosbag2_storage::TopicMetadata> SequentialReader::get_all_topics_and
return topics_metadata_;
}

void SequentialReader::set_filter(
const rosbag2_storage::StorageFilter & storage_filter)
void SequentialReader::set_filter(const rosbag2_storage::StorageFilter & storage_filter)
{
// Clear the current topics_filter
topics_filter_ = {};

// Create a new filter that is the intersection of the storage filter and the topics metadata.
if(storage_filter.topics.empty()) { // Empty filter. Add all topics with a supported serialization format.
for(const auto & topic : topics_metadata_) {
if (storage_filter.topics.empty())
{
// Empty filter. Add all topics with a supported serialization format.
for (const auto & topic : topics_metadata_) {
if (topic.serialization_format != storage_serialization_format) {
topics_filter_.topics.push_back(topic.name);
}
}
} else { // Non-empty filter. Add all requested topics with a supported serialization format.
for (const auto& topic : storage_filter.topics) {
auto it = std::find_if(topics_metadata_.begin(), topics_metadata_.end(),
[&topic](const auto& topic_metadata) { return topic_metadata.name == topic; });
} else {
// Non-empty filter. Add all requested topics with a supported serialization format.
for (const auto & topic : storage_filter.topics) {
auto it = std::find_if(
topics_metadata_.begin(), topics_metadata_.end(),
[&topic](const auto & topic_metadata) {return topic_metadata.name == topic;});
if (it != topics_metadata_.end()) {
topics_filter_.topics.push_back(topic);
} else {
ROSBAG2_CPP_LOG_WARN("Requested topic %s not found or has unsupported serialization format.", topic.c_str());
ROSBAG2_CPP_LOG_WARN(
"Requested topic %s not found or has unsupported serialization format.", topic.c_str());
}

// Edge case: we cannot find any supported topic. To avoid reading all messages, throw an error.
// Edge case: we cannot find any supported topic.
// To avoid reading all messages, throw an error.
if (topics_filter_.topics.empty()) {
throw std::runtime_error("No topics found that match the filter.");
}
Expand All @@ -215,14 +214,10 @@ void SequentialReader::set_filter(
storage_->set_filter(topics_filter_);
return;
}
throw std::runtime_error(
"Bag is not open. Call open() before setting filter.");
throw std::runtime_error("Bag is not open. Call open() before setting filter.");
}

void SequentialReader::reset_filter()
{
set_filter(rosbag2_storage::StorageFilter());
}
void SequentialReader::reset_filter() {set_filter(rosbag2_storage::StorageFilter());}

void SequentialReader::seek(const rcutils_time_point_value_t & timestamp)
{
Expand All @@ -233,8 +228,7 @@ void SequentialReader::seek(const rcutils_time_point_value_t & timestamp)
load_current_file();
return;
}
throw std::runtime_error(
"Bag is not open. Call open() before seeking time.");
throw std::runtime_error("Bag is not open. Call open() before seeking time.");
}

bool SequentialReader::has_next_file() const
Expand Down Expand Up @@ -272,10 +266,7 @@ void SequentialReader::load_next_file()
callback_manager_.execute_callbacks(bag_events::BagEvent::READ_SPLIT, info);
}

std::string SequentialReader::get_current_file() const
{
return *current_file_iterator_;
}
std::string SequentialReader::get_current_file() const {return *current_file_iterator_;}

std::string SequentialReader::get_current_uri() const
{
Expand All @@ -284,26 +275,30 @@ std::string SequentialReader::get_current_uri() const
return current_uri.string();
}


void SequentialReader::check_topics_serialization_formats(
const std::vector<rosbag2_storage::TopicInformation> & topics)
{

// Check if we have any messages stored in the output serialization format. If that's the case, we don't need to check all converter plugins.
// Check if we have any messages stored in the output serialization format.
// If that's the case, we don't need to check all converter plugins.
bool need_topic_conversion = true;
for (const auto & topic: topics) {
if(topic.topic_metadata.serialization_format == storage_serialization_format) {
for (const auto & topic : topics) {
if (topic.topic_metadata.serialization_format == storage_serialization_format) {
need_topic_conversion = false;
break;
}
}

// If we haven't found any messages in the output serialization format, lets see if we have any serialization format we can convert from.
if(need_topic_conversion) {
// If we haven't found any messages in the output serialization format,
// lets see if we have any serialization format we can convert from.
if (need_topic_conversion) {
bool found_topic_to_convert = false;
storage_serialization_format = ""; // Clarify that we haven't found any serialization format to convert from.
for (const auto& topic : topics) {
if (converter_factory_->load_deserializer(topic.topic_metadata.serialization_format) != nullptr) {
// Clarify that we haven't found any serialization format to convert from.
storage_serialization_format = "";
for (const auto & topic : topics) {
if (
converter_factory_->load_deserializer(topic.topic_metadata.serialization_format) !=
nullptr)
{
storage_serialization_format = topic.topic_metadata.serialization_format;
found_topic_to_convert = true;
break;
Expand All @@ -323,34 +318,36 @@ void SequentialReader::check_converter_serialization_format(
const std::string & converter_serialization_format,
const std::string & storage_serialization_format)
{
if (converter_serialization_format.empty()) {return;}
if (converter_serialization_format.empty()) {
return;
}

if (converter_serialization_format != storage_serialization_format) {
converter_ = std::make_unique<Converter>(
storage_serialization_format,
converter_serialization_format,
converter_factory_);
storage_serialization_format, converter_serialization_format, converter_factory_);
auto topics = storage_->get_all_topics_and_types();
for (const auto & topic_with_type : topics) {
converter_->add_topic(topic_with_type.name, topic_with_type.type);
}
}
}

void SequentialReader::fill_topics_metadata() {
void SequentialReader::fill_topics_metadata()
{
rcpputils::check_true(storage_ != nullptr, "Bag is not open. Call open() before reading.");

// Add only topics with the same serialization format as the storage serialization format
topics_metadata_.clear();
topics_metadata_.reserve(metadata_.topics_with_message_count.size());
for (const auto& topic_information : metadata_.topics_with_message_count) {
for (const auto & topic_information : metadata_.topics_with_message_count) {
if (topic_information.topic_metadata.serialization_format == storage_serialization_format) {
topics_metadata_.push_back(topic_information.topic_metadata);
} else {
ROSBAG2_CPP_LOG_WARN("Topic %s with serialization format %s doesn't match the storage format %s.",
topic_information.topic_metadata.name.c_str(),
topic_information.topic_metadata.serialization_format.c_str(),
storage_serialization_format.c_str());
ROSBAG2_CPP_LOG_WARN(
"Topic %s with serialization format %s doesn't match the storage format %s.",
topic_information.topic_metadata.name.c_str(),
topic_information.topic_metadata.serialization_format.c_str(),
storage_serialization_format.c_str());
}
}
}
Expand All @@ -359,8 +356,7 @@ void SequentialReader::add_event_callbacks(const bag_events::ReaderEventCallback
{
if (callbacks.read_split_callback) {
callback_manager_.add_event_callback(
callbacks.read_split_callback,
bag_events::BagEvent::READ_SPLIT);
callbacks.read_split_callback, bag_events::BagEvent::READ_SPLIT);
}
}

Expand Down
13 changes: 8 additions & 5 deletions rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,14 @@ class SequentialReaderTest : public Test
TEST_F(SequentialReaderTest, read_next_uses_converters_to_convert_serialization_format) {
std::string output_format = "rmw2_format";

auto format1_converter = std::make_unique<StrictMock<MockConverter>>();
auto format2_converter = std::make_unique<StrictMock<MockConverter>>();
EXPECT_CALL(*format1_converter, deserialize(_, _, _)).Times(1);
EXPECT_CALL(*format2_converter, serialize(_, _, _)).Times(1);

EXPECT_CALL(*converter_factory_, load_deserializer(storage_serialization_format_))
.WillOnce(Return(ByMove(std::move(format1_converter))));
.WillRepeatedly(
[](const std::string &) {
return std::make_unique<StrictMock<MockConverter>>();
});
EXPECT_CALL(*converter_factory_, load_serializer(output_format))
.WillOnce(Return(ByMove(std::move(format2_converter))));

Expand All @@ -145,9 +146,11 @@ TEST_F(SequentialReaderTest, read_next_uses_converters_to_convert_serialization_
TEST_F(SequentialReaderTest, open_throws_error_if_converter_plugin_does_not_exist) {
std::string output_format = "rmw2_format";

auto format1_converter = std::make_unique<StrictMock<MockConverter>>();
EXPECT_CALL(*converter_factory_, load_deserializer(storage_serialization_format_))
.WillOnce(Return(ByMove(std::move(format1_converter))));
.WillRepeatedly(
[](const std::string &) {
return std::make_unique<StrictMock<MockConverter>>();
});
EXPECT_CALL(*converter_factory_, load_serializer(output_format))
.WillOnce(Return(ByMove(nullptr)));

Expand Down

0 comments on commit f453fdf

Please sign in to comment.