Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ TEST_F(Example_1_Presentation_0_HelloRawPubSub_Udp, main)
const TimePoint msg_deadline = arg.approx_now + 1s;
constexpr cetl::span<const char> message{"Hello, World!"};
//
const std::array<const cetl::span<const cetl::byte>, 1> payload_fragments{
cetl::span<const cetl::byte>{reinterpret_cast<const cetl::byte*>(message.data()), // NOLINT
const std::array<const PayloadFragment, 1> payload_fragments{
PayloadFragment{reinterpret_cast<const cetl::byte*>(message.data()), // NOLINT
message.size()}};
EXPECT_THAT(raw_publisher.publish(msg_deadline, payload_fragments), testing::Eq(cetl::nullopt));
});
Expand Down
8 changes: 5 additions & 3 deletions docs/examples/platform/node_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,16 @@ struct NodeHelpers
TxSession& tx_session,
const TxMetadata& metadata)
{
using traits = typename T::_traits_;
using traits = typename T::_traits_;
using PayloadFragment = libcyphal::transport::PayloadFragment;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

┬─┬ノ(º_ºノ)

Suggested change
using PayloadFragment = libcyphal::transport::PayloadFragment;
using libcyphal::transport::PayloadFragment;


std::array<std::uint8_t, traits::SerializationBufferSizeBytes> buffer{};

const auto data_size = serialize(value, buffer).value();

// NOLINTNEXTLINE
const cetl::span<const cetl::byte> fragment{reinterpret_cast<cetl::byte*>(buffer.data()), data_size};
const std::array<const cetl::span<const cetl::byte>, 1> payload{fragment};
const PayloadFragment fragment{reinterpret_cast<cetl::byte*>(buffer.data()), data_size};
const std::array<const PayloadFragment, 1> payload{fragment};

return tx_session.send(metadata, payload);
}
Expand Down
8 changes: 4 additions & 4 deletions include/libcyphal/presentation/common_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ static auto tryPerformOnSerialized(const Message& message,
return result_size.error();
}

const cetl::span<const cetl::byte> data_span{buffer.data(), result_size.value()};
const std::array<const cetl::span<const cetl::byte>, 1> fragments{data_span};
const transport::PayloadFragment data_span{buffer.data(), result_size.value()};
const std::array<const transport::PayloadFragment, 1> fragments{data_span};

return std::forward<Action>(action)(fragments);
}
Expand Down Expand Up @@ -135,8 +135,8 @@ static auto tryPerformOnSerialized(const Message& message,
return result_size.error();
}

const cetl::span<const cetl::byte> data_span{buffer.get(), result_size.value()};
const std::array<const cetl::span<const cetl::byte>, 1> fragments{data_span};
const transport::PayloadFragment data_span{buffer.get(), result_size.value()};
const std::array<const transport::PayloadFragment, 1> fragments{data_span};

return std::forward<Action>(action)(fragments);
}
Expand Down
40 changes: 24 additions & 16 deletions include/libcyphal/transport/can/can_transport_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,8 @@ class TransportImpl final : private TransportDelegate, public ICanTransport
return ArgumentError{};
}

// False positive of clang-tidy - we move `media_array` to the `transport` instance, so can't make it const.
// NOLINTNEXTLINE(misc-const-correctness)
MediaArray media_array = makeMediaArray(memory, media_count, media, tx_capacity);
if (media_array.size() != media_count)
MediaArray media_array{media_count, &memory};
if (!makeMediaArray(media_array, media_count, media, tx_capacity))
{
return MemoryError{};
}
Expand Down Expand Up @@ -514,18 +512,23 @@ class TransportImpl final : private TransportDelegate, public ICanTransport
return tryHandleTransientFailure<Report>(std::move(*failure), media.index(), canardInstance());
}

