diff --git a/Dockerfile b/Dockerfile index 3f695f97..00252496 100644 --- a/Dockerfile +++ b/Dockerfile @@ -33,13 +33,16 @@ COPY zcm/ ./zcm ENV PATH ${PATH}:/root/.local/bin:$ZCM_HOME/deps/julia/bin ENV NVM_DIR /root/.nvm -RUN bash -c 'export JAVA_HOME=$(readlink -f /usr/bin/javac | sed "s:/bin/javac::") && \ - [ -s "$NVM_DIR/nvm.sh" ] && \. "$NVM_DIR/nvm.sh" && \ - . $ZCM_HOME/deps/cxxtest/.env && \ - ./waf distclean configure --use-all --use-dev && \ - ./waf build && \ - ./waf install && \ - ./waf build_examples' +RUN < +#include +#include +#include +#include +#include "util.h" +#include "zcmtypes/blob_t.h" + +#define VERBOSE 1 + +#define URL_IPCSHM "ipcshm://test_direct?mtu=1000&depth=8" + +typedef struct state state_t; +struct state +{ + bool running; + zcm_trans_t *trans; +}; + +typedef struct data data_t; +struct data +{ + i64 send_time; + u64 seq; +}; +static data_t DATA[1]; + +static void pin_cpu_core(int core_id) +{ + int num_cores = sysconf(_SC_NPROCESSORS_ONLN); + if (core_id < 0 || core_id >= num_cores) { + FAIL("Invalid core id: num_cores=%d", num_cores); + } + + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(core_id, &cpuset); + + pthread_t current_thread = pthread_self(); + pthread_setaffinity_np(current_thread, sizeof(cpu_set_t), &cpuset); +} + +static void *publish_thread_direct(void *usr) +{ + pin_cpu_core(2); + + static u64 seq = 0; + + state_t *st = (state_t*)usr; + zcm_trans_t *trans = st->trans; + + while (st->running) { + usleep((i64)100e3); // 10 hz + + DATA->send_time = wallclock(); + DATA->seq = seq++; + + zcm_msg_t msg[1]; + msg->utime = 0; + msg->channel = "BLOB"; + msg->len = sizeof(data_t); + msg->buf = (u8*)DATA; + + int ret = zcm_trans_sendmsg(trans, *msg); + if (ret != ZCM_EOK) { + FAIL("Failed to publish!"); + } + } + + return NULL; +} + +static void *handle_thread_direct(void *usr) +{ + pin_cpu_core(3); + + state_t *st = (state_t*)usr; + zcm_trans_t *trans = st->trans; + size_t recv_count = 0; + + zcm_trans_recvmsg_enable(trans, "BLOB", true); + + zcm_msg_t msg[1]; + while (1) { + uint64_t drops = 0; + int ret = zcm_trans_query_drops(trans, &drops); + assert(ret == ZCM_EOK); + printf("status | recv: %lu drop: %lu\n", recv_count, drops); + + // consume all + while (1) { + int ret = zcm_trans_recvmsg(trans, msg, 10); + if (ret != ZCM_EOK) break; + recv_count++; + } + usleep(1e6); // sleep to stall and drop + } + + st->running = false; + return NULL; +} + +int main(int argc, char *argv[]) +{ + zcm_url_t* u = zcm_url_create(URL_IPCSHM); + if (!u) FAIL("Failed to create url"); + + const char* protocol = zcm_url_protocol(u); + + zcm_trans_create_func* creator = zcm_transport_find(protocol); + if (!creator) FAIL("Failed to find transport type by url"); + + char *errmsg; + zcm_trans_t* trans = creator(u, &errmsg); + if (!trans) FAIL("Failed to create transport: %s", errmsg); + + state_t st[1] = {}; + st->running = true; + st->trans = trans; + + pthread_t pub_thread, hdl_thread; + pthread_create(&pub_thread, NULL, publish_thread_direct, st); + pthread_create(&hdl_thread, NULL, handle_thread_direct, st); + + usleep(10e6); + st->running = true; + + pthread_join(pub_thread, NULL); + pthread_join(hdl_thread, NULL); + + zcm_trans_destroy(trans); + return 1; +} diff --git a/bench/ipcshm/wscript b/bench/ipcshm/wscript index f5b55759..c479ab44 100644 --- a/bench/ipcshm/wscript +++ b/bench/ipcshm/wscript @@ -42,3 +42,7 @@ def build(ctx): ctx.program(target='test_big', use = 'default zcm benchzcmtypes', source='test_big.c') + + ctx.program(target='test_dropping', + use = 'default zcm benchzcmtypes', + source='test_dropping.c') diff --git a/zcm/blocking.cpp b/zcm/blocking.cpp index e90d7b9f..5ee05738 100644 --- a/zcm/blocking.cpp +++ b/zcm/blocking.cpp @@ -120,7 +120,7 @@ struct zcm_blocking int flush(bool block); int setQueueSize(uint32_t numMsgs, bool block); - + int queryDrops(uint64_t *out_drops); int writeTopology(string name); private: @@ -612,6 +612,11 @@ int zcm_blocking_t::setQueueSize(uint32_t numMsgs, bool block) return ZCM_EOK; } +int zcm_blocking_t::queryDrops(uint64_t *out_drops) +{ + return zcm_trans_query_drops(zt, out_drops); +} + void zcm_blocking_t::sendThreadFunc() { // Name the send thread @@ -930,6 +935,11 @@ void zcm_blocking_set_queue_size(zcm_blocking_t* zcm, uint32_t sz) zcm->setQueueSize(sz, true); } +int zcm_blocking_query_drops(zcm_blocking_t *zcm, uint64_t *out_drops) +{ + return zcm->queryDrops(out_drops); +} + int zcm_blocking_write_topology(zcm_blocking_t* zcm, const char* name) { #ifdef TRACK_TRAFFIC_TOPOLOGY diff --git a/zcm/blocking.h b/zcm/blocking.h index ce6bd34d..b5f75a88 100644 --- a/zcm/blocking.h +++ b/zcm/blocking.h @@ -31,6 +31,7 @@ void zcm_blocking_resume(zcm_blocking_t* zcm); int zcm_blocking_handle(zcm_blocking_t* zcm); int zcm_blocking_handle_nonblock(zcm_blocking_t* zcm); void zcm_blocking_set_queue_size(zcm_blocking_t* zcm, uint32_t numMsgs); +int zcm_blocking_query_drops(zcm_blocking_t *zcm, uint64_t *out_drops); int zcm_blocking_write_topology(zcm_blocking_t* zcm, const char* name); diff --git a/zcm/js/node/index.js b/zcm/js/node/index.js index 382f6e8f..876216eb 100644 --- a/zcm/js/node/index.js +++ b/zcm/js/node/index.js @@ -55,6 +55,8 @@ var ZCM_EAGAIN = libzcm.zcm_retcode_name_to_enum("ZCM_EAGAIN"); var ZCM_ECONNECT = libzcm.zcm_retcode_name_to_enum("ZCM_ECONNECT"); var ZCM_EINTR = libzcm.zcm_retcode_name_to_enum("ZCM_EINTR"); var ZCM_EUNKNOWN = libzcm.zcm_retcode_name_to_enum("ZCM_EUNKNOWN"); +var ZCM_EMEMORY = libzcm.zcm_retcode_name_to_enum("ZCM_EMEMORY"); +var ZCM_EUNIMPL = libzcm.zcm_retcode_name_to_enum("ZCM_EUNIMPL"); var ZCM_NUM_RETURN_CODES = libzcm.zcm_retcode_name_to_enum("ZCM_NUM_RETURN_CODES"); exports.ZCM_EOK = ZCM_EOK; @@ -63,6 +65,8 @@ exports.ZCM_EAGAIN = ZCM_EAGAIN; exports.ZCM_ECONNECT = ZCM_ECONNECT; exports.ZCM_EINTR = ZCM_EINTR; exports.ZCM_EUNKNOWN = ZCM_EUNKNOWN; +exports.ZCM_EMEMORY = ZCM_EMEMORY; +exports.ZCM_EUNIMPL = ZCM_EUNIMPL; exports.ZCM_NUM_RETURN_CODES = ZCM_NUM_RETURN_CODES; /** diff --git a/zcm/nonblocking.c b/zcm/nonblocking.c index 2c665160..d5b7adb7 100644 --- a/zcm/nonblocking.c +++ b/zcm/nonblocking.c @@ -157,6 +157,11 @@ int zcm_nonblocking_unsubscribe(zcm_nonblocking_t* zcm, zcm_sub_t* sub) return rc; } +int zcm_nonblocking_query_drops(zcm_nonblocking_t *zcm, uint64_t *out_drops) +{ + return zcm_trans_query_drops(zcm->zt, out_drops); +} + static void dispatch_message(zcm_nonblocking_t* zcm, zcm_msg_t* msg) { zcm_recv_buf_t rbuf; diff --git a/zcm/nonblocking.h b/zcm/nonblocking.h index 91c6a57e..b1913bb8 100644 --- a/zcm/nonblocking.h +++ b/zcm/nonblocking.h @@ -22,6 +22,8 @@ zcm_sub_t* zcm_nonblocking_subscribe(zcm_nonblocking_t* zcm, const char* channel int zcm_nonblocking_unsubscribe(zcm_nonblocking_t* zcm, zcm_sub_t* sub); +int zcm_nonblocking_query_drops(zcm_nonblocking_t *zcm, uint64_t *out_drops); + /* Returns 1 if a message was dispatched, and 0 otherwise */ int zcm_nonblocking_handle_nonblock(zcm_nonblocking_t* zcm); diff --git a/zcm/python/zerocm.pyx b/zcm/python/zerocm.pyx index 2c9dbd30..38e37478 100644 --- a/zcm/python/zerocm.pyx +++ b/zcm/python/zerocm.pyx @@ -16,6 +16,8 @@ cdef extern from "zcm/zcm.h": ZCM_ECONNECT, ZCM_EINTR, ZCM_EUNKNOWN, + ZCM_EMEMORY, + ZCM_EUNIMPL, ZCM_NUM_RETURN_CODES ctypedef struct zcm_t: pass diff --git a/zcm/transport.h b/zcm/transport.h index 53f7804c..d7e47846 100644 --- a/zcm/transport.h +++ b/zcm/transport.h @@ -88,6 +88,13 @@ * and users should only expect accuracy within a few milliseconds. Users * should *not* attempt to use this timing mechanism for real-time events. * + * int query_drops(zcm_trans_t* zt, uint64_t *out_drops); + * -------------------------------------------------------------------- + * This method provides the caller access to an internal transport drop counter + * Implementing this is not required. If unimplemented, it should return ZCM_EUNIMPL. + * If implemented, the out-parameter *out_drops should be populated and the + * call should return ZCM_EOK. + * * int update(zcm_trans_t* zt); * -------------------------------------------------------------------- * This method is unused (in this mode) and should not be called by the user. @@ -151,6 +158,13 @@ * NOTE: This method does NOT have to work concurrently with recvmsg_enable() * NOTE: The 'timeout' field is ignored * + * int query_drops(zcm_trans_t* zt, uint64_t *out_drops); + * -------------------------------------------------------------------- + * This method provides the caller access to an internal transport drop counter + * Implementing this is not required. If unimplemented, it should return ZCM_EUNIMPL. + * If implemented, the out-parameter *out_drops should be populated and the + * call should return ZCM_EOK. + * * int update(zcm_trans_t* zt) * -------------------------------------------------------------------- * This method is called from the zcm_handle_nonblock() function. @@ -211,6 +225,7 @@ struct zcm_trans_methods_t int (*sendmsg)(zcm_trans_t* zt, zcm_msg_t msg); int (*recvmsg_enable)(zcm_trans_t* zt, const char* channel, bool enable); int (*recvmsg)(zcm_trans_t* zt, zcm_msg_t* msg, unsigned timeout); + int (*query_drops)(zcm_trans_t *zt, uint64_t *out_drops); int (*update)(zcm_trans_t* zt); void (*destroy)(zcm_trans_t* zt); }; @@ -228,6 +243,13 @@ static INLINE int zcm_trans_recvmsg_enable(zcm_trans_t* zt, const char* channel, static INLINE int zcm_trans_recvmsg(zcm_trans_t* zt, zcm_msg_t* msg, unsigned timeout) { return zt->vtbl->recvmsg(zt, msg, timeout); } +static INLINE int zcm_trans_query_drops(zcm_trans_t* zt, uint64_t *out_drops) +{ + /* Possibly unimplemented, return ZCM_EUNIMPL */ + if (!zt->vtbl->query_drops) return ZCM_EUNIMPL; + return zt->vtbl->query_drops(zt, out_drops); +} + static INLINE int zcm_trans_update(zcm_trans_t* zt) { return zt->vtbl->update(zt); } diff --git a/zcm/transport/generic_serial_transport.c b/zcm/transport/generic_serial_transport.c index 249078a0..cb07262a 100644 --- a/zcm/transport/generic_serial_transport.c +++ b/zcm/transport/generic_serial_transport.c @@ -263,6 +263,7 @@ static zcm_trans_methods_t methods = { &_serial_sendmsg, &_serial_recvmsg_enable, &_serial_recvmsg, + NULL, // drops &_serial_update, &zcm_trans_generic_serial_destroy, }; diff --git a/zcm/transport/lockfree/blah.c b/zcm/transport/lockfree/blah.c new file mode 100644 index 00000000..0f0594da --- /dev/null +++ b/zcm/transport/lockfree/blah.c @@ -0,0 +1,6 @@ +#include "lf_machine_assumptions.h" +int main() +{ + static_assert(1, ""); + static_assert(3 == 3, ""); +} diff --git a/zcm/transport/lockfree/lf_bcast.c b/zcm/transport/lockfree/lf_bcast.c index 6350eb69..74121659 100644 --- a/zcm/transport/lockfree/lf_bcast.c +++ b/zcm/transport/lockfree/lf_bcast.c @@ -226,6 +226,12 @@ void lf_bcast_sub_init(lf_bcast_sub_t *_sub, lf_bcast_t *b) sub->idx = b->tail_idx; } +uint64_t lf_bcast_sub_drops(lf_bcast_sub_t *_sub) +{ + sub_impl_t *sub = (sub_impl_t*)_sub; + return sub->drops; +} + const void *lf_bcast_sub_consume_begin(lf_bcast_sub_t *_sub, int64_t timeout) { sub_impl_t *sub = (sub_impl_t*)_sub; diff --git a/zcm/transport/lockfree/lf_bcast.h b/zcm/transport/lockfree/lf_bcast.h index ece56fa2..f3004d4a 100644 --- a/zcm/transport/lockfree/lf_bcast.h +++ b/zcm/transport/lockfree/lf_bcast.h @@ -1,6 +1,7 @@ #pragma once #include #include +#include #ifdef __cplusplus extern "C" { @@ -49,7 +50,7 @@ void lf_bcast_sub_init(lf_bcast_sub_t *sub, lf_bcast_t *b); /* Return the number of drops the sub has experienced. If the consumer is too slow, elements it's interested will be reclaimed and rewritten. When the consumer tries to read them, it will discover they are missing and count them as a drop. */ -size_t lf_bcast_sub_drops(lf_bcast_t *sub); +uint64_t lf_bcast_sub_drops(lf_bcast_sub_t *sub); /* Begin consuming the next buffer in the queue. If no buffer is available, wait up to 'timeout' nanos. If available, return a pointer to the buffer. diff --git a/zcm/transport/transport.cpp.template b/zcm/transport/transport.cpp.template index 51d69946..fa64c067 100644 --- a/zcm/transport/transport.cpp.template +++ b/zcm/transport/transport.cpp.template @@ -3,6 +3,7 @@ #include "zcm/transport_registrar.h" #include "zcm/transport_register.hpp" **/ +#include #include #include @@ -45,13 +46,19 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t assert(0); } + int query_drops(uint64_t *out_drops) + { + // WRITE ME + assert(0); + } + void update() { // WRITE ME assert(0); } - void destory() + void destroy() { // WRITE ME assert(0); @@ -77,6 +84,9 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t static int _recvmsg(zcm_trans_t *zt, zcm_msg_t *msg, int timeout) { return cast(zt)->recvmsg(msg, timeout); } + static int _query_drops(zcm_trans_t *zt, uint64_t *out_drops) + { return cast(zt)->query_drops(out_drops); } + static int _update(zcm_trans_t *zt) { return cast(zt)->update(); } @@ -92,6 +102,7 @@ zcm_trans_methods_t ZCM_TRANS_CLASSNAME::methods = { &ZCM_TRANS_CLASSNAME::_sendmsg, &ZCM_TRANS_CLASSNAME::_recvmsg_enable, &ZCM_TRANS_CLASSNAME::_recvmsg, + &ZCM_TRANS_CLASSNAME::_query_drops, &ZCM_TRANS_CLASSNAME::_update, &ZCM_TRANS_CLASSNAME::_destroy, }; diff --git a/zcm/transport/transport_can.cpp b/zcm/transport/transport_can.cpp index bcc2ab39..37e0d713 100644 --- a/zcm/transport/transport_can.cpp +++ b/zcm/transport/transport_can.cpp @@ -277,7 +277,8 @@ zcm_trans_methods_t ZCM_TRANS_CLASSNAME::methods = { &ZCM_TRANS_CLASSNAME::_sendmsg, &ZCM_TRANS_CLASSNAME::_recvmsg_enable, &ZCM_TRANS_CLASSNAME::_recvmsg, - NULL, + NULL, // drops + NULL, // update &ZCM_TRANS_CLASSNAME::_destroy, }; diff --git a/zcm/transport/transport_file.cpp b/zcm/transport/transport_file.cpp index 597de421..8cadd40e 100644 --- a/zcm/transport/transport_file.cpp +++ b/zcm/transport/transport_file.cpp @@ -193,7 +193,8 @@ zcm_trans_methods_t ZCM_TRANS_CLASSNAME::methods = { &ZCM_TRANS_CLASSNAME::_sendmsg, &ZCM_TRANS_CLASSNAME::_recvmsg_enable, &ZCM_TRANS_CLASSNAME::_recvmsg, - NULL, + NULL, // drops + NULL, // update &ZCM_TRANS_CLASSNAME::_destroy, }; diff --git a/zcm/transport/transport_inproc.cpp b/zcm/transport/transport_inproc.cpp index ae6ead33..bda94e51 100644 --- a/zcm/transport/transport_inproc.cpp +++ b/zcm/transport/transport_inproc.cpp @@ -154,6 +154,7 @@ zcm_trans_methods_t ZCM_TRANS_CLASSNAME::methods = { &ZCM_TRANS_CLASSNAME::_sendmsg, &ZCM_TRANS_CLASSNAME::_recvmsg_enable, &ZCM_TRANS_CLASSNAME::_recvmsg, + NULL, // drops &ZCM_TRANS_CLASSNAME::_update, &ZCM_TRANS_CLASSNAME::_destroy, }; diff --git a/zcm/transport/transport_ipcshm.cpp b/zcm/transport/transport_ipcshm.cpp index b25fcb89..26f67895 100644 --- a/zcm/transport/transport_ipcshm.cpp +++ b/zcm/transport/transport_ipcshm.cpp @@ -302,6 +302,14 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t return ZCM_EOK; } + int query_drops(uint64_t *out_drops) + { + if (!out_drops) return ZCM_EINVALID; + uint64_t drops = lf_bcast_sub_drops(sub); + *out_drops = drops; + return ZCM_EOK; + } + /********************** STATICS **********************/ static zcm_trans_methods_t methods; static ZCM_TRANS_CLASSNAME *cast(zcm_trans_t *zt) @@ -322,6 +330,9 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t static int _recvmsg(zcm_trans_t *zt, zcm_msg_t *msg, unsigned timeout) { return cast(zt)->recvmsg(msg, timeout); } + static int _query_drops(zcm_trans_t *zt, uint64_t *out_drops) + { return cast(zt)->query_drops(out_drops); } + static void _destroy(zcm_trans_t *zt) { delete cast(zt); } @@ -330,12 +341,13 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t }; zcm_trans_methods_t ZCM_TRANS_CLASSNAME::methods = { - &ZCM_TRANS_CLASSNAME::_get_mtu, - &ZCM_TRANS_CLASSNAME::_sendmsg, - &ZCM_TRANS_CLASSNAME::_recvmsg_enable, - &ZCM_TRANS_CLASSNAME::_recvmsg, - NULL, // update - &ZCM_TRANS_CLASSNAME::_destroy, + &ZCM_TRANS_CLASSNAME::_get_mtu, + &ZCM_TRANS_CLASSNAME::_sendmsg, + &ZCM_TRANS_CLASSNAME::_recvmsg_enable, + &ZCM_TRANS_CLASSNAME::_recvmsg, + &ZCM_TRANS_CLASSNAME::_query_drops, + NULL, // update + &ZCM_TRANS_CLASSNAME::_destroy, }; static zcm_trans_t *create(zcm_url_t *url, char **opt_errmsg) diff --git a/zcm/transport/transport_serial.cpp b/zcm/transport/transport_serial.cpp index 3cd88140..b0cb8e06 100644 --- a/zcm/transport/transport_serial.cpp +++ b/zcm/transport/transport_serial.cpp @@ -469,6 +469,7 @@ zcm_trans_methods_t ZCM_TRANS_CLASSNAME::methods = { &ZCM_TRANS_CLASSNAME::_sendmsg, &ZCM_TRANS_CLASSNAME::_recvmsgEnable, &ZCM_TRANS_CLASSNAME::_recvmsg, + NULL, // drops NULL, // update &ZCM_TRANS_CLASSNAME::_destroy, }; diff --git a/zcm/transport/transport_zmq_local.cpp b/zcm/transport/transport_zmq_local.cpp index b0b17b77..71e812ce 100644 --- a/zcm/transport/transport_zmq_local.cpp +++ b/zcm/transport/transport_zmq_local.cpp @@ -520,6 +520,7 @@ zcm_trans_methods_t ZCM_TRANS_CLASSNAME::methods = { &ZCM_TRANS_CLASSNAME::_sendmsg, &ZCM_TRANS_CLASSNAME::_recvmsgEnable, &ZCM_TRANS_CLASSNAME::_recvmsg, + NULL, // drops NULL, // update &ZCM_TRANS_CLASSNAME::_destroy, }; diff --git a/zcm/transport/udp/udp.cpp b/zcm/transport/udp/udp.cpp index 4b04ec1e..3725f49a 100644 --- a/zcm/transport/udp/udp.cpp +++ b/zcm/transport/udp/udp.cpp @@ -486,6 +486,7 @@ zcm_trans_methods_t ZCM_TRANS_CLASSNAME::methods = { &ZCM_TRANS_CLASSNAME::_sendmsg, &ZCM_TRANS_CLASSNAME::_recvmsgEnable, &ZCM_TRANS_CLASSNAME::_recvmsg, + NULL, // drops NULL, // update &ZCM_TRANS_CLASSNAME::_destroy, }; diff --git a/zcm/zcm.c b/zcm/zcm.c index 0d90b97f..e535288b 100644 --- a/zcm/zcm.c +++ b/zcm/zcm.c @@ -338,9 +338,15 @@ int zcm_handle_nonblock(zcm_t* zcm) return ret; } - - - +int zcm_query_drops(zcm_t *zcm, uint64_t *out_drops) +{ + switch (zcm->type) { + case ZCM_BLOCKING: return zcm_blocking_query_drops(zcm->impl, out_drops); + case ZCM_NONBLOCKING: return zcm_nonblocking_query_drops(zcm->impl, out_drops); + } + ZCM_ASSERT(0 && "unreachable"); + return ZCM_EUNKNOWN; +} /****************************************************************************/ /* NOT FOR GENERAL USE. USED FOR LANGUAGE-SPECIFIC BINDINGS WITH VERY */ diff --git a/zcm/zcm.h b/zcm/zcm.h index 16f395b0..216e0127 100644 --- a/zcm/zcm.h +++ b/zcm/zcm.h @@ -39,7 +39,8 @@ enum zcm_type { X(ZCM_EINTR , -4, "Operation was unexpectedly interrupted") \ X(ZCM_EUNKNOWN, -5, "Unknown error" ) \ X(ZCM_EMEMORY, -6, "Out of memory" ) \ - X(ZCM_NUM_RETURN_CODES, 7, "Invalid return code" ) + X(ZCM_EUNIMPL, -7, "Function is not implemented" ) \ + X(ZCM_NUM_RETURN_CODES, 8, "Invalid return code" ) /* Return codes */ enum zcm_return_codes { @@ -157,7 +158,10 @@ int zcm_write_topology(zcm_t* zcm, const char* name); error code otherwise */ int zcm_handle_nonblock(zcm_t* zcm); - +/* Query the drop counter on the underlying transport + NOTE: This may be unimplemented, in which case it will return ZCM_EIMPL and + the out-param will be disregarded. */ +int zcm_query_drops(zcm_t *zcm, uint64_t *out_drops); /****************************************************************************/ /* NOT FOR GENERAL USE. USED FOR LANGUAGE-SPECIFIC BINDINGS WITH VERY */