Skip to content

Commit

Permalink
feat: JS clients can provide custom grpc transports (deephaven#6476)
Browse files Browse the repository at this point in the history
Provides a contract for client applications to use a custom http/2
implementation. Roughly abstracted from our TypeScript grpc-web library,
with a few rough edges taken off, and no external dependencies.

Two integration tests are included, one which requires https (presently
only possible with manual testing, see deephaven#6421), and one which pretends to
contact the server but really responds to every request with a "success"
response and no payload.

No documentation required at this time, generated typescript includes
details on the new APIs.

Fixes deephaven#6404
  • Loading branch information
niloc132 committed Dec 10, 2024
1 parent d10d596 commit 4a9827a
Showing 13 changed files with 511 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@
//
package io.deephaven.web.client.api;

import elemental2.core.Function;
import io.deephaven.web.client.api.grpc.GrpcTransportFactory;
import jsinterop.annotations.JsIgnore;
import jsinterop.annotations.JsNullable;
import jsinterop.annotations.JsType;
@@ -29,20 +29,23 @@ public class ConnectOptions {

/**
* Set this to true to force the use of websockets when connecting to the deephaven instance, false to force the use
* of {@code fetch}.
* of {@code fetch}. Ignored if {@link #transportFactory} is set.
* <p>
* Defaults to null, indicating that the server URL should be checked to see if we connect with fetch or websockets.
*/
@JsNullable
public Boolean useWebsockets;

// TODO (deephaven-core#6214) provide our own grpc-web library that can replace fetch
// /**
// * Optional fetch implementation to use instead of the global {@code fetch()} call, allowing callers to provide a
// * polyfill rather than add a new global.
// */
// @JsNullable
// public Function fetch;
/**
* The transport factory to use for creating gRPC streams. If specified, the JS API will ignore
* {@link #useWebsockets} and its own internal logic for determining the appropriate transport to use.
* <p>
* Defaults to null, indicating that the JS API should determine the appropriate transport to use. If
* {@code useWebsockets} is set to true, the JS API will use websockets, otherwise if the server url begins with
* https, it will use fetch, otherwise it will use websockets.
*/
@JsNullable
public GrpcTransportFactory transportFactory;

public ConnectOptions() {

@@ -65,5 +68,8 @@ public ConnectOptions(Object connectOptions) {
// if (map.has("fetch")) {
// fetch = map.getAsAny("fetch").uncheckedCast();
// }
if (map.has("transportFactory")) {
transportFactory = map.getAsAny("transportFactory").uncheckedCast();
}
}
}
Original file line number Diff line number Diff line change
@@ -7,11 +7,9 @@
import elemental2.core.JsArray;
import elemental2.core.JsSet;
import elemental2.promise.Promise;
import io.deephaven.javascript.proto.dhinternal.grpcweb.Grpc;
import io.deephaven.javascript.proto.dhinternal.grpcweb.client.RpcOptions;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.TerminationNotificationResponse;
import io.deephaven.web.client.api.event.HasEventHandling;
import io.deephaven.web.client.api.grpc.MultiplexedWebsocketTransport;
import io.deephaven.web.client.ide.IdeSession;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb.*;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.ticket_pb.Ticket;
@@ -246,12 +244,8 @@ public void disconnected() {

public abstract void notifyServerShutdown(TerminationNotificationResponse success);

public boolean useWebsockets() {
Boolean useWebsockets = getOptions().useWebsockets;
if (useWebsockets == null) {
useWebsockets = getServerUrl().startsWith("http:");
}
return useWebsockets;
public boolean supportsClientStreaming() {
return getOptions().transportFactory.getSupportsClientStreaming();
}

public <T> T createClient(BiFunction<String, Object, T> constructor) {
@@ -261,12 +255,7 @@ public <T> T createClient(BiFunction<String, Object, T> constructor) {
public RpcOptions makeRpcOptions() {
RpcOptions options = RpcOptions.create();
options.setDebug(getOptions().debug);
if (useWebsockets()) {
// Replace with our custom websocket impl, with fallback to the built-in one
options.setTransport(o -> new MultiplexedWebsocketTransport(o, () -> {
Grpc.setDefaultTransport.onInvoke(Grpc.WebsocketTransport.onInvoke());
}));
}
options.setTransport(getOptions().transportFactory.adapt());
return options;
}
}
Original file line number Diff line number Diff line change
@@ -986,7 +986,7 @@ public BrowserHeaders metadata() {
}

public <ReqT, RespT> BiDiStream.Factory<ReqT, RespT> streamFactory() {
return new BiDiStream.Factory<>(info.useWebsockets(), this::metadata, config::newTicketInt);
return new BiDiStream.Factory<>(info.supportsClientStreaming(), this::metadata, config::newTicketInt);
}

public Promise<JsTable> newTable(String[] columnNames, String[] types, Object[][] data, String userTimeZone,
Original file line number Diff line number Diff line change
@@ -36,12 +36,12 @@ public interface NextStreamMessageFactory<Req> {
void nextStreamMessage(Req nextPayload, BrowserHeaders headers, JsBiConsumer<Object, Object> callback);
}
public static class Factory<ReqT, RespT> {
private final boolean useWebsockets;
private final boolean supportsClientStreaming;
private final Supplier<BrowserHeaders> headers;
private final IntSupplier nextIntTicket;

public Factory(boolean useWebsockets, Supplier<BrowserHeaders> headers, IntSupplier nextIntTicket) {
this.useWebsockets = useWebsockets;
public Factory(boolean supportsClientStreaming, Supplier<BrowserHeaders> headers, IntSupplier nextIntTicket) {
this.supportsClientStreaming = supportsClientStreaming;
this.headers = headers;
this.nextIntTicket = nextIntTicket;
}
@@ -51,8 +51,8 @@ public BiDiStream<ReqT, RespT> create(
OpenStreamFactory<ReqT> openEmulatedStream,
NextStreamMessageFactory<ReqT> nextEmulatedStream,
ReqT emptyReq) {
if (useWebsockets) {
return websocket(bidirectionalStream.openBiDiStream(headers.get()));
if (supportsClientStreaming) {
return bidi(bidirectionalStream.openBiDiStream(headers.get()));
} else {
return new EmulatedBiDiStream<>(
openEmulatedStream,
@@ -73,7 +73,7 @@ public static <Req, Resp> BiDiStream<Req, Resp> of(
IntSupplier nextIntTicket,
boolean useWebsocket) {
if (useWebsocket) {
return websocket(bidirectionalStream.openBiDiStream(headers.get()));
return bidi(bidirectionalStream.openBiDiStream(headers.get()));
} else {
return new EmulatedBiDiStream<>(
openEmulatedStream,
@@ -84,7 +84,7 @@ public static <Req, Resp> BiDiStream<Req, Resp> of(
}
}

public static <Req, Resp> BiDiStream<Req, Resp> websocket(Object bidirectionalStream) {
public static <Req, Resp> BiDiStream<Req, Resp> bidi(Object bidirectionalStream) {
return new WebsocketBiDiStream<>(Js.cast(bidirectionalStream));
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.web.client.api.grpc;

import com.vertispan.tsdefs.annotations.TsInterface;
import elemental2.core.Uint8Array;
import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders;
import io.deephaven.javascript.proto.dhinternal.grpcweb.grpc.Transport;
import jsinterop.annotations.JsIgnore;
import jsinterop.annotations.JsType;
import jsinterop.base.JsPropertyMap;

/**
* gRPC transport implementation.
*
*/
@JsType(namespace = "dh.grpc")
@TsInterface
public interface GrpcTransport {
/**
* Starts the stream, sending metadata to the server.
*
* @param metadata the headers to send the server when opening the connection
*/
void start(JsPropertyMap<HeaderValueUnion> metadata);

/**
* Sends a message to the server.
*
* @param msgBytes bytes to send to the server
*/
void sendMessage(Uint8Array msgBytes);

/**
* "Half close" the stream, signaling to the server that no more messages will be sent, but that the client is still
* open to receiving messages.
*/
void finishSend();

/**
* End the stream, both notifying the server that no more messages will be sent nor received, and preventing the
* client from receiving any more events.
*/
void cancel();

/**
* Helper to transform ts implementations to our own api.
*/
@JsIgnore
static GrpcTransport from(Transport tsTransport) {
return new GrpcTransport() {
@Override
public void start(JsPropertyMap<HeaderValueUnion> metadata) {
tsTransport.start(new BrowserHeaders(metadata));
}

@Override
public void sendMessage(Uint8Array msgBytes) {
tsTransport.sendMessage(msgBytes);
}

@Override
public void finishSend() {
tsTransport.finishSend();
}

@Override
public void cancel() {
tsTransport.cancel();
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.web.client.api.grpc;

import com.vertispan.tsdefs.annotations.TsInterface;
import elemental2.core.Uint8Array;
import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders;
import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.Transport;
import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.TransportFactory;
import jsinterop.annotations.JsOverlay;
import jsinterop.annotations.JsProperty;
import jsinterop.annotations.JsType;
import jsinterop.base.Js;

/**
* Factory for creating gRPC transports.
*/
@TsInterface
@JsType(namespace = "dh.grpc", isNative = true)
public interface GrpcTransportFactory {
/**
* Create a new transport instance.
*
* @param options options for creating the transport
* @return a transport instance to use for gRPC communication
*/
GrpcTransport create(GrpcTransportOptions options);

/**
* Return true to signal that created transports may have {@link GrpcTransport#sendMessage(Uint8Array)} called on it
* more than once before {@link GrpcTransport#finishSend()} should be called.
*
* @return true to signal that the implementation can stream multiple messages, false otherwise indicating that
* Open/Next gRPC calls should be used
*/
@JsProperty
boolean getSupportsClientStreaming();

/**
* Adapt this factory to the transport factory used by the gRPC-web library.
*/
@JsOverlay
default TransportFactory adapt() {
return options -> {
GrpcTransport impl = create(GrpcTransportOptions.from(options));
return new Transport() {
@Override
public void cancel() {
impl.cancel();
}

@Override
public void finishSend() {
impl.finishSend();
}

@Override
public void sendMessage(Uint8Array msgBytes) {
impl.sendMessage(msgBytes);
}

@Override
public void start(BrowserHeaders metadata) {
impl.start(Js.cast(metadata.headersMap));
}
};
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.web.client.api.grpc;

import com.vertispan.tsdefs.annotations.TsInterface;
import elemental2.core.JsError;
import elemental2.core.Uint8Array;
import elemental2.dom.URL;
import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders;
import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.TransportOptions;
import jsinterop.annotations.JsFunction;
import jsinterop.annotations.JsIgnore;
import jsinterop.annotations.JsNullable;
import jsinterop.annotations.JsOptional;
import jsinterop.annotations.JsType;
import jsinterop.base.JsPropertyMap;

/**
* Options for creating a gRPC stream transport instance.
*/
@TsInterface
@JsType(namespace = "dh.grpc")
public class GrpcTransportOptions {
@JsFunction
@FunctionalInterface
public interface OnHeadersCallback {
void onHeaders(JsPropertyMap<HeaderValueUnion> headers, int status);
}

@JsFunction
@FunctionalInterface
public interface OnChunkCallback {
void onChunk(Uint8Array chunk);
}

@JsFunction
@FunctionalInterface
public interface OnEndCallback {
void onEnd(@JsOptional @JsNullable JsError error);
}

/**
* The gRPC method URL.
*/
public URL url;

/**
* True to enable debug logging for this stream.
*/
public boolean debug;

/**
* Callback for when headers and status are received. The headers are a map of header names to values, and the
* status is the HTTP status code. If the connection could not be made, the status should be 0.
*/
public OnHeadersCallback onHeaders;

/**
* Callback for when a chunk of data is received.
*/
public OnChunkCallback onChunk;

/**
* Callback for when the stream ends, with an error instance if it can be provided. Note that the present
* implementation does not consume errors, even if provided.
*/
public OnEndCallback onEnd;

/**
* Internal copy of options, to be used for fallback.
*/
@JsIgnore
public TransportOptions originalOptions;

/**
* Convert a {@link TransportOptions} instance to a {@link GrpcTransportOptions} instance.
*/
@JsIgnore
public static GrpcTransportOptions from(TransportOptions options) {
GrpcTransportOptions impl = new GrpcTransportOptions();
impl.url = new URL(options.getUrl());
impl.debug = options.isDebug();
impl.onHeaders = (headers, status) -> options.getOnHeaders().onInvoke(new BrowserHeaders(headers), status);
impl.onChunk = p0 -> {
// "false" because the underlying implementation doesn't rely on this anyway.
options.getOnChunk().onInvoke(p0, false);
};
impl.onEnd = options.getOnEnd()::onInvoke;
return impl;
}
}
Loading

0 comments on commit 4a9827a

Please sign in to comment.