From fa8539e921de4551785856d984ad6be756155ac9 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Wed, 20 Nov 2024 15:54:09 -0600 Subject: [PATCH 01/13] Support custom transport factories from js api consumers --- .../deephaven/web/client/api/ConnectOptions.java | 16 ++++++++++++++++ .../web/client/api/QueryConnectable.java | 6 ++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java b/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java index 8bf5c5d10bc..5c0449e2bd8 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java @@ -3,7 +3,9 @@ // package io.deephaven.web.client.api; +import com.vertispan.tsdefs.annotations.TsTypeRef; import elemental2.core.Function; +import io.deephaven.javascript.proto.dhinternal.grpcweb.grpc.TransportFactory; import jsinterop.annotations.JsIgnore; import jsinterop.annotations.JsNullable; import jsinterop.annotations.JsType; @@ -44,6 +46,17 @@ public class ConnectOptions { // @JsNullable // public Function fetch; + /** + * The transport factory to use for creating gRPC streams. If specified, the JS API will ignore + * {@link #useWebsockets} and its own internal logic for determining the appropriate transport to use. + *

+ * Defaults to null, indicating that the JS API should determine the appropriate transport to use. If + * {@code useWebsockets} is set to true, the JS API will use websockets, otherwise if the server url begins with + * https, it will use fetch, otherwise it will use websockets. + */ + @JsNullable + public @TsTypeRef(Function.class) TransportFactory transportFactory; + public ConnectOptions() { } @@ -65,5 +78,8 @@ public ConnectOptions(Object connectOptions) { // if (map.has("fetch")) { // fetch = map.getAsAny("fetch").uncheckedCast(); // } + if (map.has("transportFactory")) { + transportFactory = map.getAsAny("transportFactory").uncheckedCast(); + } } } diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java b/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java index 1394069e0b7..6dd752909c1 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java @@ -261,12 +261,14 @@ public T createClient(BiFunction constructor) { public RpcOptions makeRpcOptions() { RpcOptions options = RpcOptions.create(); options.setDebug(getOptions().debug); - if (useWebsockets()) { + if (getOptions().transportFactory != null) { + options.setTransport(getOptions().transportFactory); + } else if (useWebsockets()) { // Replace with our custom websocket impl, with fallback to the built-in one options.setTransport(o -> new MultiplexedWebsocketTransport(o, () -> { Grpc.setDefaultTransport.onInvoke(Grpc.WebsocketTransport.onInvoke()); })); - } + } // else default to fetch implementation return options; } } From e3e8732e228c1f0f91328682d80402ba399e6967 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Thu, 21 Nov 2024 15:42:28 -0600 Subject: [PATCH 02/13] Fix import for correct interface --- .../main/java/io/deephaven/web/client/api/ConnectOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java b/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java index 5c0449e2bd8..d560cd8ffa9 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java @@ -5,7 +5,7 @@ import com.vertispan.tsdefs.annotations.TsTypeRef; import elemental2.core.Function; -import io.deephaven.javascript.proto.dhinternal.grpcweb.grpc.TransportFactory; +import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.TransportFactory; import jsinterop.annotations.JsIgnore; import jsinterop.annotations.JsNullable; import jsinterop.annotations.JsType; From 970b5c8c43d67a4cb1054195458e8ede29a0fbd1 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Fri, 22 Nov 2024 15:57:57 -0600 Subject: [PATCH 03/13] Draft of proposed transport api --- .../web/client/api/ConnectOptions.java | 6 +- .../web/client/api/QueryConnectable.java | 2 +- .../web/client/api/grpc/GrpcTransport.java | 42 ++++++++++ .../client/api/grpc/GrpcTransportFactory.java | 52 +++++++++++++ .../client/api/grpc/GrpcTransportOptions.java | 78 +++++++++++++++++++ .../transport/TransportOptions.java | 2 +- 6 files changed, 176 insertions(+), 6 deletions(-) create mode 100644 web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransport.java create mode 100644 web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java create mode 100644 web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportOptions.java diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java b/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java index d560cd8ffa9..9801424f73c 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java @@ -3,9 +3,7 @@ // package io.deephaven.web.client.api; -import com.vertispan.tsdefs.annotations.TsTypeRef; -import elemental2.core.Function; -import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.TransportFactory; +import io.deephaven.web.client.api.grpc.GrpcTransportFactory; import jsinterop.annotations.JsIgnore; import jsinterop.annotations.JsNullable; import jsinterop.annotations.JsType; @@ -55,7 +53,7 @@ public class ConnectOptions { * https, it will use fetch, otherwise it will use websockets. */ @JsNullable - public @TsTypeRef(Function.class) TransportFactory transportFactory; + public GrpcTransportFactory transportFactory; public ConnectOptions() { diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java b/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java index 6dd752909c1..7f3937250fe 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java @@ -262,7 +262,7 @@ public RpcOptions makeRpcOptions() { RpcOptions options = RpcOptions.create(); options.setDebug(getOptions().debug); if (getOptions().transportFactory != null) { - options.setTransport(getOptions().transportFactory); + options.setTransport(getOptions().transportFactory.adapt()); } else if (useWebsockets()) { // Replace with our custom websocket impl, with fallback to the built-in one options.setTransport(o -> new MultiplexedWebsocketTransport(o, () -> { diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransport.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransport.java new file mode 100644 index 00000000000..bfb7e26fde9 --- /dev/null +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransport.java @@ -0,0 +1,42 @@ +package io.deephaven.web.client.api.grpc; + +import com.vertispan.tsdefs.annotations.TsInterface; +import elemental2.core.JsArray; +import elemental2.core.Uint8Array; +import jsinterop.annotations.JsType; +import jsinterop.base.JsPropertyMap; + +/** + * gRPC transport implementation. + */ +@JsType(namespace = "dh.grpc") +@TsInterface +public interface GrpcTransport { + /** + * Return true to signal that the client may have {@link #sendMessage(Uint8Array)} called on it more than once before {@link #finishSend()} should be called. + * @return true to signal that the implementation can stream multiple messages, false otherwise indicating that Open/Next gRPC calls should be used + */ + boolean supportsClientStreaming(); + /** + * Starts the stream, sending metadata to the server. + * @param metadata the headers to send the server when opening the connection + */ + void start(JsPropertyMap> metadata); + + /** + * Sends a message to the server. + * @param msgBytes bytes to send to the server + */ + void sendMessage(Uint8Array msgBytes); + + /** + * "Half close" the stream, signaling to the server that no more messages will be sent, but that the client is still open to receiving messages. + */ + void finishSend(); + + /** + * End the stream, both notifying the server that no more messages will be sent nor received, and preventing + * the client from receiving any more events. + */ + void cancel(); +} diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java new file mode 100644 index 00000000000..58a06414af3 --- /dev/null +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java @@ -0,0 +1,52 @@ +package io.deephaven.web.client.api.grpc; + +import elemental2.core.Uint8Array; +import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders; +import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.Transport; +import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.TransportFactory; +import jsinterop.annotations.JsFunction; +import jsinterop.annotations.JsOverlay; + +/** + * Factory for creating gRPC transports. + */ +@JsFunction +public interface GrpcTransportFactory { + /** + * Create a new transport instance. + * @param options options for creating the transport + * @return a transport to use for gRPC communication + */ + GrpcTransport create(GrpcTransportOptions options); + + /** + * Adapt this factory to the transport factory used by the gRPC-web library. + */ + @JsOverlay + default TransportFactory adapt() { + return options -> { + GrpcTransport impl = create(GrpcTransportOptions.from(options)); + return new Transport() { + @Override + public void cancel() { + impl.cancel(); + } + + @Override + public void finishSend() { + impl.finishSend(); + } + + @Override + public void sendMessage(Uint8Array msgBytes) { + impl.sendMessage(msgBytes); + } + + @Override + public void start(BrowserHeaders metadata) { + impl.start(metadata.headersMap); + } + }; + }; + } +} diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportOptions.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportOptions.java new file mode 100644 index 00000000000..a7177cdc523 --- /dev/null +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportOptions.java @@ -0,0 +1,78 @@ +package io.deephaven.web.client.api.grpc; + +import com.vertispan.tsdefs.annotations.TsInterface; +import elemental2.core.JsArray; +import elemental2.core.JsError; +import elemental2.core.Uint8Array; +import elemental2.dom.URL; +import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders; +import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.TransportOptions; +import jsinterop.annotations.JsFunction; +import jsinterop.annotations.JsIgnore; +import jsinterop.annotations.JsNullable; +import jsinterop.annotations.JsType; +import jsinterop.base.JsPropertyMap; + +/** + * Options for creating a gRPC stream transport instance. + */ +@TsInterface +@JsType(namespace = "dh.grpc") +public class GrpcTransportOptions { + @JsFunction + @FunctionalInterface + public interface OnHeadersCallback { + void onHeaders(JsPropertyMap> headers, int status); + } + + @JsFunction + @FunctionalInterface + public interface OnChunkCallback { + void onChunk(Uint8Array chunk, boolean finished); + } + + @JsFunction + @FunctionalInterface + public interface OnEndCallback { + void onEnd(@JsNullable JsError error); + } + + /** + * The gRPC method URL. + */ + public URL url; + + /** + * True to enable debug logging for this stream. + */ + public boolean debug; + + /** + * Callback for when headers and status are received. The headers are a map of header names to values, and the status is the HTTP status code. If the connection could not be made, the status should be 0. + */ + public OnHeadersCallback onHeaders; + + /** + * Callback for when a chunk of data is received. + */ + public OnChunkCallback onChunk; + + /** + * Callback for when the stream ends, with an error instance if it can be provided. Note that the present implementation does not consume errors, even if provided. + */ + public OnEndCallback onEnd; + + /** + * Convert a {@link TransportOptions} instance to a {@link GrpcTransportOptions} instance. + */ + @JsIgnore + public static GrpcTransportOptions from(TransportOptions options) { + GrpcTransportOptions impl = new GrpcTransportOptions(); + impl.url = new URL(options.getUrl()); + impl.debug = options.isDebug(); + impl.onHeaders = (headers, status) -> options.getOnHeaders().onInvoke(new BrowserHeaders(headers), status); + impl.onChunk = options.getOnChunk()::onInvoke; + impl.onEnd = options.getOnEnd()::onInvoke; + return impl; + } +} diff --git a/web/client-backplane/src/main/java/io/deephaven/javascript/proto/dhinternal/grpcweb/transports/transport/TransportOptions.java b/web/client-backplane/src/main/java/io/deephaven/javascript/proto/dhinternal/grpcweb/transports/transport/TransportOptions.java index 7bb39eda843..d9162713094 100644 --- a/web/client-backplane/src/main/java/io/deephaven/javascript/proto/dhinternal/grpcweb/transports/transport/TransportOptions.java +++ b/web/client-backplane/src/main/java/io/deephaven/javascript/proto/dhinternal/grpcweb/transports/transport/TransportOptions.java @@ -33,7 +33,7 @@ public interface OnEndFn { @JsFunction public interface OnHeadersFn { - void onInvoke(BrowserHeaders p0, double p1); + void onInvoke(BrowserHeaders p0, int p1); } @JsOverlay From 4195d8e4f16967caf9d8b283455b3a64c2056ba7 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Wed, 27 Nov 2024 11:25:46 -0600 Subject: [PATCH 04/13] Use new api consistently, rebuilt fallback --- .../web/client/api/ConnectOptions.java | 10 +--- .../web/client/api/QueryConnectable.java | 19 ++----- .../web/client/api/WorkerConnection.java | 2 +- .../web/client/api/grpc/GrpcTransport.java | 50 ++++++++++++++++--- .../client/api/grpc/GrpcTransportFactory.java | 25 ++++++++-- .../client/api/grpc/GrpcTransportOptions.java | 15 +++++- .../grpc/MultiplexedWebsocketTransport.java | 50 +++++++++++-------- .../web/client/ide/IdeConnection.java | 26 ++++++++++ 8 files changed, 136 insertions(+), 61 deletions(-) diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java b/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java index 9801424f73c..16a3711974d 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java @@ -29,21 +29,13 @@ public class ConnectOptions { /** * Set this to true to force the use of websockets when connecting to the deephaven instance, false to force the use - * of {@code fetch}. + * of {@code fetch}. Ignored if {@link #transportFactory} is set. *

* Defaults to null, indicating that the server URL should be checked to see if we connect with fetch or websockets. */ @JsNullable public Boolean useWebsockets; - // TODO (deephaven-core#6214) provide our own grpc-web library that can replace fetch - // /** - // * Optional fetch implementation to use instead of the global {@code fetch()} call, allowing callers to provide a - // * polyfill rather than add a new global. - // */ - // @JsNullable - // public Function fetch; - /** * The transport factory to use for creating gRPC streams. If specified, the JS API will ignore * {@link #useWebsockets} and its own internal logic for determining the appropriate transport to use. diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java b/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java index 7f3937250fe..dc9a03417c7 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java @@ -7,11 +7,9 @@ import elemental2.core.JsArray; import elemental2.core.JsSet; import elemental2.promise.Promise; -import io.deephaven.javascript.proto.dhinternal.grpcweb.Grpc; import io.deephaven.javascript.proto.dhinternal.grpcweb.client.RpcOptions; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.TerminationNotificationResponse; import io.deephaven.web.client.api.event.HasEventHandling; -import io.deephaven.web.client.api.grpc.MultiplexedWebsocketTransport; import io.deephaven.web.client.ide.IdeSession; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb.*; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.ticket_pb.Ticket; @@ -246,12 +244,8 @@ public void disconnected() { public abstract void notifyServerShutdown(TerminationNotificationResponse success); - public boolean useWebsockets() { - Boolean useWebsockets = getOptions().useWebsockets; - if (useWebsockets == null) { - useWebsockets = getServerUrl().startsWith("http:"); - } - return useWebsockets; + public boolean supportsClientStreaming() { + return getOptions().transportFactory.supportsClientStreaming(); } public T createClient(BiFunction constructor) { @@ -261,14 +255,7 @@ public T createClient(BiFunction constructor) { public RpcOptions makeRpcOptions() { RpcOptions options = RpcOptions.create(); options.setDebug(getOptions().debug); - if (getOptions().transportFactory != null) { - options.setTransport(getOptions().transportFactory.adapt()); - } else if (useWebsockets()) { - // Replace with our custom websocket impl, with fallback to the built-in one - options.setTransport(o -> new MultiplexedWebsocketTransport(o, () -> { - Grpc.setDefaultTransport.onInvoke(Grpc.WebsocketTransport.onInvoke()); - })); - } // else default to fetch implementation + options.setTransport(getOptions().transportFactory.adapt()); return options; } } diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java b/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java index f8212bb05ac..4ea02498a51 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java @@ -986,7 +986,7 @@ public BrowserHeaders metadata() { } public BiDiStream.Factory streamFactory() { - return new BiDiStream.Factory<>(info.useWebsockets(), this::metadata, config::newTicketInt); + return new BiDiStream.Factory<>(info.supportsClientStreaming(), this::metadata, config::newTicketInt); } public Promise newTable(String[] columnNames, String[] types, Object[][] data, String userTimeZone, diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransport.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransport.java index bfb7e26fde9..b49281145c8 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransport.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransport.java @@ -1,42 +1,76 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// package io.deephaven.web.client.api.grpc; import com.vertispan.tsdefs.annotations.TsInterface; import elemental2.core.JsArray; import elemental2.core.Uint8Array; +import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders; +import io.deephaven.javascript.proto.dhinternal.grpcweb.grpc.Transport; +import jsinterop.annotations.JsIgnore; +import jsinterop.annotations.JsOverlay; import jsinterop.annotations.JsType; import jsinterop.base.JsPropertyMap; /** * gRPC transport implementation. + * */ @JsType(namespace = "dh.grpc") @TsInterface public interface GrpcTransport { - /** - * Return true to signal that the client may have {@link #sendMessage(Uint8Array)} called on it more than once before {@link #finishSend()} should be called. - * @return true to signal that the implementation can stream multiple messages, false otherwise indicating that Open/Next gRPC calls should be used - */ - boolean supportsClientStreaming(); /** * Starts the stream, sending metadata to the server. + * * @param metadata the headers to send the server when opening the connection */ void start(JsPropertyMap> metadata); /** * Sends a message to the server. + * * @param msgBytes bytes to send to the server */ void sendMessage(Uint8Array msgBytes); /** - * "Half close" the stream, signaling to the server that no more messages will be sent, but that the client is still open to receiving messages. + * "Half close" the stream, signaling to the server that no more messages will be sent, but that the client is still + * open to receiving messages. */ void finishSend(); /** - * End the stream, both notifying the server that no more messages will be sent nor received, and preventing - * the client from receiving any more events. + * End the stream, both notifying the server that no more messages will be sent nor received, and preventing the + * client from receiving any more events. */ void cancel(); + + /** + * Helper to transform ts implementations to our own api. + */ + @JsIgnore + static GrpcTransport from(Transport tsTransport) { + return new GrpcTransport() { + @Override + public void start(JsPropertyMap> metadata) { + tsTransport.start(new BrowserHeaders(metadata)); + } + + @Override + public void sendMessage(Uint8Array msgBytes) { + tsTransport.sendMessage(msgBytes); + } + + @Override + public void finishSend() { + tsTransport.finishSend(); + } + + @Override + public void cancel() { + tsTransport.cancel(); + } + }; + } } diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java index 58a06414af3..e1b6a631136 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java @@ -1,28 +1,43 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// package io.deephaven.web.client.api.grpc; +import com.vertispan.tsdefs.annotations.TsInterface; import elemental2.core.Uint8Array; import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders; import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.Transport; import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.TransportFactory; -import jsinterop.annotations.JsFunction; -import jsinterop.annotations.JsOverlay; +import jsinterop.annotations.JsIgnore; +import jsinterop.annotations.JsType; /** * Factory for creating gRPC transports. */ -@JsFunction +@TsInterface +@JsType(namespace = "dh.grpc") public interface GrpcTransportFactory { /** * Create a new transport instance. + * * @param options options for creating the transport - * @return a transport to use for gRPC communication + * @return a transport instance to use for gRPC communication */ GrpcTransport create(GrpcTransportOptions options); + /** + * Return true to signal that created transports may have {@link GrpcTransport#sendMessage(Uint8Array)} called on it + * more than once before {@link GrpcTransport#finishSend()} should be called. + * + * @return true to signal that the implementation can stream multiple messages, false otherwise indicating that + * Open/Next gRPC calls should be used + */ + boolean supportsClientStreaming(); + /** * Adapt this factory to the transport factory used by the gRPC-web library. */ - @JsOverlay + @JsIgnore default TransportFactory adapt() { return options -> { GrpcTransport impl = create(GrpcTransportOptions.from(options)); diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportOptions.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportOptions.java index a7177cdc523..cc0c40f0d89 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportOptions.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportOptions.java @@ -1,3 +1,6 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// package io.deephaven.web.client.api.grpc; import com.vertispan.tsdefs.annotations.TsInterface; @@ -48,7 +51,8 @@ public interface OnEndCallback { public boolean debug; /** - * Callback for when headers and status are received. The headers are a map of header names to values, and the status is the HTTP status code. If the connection could not be made, the status should be 0. + * Callback for when headers and status are received. The headers are a map of header names to values, and the + * status is the HTTP status code. If the connection could not be made, the status should be 0. */ public OnHeadersCallback onHeaders; @@ -58,10 +62,17 @@ public interface OnEndCallback { public OnChunkCallback onChunk; /** - * Callback for when the stream ends, with an error instance if it can be provided. Note that the present implementation does not consume errors, even if provided. + * Callback for when the stream ends, with an error instance if it can be provided. Note that the present + * implementation does not consume errors, even if provided. */ public OnEndCallback onEnd; + /** + * Internal copy of options, to be used for fallback. + */ + @JsIgnore + public TransportOptions originalOptions; + /** * Convert a {@link TransportOptions} instance to a {@link GrpcTransportOptions} instance. */ diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java index dfb0fd7cea4..d97a202e167 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java @@ -6,6 +6,7 @@ import elemental2.core.ArrayBuffer; import elemental2.core.DataView; import elemental2.core.Int8Array; +import elemental2.core.JsArray; import elemental2.core.JsError; import elemental2.core.Uint8Array; import elemental2.dom.CloseEvent; @@ -17,10 +18,10 @@ import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders; import io.deephaven.javascript.proto.dhinternal.grpcweb.Grpc; import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.Transport; -import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.TransportOptions; import io.deephaven.web.client.api.JsLazy; import io.deephaven.web.shared.fu.JsRunnable; import jsinterop.base.Js; +import jsinterop.base.JsPropertyMap; import java.util.ArrayList; import java.util.HashMap; @@ -32,11 +33,23 @@ * equal, this transport should be preferred to the default grpc-websockets transport, and in turn the fetch based * transport is usually superior to this. */ -public class MultiplexedWebsocketTransport implements Transport { +public class MultiplexedWebsocketTransport implements GrpcTransport { public static final String MULTIPLEX_PROTOCOL = "grpc-websockets-multiplex"; public static final String SOCKET_PER_STREAM_PROTOCOL = "grpc-websockets"; + public static class Factory implements GrpcTransportFactory { + @Override + public GrpcTransport create(GrpcTransportOptions options) { + return new MultiplexedWebsocketTransport(options); + } + + @Override + public boolean supportsClientStreaming() { + return true; + } + } + private static Uint8Array encodeASCII(String str) { Uint8Array encoded = new Uint8Array(str.length()); for (int i = 0; i < str.length(); i++) { @@ -55,9 +68,9 @@ private interface QueuedEntry { public static class HeaderFrame implements QueuedEntry { private final String path; - private final BrowserHeaders metadata; + private final JsPropertyMap> metadata; - public HeaderFrame(String path, BrowserHeaders metadata) { + public HeaderFrame(String path, JsPropertyMap> metadata) { this.path = path; this.metadata = metadata; } @@ -66,9 +79,9 @@ public HeaderFrame(String path, BrowserHeaders metadata) { public void send(WebSocket webSocket, int streamId) { final Uint8Array headerBytes; final StringBuilder str = new StringBuilder(); - metadata.append("grpc-websockets-path", path); - metadata.forEach((key, value) -> { - str.append(key).append(": ").append(value.join(", ")).append("\r\n"); + metadata.set("grpc-websockets-path", JsArray.of(path)); + metadata.forEach((key) -> { + str.append(key).append(": ").append(metadata.get(key).join(", ")).append("\r\n"); }); headerBytes = encodeASCII(str.toString()); Int8Array payload = new Int8Array(headerBytes.byteLength + 4); @@ -79,7 +92,7 @@ public void send(WebSocket webSocket, int streamId) { @Override public void sendFallback(Transport transport) { - transport.start(metadata); + transport.start(new BrowserHeaders(metadata)); } } @@ -201,16 +214,16 @@ private void release() { private ActiveTransport transport; private final int streamId = nextStreamId++; private final List sendQueue = new ArrayList<>(); - private final TransportOptions options; + private final GrpcTransportOptions options; private final String path; private final JsLazy alternativeTransport; private JsRunnable cleanup = JsRunnable.doNothing(); - public MultiplexedWebsocketTransport(TransportOptions options, JsRunnable avoidMultiplexCallback) { + public MultiplexedWebsocketTransport(GrpcTransportOptions options) { this.options = options; - String url = options.getUrl(); + String url = options.url.toString(); URL urlWrapper = new URL(url); // preserve the path to send as metadata, but still talk to the server with that path path = urlWrapper.pathname.substring(1); @@ -220,16 +233,13 @@ public MultiplexedWebsocketTransport(TransportOptions options, JsRunnable avoidM transport = ActiveTransport.get(url); // prepare a fallback - alternativeTransport = new JsLazy<>(() -> { - avoidMultiplexCallback.run(); - return Grpc.WebsocketTransport.onInvoke().onInvoke(options); - }); + alternativeTransport = new JsLazy<>(() -> Grpc.WebsocketTransport.onInvoke().onInvoke(options.originalOptions)); } @Override - public void start(BrowserHeaders metadata) { + public void start(JsPropertyMap> metadata) { if (alternativeTransport.isAvailable()) { - alternativeTransport.get().start(metadata); + alternativeTransport.get().start(new BrowserHeaders(metadata)); return; } this.transport.retain(); @@ -325,7 +335,7 @@ private void onClose(Event event) { return; } // each grpc transport will handle this as an error - options.getOnEnd().onInvoke(new JsError("Unexpectedly closed " + Js.uncheckedCast(event).reason)); + options.onEnd.onEnd(new JsError("Unexpectedly closed " + Js.uncheckedCast(event).reason)); removeHandlers(); } @@ -345,9 +355,9 @@ private void onMessage(Event event) { closed = false; } if (streamId == this.streamId) { - options.getOnChunk().onInvoke(new Uint8Array(messageEvent.data, 4), false); + options.onChunk.onChunk(new Uint8Array(messageEvent.data, 4), false); if (closed) { - options.getOnEnd().onInvoke(null); + options.onEnd.onEnd(null); removeHandlers(); } } diff --git a/web/client-api/src/main/java/io/deephaven/web/client/ide/IdeConnection.java b/web/client-api/src/main/java/io/deephaven/web/client/ide/IdeConnection.java index bea6aacbba5..86d1361ec4f 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/ide/IdeConnection.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/ide/IdeConnection.java @@ -7,7 +7,10 @@ import elemental2.core.JsArray; import elemental2.promise.Promise; import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders; +import io.deephaven.javascript.proto.dhinternal.grpcweb.Grpc; import io.deephaven.javascript.proto.dhinternal.grpcweb.grpc.Code; +import io.deephaven.javascript.proto.dhinternal.grpcweb.grpc.Transport; +import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.TransportOptions; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.TerminationNotificationResponse; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.terminationnotificationresponse.StackTrace; import io.deephaven.web.client.api.ConnectOptions; @@ -16,6 +19,10 @@ import io.deephaven.web.client.api.barrage.stream.ResponseStreamWrapper; import io.deephaven.web.client.api.console.JsVariableChanges; import io.deephaven.web.client.api.console.JsVariableDescriptor; +import io.deephaven.web.client.api.grpc.GrpcTransport; +import io.deephaven.web.client.api.grpc.GrpcTransportFactory; +import io.deephaven.web.client.api.grpc.GrpcTransportOptions; +import io.deephaven.web.client.api.grpc.MultiplexedWebsocketTransport; import io.deephaven.web.shared.data.ConnectToken; import io.deephaven.web.shared.fu.JsConsumer; import io.deephaven.web.shared.fu.JsRunnable; @@ -57,6 +64,25 @@ public IdeConnection(String serverUrl, Object connectOptions) { } else { options = new ConnectOptions(); } + if (options.transportFactory == null) { + // assign a default transport factory + if (options.useWebsockets == Boolean.TRUE || !serverUrl.startsWith("https:")) { + options.transportFactory = new MultiplexedWebsocketTransport.Factory(); + } else { + options.transportFactory = new GrpcTransportFactory() { + @Override + public GrpcTransport create(GrpcTransportOptions options) { + return GrpcTransport + .from((Transport) Grpc.FetchReadableStreamTransport.onInvoke(new Object()).onInvoke((TransportOptions) options)); + } + + @Override + public boolean supportsClientStreaming() { + return false; + } + }; + } + } } @Override From dc85cd37f02e31e163c516f00ffccbd9976e09ab Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Wed, 27 Nov 2024 11:33:28 -0600 Subject: [PATCH 05/13] Make the accessor method be a property instead --- .../java/io/deephaven/web/client/api/QueryConnectable.java | 2 +- .../deephaven/web/client/api/grpc/GrpcTransportFactory.java | 4 +++- .../web/client/api/grpc/MultiplexedWebsocketTransport.java | 2 +- .../main/java/io/deephaven/web/client/ide/IdeConnection.java | 2 +- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java b/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java index dc9a03417c7..2a73b357a5e 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java @@ -245,7 +245,7 @@ public void disconnected() { public abstract void notifyServerShutdown(TerminationNotificationResponse success); public boolean supportsClientStreaming() { - return getOptions().transportFactory.supportsClientStreaming(); + return getOptions().transportFactory.getSupportsClientStreaming(); } public T createClient(BiFunction constructor) { diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java index e1b6a631136..a3479e2ff54 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java @@ -9,6 +9,7 @@ import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.Transport; import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.TransportFactory; import jsinterop.annotations.JsIgnore; +import jsinterop.annotations.JsProperty; import jsinterop.annotations.JsType; /** @@ -32,7 +33,8 @@ public interface GrpcTransportFactory { * @return true to signal that the implementation can stream multiple messages, false otherwise indicating that * Open/Next gRPC calls should be used */ - boolean supportsClientStreaming(); + @JsProperty + boolean getSupportsClientStreaming(); /** * Adapt this factory to the transport factory used by the gRPC-web library. diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java index d97a202e167..4d57f04d9d9 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java @@ -45,7 +45,7 @@ public GrpcTransport create(GrpcTransportOptions options) { } @Override - public boolean supportsClientStreaming() { + public boolean getSupportsClientStreaming() { return true; } } diff --git a/web/client-api/src/main/java/io/deephaven/web/client/ide/IdeConnection.java b/web/client-api/src/main/java/io/deephaven/web/client/ide/IdeConnection.java index 86d1361ec4f..13d3bcc7d4a 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/ide/IdeConnection.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/ide/IdeConnection.java @@ -77,7 +77,7 @@ public GrpcTransport create(GrpcTransportOptions options) { } @Override - public boolean supportsClientStreaming() { + public boolean getSupportsClientStreaming() { return false; } }; From e77b1dfe85e025fe1157c37e71c1005ae941af76 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Fri, 6 Dec 2024 11:56:35 -0600 Subject: [PATCH 06/13] Refine types after iterating with Brian --- .../web/client/api/grpc/GrpcTransport.java | 6 +-- .../client/api/grpc/GrpcTransportFactory.java | 3 +- .../client/api/grpc/GrpcTransportOptions.java | 6 +-- .../web/client/api/grpc/HeaderValueUnion.java | 39 +++++++++++++++++++ .../grpc/MultiplexedWebsocketTransport.java | 16 +++++--- 5 files changed, 56 insertions(+), 14 deletions(-) create mode 100644 web/client-api/src/main/java/io/deephaven/web/client/api/grpc/HeaderValueUnion.java diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransport.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransport.java index b49281145c8..d51bfcb33d3 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransport.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransport.java @@ -4,12 +4,10 @@ package io.deephaven.web.client.api.grpc; import com.vertispan.tsdefs.annotations.TsInterface; -import elemental2.core.JsArray; import elemental2.core.Uint8Array; import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders; import io.deephaven.javascript.proto.dhinternal.grpcweb.grpc.Transport; import jsinterop.annotations.JsIgnore; -import jsinterop.annotations.JsOverlay; import jsinterop.annotations.JsType; import jsinterop.base.JsPropertyMap; @@ -25,7 +23,7 @@ public interface GrpcTransport { * * @param metadata the headers to send the server when opening the connection */ - void start(JsPropertyMap> metadata); + void start(JsPropertyMap metadata); /** * Sends a message to the server. @@ -53,7 +51,7 @@ public interface GrpcTransport { static GrpcTransport from(Transport tsTransport) { return new GrpcTransport() { @Override - public void start(JsPropertyMap> metadata) { + public void start(JsPropertyMap metadata) { tsTransport.start(new BrowserHeaders(metadata)); } diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java index a3479e2ff54..66ddcfb949a 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java @@ -11,6 +11,7 @@ import jsinterop.annotations.JsIgnore; import jsinterop.annotations.JsProperty; import jsinterop.annotations.JsType; +import jsinterop.base.Js; /** * Factory for creating gRPC transports. @@ -61,7 +62,7 @@ public void sendMessage(Uint8Array msgBytes) { @Override public void start(BrowserHeaders metadata) { - impl.start(metadata.headersMap); + impl.start(Js.cast(metadata.headersMap)); } }; }; diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportOptions.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportOptions.java index cc0c40f0d89..2e1a8139ae1 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportOptions.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportOptions.java @@ -4,7 +4,6 @@ package io.deephaven.web.client.api.grpc; import com.vertispan.tsdefs.annotations.TsInterface; -import elemental2.core.JsArray; import elemental2.core.JsError; import elemental2.core.Uint8Array; import elemental2.dom.URL; @@ -13,6 +12,7 @@ import jsinterop.annotations.JsFunction; import jsinterop.annotations.JsIgnore; import jsinterop.annotations.JsNullable; +import jsinterop.annotations.JsOptional; import jsinterop.annotations.JsType; import jsinterop.base.JsPropertyMap; @@ -25,7 +25,7 @@ public class GrpcTransportOptions { @JsFunction @FunctionalInterface public interface OnHeadersCallback { - void onHeaders(JsPropertyMap> headers, int status); + void onHeaders(JsPropertyMap headers, int status); } @JsFunction @@ -37,7 +37,7 @@ public interface OnChunkCallback { @JsFunction @FunctionalInterface public interface OnEndCallback { - void onEnd(@JsNullable JsError error); + void onEnd(@JsOptional @JsNullable JsError error); } /** diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/HeaderValueUnion.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/HeaderValueUnion.java new file mode 100644 index 00000000000..ebc9306dd05 --- /dev/null +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/HeaderValueUnion.java @@ -0,0 +1,39 @@ +package io.deephaven.web.client.api.grpc; + +import com.vertispan.tsdefs.annotations.TsUnion; +import com.vertispan.tsdefs.annotations.TsUnionMember; +import elemental2.core.JsArray; +import javaemul.internal.annotations.DoNotAutobox; +import jsinterop.annotations.JsOverlay; +import jsinterop.annotations.JsPackage; +import jsinterop.annotations.JsType; +import jsinterop.base.Js; + +/** + * Union of string and array of string, as node/browser APIs tend to accept either for http headers. + */ +@TsUnion +@JsType(name = "?", namespace = JsPackage.GLOBAL, isNative = true) +public interface HeaderValueUnion { + @JsOverlay + static HeaderValueUnion of(@DoNotAutobox Object value) { + return Js.cast(value); + } + + @JsOverlay + default boolean isArray() { + return JsArray.isArray(this); + } + + @TsUnionMember + @JsOverlay + default String asString() { + return Js.cast(this); + } + + @TsUnionMember + @JsOverlay + default JsArray asArray() { + return Js.cast(this); + } +} diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java index 4d57f04d9d9..786d5875cc3 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java @@ -6,7 +6,6 @@ import elemental2.core.ArrayBuffer; import elemental2.core.DataView; import elemental2.core.Int8Array; -import elemental2.core.JsArray; import elemental2.core.JsError; import elemental2.core.Uint8Array; import elemental2.dom.CloseEvent; @@ -68,9 +67,9 @@ private interface QueuedEntry { public static class HeaderFrame implements QueuedEntry { private final String path; - private final JsPropertyMap> metadata; + private final JsPropertyMap metadata; - public HeaderFrame(String path, JsPropertyMap> metadata) { + public HeaderFrame(String path, JsPropertyMap metadata) { this.path = path; this.metadata = metadata; } @@ -79,9 +78,14 @@ public HeaderFrame(String path, JsPropertyMap> metadata) { public void send(WebSocket webSocket, int streamId) { final Uint8Array headerBytes; final StringBuilder str = new StringBuilder(); - metadata.set("grpc-websockets-path", JsArray.of(path)); + metadata.set("grpc-websockets-path", HeaderValueUnion.of(path)); metadata.forEach((key) -> { - str.append(key).append(": ").append(metadata.get(key).join(", ")).append("\r\n"); + HeaderValueUnion value = metadata.get(key); + if (value.isArray()) { + str.append(key).append(": ").append(value.asArray().join(", ")).append("\r\n"); + } else { + str.append(key).append(": ").append(value.asString()).append("\r\n"); + } }); headerBytes = encodeASCII(str.toString()); Int8Array payload = new Int8Array(headerBytes.byteLength + 4); @@ -237,7 +241,7 @@ public MultiplexedWebsocketTransport(GrpcTransportOptions options) { } @Override - public void start(JsPropertyMap> metadata) { + public void start(JsPropertyMap metadata) { if (alternativeTransport.isAvailable()) { alternativeTransport.get().start(new BrowserHeaders(metadata)); return; From c32ccdbd3613c61689b3e2a62aad8a8468c5ef6f Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Fri, 6 Dec 2024 12:12:00 -0600 Subject: [PATCH 07/13] Remove "finished"/"flush" param, as it is unused --- .../web/client/api/grpc/GrpcTransportOptions.java | 7 +++++-- .../web/client/api/grpc/MultiplexedWebsocketTransport.java | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportOptions.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportOptions.java index 2e1a8139ae1..8853471a98e 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportOptions.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportOptions.java @@ -31,7 +31,7 @@ public interface OnHeadersCallback { @JsFunction @FunctionalInterface public interface OnChunkCallback { - void onChunk(Uint8Array chunk, boolean finished); + void onChunk(Uint8Array chunk); } @JsFunction @@ -82,7 +82,10 @@ public static GrpcTransportOptions from(TransportOptions options) { impl.url = new URL(options.getUrl()); impl.debug = options.isDebug(); impl.onHeaders = (headers, status) -> options.getOnHeaders().onInvoke(new BrowserHeaders(headers), status); - impl.onChunk = options.getOnChunk()::onInvoke; + impl.onChunk = p0 -> { + // "false" because the underlying implementation doesn't rely on this anyway. + options.getOnChunk().onInvoke(p0, false); + }; impl.onEnd = options.getOnEnd()::onInvoke; return impl; } diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java index 786d5875cc3..a40cf532e1b 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java @@ -359,7 +359,7 @@ private void onMessage(Event event) { closed = false; } if (streamId == this.streamId) { - options.onChunk.onChunk(new Uint8Array(messageEvent.data, 4), false); + options.onChunk.onChunk(new Uint8Array(messageEvent.data, 4)); if (closed) { options.onEnd.onEnd(null); removeHandlers(); From 90e4a72532a1713215bbf72b769d934ac85431ae Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Mon, 9 Dec 2024 13:01:53 -0600 Subject: [PATCH 08/13] Fix js contract issues, with simple http2/https test --- .../client/api/grpc/GrpcTransportFactory.java | 6 +- .../client/api/grpc/GrpcTransportGwtTest.java | 127 ++++++++++++++++++ 2 files changed, 130 insertions(+), 3 deletions(-) create mode 100644 web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportGwtTest.java diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java index 66ddcfb949a..978e1a36426 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java @@ -8,7 +8,7 @@ import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders; import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.Transport; import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.TransportFactory; -import jsinterop.annotations.JsIgnore; +import jsinterop.annotations.JsOverlay; import jsinterop.annotations.JsProperty; import jsinterop.annotations.JsType; import jsinterop.base.Js; @@ -17,7 +17,7 @@ * Factory for creating gRPC transports. */ @TsInterface -@JsType(namespace = "dh.grpc") +@JsType(namespace = "dh.grpc", isNative = true) public interface GrpcTransportFactory { /** * Create a new transport instance. @@ -40,7 +40,7 @@ public interface GrpcTransportFactory { /** * Adapt this factory to the transport factory used by the gRPC-web library. */ - @JsIgnore + @JsOverlay default TransportFactory adapt() { return options -> { GrpcTransport impl = create(GrpcTransportOptions.from(options)); diff --git a/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportGwtTest.java b/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportGwtTest.java new file mode 100644 index 00000000000..bba539fdfac --- /dev/null +++ b/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportGwtTest.java @@ -0,0 +1,127 @@ +package io.deephaven.web.client.api.grpc; + +import elemental2.promise.Promise; +import io.deephaven.web.client.api.AbstractAsyncGwtTestCase; +import io.deephaven.web.client.api.ConnectOptions; +import io.deephaven.web.client.api.CoreClient; +import jsinterop.base.JsPropertyMap; + +/** + * Simple test to verify we can produce custom transports in JS. Only works with https, which means it can only + * be run manually at this time or it will triviall succeed. + */ +public class GrpcTransportGwtTest extends AbstractAsyncGwtTestCase { + @Override + public String getModuleName() { + return "io.deephaven.web.DeephavenIntegrationTest"; + } + + /** + * Simple fetch impl, with no cancelation handling. + */ + public native GrpcTransportFactory makeFetchTransportFactory() /*-{ + return { + create: function(options) { + function pump(reader, res) { + reader.read().then(function(result) { + debugger; + if (result.done) { + options.onEnd(); + } else { + options.onChunk(result.value); + pump(reader, res); + } + })['catch'](function(e) { + debugger; + options.onEnd(e); + }); + } + return { + start: function(metadata) { + this.metadata = metadata; + }, + sendMessage: function(msgBytes) { + var fetchInit = { + headers: new Headers(this.metadata), + method: "POST", + body: msgBytes, + }; + $wnd.fetch(options.url.href, fetchInit).then(function(response) { + var m = {}; + response.headers.forEach(function(value, key) { + m[key] = value; + }); + options.onHeaders(m, response.status); + if (response.body) { + pump(response.body.getReader(), response); + } + return response; + })['catch'](function(e) { + debugger; + options.onEnd(e); + }); + }, + finishSend: function() { + // no-op + }, + cancel: function() { + // no-op + } + }; + }, + supportsClientStreaming: false + }; + }-*/; + + public void testFetchGrpcTransport() { + if (!localServer.startsWith("https:")) { + // We're using h2, so we need to be on https for our current implementation + return; + } + setupDhInternal().then(ignore -> { + ConnectOptions connectOptions = new ConnectOptions(); + connectOptions.transportFactory = makeFetchTransportFactory(); + CoreClient coreClient = new CoreClient(localServer, connectOptions); + return coreClient.login(JsPropertyMap.of("type", CoreClient.LOGIN_TYPE_ANONYMOUS)) + .then(ignore2 -> Promise.resolve(coreClient)); + }).then(this::finish).catch_(this::report); + } + + /** + * Dummy transport that just sends a single message and receives a single message. Doesn't actually talk to + * the server, headers are empty, and the message is always 5 byte proto payload "no data". + */ + private native GrpcTransportFactory makeDummyTransportFactory() /*-{ + return { + create: function(options) { + return { + start: function(metadata) { + // no-op + $wnd.setTimeout(function() {options.onHeaders({}, 200);}, 0); + }, + sendMessage: function(msgBytes) { + // no-op + $wnd.setTimeout(function() {options.onChunk(new Uint8Array(5));}, 0); + }, + finishSend: function() { + // no-op + }, + cancel: function() { + // no-op + } + }; + }, + supportsClientStreaming: true + }; + }-*/; + +// public void testDummyGrpcTransport() { +// setupDhInternal().then(ignore -> { +// ConnectOptions connectOptions = new ConnectOptions(); +// connectOptions.transportFactory = makeDummyTransportFactory(); +// CoreClient coreClient = new CoreClient(localServer, connectOptions); +// return coreClient.login(JsPropertyMap.of("type", CoreClient.LOGIN_TYPE_ANONYMOUS)) +// .then(ignore2 -> Promise.resolve(coreClient)); +// }).then(this::finish).catch_(this::report); +// } +} From a35b0e3a672dfb7eecec007eb008e85b6a0e764a Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Mon, 9 Dec 2024 21:27:59 -0600 Subject: [PATCH 09/13] Improve dummy test, still suppressed --- .../client/api/grpc/GrpcTransportGwtTest.java | 39 ++++++++++++------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportGwtTest.java b/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportGwtTest.java index bba539fdfac..0889435dd35 100644 --- a/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportGwtTest.java +++ b/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportGwtTest.java @@ -79,6 +79,7 @@ public void testFetchGrpcTransport() { return; } setupDhInternal().then(ignore -> { + delayTestFinish(7101); ConnectOptions connectOptions = new ConnectOptions(); connectOptions.transportFactory = makeFetchTransportFactory(); CoreClient coreClient = new CoreClient(localServer, connectOptions); @@ -89,19 +90,29 @@ public void testFetchGrpcTransport() { /** * Dummy transport that just sends a single message and receives a single message. Doesn't actually talk to - * the server, headers are empty, and the message is always 5 byte proto payload "no data". + * the server, headers are empty, and the message is always 5 byte proto payload "no data", followed by + * trailers signifying success. */ private native GrpcTransportFactory makeDummyTransportFactory() /*-{ return { create: function(options) { return { start: function(metadata) { - // no-op + // empty headers $wnd.setTimeout(function() {options.onHeaders({}, 200);}, 0); }, sendMessage: function(msgBytes) { - // no-op - $wnd.setTimeout(function() {options.onChunk(new Uint8Array(5));}, 0); + // empty payload + var empty = new $wnd.Uint8Array(5); + var successTrailers = new $wnd.Uint8Array(5); + successTrailers[0] = 128; + new TextEncoding('utf-8').encodeInto('grpc-status:1', successTrailers.subarray(1)); + + $wnd.setTimeout(function() { + options.onChunk(empty); + debugger; + options.onChunk(successTrailers); + }, 0); }, finishSend: function() { // no-op @@ -115,13 +126,15 @@ private native GrpcTransportFactory makeDummyTransportFactory() /*-{ }; }-*/; -// public void testDummyGrpcTransport() { -// setupDhInternal().then(ignore -> { -// ConnectOptions connectOptions = new ConnectOptions(); -// connectOptions.transportFactory = makeDummyTransportFactory(); -// CoreClient coreClient = new CoreClient(localServer, connectOptions); -// return coreClient.login(JsPropertyMap.of("type", CoreClient.LOGIN_TYPE_ANONYMOUS)) -// .then(ignore2 -> Promise.resolve(coreClient)); -// }).then(this::finish).catch_(this::report); -// } + public void ignore_testDummyGrpcTransport() { + setupDhInternal().then(ignore -> { + delayTestFinish(7102); + ConnectOptions connectOptions = new ConnectOptions(); + connectOptions.transportFactory = makeDummyTransportFactory(); + connectOptions.debug = true; + CoreClient coreClient = new CoreClient(localServer, connectOptions); + return coreClient.login(JsPropertyMap.of("type", CoreClient.LOGIN_TYPE_ANONYMOUS)) + .then(ignore2 -> Promise.resolve(coreClient)); + }).then(this::finish).catch_(this::report); + } } From 976539716e2b4e140bf3f983083757b48709125f Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Tue, 10 Dec 2024 11:32:50 -0600 Subject: [PATCH 10/13] Second working test, with no server component --- .../client/api/grpc/GrpcTransportGwtTest.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportGwtTest.java b/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportGwtTest.java index 0889435dd35..00fc4e8a52d 100644 --- a/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportGwtTest.java +++ b/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportGwtTest.java @@ -8,7 +8,7 @@ /** * Simple test to verify we can produce custom transports in JS. Only works with https, which means it can only - * be run manually at this time or it will triviall succeed. + * be run manually at this time, or it will trivially succeed. */ public class GrpcTransportGwtTest extends AbstractAsyncGwtTestCase { @Override @@ -24,7 +24,6 @@ public native GrpcTransportFactory makeFetchTransportFactory() /*-{ create: function(options) { function pump(reader, res) { reader.read().then(function(result) { - debugger; if (result.done) { options.onEnd(); } else { @@ -32,7 +31,6 @@ function pump(reader, res) { pump(reader, res); } })['catch'](function(e) { - debugger; options.onEnd(e); }); } @@ -57,7 +55,6 @@ function pump(reader, res) { } return response; })['catch'](function(e) { - debugger; options.onEnd(e); }); }, @@ -104,14 +101,17 @@ private native GrpcTransportFactory makeDummyTransportFactory() /*-{ sendMessage: function(msgBytes) { // empty payload var empty = new $wnd.Uint8Array(5); - var successTrailers = new $wnd.Uint8Array(5); + // successful trailer payload + var trailersString = 'grpc-status:0'; + var successTrailers = new $wnd.Uint8Array(5 + trailersString.length); successTrailers[0] = 128; - new TextEncoding('utf-8').encodeInto('grpc-status:1', successTrailers.subarray(1)); - + successTrailers[4] = trailersString.length; + new $wnd.TextEncoder('utf-8').encodeInto(trailersString, successTrailers.subarray(5)); $wnd.setTimeout(function() { + // delay a bit, then send the empty messages and end the stream options.onChunk(empty); - debugger; options.onChunk(successTrailers); + options.onEnd(); }, 0); }, finishSend: function() { @@ -126,7 +126,7 @@ private native GrpcTransportFactory makeDummyTransportFactory() /*-{ }; }-*/; - public void ignore_testDummyGrpcTransport() { + public void testDummyGrpcTransport() { setupDhInternal().then(ignore -> { delayTestFinish(7102); ConnectOptions connectOptions = new ConnectOptions(); From 5ada1ef94fc30fa6bdbcbe5c1ec15f885fd326e8 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Tue, 10 Dec 2024 12:30:18 -0600 Subject: [PATCH 11/13] Spotless --- .../web/client/api/grpc/HeaderValueUnion.java | 3 +++ .../deephaven/web/client/ide/IdeConnection.java | 3 ++- .../web/client/api/grpc/GrpcTransportGwtTest.java | 15 +++++++++------ 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/HeaderValueUnion.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/HeaderValueUnion.java index ebc9306dd05..9d64a7c0402 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/HeaderValueUnion.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/HeaderValueUnion.java @@ -1,3 +1,6 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// package io.deephaven.web.client.api.grpc; import com.vertispan.tsdefs.annotations.TsUnion; diff --git a/web/client-api/src/main/java/io/deephaven/web/client/ide/IdeConnection.java b/web/client-api/src/main/java/io/deephaven/web/client/ide/IdeConnection.java index 13d3bcc7d4a..1ea9412d769 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/ide/IdeConnection.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/ide/IdeConnection.java @@ -73,7 +73,8 @@ public IdeConnection(String serverUrl, Object connectOptions) { @Override public GrpcTransport create(GrpcTransportOptions options) { return GrpcTransport - .from((Transport) Grpc.FetchReadableStreamTransport.onInvoke(new Object()).onInvoke((TransportOptions) options)); + .from((Transport) Grpc.FetchReadableStreamTransport.onInvoke(new Object()) + .onInvoke((TransportOptions) options)); } @Override diff --git a/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportGwtTest.java b/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportGwtTest.java index 00fc4e8a52d..a5b3a952792 100644 --- a/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportGwtTest.java +++ b/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportGwtTest.java @@ -1,3 +1,6 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// package io.deephaven.web.client.api.grpc; import elemental2.promise.Promise; @@ -7,8 +10,8 @@ import jsinterop.base.JsPropertyMap; /** - * Simple test to verify we can produce custom transports in JS. Only works with https, which means it can only - * be run manually at this time, or it will trivially succeed. + * Simple test to verify we can produce custom transports in JS. Only works with https, which means it can only be run + * manually at this time, or it will trivially succeed. */ public class GrpcTransportGwtTest extends AbstractAsyncGwtTestCase { @Override @@ -79,16 +82,16 @@ public void testFetchGrpcTransport() { delayTestFinish(7101); ConnectOptions connectOptions = new ConnectOptions(); connectOptions.transportFactory = makeFetchTransportFactory(); - CoreClient coreClient = new CoreClient(localServer, connectOptions); + CoreClient coreClient = new CoreClient(localServer, connectOptions); return coreClient.login(JsPropertyMap.of("type", CoreClient.LOGIN_TYPE_ANONYMOUS)) .then(ignore2 -> Promise.resolve(coreClient)); }).then(this::finish).catch_(this::report); } /** - * Dummy transport that just sends a single message and receives a single message. Doesn't actually talk to - * the server, headers are empty, and the message is always 5 byte proto payload "no data", followed by - * trailers signifying success. + * Dummy transport that just sends a single message and receives a single message. Doesn't actually talk to the + * server, headers are empty, and the message is always 5 byte proto payload "no data", followed by trailers + * signifying success. */ private native GrpcTransportFactory makeDummyTransportFactory() /*-{ return { From 574e5d3b822e4a43c6c598c531b85b7635451a23 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Tue, 10 Dec 2024 14:09:47 -0600 Subject: [PATCH 12/13] rename 'websocket' flag to 'supports client streaming' --- .../web/client/api/barrage/stream/BiDiStream.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/stream/BiDiStream.java b/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/stream/BiDiStream.java index 5febd0a9fa4..4fc65739780 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/stream/BiDiStream.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/stream/BiDiStream.java @@ -36,12 +36,12 @@ public interface NextStreamMessageFactory { void nextStreamMessage(Req nextPayload, BrowserHeaders headers, JsBiConsumer callback); } public static class Factory { - private final boolean useWebsockets; + private final boolean supportsClientStreaming; private final Supplier headers; private final IntSupplier nextIntTicket; - public Factory(boolean useWebsockets, Supplier headers, IntSupplier nextIntTicket) { - this.useWebsockets = useWebsockets; + public Factory(boolean supportsClientStreaming, Supplier headers, IntSupplier nextIntTicket) { + this.supportsClientStreaming = supportsClientStreaming; this.headers = headers; this.nextIntTicket = nextIntTicket; } @@ -51,8 +51,8 @@ public BiDiStream create( OpenStreamFactory openEmulatedStream, NextStreamMessageFactory nextEmulatedStream, ReqT emptyReq) { - if (useWebsockets) { - return websocket(bidirectionalStream.openBiDiStream(headers.get())); + if (supportsClientStreaming) { + return bidi(bidirectionalStream.openBiDiStream(headers.get())); } else { return new EmulatedBiDiStream<>( openEmulatedStream, @@ -73,7 +73,7 @@ public static BiDiStream of( IntSupplier nextIntTicket, boolean useWebsocket) { if (useWebsocket) { - return websocket(bidirectionalStream.openBiDiStream(headers.get())); + return bidi(bidirectionalStream.openBiDiStream(headers.get())); } else { return new EmulatedBiDiStream<>( openEmulatedStream, @@ -84,7 +84,7 @@ public static BiDiStream of( } } - public static BiDiStream websocket(Object bidirectionalStream) { + public static BiDiStream bidi(Object bidirectionalStream) { return new WebsocketBiDiStream<>(Js.cast(bidirectionalStream)); } From 8544a69a2bdccf2bb822c2010dd16257780495c1 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Tue, 10 Dec 2024 15:04:35 -0600 Subject: [PATCH 13/13] Fix test naming convention --- .../test/java/io/deephaven/web/ClientIntegrationTestSuite.java | 2 ++ .../{GrpcTransportGwtTest.java => GrpcTransportTestGwt.java} | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) rename web/client-api/src/test/java/io/deephaven/web/client/api/grpc/{GrpcTransportGwtTest.java => GrpcTransportTestGwt.java} (98%) diff --git a/web/client-api/src/test/java/io/deephaven/web/ClientIntegrationTestSuite.java b/web/client-api/src/test/java/io/deephaven/web/ClientIntegrationTestSuite.java index 3c1887e8a5c..3c3aa8791ca 100644 --- a/web/client-api/src/test/java/io/deephaven/web/ClientIntegrationTestSuite.java +++ b/web/client-api/src/test/java/io/deephaven/web/ClientIntegrationTestSuite.java @@ -5,6 +5,7 @@ import com.google.gwt.junit.tools.GWTTestSuite; import io.deephaven.web.client.api.*; +import io.deephaven.web.client.api.grpc.GrpcTransportTestGwt; import io.deephaven.web.client.api.storage.JsStorageServiceTestGwt; import io.deephaven.web.client.api.subscription.ConcurrentTableTestGwt; import io.deephaven.web.client.api.subscription.ViewportTestGwt; @@ -30,6 +31,7 @@ public static Test suite() { suite.addTestSuite(JsStorageServiceTestGwt.class); suite.addTestSuite(InputTableTestGwt.class); suite.addTestSuite(ColumnStatisticsTestGwt.class); + suite.addTestSuite(GrpcTransportTestGwt.class); // This should be a unit test, but it requires a browser environment to run on GWT 2.9 // GWT 2.9 doesn't have proper bindings for Promises in HtmlUnit, so we need to use the IntegrationTest suite diff --git a/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportGwtTest.java b/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportTestGwt.java similarity index 98% rename from web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportGwtTest.java rename to web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportTestGwt.java index a5b3a952792..020c65b771a 100644 --- a/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportGwtTest.java +++ b/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportTestGwt.java @@ -13,7 +13,7 @@ * Simple test to verify we can produce custom transports in JS. Only works with https, which means it can only be run * manually at this time, or it will trivially succeed. */ -public class GrpcTransportGwtTest extends AbstractAsyncGwtTestCase { +public class GrpcTransportTestGwt extends AbstractAsyncGwtTestCase { @Override public String getModuleName() { return "io.deephaven.web.DeephavenIntegrationTest";