Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@ public class ZeroCopyMetrics extends RatisMetrics {
private final LongCounter nonZeroCopyMessages = getRegistry().counter("num_non_zero_copy_messages");
private final LongCounter releasedMessages = getRegistry().counter("num_released_messages");

// Per-message-type zero-copy counters.
private final LongCounter zeroCopyAppendEntries = getRegistry().counter("num_zero_copy_append_entries");
private final LongCounter zeroCopyInstallSnapshot = getRegistry().counter("num_zero_copy_install_snapshot");
private final LongCounter zeroCopyClientRequest = getRegistry().counter("num_zero_copy_client_request");

// Aggregated savings and parse time (nanos) for zero-copy path.
private final LongCounter bytesSavedByZeroCopy = getRegistry().counter("bytes_saved_by_zero_copy");
private final LongCounter zeroCopyParseTimeNanos = getRegistry().counter("zero_copy_parse_time_nanos");

// Reason counters for zero-copy fallback.
private final LongCounter fallbackNotKnownLength = getRegistry().counter("zero_copy_fallback_not_known_length");
private final LongCounter fallbackNotDetachable = getRegistry().counter("zero_copy_fallback_not_detachable");
private final LongCounter fallbackNotByteBuffer = getRegistry().counter("zero_copy_fallback_not_byte_buffer");

public ZeroCopyMetrics() {
super(createRegistry());
}
Expand All @@ -54,6 +68,21 @@ public void onZeroCopyMessage(AbstractMessage ignored) {
zeroCopyMessages.inc();
}

public void onZeroCopyAppendEntries(AbstractMessage ignored) {
onZeroCopyMessage(ignored);
zeroCopyAppendEntries.inc();
}

public void onZeroCopyInstallSnapshot(AbstractMessage ignored) {
onZeroCopyMessage(ignored);
zeroCopyInstallSnapshot.inc();
}

public void onZeroCopyClientRequest(AbstractMessage ignored) {
onZeroCopyMessage(ignored);
zeroCopyClientRequest.inc();
}

public void onNonZeroCopyMessage(AbstractMessage ignored) {
nonZeroCopyMessages.inc();
}
Expand All @@ -62,6 +91,34 @@ public void onReleasedMessage(AbstractMessage ignored) {
releasedMessages.inc();
}

public ZeroCopyMessageMarshallerMetrics newMarshallerMetrics() {
return new ZeroCopyMessageMarshallerMetrics();
}

// Adapter used by ZeroCopyMessageMarshaller to report parse stats and fallback reasons.
public class ZeroCopyMessageMarshallerMetrics implements org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller.Metrics {
@Override
public void onZeroCopyParse(long bytesSaved, long parseTimeNanos) {
bytesSavedByZeroCopy.inc(bytesSaved);
zeroCopyParseTimeNanos.inc(parseTimeNanos);
}

@Override
public void onFallbackNotKnownLength() {
fallbackNotKnownLength.inc();
}

@Override
public void onFallbackNotDetachable() {
fallbackNotDetachable.inc();
}

@Override
public void onFallbackNotByteBuffer() {
fallbackNotByteBuffer.inc();
}
}

@VisibleForTesting
public long zeroCopyMessages() {
return zeroCopyMessages.getCount();
Expand All @@ -76,4 +133,4 @@ public long nonZeroCopyMessages() {
public long releasedMessages() {
return releasedMessages.getCount();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ void closeAllExisting(RaftGroupId groupId) {
this.executor = executor;
this.zeroCopyEnabled = zeroCopyEnabled;
this.zeroCopyRequestMarshaller = new ZeroCopyMessageMarshaller<>(RaftClientRequestProto.getDefaultInstance(),
zeroCopyMetrics::onZeroCopyMessage, zeroCopyMetrics::onNonZeroCopyMessage, zeroCopyMetrics::onReleasedMessage);
zeroCopyMetrics::onZeroCopyClientRequest, zeroCopyMetrics::onNonZeroCopyMessage,
zeroCopyMetrics::onReleasedMessage, zeroCopyMetrics.newMarshallerMetrics());
zeroCopyMetrics.addUnreleased("client_protocol", zeroCopyRequestMarshaller::getUnclosedCount);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

import static org.apache.ratis.grpc.GrpcUtil.addMethodWithCustomMarshaller;
import static org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.getAppendEntriesMethod;
import static org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.getInstallSnapshotMethod;

class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase {
public static final Logger LOG = LoggerFactory.getLogger(GrpcServerProtocolService.class);
Expand All @@ -59,10 +60,12 @@ private enum BatchLogKey implements BatchLogger.Key {
static class PendingServerRequest<REQUEST> {
private final AtomicReference<ReferenceCountedObject<REQUEST>> requestRef;
private final CompletableFuture<Void> future = new CompletableFuture<>();
private final String requestString;

PendingServerRequest(ReferenceCountedObject<REQUEST> requestRef) {
PendingServerRequest(ReferenceCountedObject<REQUEST> requestRef, String requestString) {
requestRef.retain();
this.requestRef = new AtomicReference<>(requestRef);
this.requestString = requestString;
}

REQUEST getRequest() {
Expand All @@ -71,6 +74,10 @@ REQUEST getRequest() {
.orElse(null);
}

String getRequestString() {
return requestString;
}

CompletableFuture<Void> getFuture() {
return future;
}
Expand Down Expand Up @@ -104,8 +111,7 @@ String getName() {

private String getPreviousRequestString() {
return Optional.ofNullable(previousOnNext.get())
.map(PendingServerRequest::getRequest)
.map(this::requestToString)
.map(PendingServerRequest::getRequestString)
.orElse(null);
}

Expand Down Expand Up @@ -177,7 +183,9 @@ public void onNext(REQUEST request) {
return;
}

final PendingServerRequest<REQUEST> current = new PendingServerRequest<>(requestRef);
final PendingServerRequest<REQUEST> current
= new PendingServerRequest<>(requestRef, requestToString(requestRef.get()));
current.getFuture().whenComplete((r, e) -> current.release());
final long callId = getCallId(current.getRequest());
final boolean isHeartbeat = isHeartbeat(current.getRequest());
final Optional<PendingServerRequest<REQUEST>> previous = Optional.ofNullable(previousOnNext.getAndSet(current));
Expand Down Expand Up @@ -243,15 +251,23 @@ private void releaseLast() {
private final RaftServer server;
private final boolean zeroCopyEnabled;
private final ZeroCopyMessageMarshaller<AppendEntriesRequestProto> zeroCopyRequestMarshaller;
private final ZeroCopyMessageMarshaller<InstallSnapshotRequestProto> zeroCopyInstallSnapshotMarshaller;

GrpcServerProtocolService(Supplier<RaftPeerId> idSupplier, RaftServer server, boolean zeroCopyEnabled,
ZeroCopyMetrics zeroCopyMetrics) {
this.idSupplier = idSupplier;
this.server = server;
this.zeroCopyEnabled = zeroCopyEnabled;
this.zeroCopyRequestMarshaller = new ZeroCopyMessageMarshaller<>(AppendEntriesRequestProto.getDefaultInstance(),
zeroCopyMetrics::onZeroCopyMessage, zeroCopyMetrics::onNonZeroCopyMessage, zeroCopyMetrics::onReleasedMessage);
zeroCopyMetrics::onZeroCopyAppendEntries, zeroCopyMetrics::onNonZeroCopyMessage,
zeroCopyMetrics::onReleasedMessage, zeroCopyMetrics.newMarshallerMetrics());
this.zeroCopyInstallSnapshotMarshaller = new ZeroCopyMessageMarshaller<>(
InstallSnapshotRequestProto.getDefaultInstance(),
zeroCopyMetrics::onZeroCopyInstallSnapshot, zeroCopyMetrics::onNonZeroCopyMessage,
zeroCopyMetrics::onReleasedMessage, zeroCopyMetrics.newMarshallerMetrics());
zeroCopyMetrics.addUnreleased("server_protocol", zeroCopyRequestMarshaller::getUnclosedCount);
zeroCopyMetrics.addUnreleased("server_protocol_install_snapshot",
zeroCopyInstallSnapshotMarshaller::getUnclosedCount);
}

RaftPeerId getId() {
Expand All @@ -268,9 +284,16 @@ ServerServiceDefinition bindServiceWithZeroCopy() {

// Add appendEntries with zero copy marshaller.
addMethodWithCustomMarshaller(orig, builder, getAppendEntriesMethod(), zeroCopyRequestMarshaller);
// Add installSnapshot with zero copy marshaller for zero-copy counters/metrics.
addMethodWithCustomMarshaller(orig, builder, getInstallSnapshotMethod(), zeroCopyInstallSnapshotMarshaller);
// Add remaining methods as is.
final String appendEntriesMethod = getAppendEntriesMethod().getFullMethodName();
final String installSnapshotMethod = getInstallSnapshotMethod().getFullMethodName();
orig.getMethods().stream().filter(
x -> !x.getMethodDescriptor().getFullMethodName().equals(getAppendEntriesMethod().getFullMethodName())
x -> {
final String methodName = x.getMethodDescriptor().getFullMethodName();
return !methodName.equals(appendEntriesMethod) && !methodName.equals(installSnapshotMethod);
}
).forEach(
builder::addMethod
);
Expand Down Expand Up @@ -365,6 +388,11 @@ CompletableFuture<InstallSnapshotReplyProto> process(InstallSnapshotRequestProto
return CompletableFuture.completedFuture(server.installSnapshot(request));
}

@Override
void release(InstallSnapshotRequestProto request) {
zeroCopyInstallSnapshotMarshaller.release(request);
}

@Override
long getCallId(InstallSnapshotRequestProto request) {
return request.getServerRequest().getCallId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,23 @@
public class ZeroCopyMessageMarshaller<T extends MessageLite> implements PrototypeMarshaller<T> {
static final Logger LOG = LoggerFactory.getLogger(ZeroCopyMessageMarshaller.class);

public interface Metrics {
default void onZeroCopyParse(long bytesSaved, long parseTimeNanos) {
}

default void onFallbackNotKnownLength() {
}

default void onFallbackNotDetachable() {
}

default void onFallbackNotByteBuffer() {
}
}

private static final Metrics NOOP_METRICS = new Metrics() {
};

private final String name;
private final Map<T, InputStream> unclosedStreams = Collections.synchronizedMap(new IdentityHashMap<>());
private final Parser<T> parser;
Expand All @@ -63,13 +80,19 @@ public class ZeroCopyMessageMarshaller<T extends MessageLite> implements Prototy
private final Consumer<T> zeroCopyCount;
private final Consumer<T> nonZeroCopyCount;
private final Consumer<T> releasedCount;
private final Metrics metrics;

public ZeroCopyMessageMarshaller(T defaultInstance) {
this(defaultInstance, m -> {}, m -> {}, m -> {});
this(defaultInstance, m -> {}, m -> {}, m -> {}, NOOP_METRICS);
}

public ZeroCopyMessageMarshaller(T defaultInstance, Consumer<T> zeroCopyCount, Consumer<T> nonZeroCopyCount,
Consumer<T> releasedCount) {
this(defaultInstance, zeroCopyCount, nonZeroCopyCount, releasedCount, NOOP_METRICS);
}

public ZeroCopyMessageMarshaller(T defaultInstance, Consumer<T> zeroCopyCount, Consumer<T> nonZeroCopyCount,
Consumer<T> releasedCount, Metrics metrics) {
this.name = JavaUtils.getClassSimpleName(defaultInstance.getClass()) + "-Marshaller";
@SuppressWarnings("unchecked")
final Parser<T> p = (Parser<T>) defaultInstance.getParserForType();
Expand All @@ -79,6 +102,7 @@ public ZeroCopyMessageMarshaller(T defaultInstance, Consumer<T> zeroCopyCount, C
this.zeroCopyCount = zeroCopyCount;
this.nonZeroCopyCount = nonZeroCopyCount;
this.releasedCount = releasedCount;
this.metrics = metrics == null ? NOOP_METRICS : metrics;
}

@Override
Expand Down Expand Up @@ -158,28 +182,36 @@ private List<ByteString> getByteStrings(InputStream detached, int exactSize) thr
*/
private T parseZeroCopy(InputStream stream) throws IOException {
if (!(stream instanceof KnownLength)) {
metrics.onFallbackNotKnownLength();
LOG.debug("stream is not KnownLength: {}", stream.getClass());
return null;
}
if (!(stream instanceof Detachable)) {
metrics.onFallbackNotDetachable();
LOG.debug("stream is not Detachable: {}", stream.getClass());
return null;
}
if (!(stream instanceof HasByteBuffer)) {
metrics.onFallbackNotByteBuffer();
LOG.debug("stream is not HasByteBuffer: {}", stream.getClass());
return null;
}
if (!((HasByteBuffer) stream).byteBufferSupported()) {
metrics.onFallbackNotByteBuffer();
LOG.debug("stream is HasByteBuffer but not byteBufferSupported: {}", stream.getClass());
return null;
}

final int exactSize = stream.available();
InputStream detached = ((Detachable) stream).detach();
// Measure only the zero-copy parse path (detach + parse).
final long startNanos = System.nanoTime();
try {
final List<ByteString> byteStrings = getByteStrings(detached, exactSize);
final T message = parseFrom(byteStrings, exactSize);

metrics.onZeroCopyParse(exactSize, System.nanoTime() - startNanos);

final InputStream previous = unclosedStreams.put(message, detached);
Preconditions.assertNull(previous, "previous");

Expand Down
Loading