Skip to content

Commit

Permalink
Merge pull request #480 from ZeroCM/query-drops-api
Browse files Browse the repository at this point in the history
Added zcm_query_drops() api to get the drop counter if a transport su…
  • Loading branch information
jbendes authored Mar 27, 2024
2 parents 21af32d + e054bcc commit a120ba6
Show file tree
Hide file tree
Showing 24 changed files with 263 additions and 23 deletions.
17 changes: 10 additions & 7 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@ COPY zcm/ ./zcm
ENV PATH ${PATH}:/root/.local/bin:$ZCM_HOME/deps/julia/bin
ENV NVM_DIR /root/.nvm

RUN bash -c 'export JAVA_HOME=$(readlink -f /usr/bin/javac | sed "s:/bin/javac::") && \
[ -s "$NVM_DIR/nvm.sh" ] && \. "$NVM_DIR/nvm.sh" && \
. $ZCM_HOME/deps/cxxtest/.env && \
./waf distclean configure --use-all --use-dev && \
./waf build && \
./waf install && \
./waf build_examples'
RUN <<EOF
#!/bin/bash
export JAVA_HOME=$(readlink -f /usr/bin/javac | sed "s:/bin/javac::")
[ -s "$NVM_DIR/nvm.sh" ] && \. "$NVM_DIR/nvm.sh"
. $ZCM_HOME/deps/cxxtest/.env
./waf distclean configure --use-all --use-dev
./waf build
./waf install
./waf build_examples
EOF

CMD bash -c 'export JAVA_HOME=$(readlink -f /usr/bin/javac | sed "s:/bin/javac::") && \
[ -s "$NVM_DIR/nvm.sh" ] && \. "$NVM_DIR/nvm.sh" && \
Expand Down
134 changes: 134 additions & 0 deletions bench/ipcshm/test_dropping.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#define _GNU_SOURCE
#include <zcm/zcm.h>
#include <zcm/url.h>
#include <zcm/transport_registrar.h>
#include <pthread.h>
#include <sched.h>
#include "util.h"
#include "zcmtypes/blob_t.h"

#define VERBOSE 1

#define URL_IPCSHM "ipcshm://test_direct?mtu=1000&depth=8"

typedef struct state state_t;
struct state
{
bool running;
zcm_trans_t *trans;
};

typedef struct data data_t;
struct data
{
i64 send_time;
u64 seq;
};
static data_t DATA[1];

static void pin_cpu_core(int core_id)
{
int num_cores = sysconf(_SC_NPROCESSORS_ONLN);
if (core_id < 0 || core_id >= num_cores) {
FAIL("Invalid core id: num_cores=%d", num_cores);
}

cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(core_id, &cpuset);

pthread_t current_thread = pthread_self();
pthread_setaffinity_np(current_thread, sizeof(cpu_set_t), &cpuset);
}

static void *publish_thread_direct(void *usr)
{
pin_cpu_core(2);

static u64 seq = 0;

state_t *st = (state_t*)usr;
zcm_trans_t *trans = st->trans;

while (st->running) {
usleep((i64)100e3); // 10 hz

DATA->send_time = wallclock();
DATA->seq = seq++;

zcm_msg_t msg[1];
msg->utime = 0;
msg->channel = "BLOB";
msg->len = sizeof(data_t);
msg->buf = (u8*)DATA;

int ret = zcm_trans_sendmsg(trans, *msg);
if (ret != ZCM_EOK) {
FAIL("Failed to publish!");
}
}

return NULL;
}

static void *handle_thread_direct(void *usr)
{
pin_cpu_core(3);

state_t *st = (state_t*)usr;
zcm_trans_t *trans = st->trans;
size_t recv_count = 0;

zcm_trans_recvmsg_enable(trans, "BLOB", true);

zcm_msg_t msg[1];
while (1) {
uint64_t drops = 0;
int ret = zcm_trans_query_drops(trans, &drops);
assert(ret == ZCM_EOK);
printf("status | recv: %lu drop: %lu\n", recv_count, drops);

// consume all
while (1) {
int ret = zcm_trans_recvmsg(trans, msg, 10);
if (ret != ZCM_EOK) break;
recv_count++;
}
usleep(1e6); // sleep to stall and drop
}

st->running = false;
return NULL;
}

