Skip to content

Commit

Permalink
- Removed programmatic UPerf Tests
Browse files Browse the repository at this point in the history
- Added AverageSummary
- UPerf now uses AverageSummary to get results from invokers / members
- RpcStats: replaced AtomicInteger with LongAdder
- RequestCorrelator: stats are disabled by default (use enable-rpcstats or rpcs-enable-details to enable)
- Average/AverageMinMax now use a simple array of doubles with a given capacity and write new samples to it (https://issues.redhat.com/browse/JGRP-2857)
- LazyThreadFactory does not call Thread.getState() anymore
  • Loading branch information
belaban committed Dec 18, 2024
1 parent 1a2611d commit 6f4b363
Show file tree
Hide file tree
Showing 32 changed files with 453 additions and 1,666 deletions.
90 changes: 65 additions & 25 deletions src/org/jgroups/blocks/RequestCorrelator.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,13 @@ public class RequestCorrelator {

protected final ForkJoinPool common_pool=ForkJoinPool.commonPool();

protected volatile boolean rpcstats; // enables/disables the 3 RPC metrics below

protected final RpcStats rpc_stats=new RpcStats(false);

protected final AverageMinMax avg_req_delivery=new AverageMinMax().unit(TimeUnit.MICROSECONDS);
protected final AverageMinMax avg_req_delivery=new AverageMinMax(1024).unit(TimeUnit.NANOSECONDS);

protected final AverageMinMax avg_rsp_delivery=new AverageMinMax().unit(TimeUnit.MICROSECONDS);
protected final AverageMinMax avg_rsp_delivery=new AverageMinMax(1024).unit(TimeUnit.NANOSECONDS);

protected static final Log log=LogFactory.getLog(RequestCorrelator.class);

Expand Down Expand Up @@ -103,6 +105,9 @@ public void setRequestHandler(RequestHandler handler) {
public RequestCorrelator asyncRspHandling(boolean f) {async_rsp_handling=f; return this;}
public boolean wrapExceptions() {return wrap_exceptions;}
public RequestCorrelator wrapExceptions(boolean flag) {wrap_exceptions=flag; return this;}
public boolean rpcStats() {return rpcstats;}
public RequestCorrelator rpcStats(boolean b) {
rpcstats=b; return this;}


/**
Expand All @@ -126,10 +131,12 @@ public <T> void sendMulticastRequest(Collection<Address> dest_mbrs, Message msg,
if(req != null) // sync
addEntry(req, hdr, false);
else { // async
if(opts.anycasting())
rpc_stats.addAnycast(false, 0, dest_mbrs);
else
rpc_stats.add(RpcStats.Type.MULTICAST, null, false, 0);
if(rpcstats) {
if(opts.anycasting())
rpc_stats.addAnycast(false, 0, dest_mbrs);
else
rpc_stats.add(RpcStats.Type.MULTICAST, null, false, 0);
}
}
if(opts.anycasting())
sendAnycastRequest(msg, dest_mbrs);
Expand All @@ -147,8 +154,10 @@ public <T> void sendUnicastRequest(Message msg, Request<T> req, RequestOptions o

if(req != null) // sync RPC
addEntry(req, hdr, true);
else // async RPC
rpc_stats.add(RpcStats.Type.UNICAST, dest, false, 0);
else {// async RPC
if(rpcstats)
rpc_stats.add(RpcStats.Type.UNICAST, dest, false, 0);
}
down_prot.down(msg);
}

Expand Down Expand Up @@ -338,13 +347,17 @@ protected RequestCorrelator removeEntry(long req_id) {
Request<?> req=requests.remove(req_id);
if(req != null) {
long time_ns=req.start_time > 0? System.nanoTime() - req.start_time : 0;
if(req instanceof UnicastRequest)
rpc_stats.add(RpcStats.Type.UNICAST, ((UnicastRequest<?>)req).target, true, time_ns);
if(req instanceof UnicastRequest) {
if(rpcstats)
rpc_stats.add(RpcStats.Type.UNICAST, ((UnicastRequest<?>)req).target, true, time_ns);
}
else if(req instanceof GroupRequest) {
if(req.options != null && req.options.anycasting())
rpc_stats.addAnycast(true, time_ns, ((GroupRequest<?>)req).rsps.keySet());
else
rpc_stats.add(RpcStats.Type.MULTICAST, null, true, time_ns);
if(rpcstats) {
if(req.options != null && req.options.anycasting())
rpc_stats.addAnycast(true, time_ns, ((GroupRequest<?>)req).rsps.keySet());
else
rpc_stats.add(RpcStats.Type.MULTICAST, null, true, time_ns);
}
}
else
log.error("request type %s not known", req != null? req.getClass().getSimpleName() : req);
Expand All @@ -358,18 +371,22 @@ else if(req instanceof GroupRequest) {
protected void dispatch(final Message msg, final Header hdr) {
switch(hdr.type) {
case Header.REQ:
long start=System.nanoTime();
long start=rpcstats? System.nanoTime() : 0;
handleRequest(msg, hdr);
long time=(long)((System.nanoTime() - start) / 1000.0);
avg_req_delivery.add(time);
if(start > 0) {
long time=System.nanoTime() - start;
avg_req_delivery.add(time);
}
break;

case Header.RSP:
case Header.EXC_RSP:
start=System.nanoTime();
start=rpcstats? System.nanoTime() : 0;
handleResponse(msg, hdr);
time=(long)((System.nanoTime() - start) / 1000.0);
avg_rsp_delivery.add(time);
if(start > 0) {
long time=System.nanoTime() - start;
avg_rsp_delivery.add(time);
}
break;

default:
Expand All @@ -380,8 +397,8 @@ protected void dispatch(final Message msg, final Header hdr) {

/** Handle a request msg for this correlator */
protected void handleRequest(Message req, Header hdr) {
Object retval;
boolean threw_exception=false;
Object retval;
boolean threw_exception=false;

if(log.isTraceEnabled())
log.trace("calling (%s) with request %d",
Expand Down Expand Up @@ -610,6 +627,10 @@ public Map<String, String> handleProbe(String... keys) {
retval.put(key, String.format("size=%d, next-id=%d", requests.size(), REQUEST_ID.get()));
break;
case "rpcs":
if(!rpcstats) {
retval.put(key, String.format("%s not enabled; use enable-rpcstats to enable it", key));
break;
}
retval.put("sync unicast RPCs", String.valueOf(rpc_stats.unicasts(true)));
retval.put("sync multicast RPCs", String.valueOf(rpc_stats.multicasts(true)));
retval.put("async unicast RPCs", String.valueOf(rpc_stats.unicasts(false)));
Expand All @@ -623,6 +644,7 @@ public Map<String, String> handleProbe(String... keys) {
break;
case "rpcs-enable-details":
rpc_stats.extendedStats(true);
rpcstats=true;
break;
case "rpcs-disable-details":
rpc_stats.extendedStats(false);
Expand All @@ -637,20 +659,37 @@ public Map<String, String> handleProbe(String... keys) {
retval.put(key + " (min/avg/max)", rpc_stats.printRTTStatsByDest());
break;
case "avg-req-delivery":
retval.put(key, avg_req_delivery.toString());
if(!rpcstats)
retval.put(key, String.format("%s not enabled; use enable-rpcstats to enable it", key));
else
retval.put(key, avg_req_delivery.toString());
break;
case "avg-req-delivery-reset":
avg_req_delivery.clear();
break;
case "avg-rsp-delivery":
retval.put(key, avg_rsp_delivery.toString());
if(!rpcstats)
retval.put(key, String.format("%s not enabled; use enable-rpcstats to enable it", key));
else
retval.put(key, avg_rsp_delivery.toString());
break;
case "avg-rsp-delivery-reset":
avg_rsp_delivery.clear();
break;
case "fjp":
retval.put(key, common_pool.toString());
break;
case "enable-rpcstats":
rpcstats=true;
break;
case "disable-rpcstats":
rpcstats=false;
break;
case "rpcstats":
retval.put(key, String.format("rpcstats=%b (enable-rpcstats to enable), " +
"extended stats=%b (rpcs-enable-details to enable)",
rpcstats, rpc_stats.extendedStats()));
break;
}
if("avg-req-delivery".startsWith(key))
retval.putIfAbsent("avg-req-delivery", avg_req_delivery.toString());
Expand All @@ -672,7 +711,8 @@ public String[] supportedKeys() {
return new String[]{"requests", "reqtable-info", "rpcs", "rpcs-reset", "rpcs-enable-details",
"rpcs-disable-details", "rpcs-details", "rtt", "rtt-reset",
"avg-req-delivery", "avg-req-delivery-reset", "async-rsp-handling",
"avg-rsp-delivery", "avg-rsp-delivery-reset", "fjp"};
"avg-rsp-delivery", "avg-rsp-delivery-reset", "fjp", "enable-rpcstats", "disable-rpcstats",
"stats"};
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/BaseBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public abstract class BaseBundler implements Bundler {
protected int capacity=16384;

@ManagedAttribute(description="Time (us) to send the bundled messages")
protected final AverageMinMax avg_send_time=new AverageMinMax().unit(NANOSECONDS);
protected final AverageMinMax avg_send_time=new AverageMinMax(1024).unit(NANOSECONDS);

@ManagedOperation(description="Prints the capacity of the buffers")
public String printBuffers() {
Expand Down
4 changes: 2 additions & 2 deletions src/org/jgroups/protocols/PERF.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ public class PERF extends Protocol {
protected int avg_size=1024;

@ManagedAttribute(description="Average latency in ns",type=AttributeType.TIME,unit=TimeUnit.NANOSECONDS)
public double latencyInNs() {return avg.getAverage();}
public double latencyInNs() {return avg.average();}

@ManagedAttribute(description="Average latency in ms",type=AttributeType.TIME)
public double latencyInMs() {return avg.getAverage() / 1000000.0;}
public double latencyInMs() {return avg.average() / 1000000.0;}

public void init() throws Exception {
super.init();
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/ReliableMulticast.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public abstract class ReliableMulticast extends Protocol implements DiagnosticsH

/** The average number of messages in a received {@link MessageBatch} */
@ManagedAttribute(description="The average number of messages in a batch removed from the table and delivered to the application")
protected final AverageMinMax avg_batch_size=new AverageMinMax();
protected final AverageMinMax avg_batch_size=new AverageMinMax(1024);

@ManagedAttribute(description="Is the retransmit task running")
public boolean isXmitTaskRunning() {return xmit_task != null && !xmit_task.isDone();}
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/ReliableUnicast.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public abstract class ReliableUnicast extends Protocol implements AgeOutCache.Ha
protected final LongAdder xmit_rsps_sent=new LongAdder();

@ManagedAttribute(description="Average batch size of messages delivered to the application")
protected final AverageMinMax avg_delivery_batch_size=new AverageMinMax();
protected final AverageMinMax avg_delivery_batch_size=new AverageMinMax(1024);

@ManagedAttribute(description="True if sending a message can block at the transport level")
protected boolean sends_can_block=true;
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/UNICAST3.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public class UNICAST3 extends Protocol implements AgeOutCache.Handler<Address> {
protected final LongAdder xmit_rsps_sent=new LongAdder();

@ManagedAttribute(description="Average batch size of messages delivered to the application")
protected final AverageMinMax avg_delivery_batch_size=new AverageMinMax();
protected final AverageMinMax avg_delivery_batch_size=new AverageMinMax(1024);

@ManagedAttribute(description="True if sending a message can block at the transport level")
protected boolean sends_can_block=true;
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/pbcast/NAKACK2.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public class NAKACK2 extends Protocol implements DiagnosticsHandler.ProbeHandler

/** The average number of messages in a received {@link MessageBatch} */
@ManagedAttribute(description="The average number of messages in a batch removed from the table and delivered to the application")
protected final AverageMinMax avg_batch_size=new AverageMinMax();
protected final AverageMinMax avg_batch_size=new AverageMinMax(1024);

@ManagedAttribute(description="Is the retransmit task running")
public boolean isXmitTaskRunning() {return xmit_task != null && !xmit_task.isDone();}
Expand Down
Loading

0 comments on commit 6f4b363

Please sign in to comment.