CETL_NODISCARD static MediaArray makeMediaArray(cetl::pmr::memory_resource& memory,
const std::size_t media_count,
const cetl::span<IMedia*> media_interfaces,
const std::size_t tx_capacity)
CETL_NODISCARD static bool makeMediaArray(MediaArray& media_array,
const std::size_t media_count,
const cetl::span<IMedia*> media_interfaces,
const std::size_t tx_capacity)
{
MediaArray media_array{media_count, &memory};

// Reserve the space for the whole array (to avoid reallocations).
// Capacity will be less than requested in case of out of memory.
media_array.reserve(media_count);
if (media_array.capacity() >= media_count)
#if defined(__cpp_exceptions)
try
{
#endif
// Reserve the space for the whole array (to avoid reallocations).
// Capacity will be less than requested in case of out of memory.
media_array.reserve(media_count);
if (media_array.capacity() < media_count)
{
return false;
}

std::size_t index = 0;
for (IMedia* const media_interface : media_interfaces)
{
Expand All @@ -538,9 +541,14 @@ class TransportImpl final : private TransportDelegate, public ICanTransport
}
CETL_DEBUG_ASSERT(index == media_count, "");
CETL_DEBUG_ASSERT(media_array.size() == media_count, "");
}
return true;

return media_array;
#if defined(__cpp_exceptions)
} catch (const std::bad_alloc&)
{
return false;
}
#endif
}

static void flushCanardTxQueue(CanardTxQueue& canard_tx_queue, const CanardInstance& canard_instance)
Expand Down
26 changes: 22 additions & 4 deletions include/libcyphal/transport/can/delegate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ class CanardMemory final : public ScatteredBuffer::IStorage
return bytes_to_copy;
}

void observeFragments(ScatteredBuffer::IFragmentsObserver& observer) const override
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be cool if we had something like IInvokable<...> defined in CETL and implemented by cetl::function, to enhance composability. The user could then either use a custom implementation for this interface, or use a cetl function.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internally at CETL we already have something similar, see

namespace detail
{
template <typename Result, typename... Args>
class function_handler : public rtti_helper<function_handler_typeid_t>
{
public:
    virtual Result operator()(Args...) = 0;

};  // function_handler
...

but it has its issues which sooner or later we might need try to resolve. I'm talking about Args... args arguments which currently could be only copied but not moved (but I can't make it forward-able Args&&... args) - not sure yet how to address it, so it's a limitation currently).

{
if ((buffer_ != nullptr) && (payload_size_ > 0))
{
observer.onNext({buffer_, payload_size_});
}
}

private:
// MARK: Data members:

Expand Down Expand Up @@ -471,13 +479,23 @@ class TransportDelegate
// Now we know that we have at least one active port,
// so we need preallocate temp memory for the total number of active ports.
//
filters.reserve(total_active_ports);
if (filters.capacity() < total_active_ports)
#if defined(__cpp_exceptions)
try
{
#endif
filters.reserve(total_active_ports);
if (filters.capacity() < total_active_ports)
{
// This is out of memory situation.
return false;
}

#if defined(__cpp_exceptions)
} catch (const std::bad_alloc&)
{
// This is out of memory situation.
return false;
}

#endif
// `ports_count` counting is just for the sake of debug verification.
std::size_t ports_count = 0;