int main(int argc, char *argv[])
{
zcm_url_t* u = zcm_url_create(URL_IPCSHM);
if (!u) FAIL("Failed to create url");

const char* protocol = zcm_url_protocol(u);

zcm_trans_create_func* creator = zcm_transport_find(protocol);
if (!creator) FAIL("Failed to find transport type by url");

char *errmsg;
zcm_trans_t* trans = creator(u, &errmsg);
if (!trans) FAIL("Failed to create transport: %s", errmsg);

state_t st[1] = {};
st->running = true;
st->trans = trans;

pthread_t pub_thread, hdl_thread;
pthread_create(&pub_thread, NULL, publish_thread_direct, st);
pthread_create(&hdl_thread, NULL, handle_thread_direct, st);

usleep(10e6);
st->running = true;

pthread_join(pub_thread, NULL);
pthread_join(hdl_thread, NULL);

zcm_trans_destroy(trans);
return 1;
}
4 changes: 4 additions & 0 deletions bench/ipcshm/wscript
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,7 @@ def build(ctx):
ctx.program(target='test_big',
use = 'default zcm benchzcmtypes',
source='test_big.c')

ctx.program(target='test_dropping',
use = 'default zcm benchzcmtypes',
source='test_dropping.c')
12 changes: 11 additions & 1 deletion zcm/blocking.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ struct zcm_blocking
int flush(bool block);

int setQueueSize(uint32_t numMsgs, bool block);

int queryDrops(uint64_t *out_drops);
int writeTopology(string name);

private:
Expand Down Expand Up @@ -612,6 +612,11 @@ int zcm_blocking_t::setQueueSize(uint32_t numMsgs, bool block)
return ZCM_EOK;
}

int zcm_blocking_t::queryDrops(uint64_t *out_drops)
{
return zcm_trans_query_drops(zt, out_drops);
}

void zcm_blocking_t::sendThreadFunc()
{
// Name the send thread
Expand Down Expand Up @@ -930,6 +935,11 @@ void zcm_blocking_set_queue_size(zcm_blocking_t* zcm, uint32_t sz)
zcm->setQueueSize(sz, true);
}

int zcm_blocking_query_drops(zcm_blocking_t *zcm, uint64_t *out_drops)
{
return zcm->queryDrops(out_drops);
}

