Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: JS API support for shared tickets #5854

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public JsPartitionedTable(WorkerConnection connection, JsWidget widget) {
this.widget = widget;
}

@JsIgnore
@Override
public WorkerConnection getConnection() {
return connection;
}

@JsIgnore
public Promise<JsPartitionedTable> refetch() {
closeSubscriptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ public Promise<JsTreeTable> rollup(@TsTypeRef(JsRollupConfig.class) Object confi
config = new JsRollupConfig(Js.cast(configObject));
}

Ticket rollupTicket = workerConnection.getConfig().newTicket();
Ticket rollupTicket = workerConnection.getTickets().newExportTicket();

Promise<Object> rollupPromise = Callbacks.grpcUnaryPromise(c -> {
RollupRequest request = config.buildRequest(getColumns());
Expand Down Expand Up @@ -1080,7 +1080,7 @@ public Promise<JsTreeTable> treeTable(@TsTypeRef(JsTreeTableConfig.class) Object
config = new JsTreeTableConfig(Js.cast(configObject));
}

Ticket treeTicket = workerConnection.getConfig().newTicket();
Ticket treeTicket = workerConnection.getTickets().newExportTicket();

Promise<Object> treePromise = Callbacks.grpcUnaryPromise(c -> {
TreeRequest requestMessage = new TreeRequest();
Expand Down Expand Up @@ -1297,7 +1297,7 @@ public Promise<JsPartitionedTable> partitionBy(Object keys, @JsOptional Boolean

// Start the partitionBy on the server - we want to get the error from here, but we'll race the fetch against
// this to avoid an extra round-trip
Ticket partitionedTableTicket = workerConnection.getConfig().newTicket();
Ticket partitionedTableTicket = workerConnection.getTickets().newExportTicket();
Promise<PartitionByResponse> partitionByPromise = Callbacks.<PartitionByResponse, Object>grpcUnaryPromise(c -> {
PartitionByRequest partitionBy = new PartitionByRequest();
partitionBy.setTableId(state().getHandle().makeTicket());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public JsTotalsTable(JsTable wrappedTable, String directive, JsArray<String> gro
this.groupBy = Js.uncheckedCast(groupBy.slice());
}

@Override
public WorkerConnection getConnection() {
return wrappedTable.getConnection();
}

public void refreshViewport() {
if (firstRow != null && lastRow != null) {
setViewport(firstRow, lastRow, Js.uncheckedCast(columns), updateIntervalMs, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,8 @@ public JsRunnable onLogMessage(JsConsumer<LogItem> callback) {
public CancellablePromise<IdeSession> startSession(String type) {
JsLog.debug("Starting", type, "console session");
LazyPromise<Ticket> promise = new LazyPromise<>();
final ClientConfiguration config = connection.get().getConfig();
final Ticket ticket = new Ticket();
ticket.setTicket(config.newTicketRaw());
final Tickets config = connection.get().getTickets();
final Ticket ticket = config.newExportTicket();

final JsRunnable closer = () -> {
boolean run = !cancelled.has(ticket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import jsinterop.annotations.JsOverlay;
import jsinterop.annotations.JsPackage;
import jsinterop.annotations.JsType;
import jsinterop.base.Js;

/**
* Indicates that this object is a local representation of an object that exists on the server. Similar to HasLifecycle,
Expand All @@ -24,13 +25,21 @@ public interface ServerObject {
@JsIgnore
TypedTicket typedTicket();

@JsIgnore
WorkerConnection getConnection();

/**
* Note that we don't explicitly use this as a union type, but sort of as a way to pretend that ServerObject is a
* sealed type with this generated set of subtypes.
*/
@JsType(name = "?", namespace = JsPackage.GLOBAL, isNative = true)
@TsUnion
interface Union {
@JsOverlay
static Union of(Object object) {
return Js.uncheckedCast(object);
}

@TsUnionMember
@JsOverlay
default JsTable asTable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,12 @@
import io.deephaven.javascript.proto.dhinternal.arrow.flight.protocol.flight_pb.FlightDescriptor;
import io.deephaven.javascript.proto.dhinternal.io.deephaven_core.proto.ticket_pb.Ticket;
import io.deephaven.javascript.proto.dhinternal.io.deephaven_core.proto.table_pb.TableReference;
import io.deephaven.web.client.api.console.JsVariableDefinition;

/**
* Replacement for TableHandle, wraps up Ticket plus current export state. We only consider the lower bytes for hashing
* (since until we've got millions of tickets it won't matter).
* Replacement for TableHandle, wraps up export tickets plus current export state. We only consider the lower bytes for
* hashing (since until we've got millions of tickets it won't matter).
*/
public class TableTicket {
public static Ticket createTicket(JsVariableDefinition varDef) {
Ticket ticket = new Ticket();
ticket.setTicket(varDef.getId());
return ticket;
}

public static TableReference createTableRef(JsVariableDefinition varDef) {
TableReference tableRef = new TableReference();
tableRef.setTicket(createTicket(varDef));
return tableRef;
}

/**
* UNKNOWN: 0, PENDING: 1, PUBLISHING: 2, QUEUED: 3, RUNNING: 4, EXPORTED: 5, RELEASED: 6, CANCELLED: 7, FAILED: 8,
Expand Down
160 changes: 160 additions & 0 deletions web/client-api/src/main/java/io/deephaven/web/client/api/Tickets.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.web.client.api;

import elemental2.core.TypedArray;
import elemental2.core.Uint8Array;
import elemental2.dom.DomGlobal;
import io.deephaven.javascript.proto.dhinternal.io.deephaven_core.proto.table_pb.TableReference;
import io.deephaven.javascript.proto.dhinternal.io.deephaven_core.proto.ticket_pb.Ticket;
import io.deephaven.web.client.api.console.JsVariableDefinition;
import jsinterop.annotations.JsMethod;
import jsinterop.annotations.JsPackage;
import jsinterop.base.Js;

/**
* Single factory for known ticket types. By definition, this cannot be exhaustive, since flight tickets have no
* inherent structure - Deephaven Core only specifies that the first byte will indicate the type of ticket, and later
* bytes will be handled by handlers for that type. Deephaven Core requires only that export tickets be support, but
* also offers application tickets, scope tickets, and shared tickets. Downstream projects may define new ticket types,
* which won't necessarily be understood by this client.
*
* @see io.deephaven.server.session.ExportTicketResolver
* @see io.deephaven.server.appmode.ApplicationTicketResolver
* @see io.deephaven.server.console.ScopeTicketResolver
* @see io.deephaven.server.session.SharedTicketResolver
*/
public class Tickets {
// Prefix for all export tickets
private static final byte EXPORT_PREFIX = 'e';
// Prefix for all application tickets
private static final byte APPLICATION_PREFIX = 'a';
// Prefix for all scope tickets
private static final byte SCOPE_PREFIX = 's';
// Prefix for all shared tickets
private static final byte SHARED_PREFIX = 'h';

// Some ticket types use a slash as a delimeter between fields
private static final char TICKET_DELIMITER = '/';

/**
* The next number to use when making an export ticket. These values must always be positive, as zero is an invalid
* value, and negative values represent server-created tickets.
*/
private int nextExport = 1;

public Tickets() {}

/**
* Utility method to create a ticket from a known-valid base64 encoding of a ticket.
* <p>
* Use caution with non-export tickets, the definition may change between calls to the server - they should be
* exported before use.
*
* @param varDef the variable definition to create a ticket from
* @return a ticket with the variable's id as the ticket bytes
*/
public static Ticket createTicket(JsVariableDefinition varDef) {
Ticket ticket = new Ticket();
ticket.setTicket(varDef.getId());
return ticket;
}

/**
* Utility method to create a ticket wrapped in a TableReference from a known-valid base64 encoding of a ticket.
* <p>
* Use caution with non-export tickets, the definition may change between calls to the server - they should be
* exported before use.
*
* @param varDef the variable definition to create a ticket from
* @return a table reference with the variable's id as the ticket bytes
*/

public static TableReference createTableRef(JsVariableDefinition varDef) {
TableReference tableRef = new TableReference();
tableRef.setTicket(createTicket(varDef));
return tableRef;
}

public static void validateScopeOrApplicationTicketBase64(String base64Bytes) {
String bytes = DomGlobal.atob(base64Bytes);
if (bytes.length() > 2) {
String prefix = bytes.substring(0, 2);
if ((prefix.charAt(0) == SCOPE_PREFIX || prefix.charAt(0) == APPLICATION_PREFIX)
&& prefix.charAt(1) == TICKET_DELIMITER) {
return;
}
}
throw new IllegalArgumentException("Cannot create a VariableDefinition from a non-scope ticket");
}

/**
* Provides the next export id for the current session as a ticket.
*
* @return a new ticket with an export id that hasn't previously been used for this session
*/
public Ticket newExportTicket() {
Ticket ticket = new Ticket();
ticket.setTicket(newExportTicketRaw());
return ticket;
}

/**
* Provides the next export id for the current session.
*
* @return the next export id
*/
public int newTicketInt() {
if (nextExport == Integer.MAX_VALUE) {
throw new IllegalStateException("Ran out of tickets!");
}

return nextExport++;
}

private Uint8Array newExportTicketRaw() {
final int exportId = newTicketInt();
final double[] dest = new double[5];
dest[0] = EXPORT_PREFIX;
dest[1] = (byte) exportId;
dest[2] = (byte) (exportId >>> 8);
dest[3] = (byte) (exportId >>> 16);
dest[4] = (byte) (exportId >>> 24);

final Uint8Array bytes = new Uint8Array(5);
bytes.set(dest);
return bytes;
}

/**
* Provides the next export id for the current session as a table ticket.
*
* @return a new table ticket with an export id that hasn't previously been used for this session
*/
public TableTicket newTableTicket() {
return new TableTicket(newExportTicketRaw());
}

/**
* Creates a shared ticket from the provided array of bytes.
* <p>
* Use caution with non-export tickets, the definition may change between calls to the server - they should be
* exported before use.
*
* @param array array of bytes to populate the ticket with
* @return a new shared ticket
*/
public Ticket sharedTicket(TypedArray.SetArrayUnionType array) {
int length = Js.asArrayLike(array).getLength();
Uint8Array bytesWithPrefix = new Uint8Array(length + 2);
// Add the shared ticket prefix at the start of the provided value
bytesWithPrefix.setAt(0, (double) SHARED_PREFIX);
bytesWithPrefix.setAt(1, (double) TICKET_DELIMITER);
bytesWithPrefix.set(array, 2);

Ticket ticket = new Ticket();
ticket.setTicket(bytesWithPrefix);
return ticket;
}
}
Loading