diff --git a/R/rdeephaven/DESCRIPTION b/R/rdeephaven/DESCRIPTION index 99188c3bdaa..335a7ab92df 100644 --- a/R/rdeephaven/DESCRIPTION +++ b/R/rdeephaven/DESCRIPTION @@ -1,7 +1,7 @@ Package: rdeephaven Type: Package Title: R Client for Deephaven Core -Version: 0.37.0 +Version: 0.37.1 Date: 2023-05-12 Author: Deephaven Data Labs Maintainer: Alex Peters diff --git a/cpp-client/deephaven/CMakeLists.txt b/cpp-client/deephaven/CMakeLists.txt index 32e7b5fabe5..8b26d231945 100644 --- a/cpp-client/deephaven/CMakeLists.txt +++ b/cpp-client/deephaven/CMakeLists.txt @@ -8,7 +8,7 @@ endif() project(deephaven) -set(deephaven_VERSION 0.37.0) +set(deephaven_VERSION 0.37.1) set(CMAKE_CXX_STANDARD 17) # for CMAKE_INSTALL_{dir} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/preview/ArrayPreview.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/preview/ArrayPreview.java index d1c742a7ddc..82514bfd29f 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/preview/ArrayPreview.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/preview/ArrayPreview.java @@ -3,6 +3,7 @@ // package io.deephaven.engine.table.impl.preview; +import io.deephaven.util.type.TypeUtils; import io.deephaven.vector.Vector; import io.deephaven.vector.VectorFactory; import org.jetbrains.annotations.NotNull; @@ -34,7 +35,9 @@ public static ArrayPreview fromArray(final Object array) { if (componentType == boolean.class) { return new ArrayPreview(convertToString((boolean[]) array)); } - return new ArrayPreview(VectorFactory.forElementType(componentType) + // Boxed primitives need the Object wrapper. + final Class elementType = TypeUtils.isBoxedType(componentType) ? Object.class : componentType; + return new ArrayPreview(VectorFactory.forElementType(elementType) .vectorWrap(array) .toString(ARRAY_SIZE_CUTOFF)); } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java index 925c7a3a923..a0607d671db 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java @@ -3,19 +3,14 @@ // package io.deephaven.iceberg.base; -import io.deephaven.base.Pair; -import io.deephaven.engine.context.ExecutionContext; -import io.deephaven.engine.context.QueryScope; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.locations.TableDataException; import io.deephaven.iceberg.util.IcebergReadInstructions; import org.apache.iceberg.DataFile; -import org.apache.iceberg.DataFiles; import org.apache.iceberg.ManifestContent; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; -import org.apache.iceberg.PartitionData; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -26,7 +21,6 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; -import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.jetbrains.annotations.NotNull; diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java index f362139133f..288d27b064b 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java @@ -11,32 +11,31 @@ import io.deephaven.iceberg.util.IcebergTableAdapter; import io.deephaven.iceberg.internal.DataInstructionsProviderLoader; import io.deephaven.util.type.TypeUtils; -import org.apache.commons.lang3.mutable.MutableInt; import org.apache.iceberg.*; +import org.apache.iceberg.data.IdentityPartitionConverters; import org.jetbrains.annotations.NotNull; import java.net.URI; import java.util.*; -import java.util.stream.Collectors; /** * Iceberg {@link TableLocationKeyFinder location finder} for tables with partitions that will discover data files from * a {@link Snapshot} */ public final class IcebergKeyValuePartitionedLayout extends IcebergBaseLayout { - private static class ColumnData { + private static class IdentityPartitioningColData { final String name; final Class type; - final int index; + final int index; // position in the partition spec - public ColumnData(String name, Class type, int index) { + private IdentityPartitioningColData(String name, Class type, int index) { this.name = name; this.type = type; this.index = index; } } - private final List outputPartitioningColumns; + private final List identityPartitioningColumns; /** * @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table. @@ -52,26 +51,26 @@ public IcebergKeyValuePartitionedLayout( // We can assume due to upstream validation that there are no duplicate names (after renaming) that are included // in the output definition, so we can ignore duplicates. - final MutableInt icebergIndex = new MutableInt(0); - final Map availablePartitioningColumns = partitionSpec.fields().stream() - .map(PartitionField::name) - .map(name -> instructions.columnRenames().getOrDefault(name, name)) - .collect(Collectors.toMap( - name -> name, - name -> icebergIndex.getAndIncrement(), - (v1, v2) -> v1, - LinkedHashMap::new)); + final List partitionFields = partitionSpec.fields(); + final int numPartitionFields = partitionFields.size(); + identityPartitioningColumns = new ArrayList<>(numPartitionFields); + for (int fieldId = 0; fieldId < numPartitionFields; ++fieldId) { + final PartitionField partitionField = partitionFields.get(fieldId); + if (!partitionField.transform().isIdentity()) { + // TODO (DH-18160): Improve support for handling non-identity transforms + continue; + } + final String icebergColName = partitionField.name(); + final String dhColName = instructions.columnRenames().getOrDefault(icebergColName, icebergColName); + final ColumnDefinition columnDef = tableDef.getColumn(dhColName); + if (columnDef == null) { + // Table definition provided by the user doesn't have this column, so skip. + continue; + } + identityPartitioningColumns.add(new IdentityPartitioningColData(dhColName, + TypeUtils.getBoxedType(columnDef.getDataType()), fieldId)); - outputPartitioningColumns = tableDef.getColumnStream() - .map((final ColumnDefinition columnDef) -> { - final Integer index = availablePartitioningColumns.get(columnDef.getName()); - if (index == null) { - return null; - } - return new ColumnData(columnDef.getName(), TypeUtils.getBoxedType(columnDef.getDataType()), index); - }) - .filter(Objects::nonNull) - .collect(Collectors.toList()); + } } @Override @@ -87,13 +86,20 @@ IcebergTableLocationKey keyFromDataFile( final Map> partitions = new LinkedHashMap<>(); final PartitionData partitionData = (PartitionData) dataFile.partition(); - for (final ColumnData colData : outputPartitioningColumns) { + for (final IdentityPartitioningColData colData : identityPartitioningColumns) { final String colName = colData.name; - final Object colValue = partitionData.get(colData.index); - if (colValue != null && !colData.type.isAssignableFrom(colValue.getClass())) { - throw new TableDataException("Partitioning column " + colName - + " has type " + colValue.getClass().getName() - + " but expected " + colData.type.getName()); + final Object colValue; + final Object valueFromPartitionData = partitionData.get(colData.index); + if (valueFromPartitionData != null) { + colValue = IdentityPartitionConverters.convertConstant( + partitionData.getType(colData.index), valueFromPartitionData); + if (!colData.type.isAssignableFrom(colValue.getClass())) { + throw new TableDataException("Partitioning column " + colName + + " has type " + colValue.getClass().getName() + + " but expected " + colData.type.getName()); + } + } else { + colValue = null; } partitions.put(colName, (Comparable) colValue); } diff --git a/extensions/iceberg/src/test/java/io/deephaven/iceberg/Pyiceberg2Test.java b/extensions/iceberg/src/test/java/io/deephaven/iceberg/Pyiceberg2Test.java new file mode 100644 index 00000000000..afcb7af1779 --- /dev/null +++ b/extensions/iceberg/src/test/java/io/deephaven/iceberg/Pyiceberg2Test.java @@ -0,0 +1,108 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg; + +import io.deephaven.engine.table.ColumnDefinition; +import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.TableDefinition; +import io.deephaven.engine.testutil.TstUtils; +import io.deephaven.engine.util.TableTools; +import io.deephaven.iceberg.sqlite.DbResource; +import io.deephaven.iceberg.util.IcebergCatalogAdapter; +import io.deephaven.iceberg.util.IcebergTableAdapter; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.time.LocalDateTime; +import java.net.URISyntaxException; +import java.util.List; +import static io.deephaven.util.QueryConstants.NULL_DOUBLE; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * This test shows that we can integrate with data written by pyiceberg. + * See TESTING.md and generate-pyiceberg-2.py for more details. + */ +@Tag("security-manager-allow") +class Pyiceberg2Test { + private static final Namespace NAMESPACE = Namespace.of("trading"); + private static final TableIdentifier TRADING_DATA = TableIdentifier.of(NAMESPACE, "data"); + + // This will need to be updated if the data is regenerated + private static final long SNAPSHOT_1_ID = 2806418501596315192L; + + private static final TableDefinition TABLE_DEFINITION = TableDefinition.of( + ColumnDefinition.fromGenericType("datetime", LocalDateTime.class), + ColumnDefinition.ofString("symbol").withPartitioning(), + ColumnDefinition.ofDouble("bid"), + ColumnDefinition.ofDouble("ask")); + + private IcebergCatalogAdapter catalogAdapter; + + @BeforeEach + void setUp() throws URISyntaxException { + catalogAdapter = DbResource.openCatalog("pyiceberg-2"); + } + + @Test + void catalogInfo() { + assertThat(catalogAdapter.listNamespaces()).containsExactly(NAMESPACE); + assertThat(catalogAdapter.listTables(NAMESPACE)).containsExactly(TRADING_DATA); + + final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(TRADING_DATA); + final List snapshots = tableAdapter.listSnapshots(); + assertThat(snapshots).hasSize(1); + { + final Snapshot snapshot = snapshots.get(0); + assertThat(snapshot.parentId()).isNull(); + assertThat(snapshot.schemaId()).isEqualTo(0); + assertThat(snapshot.sequenceNumber()).isEqualTo(1L); + assertThat(snapshot.snapshotId()).isEqualTo(SNAPSHOT_1_ID); + } + } + + @Test + void testDefinition() { + final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(TRADING_DATA); + final TableDefinition td = tableAdapter.definition(); + assertThat(td).isEqualTo(TABLE_DEFINITION); + + // Check the partition spec + final PartitionSpec partitionSpec = tableAdapter.icebergTable().spec(); + assertThat(partitionSpec.fields().size()).isEqualTo(2); + final PartitionField firstPartitionField = partitionSpec.fields().get(0); + assertThat(firstPartitionField.name()).isEqualTo("datetime_day"); + assertThat(firstPartitionField.transform().toString()).isEqualTo("day"); + + final PartitionField secondPartitionField = partitionSpec.fields().get(1); + assertThat(secondPartitionField.name()).isEqualTo("symbol"); + assertThat(secondPartitionField.transform().toString()).isEqualTo("identity"); + } + + @Test + void testData() { + final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(TRADING_DATA); + final Table fromIceberg = tableAdapter.table(); + assertThat(fromIceberg.size()).isEqualTo(5); + final Table expectedData = TableTools.newTable(TABLE_DEFINITION, + TableTools.col("datetime", + LocalDateTime.of(2024, 11, 27, 10, 0, 0), + LocalDateTime.of(2024, 11, 27, 10, 0, 0), + LocalDateTime.of(2024, 11, 26, 10, 1, 0), + LocalDateTime.of(2024, 11, 26, 10, 2, 0), + LocalDateTime.of(2024, 11, 28, 10, 3, 0)), + TableTools.stringCol("symbol", "AAPL", "MSFT", "GOOG", "AMZN", "MSFT"), + TableTools.doubleCol("bid", 150.25, 150.25, 2800.75, 3400.5, NULL_DOUBLE), + TableTools.doubleCol("ask", 151.0, 151.0, 2810.5, 3420.0, 250.0)); + TstUtils.assertTableEquals(expectedData.sort("datetime", "symbol"), + fromIceberg.sort("datetime", "symbol")); + } +} diff --git a/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java b/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java index 068e77b31c3..11962bf71ec 100644 --- a/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java +++ b/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java @@ -754,14 +754,7 @@ void testPartitionedAppendWithAllPartitioningTypes() { "DoublePC = (double) 4.0", "LocalDatePC = LocalDate.parse(`2023-10-01`)") .moveColumns(7, "data"); - - // TODO (deephaven-core#6419) Dropping the local data column since it is not supported on the read side. - // Remove this when the issue is fixed. - final TableDefinition tableDefinitionWithoutLocalDate = fromIceberg.dropColumns("LocalDatePC").getDefinition(); - final Table fromIcebergWithoutLocalDate = tableAdapter.table(IcebergReadInstructions.builder() - .tableDefinition(tableDefinitionWithoutLocalDate) - .build()); - assertTableEquals(expected.dropColumns("LocalDatePC"), fromIcebergWithoutLocalDate); + assertTableEquals(expected, fromIceberg); } @Test diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-26/symbol=AMZN/00000-0-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-26/symbol=AMZN/00000-0-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet new file mode 100644 index 00000000000..c9dd7a2aca7 --- /dev/null +++ b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-26/symbol=AMZN/00000-0-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:aef59cba214467bee2cb2d91118aadc6114718be02ab4f04d7742471f9436955 +size 1990 diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-26/symbol=GOOG/00000-1-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-26/symbol=GOOG/00000-1-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet new file mode 100644 index 00000000000..8d7be97be08 --- /dev/null +++ b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-26/symbol=GOOG/00000-1-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:9481c165c6356f03b3d4fe5df967a03422557d8d2bfc1c58d8dc052fe18fec06 +size 1990 diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-27/symbol=AAPL/00000-2-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-27/symbol=AAPL/00000-2-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet new file mode 100644 index 00000000000..4106ca81801 --- /dev/null +++ b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-27/symbol=AAPL/00000-2-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:e1568c608054bbd0de727d4909b29a3bdb777e2b6723a74d1641c6326d3b35cd +size 1990 diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-27/symbol=MSFT/00000-3-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-27/symbol=MSFT/00000-3-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet new file mode 100644 index 00000000000..38515dcd35b --- /dev/null +++ b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-27/symbol=MSFT/00000-3-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:79f26f9cb4e2b00548491122eb8a0efb04a8ff37d0f5bce65350c38b7b17af19 +size 1990 diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-28/symbol=MSFT/00000-4-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-28/symbol=MSFT/00000-4-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet new file mode 100644 index 00000000000..d3b061d46b0 --- /dev/null +++ b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/data/datetime_day=2024-11-28/symbol=MSFT/00000-4-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:a6a8ee2286b0cdf702fef4745471375adf9b819883726d659129e0ba95e0c8dd +size 1856 diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/00000-0956ea6c-b522-447f-a2f4-5c6e7b104783.metadata.json b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/00000-0956ea6c-b522-447f-a2f4-5c6e7b104783.metadata.json new file mode 100644 index 00000000000..0bf00e06545 --- /dev/null +++ b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/00000-0956ea6c-b522-447f-a2f4-5c6e7b104783.metadata.json @@ -0,0 +1 @@ +{"location":"catalogs/pyiceberg-2/trading.db/data","table-uuid":"d1f874d4-c065-432d-969b-39efb3e9eb1c","last-updated-ms":1733868694938,"last-column-id":4,"schemas":[{"type":"struct","fields":[{"id":1,"name":"datetime","type":"timestamp","required":false},{"id":2,"name":"symbol","type":"string","required":false},{"id":3,"name":"bid","type":"double","required":false},{"id":4,"name":"ask","type":"double","required":false}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"day","name":"datetime_day"},{"source-id":2,"field-id":1001,"transform":"identity","name":"symbol"}]}],"default-spec-id":0,"last-partition-id":1001,"properties":{},"snapshots":[],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":2,"last-sequence-number":0} \ No newline at end of file diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/00001-4e3fe6dc-5e3e-4da1-9da3-666cbad70ace.metadata.json b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/00001-4e3fe6dc-5e3e-4da1-9da3-666cbad70ace.metadata.json new file mode 100644 index 00000000000..fea345f31a8 --- /dev/null +++ b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/00001-4e3fe6dc-5e3e-4da1-9da3-666cbad70ace.metadata.json @@ -0,0 +1 @@ +{"location":"catalogs/pyiceberg-2/trading.db/data","table-uuid":"d1f874d4-c065-432d-969b-39efb3e9eb1c","last-updated-ms":1733868695120,"last-column-id":4,"schemas":[{"type":"struct","fields":[{"id":1,"name":"datetime","type":"timestamp","required":false},{"id":2,"name":"symbol","type":"string","required":false},{"id":3,"name":"bid","type":"double","required":false},{"id":4,"name":"ask","type":"double","required":false}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"day","name":"datetime_day"},{"source-id":2,"field-id":1001,"transform":"identity","name":"symbol"}]}],"default-spec-id":0,"last-partition-id":1001,"properties":{},"current-snapshot-id":2806418501596315192,"snapshots":[{"snapshot-id":2806418501596315192,"sequence-number":1,"timestamp-ms":1733868695120,"manifest-list":"catalogs/pyiceberg-2/trading.db/data/metadata/snap-2806418501596315192-0-d9c06748-9892-404f-a744-7bbfd06d0eeb.avro","summary":{"operation":"append","added-files-size":"9816","added-data-files":"5","added-records":"5","changed-partition-count":"5","total-data-files":"5","total-delete-files":"0","total-records":"5","total-files-size":"9816","total-position-deletes":"0","total-equality-deletes":"0"},"schema-id":0}],"snapshot-log":[{"snapshot-id":2806418501596315192,"timestamp-ms":1733868695120}],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{"main":{"snapshot-id":2806418501596315192,"type":"branch"}},"format-version":2,"last-sequence-number":1} \ No newline at end of file diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/d9c06748-9892-404f-a744-7bbfd06d0eeb-m0.avro b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/d9c06748-9892-404f-a744-7bbfd06d0eeb-m0.avro new file mode 100644 index 00000000000..36d7d3ca191 Binary files /dev/null and b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/d9c06748-9892-404f-a744-7bbfd06d0eeb-m0.avro differ diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/snap-2806418501596315192-0-d9c06748-9892-404f-a744-7bbfd06d0eeb.avro b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/snap-2806418501596315192-0-d9c06748-9892-404f-a744-7bbfd06d0eeb.avro new file mode 100644 index 00000000000..20d65fea166 Binary files /dev/null and b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/catalogs/pyiceberg-2/trading.db/data/metadata/snap-2806418501596315192-0-d9c06748-9892-404f-a744-7bbfd06d0eeb.avro differ diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/dh-iceberg-test.db b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/dh-iceberg-test.db index a59334f571f..44c557bd98e 100644 Binary files a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/dh-iceberg-test.db and b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/dh-iceberg-test.db differ diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/generate-pyiceberg-1.py b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/generate-pyiceberg-1.py index 86c6c2728c8..25b9c88da8d 100644 --- a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/generate-pyiceberg-1.py +++ b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/generate-pyiceberg-1.py @@ -1,3 +1,7 @@ +''' +See TESTING.md for how to run this script. +''' + from pyiceberg.schema import Schema from pyiceberg.types import NestedField, StringType, DoubleType from pyiceberg.catalog.sql import SqlCatalog diff --git a/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/generate-pyiceberg-2.py b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/generate-pyiceberg-2.py new file mode 100644 index 00000000000..fc7b2c5a588 --- /dev/null +++ b/extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/generate-pyiceberg-2.py @@ -0,0 +1,58 @@ +''' +See TESTING.md for how to run this script. +''' + +import pyarrow as pa +from datetime import datetime +from pyiceberg.catalog.sql import SqlCatalog +from pyiceberg.schema import Schema +from pyiceberg.types import TimestampType, FloatType, DoubleType, StringType, NestedField, StructType +from pyiceberg.partitioning import PartitionSpec, PartitionField +from pyiceberg.transforms import DayTransform, IdentityTransform + +catalog = SqlCatalog( + "pyiceberg-2", + **{ + "uri": f"sqlite:///dh-iceberg-test.db", + "warehouse": f"catalogs/pyiceberg-2", + }, +) + +schema = Schema( + NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=False), + NestedField(field_id=2, name="symbol", field_type=StringType(), required=False), + NestedField(field_id=3, name="bid", field_type=DoubleType(), required=False), + NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False), +) + +partition_spec = PartitionSpec( + PartitionField( + source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day", + ), + PartitionField( + source_id=2, field_id=1001, transform=IdentityTransform(), name="symbol", + ) +) + +catalog.create_namespace("trading") + +tbl = catalog.create_table( + identifier="trading.data", + schema=schema, + partition_spec=partition_spec, +) + +# Define the data according to your Iceberg schema +data = [ + {"datetime": datetime(2024, 11, 27, 10, 0, 0), "symbol": "AAPL", "bid": 150.25, "ask": 151.0}, + {"datetime": datetime(2024, 11, 27, 10, 0, 0), "symbol": "MSFT", "bid": 150.25, "ask": 151.0}, + {"datetime": datetime(2024, 11, 26, 10, 1, 0), "symbol": "GOOG", "bid": 2800.75, "ask": 2810.5}, + {"datetime": datetime(2024, 11, 26, 10, 2, 0), "symbol": "AMZN", "bid": 3400.5, "ask": 3420.0}, + {"datetime": datetime(2024, 11, 28, 10, 3, 0), "symbol": "MSFT", "bid": None, "ask": 250.0}, +] + +# Create a PyArrow Table +table = pa.Table.from_pylist(data) + +# Append the table to the Iceberg table +tbl.append(table) diff --git a/extensions/protobuf/src/main/java/io/deephaven/protobuf/ProtobufDescriptorParserImpl.java b/extensions/protobuf/src/main/java/io/deephaven/protobuf/ProtobufDescriptorParserImpl.java index a2365ea853e..91c4850e976 100644 --- a/extensions/protobuf/src/main/java/io/deephaven/protobuf/ProtobufDescriptorParserImpl.java +++ b/extensions/protobuf/src/main/java/io/deephaven/protobuf/ProtobufDescriptorParserImpl.java @@ -32,7 +32,6 @@ import io.deephaven.function.ToShortFunction; import io.deephaven.function.TypedFunction; import io.deephaven.function.TypedFunction.Visitor; -import io.deephaven.util.QueryConstants; import java.lang.reflect.Array; import java.util.HashMap; @@ -362,40 +361,46 @@ private ProtobufFunctions functions() { } } + private ToObjectFunction maybeBypass(ToObjectFunction f) { + // Ideally, we could be very targetted in our application of null checks; in a lot of contexts, our + // implementation could know it will never be called with a null message to produce an array. + return BypassOnNull.of(f); + } + private ToObjectFunction mapChars(ToCharFunction f) { - return ToObjectFunction.of(m -> toChars(m, fd, f), Type.charType().arrayType()); + return maybeBypass(ToObjectFunction.of(m -> toChars(m, fd, f), Type.charType().arrayType())); } private ToObjectFunction mapBytes(ToByteFunction f) { - return ToObjectFunction.of(m -> toBytes(m, fd, f), Type.byteType().arrayType()); + return maybeBypass(ToObjectFunction.of(m -> toBytes(m, fd, f), Type.byteType().arrayType())); } private ToObjectFunction mapShorts(ToShortFunction f) { - return ToObjectFunction.of(m -> toShorts(m, fd, f), Type.shortType().arrayType()); + return maybeBypass(ToObjectFunction.of(m -> toShorts(m, fd, f), Type.shortType().arrayType())); } private ToObjectFunction mapInts(ToIntFunction f) { - return ToObjectFunction.of(m -> toInts(m, fd, f), Type.intType().arrayType()); + return maybeBypass(ToObjectFunction.of(m -> toInts(m, fd, f), Type.intType().arrayType())); } private ToObjectFunction mapLongs(ToLongFunction f) { - return ToObjectFunction.of(m -> toLongs(m, fd, f), Type.longType().arrayType()); + return maybeBypass(ToObjectFunction.of(m -> toLongs(m, fd, f), Type.longType().arrayType())); } private ToObjectFunction mapFloats(ToFloatFunction f) { - return ToObjectFunction.of(m -> toFloats(m, fd, f), Type.floatType().arrayType()); + return maybeBypass(ToObjectFunction.of(m -> toFloats(m, fd, f), Type.floatType().arrayType())); } private ToObjectFunction mapDoubles(ToDoubleFunction f) { - return ToObjectFunction.of(m -> toDoubles(m, fd, f), Type.doubleType().arrayType()); + return maybeBypass(ToObjectFunction.of(m -> toDoubles(m, fd, f), Type.doubleType().arrayType())); } private ToObjectFunction mapBooleans(ToBooleanFunction f) { - return ToObjectFunction.of(m -> toBooleans(m, fd, f), Type.booleanType().arrayType()); + return maybeBypass(ToObjectFunction.of(m -> toBooleans(m, fd, f), Type.booleanType().arrayType())); } private ToObjectFunction mapGenerics(ToObjectFunction f) { - return ToObjectFunction.of(message -> toArray(message, fd, f), f.returnType().arrayType()); + return maybeBypass(ToObjectFunction.of(message -> toArray(message, fd, f), f.returnType().arrayType())); } private class ToRepeatedType implements diff --git a/extensions/protobuf/src/test/java/io/deephaven/protobuf/ProtobufDescriptorParserTest.java b/extensions/protobuf/src/test/java/io/deephaven/protobuf/ProtobufDescriptorParserTest.java index 975f13fa6c1..fd4e3a4e1e2 100644 --- a/extensions/protobuf/src/test/java/io/deephaven/protobuf/ProtobufDescriptorParserTest.java +++ b/extensions/protobuf/src/test/java/io/deephaven/protobuf/ProtobufDescriptorParserTest.java @@ -45,6 +45,7 @@ import io.deephaven.protobuf.test.ByteWrapperRepeated; import io.deephaven.protobuf.test.FieldMaskWrapper; import io.deephaven.protobuf.test.MultiRepeated; +import io.deephaven.protobuf.test.NestedArrays; import io.deephaven.protobuf.test.NestedByteWrapper; import io.deephaven.protobuf.test.NestedRepeatedTimestamps; import io.deephaven.protobuf.test.NestedRepeatedTimestamps.Timestamps; @@ -1470,6 +1471,111 @@ void twoTimestampsOneAsWellKnown() { assertThat(nf.keySet()).containsExactly(List.of("ts1"), List.of("ts2", "seconds"), List.of("ts2", "nanos")); } + @Test + void nestedArraysADirect() { + checkKey( + NestedArrays.getDescriptor(), + List.of("a_direct", "b", "c"), + Type.stringType().arrayType(), + new HashMap<>() { + { + put(NestedArrays.getDefaultInstance(), null); + + put(NestedArrays.newBuilder() + .setADirect(NestedArrays.A.getDefaultInstance()) + .build(), null); + + // c is only non-null when b has been explicitly set + + put(NestedArrays.newBuilder() + .setADirect(NestedArrays.A.newBuilder() + .setB(NestedArrays.B.getDefaultInstance()) + .build()) + .build(), new String[0]); + + put(NestedArrays.newBuilder() + .setADirect(NestedArrays.A.newBuilder() + .setB(NestedArrays.B.newBuilder() + .addC("Foo") + .addC("Bar") + .build()) + .build()) + .build(), new String[] {"Foo", "Bar"}); + } + }); + } + + @Test + void nestedArraysARepeated() { + checkKey( + NestedArrays.getDescriptor(), + List.of("a_repeated", "b", "c"), + Type.stringType().arrayType().arrayType(), + new HashMap<>() { + { + put(NestedArrays.getDefaultInstance(), new String[0][]); + put(NestedArrays.newBuilder() + .addARepeated(NestedArrays.A.getDefaultInstance()) + .addARepeated(NestedArrays.A.newBuilder() + .setB(NestedArrays.B.getDefaultInstance()) + .build()) + .addARepeated(NestedArrays.A.newBuilder() + .setB(NestedArrays.B.newBuilder() + .addC("Foo") + .addC("Bar") + .build()) + .build()) + .build(), new String[][] {null, new String[0], new String[] {"Foo", "Bar"}}); + } + }); + } + + @Test + void nestedArraysBDirect() { + checkKey( + NestedArrays.getDescriptor(), + List.of("b_direct", "c"), + Type.stringType().arrayType(), + new HashMap<>() { + { + put(NestedArrays.getDefaultInstance(), null); + + put(NestedArrays.newBuilder() + .setBDirect(NestedArrays.B.getDefaultInstance()) + .build(), new String[0]); + + put(NestedArrays.newBuilder() + .setBDirect(NestedArrays.B.newBuilder() + .addC("Foo") + .addC("Bar") + .build()) + .build(), new String[] {"Foo", "Bar"}); + } + }); + } + + @Test + void nestedArraysBRepeated() { + checkKey( + NestedArrays.getDescriptor(), + List.of("b_repeated", "c"), + Type.stringType().arrayType().arrayType(), + new HashMap<>() { + { + put(NestedArrays.getDefaultInstance(), new String[0][]); + + put(NestedArrays.newBuilder() + .addBRepeated(NestedArrays.B.getDefaultInstance()) + .addBRepeated(NestedArrays.B.newBuilder() + .addC("Foo") + .addC("Bar") + .build()) + + .build(), new String[][] {new String[0], new String[] {"Foo", "Bar"}}); + } + }); + } + private static Map, TypedFunction> nf(Descriptor descriptor) { return nf(descriptor, ProtobufDescriptorParserOptions.defaults()); } diff --git a/extensions/protobuf/src/test/proto/mytest.proto b/extensions/protobuf/src/test/proto/mytest.proto index da6a6e169db..46d6e2a9513 100644 --- a/extensions/protobuf/src/test/proto/mytest.proto +++ b/extensions/protobuf/src/test/proto/mytest.proto @@ -131,6 +131,21 @@ message RepeatedObject { repeated XYZ xyz = 1; } +message NestedArrays { + message A { + B b = 1; + } + message B { + repeated string c = 1; + } + + A a_direct = 1; + repeated A a_repeated = 2; + + B b_direct = 3; + repeated B b_repeated = 4; +} + message MultiRepeated { repeated RepeatedBasics my_basics = 1; repeated RepeatedWrappers my_wrappers = 2; diff --git a/gradle.properties b/gradle.properties index e03001083f8..3d451b83e25 100644 --- a/gradle.properties +++ b/gradle.properties @@ -9,7 +9,7 @@ # Re-builders who want to inherit the base version, but have their own qualifier can set -PdeephavenBaseQualifier="customQualifier": "X.Y.Z-customQualifier". # # Re-builders who want a fully custom version can set -PdeephavenBaseVersion="customVersion" -PdeephavenBaseQualifier="": "customVersion". -deephavenBaseVersion=0.37.0 +deephavenBaseVersion=0.37.1 deephavenBaseQualifier=SNAPSHOT #org.gradle.debug diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java b/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java index 8bf5c5d10bc..16a3711974d 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java @@ -3,7 +3,7 @@ // package io.deephaven.web.client.api; -import elemental2.core.Function; +import io.deephaven.web.client.api.grpc.GrpcTransportFactory; import jsinterop.annotations.JsIgnore; import jsinterop.annotations.JsNullable; import jsinterop.annotations.JsType; @@ -29,20 +29,23 @@ public class ConnectOptions { /** * Set this to true to force the use of websockets when connecting to the deephaven instance, false to force the use - * of {@code fetch}. + * of {@code fetch}. Ignored if {@link #transportFactory} is set. *