int zcm_blocking_write_topology(zcm_blocking_t* zcm, const char* name)
{
#ifdef TRACK_TRAFFIC_TOPOLOGY
Expand Down
1 change: 1 addition & 0 deletions zcm/blocking.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ void zcm_blocking_resume(zcm_blocking_t* zcm);
int zcm_blocking_handle(zcm_blocking_t* zcm);
int zcm_blocking_handle_nonblock(zcm_blocking_t* zcm);
void zcm_blocking_set_queue_size(zcm_blocking_t* zcm, uint32_t numMsgs);
int zcm_blocking_query_drops(zcm_blocking_t *zcm, uint64_t *out_drops);

int zcm_blocking_write_topology(zcm_blocking_t* zcm, const char* name);

Expand Down
4 changes: 4 additions & 0 deletions zcm/js/node/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ var ZCM_EAGAIN = libzcm.zcm_retcode_name_to_enum("ZCM_EAGAIN");
var ZCM_ECONNECT = libzcm.zcm_retcode_name_to_enum("ZCM_ECONNECT");
var ZCM_EINTR = libzcm.zcm_retcode_name_to_enum("ZCM_EINTR");
var ZCM_EUNKNOWN = libzcm.zcm_retcode_name_to_enum("ZCM_EUNKNOWN");
var ZCM_EMEMORY = libzcm.zcm_retcode_name_to_enum("ZCM_EMEMORY");
var ZCM_EUNIMPL = libzcm.zcm_retcode_name_to_enum("ZCM_EUNIMPL");
var ZCM_NUM_RETURN_CODES = libzcm.zcm_retcode_name_to_enum("ZCM_NUM_RETURN_CODES");

exports.ZCM_EOK = ZCM_EOK;
Expand All @@ -63,6 +65,8 @@ exports.ZCM_EAGAIN = ZCM_EAGAIN;
exports.ZCM_ECONNECT = ZCM_ECONNECT;
exports.ZCM_EINTR = ZCM_EINTR;
exports.ZCM_EUNKNOWN = ZCM_EUNKNOWN;
exports.ZCM_EMEMORY = ZCM_EMEMORY;
exports.ZCM_EUNIMPL = ZCM_EUNIMPL;
exports.ZCM_NUM_RETURN_CODES = ZCM_NUM_RETURN_CODES;

/**
Expand Down
5 changes: 5 additions & 0 deletions zcm/nonblocking.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ int zcm_nonblocking_unsubscribe(zcm_nonblocking_t* zcm, zcm_sub_t* sub)
return rc;
}

int zcm_nonblocking_query_drops(zcm_nonblocking_t *zcm, uint64_t *out_drops)
{
return zcm_trans_query_drops(zcm->zt, out_drops);
}

static void dispatch_message(zcm_nonblocking_t* zcm, zcm_msg_t* msg)
{
zcm_recv_buf_t rbuf;
Expand Down
2 changes: 2 additions & 0 deletions zcm/nonblocking.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ zcm_sub_t* zcm_nonblocking_subscribe(zcm_nonblocking_t* zcm, const char* channel

int zcm_nonblocking_unsubscribe(zcm_nonblocking_t* zcm, zcm_sub_t* sub);

int zcm_nonblocking_query_drops(zcm_nonblocking_t *zcm, uint64_t *out_drops);

/* Returns 1 if a message was dispatched, and 0 otherwise */
int zcm_nonblocking_handle_nonblock(zcm_nonblocking_t* zcm);

Expand Down
2 changes: 2 additions & 0 deletions zcm/python/zerocm.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ cdef extern from "zcm/zcm.h":
ZCM_ECONNECT,
ZCM_EINTR,
ZCM_EUNKNOWN,
ZCM_EMEMORY,
ZCM_EUNIMPL,
ZCM_NUM_RETURN_CODES
ctypedef struct zcm_t:
pass
Expand Down
22 changes: 22 additions & 0 deletions zcm/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@
* and users should only expect accuracy within a few milliseconds. Users
* should *not* attempt to use this timing mechanism for real-time events.
*
* int query_drops(zcm_trans_t* zt, uint64_t *out_drops);
* --------------------------------------------------------------------
* This method provides the caller access to an internal transport drop counter
* Implementing this is not required. If unimplemented, it should return ZCM_EUNIMPL.
* If implemented, the out-parameter *out_drops should be populated and the
* call should return ZCM_EOK.
*
* int update(zcm_trans_t* zt);
* --------------------------------------------------------------------
* This method is unused (in this mode) and should not be called by the user.
Expand Down Expand Up @@ -151,6 +158,13 @@
* NOTE: This method does NOT have to work concurrently with recvmsg_enable()
* NOTE: The 'timeout' field is ignored
*
* int query_drops(zcm_trans_t* zt, uint64_t *out_drops);
* --------------------------------------------------------------------
* This method provides the caller access to an internal transport drop counter
* Implementing this is not required. If unimplemented, it should return ZCM_EUNIMPL.
* If implemented, the out-parameter *out_drops should be populated and the
* call should return ZCM_EOK.
*
* int update(zcm_trans_t* zt)
* --------------------------------------------------------------------
* This method is called from the zcm_handle_nonblock() function.
Expand Down Expand Up @@ -211,6 +225,7 @@ struct zcm_trans_methods_t
int (*sendmsg)(zcm_trans_t* zt, zcm_msg_t msg);
int (*recvmsg_enable)(zcm_trans_t* zt, const char* channel, bool enable);
int (*recvmsg)(zcm_trans_t* zt, zcm_msg_t* msg, unsigned timeout);
int (*query_drops)(zcm_trans_t *zt, uint64_t *out_drops);
int (*update)(zcm_trans_t* zt);
void (*destroy)(zcm_trans_t* zt);
};
Expand All @@ -228,6 +243,13 @@ static INLINE int zcm_trans_recvmsg_enable(zcm_trans_t* zt, const char* channel,
static INLINE int zcm_trans_recvmsg(zcm_trans_t* zt, zcm_msg_t* msg, unsigned timeout)
{ return zt->vtbl->recvmsg(zt, msg, timeout); }

static INLINE int zcm_trans_query_drops(zcm_trans_t* zt, uint64_t *out_drops)
{
/* Possibly unimplemented, return ZCM_EUNIMPL */
if (!zt->vtbl->query_drops) return ZCM_EUNIMPL;
return zt->vtbl->query_drops(zt, out_drops);
}

static INLINE int zcm_trans_update(zcm_trans_t* zt)
{ return zt->vtbl->update(zt); }

Expand Down
1 change: 1 addition & 0 deletions zcm/transport/generic_serial_transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ static zcm_trans_methods_t methods = {
&_serial_sendmsg,
&_serial_recvmsg_enable,
&_serial_recvmsg,
NULL, // drops
&_serial_update,
&zcm_trans_generic_serial_destroy,
};
Expand Down
6 changes: 6 additions & 0 deletions zcm/transport/lockfree/blah.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#include "lf_machine_assumptions.h"
int main()
{
static_assert(1, "");
static_assert(3 == 3, "");
}
6 changes: 6 additions & 0 deletions zcm/transport/lockfree/lf_bcast.c
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ void lf_bcast_sub_init(lf_bcast_sub_t *_sub, lf_bcast_t *b)
sub->idx = b->tail_idx;
}

