Skip to content

Commit

Permalink
[core] ported CMsgPublisher/Subscriber to v6 (#1944)
Browse files Browse the repository at this point in the history
  • Loading branch information
rex-schilasky authored Jan 27, 2025
1 parent f36ac33 commit fc0b30f
Show file tree
Hide file tree
Showing 53 changed files with 258 additions and 905 deletions.
4 changes: 2 additions & 2 deletions app/mon/mon_cli/src/ecal_mon_cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ void ProcEcho(const std::string& topic_name, int msg_count)
eCAL::string::CSubscriber<std::string> sub(topic_name);
std::atomic<int> cnt(msg_count);
auto msg_cb = [&cnt](const std::string& msg_) { if (cnt != 0) { std::cout << msg_ << std::endl; if (cnt > 0) cnt--; } };
sub.AddReceiveCallback(std::bind(msg_cb, std::placeholders::_2));
sub.SetReceiveCallback(std::bind(msg_cb, std::placeholders::_2));

while(eCAL::Ok() && (cnt != 0))
{
Expand All @@ -357,7 +357,7 @@ void ProcProto(const std::string& topic_name, int msg_count)
eCAL::protobuf::CDynamicSubscriber sub(topic_name);
std::atomic<int> cnt(msg_count);
auto msg_cb = [&cnt](const std::shared_ptr<google::protobuf::Message>& msg_) { if (cnt != 0) { std::cout << msg_->DebugString() << std::endl; if (cnt > 0) cnt--; } };
sub.AddReceiveCallback(std::bind(msg_cb, std::placeholders::_2));
sub.SetReceiveCallback(std::bind(msg_cb, std::placeholders::_2));

// enter main loop
while(eCAL::Ok() && (cnt != 0))
Expand Down
16 changes: 8 additions & 8 deletions app/mon/mon_plugins/capnproto_reflection/src/plugin_widget.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ PluginWidget::PluginWidget(const QString& topic_name, const QString& topic_type,
ui_.publish_timestamp_warning_label->setVisible(false);

// Add eCAL Callbacks
subscriber_.AddReceiveCallback(std::bind(&PluginWidget::onProtoMessageCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.AddErrorCallback(std::bind(&PluginWidget::onProtoErrorCallback, this, std::placeholders::_1));
subscriber_.SetReceiveCallback(std::bind(&PluginWidget::onProtoMessageCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.SetErrorCallback(std::bind(&PluginWidget::onProtoErrorCallback, this, std::placeholders::_1));

// Button connections
connect(ui_.expand_button, &QPushButton::clicked, [this]() { tree_view_->expandAll(); });
Expand Down Expand Up @@ -102,8 +102,8 @@ PluginWidget::~PluginWidget() noexcept
qDebug().nospace() << "[" << metaObject()->className() << "]: Deleting Widget for topic " << topic_name_;
#endif // NDEBUG

subscriber_.RemReceiveCallback();
//subscriber_.RemErrorCallback();
subscriber_.RemoveReceiveCallback();
//subscriber_.RemoveErrorCallback();

{
std::lock_guard<std::mutex> lock(capnproto_message_mutex_);
Expand Down Expand Up @@ -322,14 +322,14 @@ void PluginWidget::onUpdate()
void PluginWidget::onResume()
{
// Add eCAL Callbacks
subscriber_.AddReceiveCallback(std::bind(&PluginWidget::onProtoMessageCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.AddErrorCallback(std::bind(&PluginWidget::onProtoErrorCallback, this, std::placeholders::_1));
subscriber_.SetReceiveCallback(std::bind(&PluginWidget::onProtoMessageCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.SetErrorCallback(std::bind(&PluginWidget::onProtoErrorCallback, this, std::placeholders::_1));
}

void PluginWidget::onPause()
{
subscriber_.RemReceiveCallback();
subscriber_.RemErrorCallback();
subscriber_.RemoveReceiveCallback();
subscriber_.RemoveErrorCallback();
}

QWidget* PluginWidget::getWidget()
Expand Down
16 changes: 8 additions & 8 deletions app/mon/mon_plugins/protobuf_reflection/src/plugin_widget.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ PluginWidget::PluginWidget(const QString& topic_name, const QString& topic_type,
ui_.publish_timestamp_warning_label->setVisible(false);

// Add eCAL Callbacks
subscriber_.AddReceiveCallback(std::bind(&PluginWidget::onProtoMessageCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.AddErrorCallback(std::bind(&PluginWidget::onProtoErrorCallback, this, std::placeholders::_1));
subscriber_.SetReceiveCallback(std::bind(&PluginWidget::onProtoMessageCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.SetErrorCallback(std::bind(&PluginWidget::onProtoErrorCallback, this, std::placeholders::_1));

// Button connections
connect(ui_.expand_button, &QPushButton::clicked, [this]() { tree_view_->expandAll(); });
Expand Down Expand Up @@ -105,8 +105,8 @@ PluginWidget::~PluginWidget()
qDebug().nospace() << "[" << PluginWidget::metaObject()->className() << "]: Deleting Widget for topic " << topic_name_;
#endif // NDEBUG

subscriber_.RemReceiveCallback();
subscriber_.RemErrorCallback();
subscriber_.RemoveReceiveCallback();
subscriber_.RemoveErrorCallback();

{
std::lock_guard<std::mutex> lock(proto_message_mutex_);
Expand Down Expand Up @@ -319,14 +319,14 @@ void PluginWidget::onUpdate()
void PluginWidget::onResume()
{
// Add eCAL Callbacks
subscriber_.AddReceiveCallback(std::bind(&PluginWidget::onProtoMessageCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.AddErrorCallback(std::bind(&PluginWidget::onProtoErrorCallback, this, std::placeholders::_1));
subscriber_.SetReceiveCallback(std::bind(&PluginWidget::onProtoMessageCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.SetErrorCallback(std::bind(&PluginWidget::onProtoErrorCallback, this, std::placeholders::_1));
}

void PluginWidget::onPause()
{
subscriber_.RemReceiveCallback();
subscriber_.RemErrorCallback();
subscriber_.RemoveReceiveCallback();
subscriber_.RemoveErrorCallback();
}

QWidget* PluginWidget::getWidget()
Expand Down
8 changes: 4 additions & 4 deletions app/mon/mon_plugins/signals_plotting/src/plugin_widget.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ PluginWidget::PluginWidget(const QString& topic_name, const QString& topic_type,
ui_.publish_timestamp_warning_label->setVisible(false);

// Add eCAL Callbacks
subscriber_.AddReceiveCallback(std::bind(&PluginWidget::onProtoMessageCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.AddErrorCallback(std::bind(&PluginWidget::onProtoErrorCallback, this, std::placeholders::_1));
subscriber_.SetReceiveCallback(std::bind(&PluginWidget::onProtoMessageCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.SetErrorCallback(std::bind(&PluginWidget::onProtoErrorCallback, this, std::placeholders::_1));

// Button connections
connect(ui_.expand_button, &QPushButton::clicked, [this]() { tree_view_->expandAll(); });
Expand Down Expand Up @@ -127,8 +127,8 @@ PluginWidget::~PluginWidget()
qDebug().nospace() << "[" << PluginWidget::metaObject()->className() << "]: Deleting Widget for topic " << topic_name_;
#endif // NDEBUG

subscriber_.RemReceiveCallback();
subscriber_.RemErrorCallback();
subscriber_.RemoveReceiveCallback();
subscriber_.RemoveErrorCallback();

{
std::lock_guard<std::mutex> lock(proto_message_mutex_);
Expand Down
8 changes: 4 additions & 4 deletions app/mon/mon_plugins/string_reflection/src/plugin_widget.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ PluginWidget::PluginWidget(const QString& topic_name, const QString&, QWidget* p
ui_.content_layout->addWidget(text_edit_);

// Connect the eCAL Subscriber
subscriber_.AddReceiveCallback(std::bind(&PluginWidget::ecalMessageReceivedCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.SetReceiveCallback(std::bind(&PluginWidget::ecalMessageReceivedCallback, this, std::placeholders::_2, std::placeholders::_3));
}

PluginWidget::~PluginWidget()
{
subscriber_.RemReceiveCallback();
subscriber_.RemoveReceiveCallback();
}

void PluginWidget::ecalMessageReceivedCallback(const std::string& message, long long publish_timestamp_usecs)
Expand Down Expand Up @@ -115,12 +115,12 @@ void PluginWidget::onUpdate()

void PluginWidget::onResume()
{
subscriber_.AddReceiveCallback(std::bind(&PluginWidget::ecalMessageReceivedCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.SetReceiveCallback(std::bind(&PluginWidget::ecalMessageReceivedCallback, this, std::placeholders::_2, std::placeholders::_3));
}

void PluginWidget::onPause()
{
subscriber_.RemReceiveCallback();
subscriber_.RemoveReceiveCallback();
}

void PluginWidget::updateStringMessageView()
Expand Down
2 changes: 1 addition & 1 deletion app/mon/mon_tui/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ int main(int argc, char** argv)
config.logging.receiver.enable = true;

auto status = eCAL::Initialize(config, "eCALMon TUI", eCAL::Init::Default | eCAL::Init::Monitoring);
if (status == -1) std::cerr << "Failed to init" << std::endl;
if (status == false) std::cerr << "Failed to init" << std::endl;
eCAL::Process::SetState(eCAL::Process::eSeverity::healthy, eCAL::Process::eSeverityLevel::level1, "Running");

TUI::Start(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class ProtoMessageVisualizationViewModel : public MessageVisualizationViewModel
: subscriber{topic}
{
using namespace std::placeholders;
subscriber.AddReceiveCallback(std::bind(&ProtoMessageVisualizationViewModel::OnMessage, this, _2, _3));
subscriber.SetReceiveCallback(std::bind(&ProtoMessageVisualizationViewModel::OnMessage, this, _2, _3));
}

ProtectedMessage message() const
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class StringMessageVisualizationViewModel : public MessageVisualizationViewModel
: subscriber{topic}
{
using namespace std::placeholders;
subscriber.AddReceiveCallback(std::bind(&StringMessageVisualizationViewModel::OnMessage, this, _2, _3));
subscriber.SetReceiveCallback(std::bind(&StringMessageVisualizationViewModel::OnMessage, this, _2, _3));
}

std::string message() const
Expand Down
7 changes: 2 additions & 5 deletions app/play/play_core/src/measurement_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,6 @@ void MeasurementContainer::CreatePublishers(const std::map<std::string, std::str
void MeasurementContainer::DeInitializePublishers()
{
// Clear the publisher map
for (auto& publisher_info : publisher_map_)
{
publisher_info.second.publisher_.Destroy();
}
publisher_map_.clear();

// Remove pointers to publishers from all frames
Expand Down Expand Up @@ -219,7 +215,8 @@ bool MeasurementContainer::PublishFrame(long long index)
{
timestamp_usecs = std::chrono::duration_cast<std::chrono::microseconds>(frame_table_[index].send_timestamp_.time_since_epoch()).count();
}
frame_table_[index].publisher_info_->publisher_.SetID(frame_table_[index].send_id_);
// this is not supported by the eCAL v6 API
//frame_table_[index].publisher_info_->publisher_.SetID(frame_table_[index].send_id_);
frame_table_[index].publisher_info_->publisher_.Send(send_buffer_, data_size, timestamp_usecs);
frame_table_[index].publisher_info_->message_counter_++;
return true;
Expand Down
4 changes: 2 additions & 2 deletions app/play/play_core/src/measurement_container.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include <memory>

#include <ecal/ecal.h>
#include <ecal/v5/ecal_publisher.h>
#include <ecal/pubsub/publisher.h>
#include <ecalhdf5/eh5_meas.h>

#include "continuity_report.h"
Expand Down Expand Up @@ -91,7 +91,7 @@ class MeasurementContainer
private:
struct PublisherInfo
{
eCAL::v5::CPublisher publisher_;
eCAL::CPublisher publisher_;
long long message_counter_;

PublisherInfo(const std::string& topic_name, const eCAL::SDataTypeInformation& info_)
Expand Down
3 changes: 3 additions & 0 deletions app/rec/rec_server_cli/src/ecal_rec_server_cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ std::unique_ptr<eCAL::rec_cli::command::Record> record_command;
int main(int argc, char** argv)
{
#ifdef WIN32
(void)argc; // suppress unused warning
(void)argv; // suppress unused warning

EcalUtils::WinCpChanger win_cp_changer(CP_UTF8); // The WinCpChanger will set the Codepage back to the original, once destroyed
#endif // WIN32

Expand Down
6 changes: 3 additions & 3 deletions app/sys/sys_gui/src/widgets/mmawidget/mma_host_item.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ MmaHostItem::MmaHostItem(QTreeWidget* tree_widget, const QString& hostname)
// Create eCAL Subscriber
mma_subscriber = std::unique_ptr<eCAL::protobuf::CSubscriber<eCAL::pb::mma::State>>
(new eCAL::protobuf::CSubscriber<eCAL::pb::mma::State>("machine_state_" + hostname_.toStdString()));
mma_subscriber->AddReceiveCallback(std::bind(&MmaHostItem::mmaReceivedCallback, this, std::placeholders::_1, std::placeholders::_2));
mma_subscriber->SetReceiveCallback(std::bind(&MmaHostItem::mmaReceivedCallback, this, std::placeholders::_1, std::placeholders::_2));

// Register custom Type in order to directly pass the monitoring state
qRegisterMetaType<eCAL::pb::mma::State>("eCAL::pb::mma::State");
Expand Down Expand Up @@ -227,10 +227,10 @@ void MmaHostItem::disable()
setEnabled(false);
}

void MmaHostItem::mmaReceivedCallback(const char* /*topic_name*/, const eCAL::pb::mma::State& state)
void MmaHostItem::mmaReceivedCallback(const eCAL::STopicId& /*topic_id_*/, const eCAL::pb::mma::State& state_)
{
// only emit the signal in order to make use of the Qt event loop
emit mmaReceivedSignal(state);
emit mmaReceivedSignal(state_);
}

void MmaHostItem::machineStateChanged(eCAL::pb::mma::State state)
Expand Down
2 changes: 1 addition & 1 deletion app/sys/sys_gui/src/widgets/mmawidget/mma_host_item.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public slots:

QTimer* deactivation_timer;

void mmaReceivedCallback(const char* topic_name, const eCAL::pb::mma::State& state);
void mmaReceivedCallback(const eCAL::STopicId& topic_id_, const eCAL::pb::mma::State& state_);

QString normalizedDataAsString(unsigned long long bytes);

Expand Down
18 changes: 8 additions & 10 deletions contrib/ecaltime/simtime/src/ecal_time_simtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,20 @@ eCAL::CSimTime::CSimTime() :
bool eCAL::CSimTime::initialize()
{
std::unique_lock<std::mutex> lk(initialize_mutex);
if (!is_initialized) {
if (!is_initialized)
{
//eCAL::Initialize("ecal_sim_time_listener", eCAL::Init::Subscriber);
// this has to be done by the parent process
// needs to be fixed with an improved reference counting
// in eCAL::Initialize ..

if (sim_time_subscriber.Create("__sim_time__")) {
sim_time_subscriber.AddReceiveCallback(std::bind(&eCAL::CSimTime::onSimTimeMessage, this, std::placeholders::_2));
is_initialized = true;
}
else {
is_initialized = false;
}
return is_initialized;
sim_time_subscriber = std::make_unique<eCAL::protobuf::CSubscriber<eCAL::pb::SimTime>>("__sim_time__");
sim_time_subscriber->SetReceiveCallback(std::bind(&eCAL::CSimTime::onSimTimeMessage, this, std::placeholders::_2));
is_initialized = true;
return true;
}
else {
else
{
return false;
}
}
Expand Down
2 changes: 1 addition & 1 deletion contrib/ecaltime/simtime/src/ecal_time_simtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ namespace eCAL
bool first_message_received; /**< Whether we received at least one Message (used for the status message)*/
eCAL::pb::SimTime::eState play_state; /**< Current state (used for the status message)*/

eCAL::protobuf::CSubscriber<eCAL::pb::SimTime> sim_time_subscriber; /**< Subscriber for getting simulation timestamps */
std::unique_ptr<eCAL::protobuf::CSubscriber<eCAL::pb::SimTime>> sim_time_subscriber; /**< Subscriber for getting simulation timestamps */

std::mutex time_mutex; /**< Mutex for computing the current simulation time */
long long last_measurement_time; /**< Last received simulation time */
Expand Down
2 changes: 1 addition & 1 deletion contrib/mma/src/mma_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ int main(int argc, char** argv)
std::cout << app_version_header << std::endl << ecal_version_header << std::endl << std::endl;

// initialize eCAL API
if (eCAL::Initialize(MMA_APPLICATION_NAME, eCAL::Init::Publisher) < 0)
if (eCAL::Initialize(MMA_APPLICATION_NAME, eCAL::Init::Publisher))
{
std::cout << "eCAL initialization failed !";
return 1;
Expand Down
Loading

0 comments on commit fc0b30f

Please sign in to comment.