* Defaults to null, indicating that the server URL should be checked to see if we connect with fetch or websockets. */ @JsNullable public Boolean useWebsockets; - // TODO (deephaven-core#6214) provide our own grpc-web library that can replace fetch - // /** - // * Optional fetch implementation to use instead of the global {@code fetch()} call, allowing callers to provide a - // * polyfill rather than add a new global. - // */ - // @JsNullable - // public Function fetch; + /** + * The transport factory to use for creating gRPC streams. If specified, the JS API will ignore + * {@link #useWebsockets} and its own internal logic for determining the appropriate transport to use. + *

+ * Defaults to null, indicating that the JS API should determine the appropriate transport to use. If + * {@code useWebsockets} is set to true, the JS API will use websockets, otherwise if the server url begins with + * https, it will use fetch, otherwise it will use websockets. + */ + @JsNullable + public GrpcTransportFactory transportFactory; public ConnectOptions() { @@ -65,5 +68,8 @@ public ConnectOptions(Object connectOptions) { // if (map.has("fetch")) { // fetch = map.getAsAny("fetch").uncheckedCast(); // } + if (map.has("transportFactory")) { + transportFactory = map.getAsAny("transportFactory").uncheckedCast(); + } } } diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/JsTable.java b/web/client-api/src/main/java/io/deephaven/web/client/api/JsTable.java index 04127edec70..9fb9e1c6c30 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/JsTable.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/JsTable.java @@ -452,7 +452,7 @@ public JsLayoutHints getLayoutHints() { @JsProperty public double getSize() { TableViewportSubscription subscription = subscriptions.get(getHandle()); - if (subscription != null && subscription.getStatus() == TableViewportSubscription.Status.ACTIVE) { + if (subscription != null && subscription.hasValidSize()) { // only ask the viewport for the size if it is alive and ticking return subscription.size(); } @@ -705,7 +705,7 @@ public TableViewportSubscription setViewport(double firstRow, double lastRow, Column[] columnsCopy = columns != null ? Js.uncheckedCast(columns.slice()) : state().getColumns(); ClientTableState currentState = state(); TableViewportSubscription activeSubscription = subscriptions.get(getHandle()); - if (activeSubscription != null && activeSubscription.getStatus() != TableViewportSubscription.Status.DONE) { + if (activeSubscription != null && !activeSubscription.isClosed()) { // hasn't finished, lets reuse it activeSubscription.setInternalViewport(firstRow, lastRow, columnsCopy, updateIntervalMs, isReverseViewport); return activeSubscription; @@ -1583,8 +1583,7 @@ public void setState(final ClientTableState state) { if (!isClosed() && was != null && was != state()) { // if we held a subscription TableViewportSubscription existingSubscription = subscriptions.remove(was.getHandle()); - if (existingSubscription != null - && existingSubscription.getStatus() != TableViewportSubscription.Status.DONE) { + if (existingSubscription != null && !existingSubscription.isClosed()) { JsLog.debug("closing old viewport", state(), existingSubscription.state()); // with the replacement state successfully running, we can shut down the old viewport (unless // something external retained it) @@ -1715,7 +1714,7 @@ public void setSize(double s) { this.size = s; TableViewportSubscription subscription = subscriptions.get(getHandle()); - if (changed && (subscription == null || subscription.getStatus() == TableViewportSubscription.Status.DONE)) { + if (changed && (subscription == null || !subscription.hasValidSize())) { // If the size changed, and we have no subscription active, fire. Otherwise, we want to let the // subscription itself manage this, so that the size changes are synchronized with data changes, // and consumers won't be confused by the table size not matching data. diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java b/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java index 1394069e0b7..2a73b357a5e 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java @@ -7,11 +7,9 @@ import elemental2.core.JsArray; import elemental2.core.JsSet; import elemental2.promise.Promise; -import io.deephaven.javascript.proto.dhinternal.grpcweb.Grpc; import io.deephaven.javascript.proto.dhinternal.grpcweb.client.RpcOptions; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.TerminationNotificationResponse; import io.deephaven.web.client.api.event.HasEventHandling; -import io.deephaven.web.client.api.grpc.MultiplexedWebsocketTransport; import io.deephaven.web.client.ide.IdeSession; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb.*; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.ticket_pb.Ticket; @@ -246,12 +244,8 @@ public void disconnected() { public abstract void notifyServerShutdown(TerminationNotificationResponse success); - public boolean useWebsockets() { - Boolean useWebsockets = getOptions().useWebsockets; - if (useWebsockets == null) { - useWebsockets = getServerUrl().startsWith("http:"); - } - return useWebsockets; + public boolean supportsClientStreaming() { + return getOptions().transportFactory.getSupportsClientStreaming(); } public T createClient(BiFunction constructor) { @@ -261,12 +255,7 @@ public T createClient(BiFunction constructor) { public RpcOptions makeRpcOptions() { RpcOptions options = RpcOptions.create(); options.setDebug(getOptions().debug); - if (useWebsockets()) { - // Replace with our custom websocket impl, with fallback to the built-in one - options.setTransport(o -> new MultiplexedWebsocketTransport(o, () -> { - Grpc.setDefaultTransport.onInvoke(Grpc.WebsocketTransport.onInvoke()); - })); - } + options.setTransport(getOptions().transportFactory.adapt()); return options; } } diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java b/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java index f8212bb05ac..4ea02498a51 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java @@ -986,7 +986,7 @@ public BrowserHeaders metadata() { } public BiDiStream.Factory streamFactory() { - return new BiDiStream.Factory<>(info.useWebsockets(), this::metadata, config::newTicketInt); + return new BiDiStream.Factory<>(info.supportsClientStreaming(), this::metadata, config::newTicketInt); } public Promise newTable(String[] columnNames, String[] types, Object[][] data, String userTimeZone, diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/stream/BiDiStream.java b/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/stream/BiDiStream.java index 5febd0a9fa4..4fc65739780 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/stream/BiDiStream.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/stream/BiDiStream.java @@ -36,12 +36,12 @@ public interface NextStreamMessageFactory { void nextStreamMessage(Req nextPayload, BrowserHeaders headers, JsBiConsumer callback); } public static class Factory { - private final boolean useWebsockets; + private final boolean supportsClientStreaming; private final Supplier headers; private final IntSupplier nextIntTicket; - public Factory(boolean useWebsockets, Supplier headers, IntSupplier nextIntTicket) { - this.useWebsockets = useWebsockets; + public Factory(boolean supportsClientStreaming, Supplier headers, IntSupplier nextIntTicket) { + this.supportsClientStreaming = supportsClientStreaming; this.headers = headers; this.nextIntTicket = nextIntTicket; } @@ -51,8 +51,8 @@ public BiDiStream create( OpenStreamFactory openEmulatedStream, NextStreamMessageFactory nextEmulatedStream, ReqT emptyReq) { - if (useWebsockets) { - return websocket(bidirectionalStream.openBiDiStream(headers.get())); + if (supportsClientStreaming) { + return bidi(bidirectionalStream.openBiDiStream(headers.get())); } else { return new EmulatedBiDiStream<>( openEmulatedStream, @@ -73,7 +73,7 @@ public static BiDiStream of( IntSupplier nextIntTicket, boolean useWebsocket) { if (useWebsocket) { - return websocket(bidirectionalStream.openBiDiStream(headers.get())); + return bidi(bidirectionalStream.openBiDiStream(headers.get())); } else { return new EmulatedBiDiStream<>( openEmulatedStream, @@ -84,7 +84,7 @@ public static BiDiStream of( } } - public static BiDiStream websocket(Object bidirectionalStream) { + public static BiDiStream bidi(Object bidirectionalStream) { return new WebsocketBiDiStream<>(Js.cast(bidirectionalStream)); } diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransport.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransport.java new file mode 100644 index 00000000000..d51bfcb33d3 --- /dev/null +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransport.java @@ -0,0 +1,74 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.web.client.api.grpc; + +import com.vertispan.tsdefs.annotations.TsInterface; +import elemental2.core.Uint8Array; +import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders; +import io.deephaven.javascript.proto.dhinternal.grpcweb.grpc.Transport; +import jsinterop.annotations.JsIgnore; +import jsinterop.annotations.JsType; +import jsinterop.base.JsPropertyMap; + +/** + * gRPC transport implementation. + * + */ +@JsType(namespace = "dh.grpc") +@TsInterface +public interface GrpcTransport { + /** + * Starts the stream, sending metadata to the server. + * + * @param metadata the headers to send the server when opening the connection + */ + void start(JsPropertyMap metadata); + + /** + * Sends a message to the server. + * + * @param msgBytes bytes to send to the server + */ + void sendMessage(Uint8Array msgBytes); + + /** + * "Half close" the stream, signaling to the server that no more messages will be sent, but that the client is still + * open to receiving messages. + */ + void finishSend(); + + /** + * End the stream, both notifying the server that no more messages will be sent nor received, and preventing the + * client from receiving any more events. + */ + void cancel(); + + /** + * Helper to transform ts implementations to our own api. + */ + @JsIgnore + static GrpcTransport from(Transport tsTransport) { + return new GrpcTransport() { + @Override + public void start(JsPropertyMap metadata) { + tsTransport.start(new BrowserHeaders(metadata)); + } + + @Override + public void sendMessage(Uint8Array msgBytes) { + tsTransport.sendMessage(msgBytes); + } + + @Override + public void finishSend() { + tsTransport.finishSend(); + } + + @Override + public void cancel() { + tsTransport.cancel(); + } + }; + } +} diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java new file mode 100644 index 00000000000..978e1a36426 --- /dev/null +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java @@ -0,0 +1,70 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.web.client.api.grpc; + +import com.vertispan.tsdefs.annotations.TsInterface; +import elemental2.core.Uint8Array; +import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders; +import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.Transport; +import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.TransportFactory; +import jsinterop.annotations.JsOverlay; +import jsinterop.annotations.JsProperty; +import jsinterop.annotations.JsType; +import jsinterop.base.Js; + +/** + * Factory for creating gRPC transports. + */ +@TsInterface +@JsType(namespace = "dh.grpc", isNative = true) +public interface GrpcTransportFactory { + /** + * Create a new transport instance. + * + * @param options options for creating the transport + * @return a transport instance to use for gRPC communication + */ + GrpcTransport create(GrpcTransportOptions options); + + /** + * Return true to signal that created transports may have {@link GrpcTransport#sendMessage(Uint8Array)} called on it + * more than once before {@link GrpcTransport#finishSend()} should be called. + * + * @return true to signal that the implementation can stream multiple messages, false otherwise indicating that + * Open/Next gRPC calls should be used + */ + @JsProperty + boolean getSupportsClientStreaming(); + + /** + * Adapt this factory to the transport factory used by the gRPC-web library. + */ + @JsOverlay + default TransportFactory adapt() { + return options -> { + GrpcTransport impl = create(GrpcTransportOptions.from(options)); + return new Transport() { + @Override + public void cancel() { + impl.cancel(); + } + + @Override + public void finishSend() { + impl.finishSend(); + } + + @Override + public void sendMessage(Uint8Array msgBytes) { + impl.sendMessage(msgBytes); + } + + @Override + public void start(BrowserHeaders metadata) { + impl.start(Js.cast(metadata.headersMap)); + } + }; + }; + } +} diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportOptions.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportOptions.java new file mode 100644 index 00000000000..8853471a98e --- /dev/null +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportOptions.java @@ -0,0 +1,92 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.web.client.api.grpc; + +import com.vertispan.tsdefs.annotations.TsInterface; +import elemental2.core.JsError; +import elemental2.core.Uint8Array; +import elemental2.dom.URL; +import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders; +import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.TransportOptions; +import jsinterop.annotations.JsFunction; +import jsinterop.annotations.JsIgnore; +import jsinterop.annotations.JsNullable; +import jsinterop.annotations.JsOptional; +import jsinterop.annotations.JsType; +import jsinterop.base.JsPropertyMap; + +/** + * Options for creating a gRPC stream transport instance. + */ +@TsInterface +@JsType(namespace = "dh.grpc") +public class GrpcTransportOptions { + @JsFunction + @FunctionalInterface + public interface OnHeadersCallback { + void onHeaders(JsPropertyMap headers, int status); + } + + @JsFunction + @FunctionalInterface + public interface OnChunkCallback { + void onChunk(Uint8Array chunk); + } + + @JsFunction + @FunctionalInterface + public interface OnEndCallback { + void onEnd(@JsOptional @JsNullable JsError error); + } + + /** + * The gRPC method URL. + */ + public URL url; + + /** + * True to enable debug logging for this stream. + */ + public boolean debug; + + /** + * Callback for when headers and status are received. The headers are a map of header names to values, and the + * status is the HTTP status code. If the connection could not be made, the status should be 0. + */ + public OnHeadersCallback onHeaders; + + /** + * Callback for when a chunk of data is received. + */ + public OnChunkCallback onChunk; + + /** + * Callback for when the stream ends, with an error instance if it can be provided. Note that the present + * implementation does not consume errors, even if provided. + */ + public OnEndCallback onEnd; + + /** + * Internal copy of options, to be used for fallback. + */ + @JsIgnore + public TransportOptions originalOptions; + + /** + * Convert a {@link TransportOptions} instance to a {@link GrpcTransportOptions} instance. + */ + @JsIgnore + public static GrpcTransportOptions from(TransportOptions options) { + GrpcTransportOptions impl = new GrpcTransportOptions(); + impl.url = new URL(options.getUrl()); + impl.debug = options.isDebug(); + impl.onHeaders = (headers, status) -> options.getOnHeaders().onInvoke(new BrowserHeaders(headers), status); + impl.onChunk = p0 -> { + // "false" because the underlying implementation doesn't rely on this anyway. + options.getOnChunk().onInvoke(p0, false); + }; + impl.onEnd = options.getOnEnd()::onInvoke; + return impl; + } +} diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/HeaderValueUnion.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/HeaderValueUnion.java new file mode 100644 index 00000000000..9d64a7c0402 --- /dev/null +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/HeaderValueUnion.java @@ -0,0 +1,42 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.web.client.api.grpc; + +import com.vertispan.tsdefs.annotations.TsUnion; +import com.vertispan.tsdefs.annotations.TsUnionMember; +import elemental2.core.JsArray; +import javaemul.internal.annotations.DoNotAutobox; +import jsinterop.annotations.JsOverlay; +import jsinterop.annotations.JsPackage; +import jsinterop.annotations.JsType; +import jsinterop.base.Js; + +/** + * Union of string and array of string, as node/browser APIs tend to accept either for http headers. + */ +@TsUnion +@JsType(name = "?", namespace = JsPackage.GLOBAL, isNative = true) +public interface HeaderValueUnion { + @JsOverlay + static HeaderValueUnion of(@DoNotAutobox Object value) { + return Js.cast(value); + } + + @JsOverlay + default boolean isArray() { + return JsArray.isArray(this); + } + + @TsUnionMember + @JsOverlay + default String asString() { + return Js.cast(this); + } + + @TsUnionMember + @JsOverlay + default JsArray asArray() { + return Js.cast(this); + } +} diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java index dfb0fd7cea4..a40cf532e1b 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java @@ -17,10 +17,10 @@ import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders; import io.deephaven.javascript.proto.dhinternal.grpcweb.Grpc; import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.Transport; -import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.TransportOptions; import io.deephaven.web.client.api.JsLazy; import io.deephaven.web.shared.fu.JsRunnable; import jsinterop.base.Js; +import jsinterop.base.JsPropertyMap; import java.util.ArrayList; import java.util.HashMap; @@ -32,11 +32,23 @@ * equal, this transport should be preferred to the default grpc-websockets transport, and in turn the fetch based * transport is usually superior to this. */ -public class MultiplexedWebsocketTransport implements Transport { +public class MultiplexedWebsocketTransport implements GrpcTransport { public static final String MULTIPLEX_PROTOCOL = "grpc-websockets-multiplex"; public static final String SOCKET_PER_STREAM_PROTOCOL = "grpc-websockets"; + public static class Factory implements GrpcTransportFactory { + @Override + public GrpcTransport create(GrpcTransportOptions options) { + return new MultiplexedWebsocketTransport(options); + } + + @Override + public boolean getSupportsClientStreaming() { + return true; + } + } + private static Uint8Array encodeASCII(String str) { Uint8Array encoded = new Uint8Array(str.length()); for (int i = 0; i < str.length(); i++) { @@ -55,9 +67,9 @@ private interface QueuedEntry { public static class HeaderFrame implements QueuedEntry { private final String path; - private final BrowserHeaders metadata; + private final JsPropertyMap metadata; - public HeaderFrame(String path, BrowserHeaders metadata) { + public HeaderFrame(String path, JsPropertyMap metadata) { this.path = path; this.metadata = metadata; } @@ -66,9 +78,14 @@ public HeaderFrame(String path, BrowserHeaders metadata) { public void send(WebSocket webSocket, int streamId) { final Uint8Array headerBytes; final StringBuilder str = new StringBuilder(); - metadata.append("grpc-websockets-path", path); - metadata.forEach((key, value) -> { - str.append(key).append(": ").append(value.join(", ")).append("\r\n"); + metadata.set("grpc-websockets-path", HeaderValueUnion.of(path)); + metadata.forEach((key) -> { + HeaderValueUnion value = metadata.get(key); + if (value.isArray()) { + str.append(key).append(": ").append(value.asArray().join(", ")).append("\r\n"); + } else { + str.append(key).append(": ").append(value.asString()).append("\r\n"); + } }); headerBytes = encodeASCII(str.toString()); Int8Array payload = new Int8Array(headerBytes.byteLength + 4); @@ -79,7 +96,7 @@ public void send(WebSocket webSocket, int streamId) { @Override public void sendFallback(Transport transport) { - transport.start(metadata); + transport.start(new BrowserHeaders(metadata)); } } @@ -201,16 +218,16 @@ private void release() { private ActiveTransport transport; private final int streamId = nextStreamId++; private final List sendQueue = new ArrayList<>(); - private final TransportOptions options; + private final GrpcTransportOptions options; private final String path; private final JsLazy alternativeTransport; private JsRunnable cleanup = JsRunnable.doNothing(); - public MultiplexedWebsocketTransport(TransportOptions options, JsRunnable avoidMultiplexCallback) { + public MultiplexedWebsocketTransport(GrpcTransportOptions options) { this.options = options; - String url = options.getUrl(); + String url = options.url.toString(); URL urlWrapper = new URL(url); // preserve the path to send as metadata, but still talk to the server with that path path = urlWrapper.pathname.substring(1); @@ -220,16 +237,13 @@ public MultiplexedWebsocketTransport(TransportOptions options, JsRunnable avoidM transport = ActiveTransport.get(url); // prepare a fallback - alternativeTransport = new JsLazy<>(() -> { - avoidMultiplexCallback.run(); - return Grpc.WebsocketTransport.onInvoke().onInvoke(options); - }); + alternativeTransport = new JsLazy<>(() -> Grpc.WebsocketTransport.onInvoke().onInvoke(options.originalOptions)); } @Override - public void start(BrowserHeaders metadata) { + public void start(JsPropertyMap metadata) { if (alternativeTransport.isAvailable()) { - alternativeTransport.get().start(metadata); + alternativeTransport.get().start(new BrowserHeaders(metadata)); return; } this.transport.retain(); @@ -325,7 +339,7 @@ private void onClose(Event event) { return; } // each grpc transport will handle this as an error - options.getOnEnd().onInvoke(new JsError("Unexpectedly closed " + Js.uncheckedCast(event).reason)); + options.onEnd.onEnd(new JsError("Unexpectedly closed " + Js.uncheckedCast(event).reason)); removeHandlers(); } @@ -345,9 +359,9 @@ private void onMessage(Event event) { closed = false; } if (streamId == this.streamId) { - options.getOnChunk().onInvoke(new Uint8Array(messageEvent.data, 4), false); + options.onChunk.onChunk(new Uint8Array(messageEvent.data, 4)); if (closed) { - options.getOnEnd().onInvoke(null); + options.onEnd.onEnd(null); removeHandlers(); } } diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/AbstractTableSubscription.java b/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/AbstractTableSubscription.java index 5165bf5fcc2..7b27ad75b0f 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/AbstractTableSubscription.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/AbstractTableSubscription.java @@ -29,6 +29,7 @@ import io.deephaven.web.shared.data.RangeSet; import io.deephaven.web.shared.data.ShiftedRange; import io.deephaven.web.shared.fu.JsRunnable; +import jsinterop.annotations.JsMethod; import jsinterop.annotations.JsProperty; import jsinterop.base.Any; import jsinterop.base.Js; @@ -51,6 +52,7 @@ * exposed to api consumers, rather than wrapping in a Table type, as it handles the barrage stream and provides events * that client code can listen to. */ +@TsIgnore public abstract class AbstractTableSubscription extends HasEventHandling { /** * Indicates that some new data is available on the client, either an initial snapshot or a delta update. The @@ -59,9 +61,11 @@ public abstract class AbstractTableSubscription extends HasEventHandling { */ public static final String EVENT_UPDATED = "updated"; - public enum Status { + protected enum Status { /** Waiting for some prerequisite before we can use it for the first time. */ STARTING, + /** All prerequisites are met, waiting for the first snapshot to be returned. */ + SUBSCRIPTION_REQUESTED, /** Successfully created, not waiting for any messages to be accurate. */ ACTIVE, /** Waiting for an update to return from being active to being active again. */ @@ -115,7 +119,11 @@ protected void revive() { WebBarrageSubscription.ViewportChangedHandler viewportChangedHandler = this::onViewportChange; WebBarrageSubscription.DataChangedHandler dataChangedHandler = this::onDataChanged; - status = Status.ACTIVE; + status = Status.SUBSCRIPTION_REQUESTED; + + // In order to create the subscription, we need to already have the table resolved, so we know if it + // is a blink table or not. In turn, we can't be prepared to handle any messages from the server until + // we know this, so we can't race messages with this design. this.barrageSubscription = WebBarrageSubscription.subscribe( subscriptionType, state, viewportChangedHandler, dataChangedHandler); @@ -137,10 +145,6 @@ protected void revive() { JsRunnable.doNothing()); } - public Status getStatus() { - return status; - } - protected static FlatBufferBuilder subscriptionRequest(byte[] tableTicket, BitSet columns, @Nullable RangeSet viewport, BarrageSubscriptionOptions options, boolean isReverseViewport) { @@ -166,16 +170,19 @@ protected static FlatBufferBuilder subscriptionRequest(byte[] tableTicket, BitSe protected abstract void sendFirstSubscriptionRequest(); - protected void sendBarrageSubscriptionRequest(RangeSet viewport, JsArray columns, Double updateIntervalMs, + protected void sendBarrageSubscriptionRequest(@Nullable RangeSet viewport, JsArray columns, + Double updateIntervalMs, boolean isReverseViewport) { - if (status == Status.DONE) { + if (isClosed()) { if (failMsg == null) { throw new IllegalStateException("Can't change subscription, already closed"); } else { throw new IllegalStateException("Can't change subscription, already failed: " + failMsg); } } - status = Status.PENDING_UPDATE; + if (status == Status.ACTIVE) { + status = Status.PENDING_UPDATE; + } this.columns = columns; this.viewportRowSet = viewport; this.columnBitSet = makeColumnBitset(columns); @@ -212,15 +219,39 @@ protected WorkerConnection connection() { return connection; } + /** + * True if the subscription is in the ACTIVE state, meaning that the server and client are in sync with the state of + * the subscription. + */ protected boolean isSubscriptionReady() { return status == Status.ACTIVE; } + /** + * Returns true if the subscription is closed and cannot be used again, false if it is actively listening for more + * data. + */ + public boolean isClosed() { + return status == Status.DONE; + } + + /** + * Returns true if the subscription is in a state where it can be used to read data, false if still waiting for the + * server to send the first snapshot or if the subscription has been closed. + * + * @return true if the {@link #size()} method will return data based on the subscription, false if some other source + * of the table's size will be used. + */ + public boolean hasValidSize() { + return status == Status.ACTIVE || status == Status.PENDING_UPDATE; + } + + public double size() { - if (status == Status.ACTIVE) { + if (hasValidSize()) { return barrageSubscription.getCurrentSize(); } - if (status == Status.DONE) { + if (isClosed()) { throw new IllegalStateException("Can't read size when already closed"); } return state.getSize(); @@ -503,7 +534,7 @@ private void onFlightData(FlightData data) { } protected void onStreamEnd(ResponseStreamWrapper.Status status) { - if (this.status == Status.DONE) { + if (isClosed()) { return; } if (status.isTransportError()) { @@ -534,6 +565,7 @@ public JsArray getColumns() { /** * Stops the subscription on the server. */ + @JsMethod public void close() { state.unretain(this); if (doExchange != null) { diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/TableViewportSubscription.java b/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/TableViewportSubscription.java index c199be4d497..e9f1bd53233 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/TableViewportSubscription.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/TableViewportSubscription.java @@ -248,6 +248,7 @@ public void setViewport(double firstRow, double lastRow, @JsOptional @JsNullable public void setInternalViewport(double firstRow, double lastRow, Column[] columns, Double updateIntervalMs, Boolean isReverseViewport) { + // Until we've created the stream, we just cache the requested viewport if (status == Status.STARTING) { this.firstRow = firstRow; this.lastRow = lastRow; @@ -281,7 +282,7 @@ public void setInternalViewport(double firstRow, double lastRow, Column[] column */ @JsMethod public void close() { - if (status == Status.DONE) { + if (isClosed()) { JsLog.warn("TableViewportSubscription.close called on subscription that's already done."); } retained = false; @@ -300,14 +301,12 @@ public void internalClose() { reconnectSubscription.remove(); - if (retained || status == Status.DONE) { + if (retained || isClosed()) { // the JsTable has indicated it is no longer interested in this viewport, but other calling // code has retained it, keep it open for now. return; } - status = Status.DONE; - super.close(); } diff --git a/web/client-api/src/main/java/io/deephaven/web/client/ide/IdeConnection.java b/web/client-api/src/main/java/io/deephaven/web/client/ide/IdeConnection.java index bea6aacbba5..1ea9412d769 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/ide/IdeConnection.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/ide/IdeConnection.java @@ -7,7 +7,10 @@ import elemental2.core.JsArray; import elemental2.promise.Promise; import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders; +import io.deephaven.javascript.proto.dhinternal.grpcweb.Grpc; import io.deephaven.javascript.proto.dhinternal.grpcweb.grpc.Code; +import io.deephaven.javascript.proto.dhinternal.grpcweb.grpc.Transport; +import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.TransportOptions; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.TerminationNotificationResponse; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.terminationnotificationresponse.StackTrace; import io.deephaven.web.client.api.ConnectOptions; @@ -16,6 +19,10 @@ import io.deephaven.web.client.api.barrage.stream.ResponseStreamWrapper; import io.deephaven.web.client.api.console.JsVariableChanges; import io.deephaven.web.client.api.console.JsVariableDescriptor; +import io.deephaven.web.client.api.grpc.GrpcTransport; +import io.deephaven.web.client.api.grpc.GrpcTransportFactory; +import io.deephaven.web.client.api.grpc.GrpcTransportOptions; +import io.deephaven.web.client.api.grpc.MultiplexedWebsocketTransport; import io.deephaven.web.shared.data.ConnectToken; import io.deephaven.web.shared.fu.JsConsumer; import io.deephaven.web.shared.fu.JsRunnable; @@ -57,6 +64,26 @@ public IdeConnection(String serverUrl, Object connectOptions) { } else { options = new ConnectOptions(); } + if (options.transportFactory == null) { + // assign a default transport factory + if (options.useWebsockets == Boolean.TRUE || !serverUrl.startsWith("https:")) { + options.transportFactory = new MultiplexedWebsocketTransport.Factory(); + } else { + options.transportFactory = new GrpcTransportFactory() { + @Override + public GrpcTransport create(GrpcTransportOptions options) { + return GrpcTransport + .from((Transport) Grpc.FetchReadableStreamTransport.onInvoke(new Object()) + .onInvoke((TransportOptions) options)); + } + + @Override + public boolean getSupportsClientStreaming() { + return false; + } + }; + } + } } @Override diff --git a/web/client-api/src/test/java/io/deephaven/web/ClientIntegrationTestSuite.java b/web/client-api/src/test/java/io/deephaven/web/ClientIntegrationTestSuite.java index 3c1887e8a5c..3c3aa8791ca 100644 --- a/web/client-api/src/test/java/io/deephaven/web/ClientIntegrationTestSuite.java +++ b/web/client-api/src/test/java/io/deephaven/web/ClientIntegrationTestSuite.java @@ -5,6 +5,7 @@ import com.google.gwt.junit.tools.GWTTestSuite; import io.deephaven.web.client.api.*; +import io.deephaven.web.client.api.grpc.GrpcTransportTestGwt; import io.deephaven.web.client.api.storage.JsStorageServiceTestGwt; import io.deephaven.web.client.api.subscription.ConcurrentTableTestGwt; import io.deephaven.web.client.api.subscription.ViewportTestGwt; @@ -30,6 +31,7 @@ public static Test suite() { suite.addTestSuite(JsStorageServiceTestGwt.class); suite.addTestSuite(InputTableTestGwt.class); suite.addTestSuite(ColumnStatisticsTestGwt.class); + suite.addTestSuite(GrpcTransportTestGwt.class); // This should be a unit test, but it requires a browser environment to run on GWT 2.9 // GWT 2.9 doesn't have proper bindings for Promises in HtmlUnit, so we need to use the IntegrationTest suite diff --git a/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportTestGwt.java b/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportTestGwt.java new file mode 100644 index 00000000000..020c65b771a --- /dev/null +++ b/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportTestGwt.java @@ -0,0 +1,143 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.web.client.api.grpc; + +import elemental2.promise.Promise; +import io.deephaven.web.client.api.AbstractAsyncGwtTestCase; +import io.deephaven.web.client.api.ConnectOptions; +import io.deephaven.web.client.api.CoreClient; +import jsinterop.base.JsPropertyMap; + +/** + * Simple test to verify we can produce custom transports in JS. Only works with https, which means it can only be run + * manually at this time, or it will trivially succeed. + */ +public class GrpcTransportTestGwt extends AbstractAsyncGwtTestCase { + @Override + public String getModuleName() { + return "io.deephaven.web.DeephavenIntegrationTest"; + } + + /** + * Simple fetch impl, with no cancelation handling. + */ + public native GrpcTransportFactory makeFetchTransportFactory() /*-{ + return { + create: function(options) { + function pump(reader, res) { + reader.read().then(function(result) { + if (result.done) { + options.onEnd(); + } else { + options.onChunk(result.value); + pump(reader, res); + } + })['catch'](function(e) { + options.onEnd(e); + }); + } + return { + start: function(metadata) { + this.metadata = metadata; + }, + sendMessage: function(msgBytes) { + var fetchInit = { + headers: new Headers(this.metadata), + method: "POST", + body: msgBytes, + }; + $wnd.fetch(options.url.href, fetchInit).then(function(response) { + var m = {}; + response.headers.forEach(function(value, key) { + m[key] = value; + }); + options.onHeaders(m, response.status); + if (response.body) { + pump(response.body.getReader(), response); + } + return response; + })['catch'](function(e) { + options.onEnd(e); + }); + }, + finishSend: function() { + // no-op + }, + cancel: function() { + // no-op + } + }; + }, + supportsClientStreaming: false + }; + }-*/; + + public void testFetchGrpcTransport() { + if (!localServer.startsWith("https:")) { + // We're using h2, so we need to be on https for our current implementation + return; + } + setupDhInternal().then(ignore -> { + delayTestFinish(7101); + ConnectOptions connectOptions = new ConnectOptions(); + connectOptions.transportFactory = makeFetchTransportFactory(); + CoreClient coreClient = new CoreClient(localServer, connectOptions); + return coreClient.login(JsPropertyMap.of("type", CoreClient.LOGIN_TYPE_ANONYMOUS)) + .then(ignore2 -> Promise.resolve(coreClient)); + }).then(this::finish).catch_(this::report); + } + + /** + * Dummy transport that just sends a single message and receives a single message. Doesn't actually talk to the + * server, headers are empty, and the message is always 5 byte proto payload "no data", followed by trailers + * signifying success. + */ + private native GrpcTransportFactory makeDummyTransportFactory() /*-{ + return { + create: function(options) { + return { + start: function(metadata) { + // empty headers + $wnd.setTimeout(function() {options.onHeaders({}, 200);}, 0); + }, + sendMessage: function(msgBytes) { + // empty payload + var empty = new $wnd.Uint8Array(5); + // successful trailer payload + var trailersString = 'grpc-status:0'; + var successTrailers = new $wnd.Uint8Array(5 + trailersString.length); + successTrailers[0] = 128; + successTrailers[4] = trailersString.length; + new $wnd.TextEncoder('utf-8').encodeInto(trailersString, successTrailers.subarray(5)); + $wnd.setTimeout(function() { + // delay a bit, then send the empty messages and end the stream + options.onChunk(empty); + options.onChunk(successTrailers); + options.onEnd(); + }, 0); + }, + finishSend: function() { + // no-op + }, + cancel: function() { + // no-op + } + }; + }, + supportsClientStreaming: true + }; + }-*/; + + public void testDummyGrpcTransport() { + setupDhInternal().then(ignore -> { + delayTestFinish(7102); + ConnectOptions connectOptions = new ConnectOptions(); + connectOptions.transportFactory = makeDummyTransportFactory(); + connectOptions.debug = true; + CoreClient coreClient = new CoreClient(localServer, connectOptions); + return coreClient.login(JsPropertyMap.of("type", CoreClient.LOGIN_TYPE_ANONYMOUS)) + .then(ignore2 -> Promise.resolve(coreClient)); + }).then(this::finish).catch_(this::report); + } +} diff --git a/web/client-api/src/test/java/io/deephaven/web/client/api/subscription/ViewportTestGwt.java b/web/client-api/src/test/java/io/deephaven/web/client/api/subscription/ViewportTestGwt.java index 0d89e2c7917..1b13e3e5052 100644 --- a/web/client-api/src/test/java/io/deephaven/web/client/api/subscription/ViewportTestGwt.java +++ b/web/client-api/src/test/java/io/deephaven/web/client/api/subscription/ViewportTestGwt.java @@ -93,6 +93,35 @@ public void testViewportOnStaticTable() { .then(this::finish).catch_(this::report); } + public void testChangePendingViewport() { + connect(tables) + .then(table("staticTable")) + .then(table -> { + delayTestFinish(5001); + + table.setViewport(0, 25, null); + assertEquals(100.0, table.getSize()); + return Promise.resolve(table); + }) + .then(table -> { + assertEquals(100.0, table.getSize()); + table.setViewport(5, 30, null); + assertEquals(100.0, table.getSize()); + return assertUpdateReceived(table, viewport -> { + assertEquals(100.0, table.getSize()); + + assertEquals(5d, viewport.getOffset()); + assertEquals(26, viewport.getRows().length); + }, 2500); + }) + .then(table -> { + assertEquals(100.0, table.getSize()); + table.close(); + return null; + }) + .then(this::finish).catch_(this::report); + } + // TODO: https://deephaven.atlassian.net/browse/DH-11196 public void ignore_testViewportOnGrowingTable() { connect(tables) diff --git a/web/client-backplane/src/main/java/io/deephaven/javascript/proto/dhinternal/grpcweb/transports/transport/TransportOptions.java b/web/client-backplane/src/main/java/io/deephaven/javascript/proto/dhinternal/grpcweb/transports/transport/TransportOptions.java index 7bb39eda843..d9162713094 100644 --- a/web/client-backplane/src/main/java/io/deephaven/javascript/proto/dhinternal/grpcweb/transports/transport/TransportOptions.java +++ b/web/client-backplane/src/main/java/io/deephaven/javascript/proto/dhinternal/grpcweb/transports/transport/TransportOptions.java @@ -33,7 +33,7 @@ public interface OnEndFn { @JsFunction public interface OnHeadersFn { - void onInvoke(BrowserHeaders p0, double p1); + void onInvoke(BrowserHeaders p0, int p1); } @JsOverlay diff --git a/web/client-ui/Dockerfile b/web/client-ui/Dockerfile index c735bb03594..bf0a74498cd 100644 --- a/web/client-ui/Dockerfile +++ b/web/client-ui/Dockerfile @@ -2,10 +2,10 @@ FROM deephaven/node:local-build WORKDIR /usr/src/app # Most of the time, these versions are the same, except in cases where a patch only affects one of the packages -ARG WEB_VERSION=0.99.0 +ARG WEB_VERSION=0.99.2 ARG GRID_VERSION=0.99.0 ARG CHART_VERSION=0.99.0 -ARG WIDGET_VERSION=0.99.0 +ARG WIDGET_VERSION=0.99.2 # Pull in the published code-studio package from npmjs and extract is RUN set -eux; \