uint64_t lf_bcast_sub_drops(lf_bcast_sub_t *_sub)
{
sub_impl_t *sub = (sub_impl_t*)_sub;
return sub->drops;
}

const void *lf_bcast_sub_consume_begin(lf_bcast_sub_t *_sub, int64_t timeout)
{
sub_impl_t *sub = (sub_impl_t*)_sub;
Expand Down
3 changes: 2 additions & 1 deletion zcm/transport/lockfree/lf_bcast.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include <stdbool.h>
#include <stdlib.h>
#include <stdint.h>

#ifdef __cplusplus
extern "C" {
Expand Down Expand Up @@ -49,7 +50,7 @@ void lf_bcast_sub_init(lf_bcast_sub_t *sub, lf_bcast_t *b);
/* Return the number of drops the sub has experienced. If the consumer is too slow, elements it's
interested will be reclaimed and rewritten. When the consumer tries to read them, it will discover
they are missing and count them as a drop. */
size_t lf_bcast_sub_drops(lf_bcast_t *sub);
uint64_t lf_bcast_sub_drops(lf_bcast_sub_t *sub);

/* Begin consuming the next buffer in the queue. If no buffer is available, wait up to 'timeout' nanos.
If available, return a pointer to the buffer.
Expand Down
13 changes: 12 additions & 1 deletion zcm/transport/transport.cpp.template
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "zcm/transport_registrar.h"
#include "zcm/transport_register.hpp"
**/
#include <cstdint>
#include <cstdio>
#include <cassert>

Expand Down Expand Up @@ -45,13 +46,19 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t
assert(0);
}

int query_drops(uint64_t *out_drops)
{
// WRITE ME
assert(0);
}

void update()
{
// WRITE ME
assert(0);
}

void destory()
void destroy()
{
// WRITE ME
assert(0);
Expand All @@ -77,6 +84,9 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t
static int _recvmsg(zcm_trans_t *zt, zcm_msg_t *msg, int timeout)
{ return cast(zt)->recvmsg(msg, timeout); }

static int _query_drops(zcm_trans_t *zt, uint64_t *out_drops)
{ return cast(zt)->query_drops(out_drops); }

static int _update(zcm_trans_t *zt)
{ return cast(zt)->update(); }

Expand All @@ -92,6 +102,7 @@ zcm_trans_methods_t ZCM_TRANS_CLASSNAME::methods = {
&ZCM_TRANS_CLASSNAME::_sendmsg,
&ZCM_TRANS_CLASSNAME::_recvmsg_enable,
&ZCM_TRANS_CLASSNAME::_recvmsg,
&ZCM_TRANS_CLASSNAME::_query_drops,
&ZCM_TRANS_CLASSNAME::_update,
&ZCM_TRANS_CLASSNAME::_destroy,
};
Expand Down
Loading

0 comments on commit a120ba6

Please sign in to comment.