Expand Down
8 changes: 3 additions & 5 deletions include/libcyphal/transport/contiguous_payload.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,11 @@ class ContiguousPayload final
, payload_size_{0}
, allocated_buffer_{nullptr}
{
using Fragment = cetl::span<const cetl::byte>;

// Count fragments skipping empty ones. Also keep tracking of the total payload size
// and pointer to the last non-empty fragment (which will be in use for the optimization).
//
const auto total_non_empty_fragments =
std::count_if(payload_fragments.begin(), payload_fragments.end(), [this](const Fragment frag) {
std::count_if(payload_fragments.begin(), payload_fragments.end(), [this](const PayloadFragment frag) {
if (frag.empty())
{
return false;
Expand All @@ -68,9 +66,9 @@ class ContiguousPayload final
if (cetl::byte* const buffer = allocated_buffer_)
{
std::size_t offset = 0;
for (const Fragment frag : payload_fragments)
for (const PayloadFragment frag : payload_fragments)
{
// Next nolint is unavoidable: we need offset from the beginning of the buffer.
// Next nolint is unavoidable: we need to offset from the beginning of the buffer.
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
(void) std::memmove(&buffer[offset], frag.data(), frag.size());
offset += frag.size();
Expand Down
4 changes: 3 additions & 1 deletion include/libcyphal/transport/media_payload.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#ifndef LIBCYPHAL_TRANSPORT_MEDIA_PAYLOAD_HPP_INCLUDED
#define LIBCYPHAL_TRANSPORT_MEDIA_PAYLOAD_HPP_INCLUDED

#include "types.hpp"

#include <cetl/pf17/cetlpf.hpp>
#include <cetl/pf20/cetlpf.hpp>

Expand Down Expand Up @@ -125,7 +127,7 @@ class MediaPayload final
///
/// Returns an empty (`{nullptr, 0}`) span if the payload is moved, released or reset.
///
cetl::span<const cetl::byte> getSpan() const noexcept
PayloadFragment getSpan() const noexcept
{
return {ownership_.data, ownership_.size};
}
Expand Down
13 changes: 13 additions & 0 deletions include/libcyphal/transport/msg_sessions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#define LIBCYPHAL_TRANSPORT_MSG_SESSIONS_HPP_INCLUDED

#include "errors.hpp"
#include "scattered_buffer.hpp"
#include "session.hpp"
#include "types.hpp"

Expand Down Expand Up @@ -34,6 +35,18 @@ struct MessageTxParams final
PortId subject_id{};
};

struct MessageRxMetadata final
{
TransferRxMetadata rx_meta{};
cetl::optional<NodeId> publisher_node_id;
};

struct MessageRxTransfer final
{
MessageRxMetadata metadata{};
ScatteredBuffer payload;
};

/// @brief Defines an abstract interface of a transport layer receive session for message subscription.
///
/// Use transport's `makeMessageRxSession` factory function to create an instance of this interface.
Expand Down
39 changes: 39 additions & 0 deletions include/libcyphal/transport/scattered_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#define LIBCYPHAL_TRANSPORT_SCATTERED_BUFFER_HPP_INCLUDED

#include "libcyphal/config.hpp"
#include "types.hpp"

#include <cetl/pf17/cetlpf.hpp>
#include <cetl/rtti.hpp>
Expand All @@ -33,6 +34,26 @@ class ScatteredBuffer final
///
static constexpr std::size_t StorageVariantFootprint = config::Transport::ScatteredBuffer_StorageVariantFootprint();

/// @brief Defines interface for observing internal fragments of the scattered buffer.
///
class IFragmentsObserver
{
public:
IFragmentsObserver(const IFragmentsObserver&) = delete;
IFragmentsObserver& operator=(const IFragmentsObserver&) = delete;
IFragmentsObserver& operator=(IFragmentsObserver&&) noexcept = delete;
IFragmentsObserver(IFragmentsObserver&&) noexcept = delete;

/// @brief Notifies the observer about the next fragment of the scattered buffer.
///
virtual void onNext(const PayloadFragment fragment) = 0;

protected:
IFragmentsObserver() = default;
~IFragmentsObserver() = default;

}; // IFragmentsObserver

/// @brief Defines storage interface for the scattered buffer.
///
/// @see ScatteredBuffer::ScatteredBuffer(AnyStorage&& any_storage)
Expand Down Expand Up @@ -72,6 +93,12 @@ class ScatteredBuffer final
cetl::byte* const destination,
const std::size_t length_bytes) const = 0;

/// @brief Reports the internal fragments of the storage to the specified observer.
///
/// @param observer The observer will be called (by `onNext` method) for each fragment of the storage.
///
virtual void observeFragments(IFragmentsObserver& observer) const = 0;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't actually an "observer" is it? Typically I reserve that term for things that are called back asynchronously and over time. This is more of a visitor pattern. It's basically foreach_fragment from what I can tell. If so; if this is similar to foreach, we should just use an std::function<void(PayloadFragment)> argument to make this consistent and easier to use.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surely you mean cetl::function?

Copy link
Copy Markdown
Contributor Author

@serges147 serges147 Mar 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thirtytwobits Scott, I'll rename it to forEachFragment, but about making it cetl::function I have concerns. Surely, it was my initial thought to make it as cetl::function, but then I had second thoughts:

  • cetl::function is not very cheap. my performance measurements showed that it's relatively cheap to call it, but creation is not so:
    • unbounded_variant behind the scene needs to be created (with several type-erasing "handler" function pointers being initialized)
    • cetl::rtti again behind the scene will be used to obtain internal callable interface
  • normally, ScatteredBuffer will hold just couple of fragments; if it would be a lot of fragments then mentioned above creation overhead probably won't be noticeable, but in case of just a few fragments (1 for CAN, and N = data_size / UDP_MTU which is also like 1 or 2, depending on data_size of cause) it feels like a waste
  • having IFragmentsObserver (or if you would like IFragmentVisitor) makes cost is just one virtual call to already existing visitor object.

What also concerns me in general is that in a lot of places of libcyphal we go extra mile for trying to achieve "zero-copy" of transferred data, or limit dynamic allocation, BUT at the same time we do a lot of copying and moving of various implicit unbounded_variants here and there, and IMO just move pressure from heap to stack. I know, heap fragmentation and etc., but still food for thoughts in my opinion.

So, what do you think about still having the interface after all?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. let's so some renaming and you can keep the interface.
As for zero-copy in libcyphal; the use case we care most about is the ability to keep data in specific memory rather than requiring it to be moved into general program ram for libcyphal to use it. For example, when receiving frames that are hardware-managed buffers we want to keep the memory in these buffers until/unless the program specifically uses it to avoid the cost of moving for messages that are discarded (this allows low-cost, application-level message rate decimation). Another example is keeping all message data in a section of memory with a different cache policy than the program RAM. etc. As the project evolves we'll probably see snooping optimizations enabled which allows applications to filter messages based on an outer-data type's values without requiring full deserialization. All of this requires an architecture that ensures message memory is treated differently then general program RAM, heap, stack, or otherwise.


// MARK: RTTI

static constexpr cetl::type_id _get_type_id_() noexcept
Expand Down Expand Up @@ -201,6 +228,18 @@ class ScatteredBuffer final
return storage_->copy(offset_bytes, static_cast<cetl::byte*>(destination), length_bytes);
}

/// @brief Reports the internal fragments of the buffer to the specified observer.
///
/// @param observer The observer will be called (by `onNext` method) for each fragment of the buffer.
///
void observeFragments(IFragmentsObserver& observer) const
{
if (const auto* const storage = storage_)
{
storage->observeFragments(observer);
}
}

private:
cetl::unbounded_variant<StorageVariantFootprint, false, true> storage_variant_;
const IStorage* storage_;
Expand Down
19 changes: 19 additions & 0 deletions include/libcyphal/transport/svc_sessions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#define LIBCYPHAL_TRANSPORT_SVC_SESSION_HPP_INCLUDED

#include "errors.hpp"
#include "scattered_buffer.hpp"
#include "session.hpp"
#include "types.hpp"

Expand Down Expand Up @@ -46,6 +47,24 @@ struct ResponseTxParams final
PortId service_id{};
};

struct ServiceTxMetadata final
{
TransferTxMetadata tx_meta{};
NodeId remote_node_id{};
};

struct ServiceRxMetadata final
{
TransferRxMetadata rx_meta{};
NodeId remote_node_id{};
};

struct ServiceRxTransfer final
{
ServiceRxMetadata metadata{};
ScatteredBuffer payload;
};

/// @brief Defines an abstract interface of a transport layer receive session for service.
///
/// @see IRxSession, ISession
Expand Down
7 changes: 7 additions & 0 deletions include/libcyphal/transport/transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ namespace libcyphal
namespace transport
{

struct ProtocolParams final
{
TransferId transfer_id_modulo{};
std::size_t mtu_bytes{};
NodeId max_nodes{};
};

/// @brief Interface for a transport layer.
///
class ITransport
Expand Down
Loading
Loading