diff --git a/src/cpp/fastdds/publisher/DataWriterHistory.cpp b/src/cpp/fastdds/publisher/DataWriterHistory.cpp index 963ae6c3c39..844f63c8cfb 100644 --- a/src/cpp/fastdds/publisher/DataWriterHistory.cpp +++ b/src/cpp/fastdds/publisher/DataWriterHistory.cpp @@ -160,6 +160,8 @@ bool DataWriterHistory::prepare_change( if (history_qos_.kind == KEEP_ALL_HISTORY_QOS) { ret = this->mp_writer->try_remove_change(max_blocking_time, lock); + // If change was removed (ret == 1) in KeepAllHistory, it must have been acked + is_acked = ret; } else if (history_qos_.kind == KEEP_LAST_HISTORY_QOS) { diff --git a/test/blackbox/common/DDSBlackboxTestsListeners.cpp b/test/blackbox/common/DDSBlackboxTestsListeners.cpp index 4c1ae522a6e..178d9f4473c 100644 --- a/test/blackbox/common/DDSBlackboxTestsListeners.cpp +++ b/test/blackbox/common/DDSBlackboxTestsListeners.cpp @@ -3476,6 +3476,96 @@ TEST(DDSStatus, entire_history_acked_volatile_unknown_pointer) } } +/*ยก + * Regression Test for 22648: on_unacknowledged_sample_removed callback is called when writer with keep all + * history is used, when the history was full but before max_blocking_time a sample was acknowledged, as is_acked was + * checked before the waiting time, and is not re-checked. This should not happen. + */ +TEST(DDSStatus, reliable_keep_all_unack_sample_removed_call) +{ + auto test_transport = std::make_shared(); + test_transport->drop_data_messages_filter_ = [](eprosima::fastdds::rtps::CDRMessage_t& msg) -> bool + { + static std::vector> delayed_messages; + + uint32_t old_pos = msg.pos; + + // Parse writer ID and sequence number + msg.pos += 2; // flags + msg.pos += 2; // inline QoS + msg.pos += 4; // reader ID + auto writerID = eprosima::fastdds::helpers::cdr_parse_entity_id((char*)&msg.buffer[msg.pos]); + msg.pos += 4; + eprosima::fastdds::rtps::SequenceNumber_t sn; + sn.high = (int32_t)eprosima::fastdds::helpers::cdr_parse_u32((char*)&msg.buffer[msg.pos]); + msg.pos += 4; + sn.low = eprosima::fastdds::helpers::cdr_parse_u32((char*)&msg.buffer[msg.pos]); + + // Restore buffer position + msg.pos = old_pos; + + // Delay logic for user endpoints only + if ((writerID.value[3] & 0xC0) == 0) // only user endpoints + { + auto now = std::chrono::steady_clock::now(); + auto it = std::find_if(delayed_messages.begin(), delayed_messages.end(), + [&sn](const auto& pair) + { + return pair.first == sn; + }); + + if (it == delayed_messages.end()) + { + // If the sequence number is encountered for the first time, start the delay + delayed_messages.emplace_back(sn, now + std::chrono::milliseconds(750)); // Add delay + return true; // Start dropping this message + } + else if (now < it->second) + { + // If the delay period has not elapsed, keep dropping the message + return true; + } + else + { + // Once the delay has elapsed, allow the message to proceed + delayed_messages.erase(it); + } + } + return false; // Allow message to proceed + }; + + PubSubWriter writer(TEST_TOPIC_NAME); + PubSubReader reader(TEST_TOPIC_NAME); + + writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS, eprosima::fastdds::dds::Duration_t (200, 0)) + .history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS) + .resource_limits_max_instances(1) + .resource_limits_max_samples(1) + .resource_limits_max_samples_per_instance(1) + .disable_builtin_transport() + .add_user_transport_to_pparams(test_transport) + .init(); + ASSERT_TRUE(writer.isInitialized()); + + reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) + .init(); + ASSERT_TRUE(reader.isInitialized()); + + // Wait for discovery + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_helloworld_data_generator(2); + + for (auto sample : data) + { + writer.send_sample(sample); + } + + EXPECT_EQ(writer.times_unack_sample_removed(), 0u); +} + /*! * Test that checks with a writer of each type that having the same listener attached, the notified writer in the * callback is the corresponding writer that has removed a sample unacknowledged.