Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[22648] Unacknowledged sample removed in KeepAll mode (backport #5618) #5624

Merged
merged 1 commit into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/cpp/fastdds/publisher/DataWriterHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ bool DataWriterHistory::prepare_change(
if (history_qos_.kind == KEEP_ALL_HISTORY_QOS)
{
ret = this->mp_writer->try_remove_change(max_blocking_time, lock);
// If change was removed (ret == 1) in KeepAllHistory, it must have been acked
is_acked = ret;
}
else if (history_qos_.kind == KEEP_LAST_HISTORY_QOS)
{
Expand Down
90 changes: 90 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsListeners.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3476,6 +3476,96 @@ TEST(DDSStatus, entire_history_acked_volatile_unknown_pointer)
}
}

/*¡
* Regression Test for 22648: on_unacknowledged_sample_removed callback is called when writer with keep all
* history is used, when the history was full but before max_blocking_time a sample was acknowledged, as is_acked was
* checked before the waiting time, and is not re-checked. This should not happen.
*/
TEST(DDSStatus, reliable_keep_all_unack_sample_removed_call)
{
auto test_transport = std::make_shared<test_UDPv4TransportDescriptor>();
test_transport->drop_data_messages_filter_ = [](eprosima::fastdds::rtps::CDRMessage_t& msg) -> bool
{
static std::vector<std::pair<eprosima::fastdds::rtps::SequenceNumber_t,
std::chrono::steady_clock::time_point>> delayed_messages;

uint32_t old_pos = msg.pos;

// Parse writer ID and sequence number
msg.pos += 2; // flags
msg.pos += 2; // inline QoS
msg.pos += 4; // reader ID
auto writerID = eprosima::fastdds::helpers::cdr_parse_entity_id((char*)&msg.buffer[msg.pos]);
msg.pos += 4;
eprosima::fastdds::rtps::SequenceNumber_t sn;
sn.high = (int32_t)eprosima::fastdds::helpers::cdr_parse_u32((char*)&msg.buffer[msg.pos]);
msg.pos += 4;
sn.low = eprosima::fastdds::helpers::cdr_parse_u32((char*)&msg.buffer[msg.pos]);

// Restore buffer position
msg.pos = old_pos;

// Delay logic for user endpoints only
if ((writerID.value[3] & 0xC0) == 0) // only user endpoints
{
auto now = std::chrono::steady_clock::now();
auto it = std::find_if(delayed_messages.begin(), delayed_messages.end(),
[&sn](const auto& pair)
{
return pair.first == sn;
});

if (it == delayed_messages.end())
{
// If the sequence number is encountered for the first time, start the delay
delayed_messages.emplace_back(sn, now + std::chrono::milliseconds(750)); // Add delay
return true; // Start dropping this message
}
else if (now < it->second)
{
// If the delay period has not elapsed, keep dropping the message
return true;
}
else
{
// Once the delay has elapsed, allow the message to proceed
delayed_messages.erase(it);
}
}
return false; // Allow message to proceed
};

PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME);

writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS, eprosima::fastdds::dds::Duration_t (200, 0))
.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS)
.resource_limits_max_instances(1)
.resource_limits_max_samples(1)
.resource_limits_max_samples_per_instance(1)
.disable_builtin_transport()
.add_user_transport_to_pparams(test_transport)
.init();
ASSERT_TRUE(writer.isInitialized());

reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.init();
ASSERT_TRUE(reader.isInitialized());

// Wait for discovery
writer.wait_discovery();
reader.wait_discovery();

auto data = default_helloworld_data_generator(2);

for (auto sample : data)
{
writer.send_sample(sample);
}

EXPECT_EQ(writer.times_unack_sample_removed(), 0u);
}

/*!
* Test that checks with a writer of each type that having the same listener attached, the notified writer in the
* callback is the corresponding writer that has removed a sample unacknowledged.
Expand Down
Loading