Skip to content

Commit ebb51d9

Browse files
Richard Drinkwaterligallag-amd
authored andcommitted
ON-16111: use monitor thread in zfsink to allow testing higher rates
1 parent ff54d00 commit ebb51d9

File tree

1 file changed

+69
-13
lines changed

1 file changed

+69
-13
lines changed

src/tests/zf_apps/zfsink.c

Lines changed: 69 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,23 @@
2121
#include <stdbool.h>
2222
#include <stdarg.h>
2323
#include <inttypes.h>
24+
#include <pthread.h>
25+
#include <sys/time.h>
26+
27+
28+
struct resources {
29+
/* statistics */
30+
volatile uint64_t n_rx_pkts;
31+
volatile uint64_t n_rx_bytes;
32+
};
2433

2534

2635
static bool cfg_quiet = false;
2736
static bool cfg_rx_timestamping = false;
37+
static struct resources res;
38+
39+
/* Mutex to protect printing from different threads */
40+
static pthread_mutex_t printf_mutex;
2841

2942

3043
static void usage_msg(FILE* f)
@@ -51,12 +64,12 @@ static void usage_err(void)
5164

5265
static void vlog(const char* fmt, ...)
5366
{
54-
if( ! cfg_quiet ) {
55-
va_list args;
56-
va_start(args, fmt);
57-
vprintf(fmt, args);
58-
va_end(args);
59-
}
67+
va_list args;
68+
va_start(args, fmt);
69+
pthread_mutex_lock(&printf_mutex);
70+
vprintf(fmt, args);
71+
pthread_mutex_unlock(&printf_mutex);
72+
va_end(args);
6073
}
6174

6275

@@ -77,9 +90,6 @@ static void try_recv(struct zfur* ur)
7790

7891
/* Do something useful with the datagram here! */
7992

80-
81-
vlog("Received datagram of length %zu\n", rd.iov[0].iov_len);
82-
8393
/* In the case rx timestamping capabilities are enabled, we can retrieve
8494
* the time at which the packet was received.
8595
* */
@@ -89,12 +99,15 @@ static void try_recv(struct zfur* ur)
8999
int rc = zfur_pkt_get_timestamp(ur, &rd.msg, &ts, 0, &flags);
90100

91101
if( rc == 0 )
92-
vlog("At time: %lld.%.9ld\n", ts.tv_sec, ts.tv_nsec);
102+
vlog("Hardware timestamp: %lld.%.9ld\n", ts.tv_sec, ts.tv_nsec);
93103
else
94104
vlog("Error retrieving timestamp! Return code: %d\n", rc);
95105
}
96106

97107
zfur_zc_recv_done(ur, &rd.msg);
108+
109+
res.n_rx_pkts += 1;
110+
res.n_rx_bytes += rd.iov[0].iov_len;
98111
} while( rd.msg.dgrams_left );
99112
}
100113

@@ -104,7 +117,6 @@ static void poll_muxer(struct zf_muxer_set* muxer, int timeout)
104117
struct epoll_event evs[8];
105118
const int max_evs = sizeof(evs) / sizeof(evs[0]);
106119

107-
vlog("Polling muxer\n");
108120
int n_ev = zf_muxer_wait(muxer, evs, max_evs, timeout);
109121

110122
for( int i = 0; i < n_ev; ++i )
@@ -115,7 +127,6 @@ static void poll_muxer(struct zf_muxer_set* muxer, int timeout)
115127
static void ev_loop_reactor(struct zf_stack* stack, struct zfur* ur)
116128
{
117129
while( 1 ) {
118-
vlog("Polling reactor\n");
119130
while( zf_reactor_perform(stack) == 0 )
120131
;
121132
try_recv(ur);
@@ -145,7 +156,6 @@ static void ev_loop_waitable_fd(struct zf_stack* stack,
145156
struct epoll_event evs[8];
146157
const int max_evs = sizeof(evs) / sizeof(evs[0]);
147158

148-
vlog("Calling epoll_wait\n");
149159
int n_ev = epoll_wait(epollfd, evs, max_evs, -1);
150160

151161
for( int i = 0; i < n_ev; ++i )
@@ -233,8 +243,47 @@ void print_attrs(struct zf_attr* attr)
233243
}
234244

235245

246+
static void monitor()
247+
{
248+
uint64_t now_bytes, prev_bytes;
249+
struct timeval start, end;
250+
uint64_t prev_pkts, now_pkts;
251+
int ms, pkt_rate, mbps;
252+
253+
vlog("#%9s %16s %16s", "pkt-rate", "bandwidth(Mbps)", "total-pkts\n");
254+
255+
prev_pkts = res.n_rx_pkts;
256+
prev_bytes = res.n_rx_bytes;
257+
gettimeofday(&start, NULL);
258+
259+
while( 1 ) {
260+
sleep(1);
261+
now_pkts = res.n_rx_pkts;
262+
now_bytes = res.n_rx_bytes;
263+
gettimeofday(&end, NULL);
264+
ms = (end.tv_sec - start.tv_sec) * 1000;
265+
ms += (end.tv_usec - start.tv_usec) / 1000;
266+
pkt_rate = (int) ((now_pkts - prev_pkts) * 1000 / ms);
267+
mbps = (int) ((now_bytes - prev_bytes) * 8 / 1000 / ms);
268+
vlog("%10d %16d %16"PRIu64"\n", pkt_rate, mbps, now_pkts);
269+
fflush(stdout);
270+
prev_pkts = now_pkts;
271+
prev_bytes = now_bytes;
272+
start = end;
273+
}
274+
}
275+
276+
277+
static void* monitor_fn(void* arg)
278+
{
279+
monitor();
280+
return NULL;
281+
}
282+
283+
236284
int main(int argc, char* argv[])
237285
{
286+
pthread_t thread_id;
238287
int cfg_muxer = 0;
239288
int cfg_waitable_fd = 0;
240289
bool cfg_print_attrs = false;
@@ -320,6 +369,13 @@ int main(int argc, char* argv[])
320369
ZF_TRY(zf_muxer_add(muxer, zfur_to_waitable(ur), &event));
321370
}
322371

372+
pthread_mutex_init(&printf_mutex, NULL);
373+
res.n_rx_bytes = 0;
374+
res.n_rx_pkts = 0;
375+
376+
if( ! cfg_quiet )
377+
ZF_TRY(pthread_create(&thread_id, NULL, monitor_fn, NULL) == 0);
378+
323379
if( cfg_waitable_fd )
324380
ev_loop_waitable_fd(stack, muxer);
325381
else if( cfg_muxer )

0 commit comments

Comments
 (0)