Skip to content

Commit

Permalink
Simplify for transports to send to multiple destinations
Browse files Browse the repository at this point in the history
This change gives a convenient method to override to allow a transport to convert data (to e.g. a direct buffer) only once before sending to multiple destinations
  • Loading branch information
cfredri4 authored and belaban committed Nov 18, 2024
1 parent 0c00066 commit 71703ca
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions src/org/jgroups/protocols/TP.java
Original file line number Diff line number Diff line change
Expand Up @@ -1381,6 +1381,7 @@ protected void sendToAll(byte[] buf, int offset, int length) throws Exception {
}
}

List<PhysicalAddress> dests=new ArrayList<>(mbrs.size());
for(Address mbr: mbrs) {
if(local_send_successful && local_transport != null && local_transport.isLocalMember(mbr))
continue; // skip if local transport sent the message successfully
Expand All @@ -1392,22 +1393,29 @@ protected void sendToAll(byte[] buf, int offset, int length) throws Exception {
missing.add(mbr);
continue;
}
if(!Objects.equals(local_physical_addr, target))
dests.add(target);
}
if(!dests.isEmpty())
sendUnicasts(dests, buf, offset, length);
if(missing != null)
fetchPhysicalAddrs(missing);
}

protected void sendUnicasts(List<PhysicalAddress> dests, byte[] data, int offset, int length) throws Exception {
for(PhysicalAddress dest: dests) {
try {
if(!Objects.equals(local_physical_addr, target))
sendUnicast(target, buf, offset, length);
sendUnicast(dest, data, offset, length);
}
catch(SocketException | SocketTimeoutException sock_ex) {
log.debug(Util.getMessage("FailureSendingToPhysAddr"), local_addr, mbr, sock_ex);
log.debug(Util.getMessage("FailureSendingToPhysAddr"), local_addr, dest, sock_ex);
}
catch(Throwable t) {
log.error(Util.getMessage("FailureSendingToPhysAddr"), local_addr, mbr, t);
log.error(Util.getMessage("FailureSendingToPhysAddr"), local_addr, dest, t);
}
}
if(missing != null)
fetchPhysicalAddrs(missing);
}


protected void fetchPhysicalAddrs(List<Address> missing) {
long current_time=0;
boolean do_send=false;
Expand All @@ -1431,7 +1439,6 @@ protected Responses fetchResponsesFromDiscoveryProtocol(List<Address> missing) {
return (Responses)up_prot.up(new Event(Event.FIND_MBRS, missing));
}


protected long timestamp() {return time_service != null? time_service.timestamp() : System.nanoTime();}

/**
Expand Down

0 comments on commit 71703ca

Please sign in to comment.