Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/rc/v0.37.x' into 5996-rst-stre…
Browse files Browse the repository at this point in the history
…am-cancel-2
  • Loading branch information
niloc132 committed Dec 12, 2024
2 parents 16d4854 + e67510f commit 2ff2aef
Showing 41 changed files with 961 additions and 134 deletions.
2 changes: 1 addition & 1 deletion R/rdeephaven/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: rdeephaven
Type: Package
Title: R Client for Deephaven Core
Version: 0.37.0
Version: 0.37.1
Date: 2023-05-12
Author: Deephaven Data Labs
Maintainer: Alex Peters <alexpeters@deephaven.io>
2 changes: 1 addition & 1 deletion cpp-client/deephaven/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ endif()

project(deephaven)

set(deephaven_VERSION 0.37.0)
set(deephaven_VERSION 0.37.1)
set(CMAKE_CXX_STANDARD 17)

# for CMAKE_INSTALL_{dir}
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@
//
package io.deephaven.engine.table.impl.preview;

import io.deephaven.util.type.TypeUtils;
import io.deephaven.vector.Vector;
import io.deephaven.vector.VectorFactory;
import org.jetbrains.annotations.NotNull;
@@ -34,7 +35,9 @@ public static ArrayPreview fromArray(final Object array) {
if (componentType == boolean.class) {
return new ArrayPreview(convertToString((boolean[]) array));
}
return new ArrayPreview(VectorFactory.forElementType(componentType)
// Boxed primitives need the Object wrapper.
final Class<?> elementType = TypeUtils.isBoxedType(componentType) ? Object.class : componentType;
return new ArrayPreview(VectorFactory.forElementType(elementType)
.vectorWrap(array)
.toString(ARRAY_SIZE_CUTOFF));
}
Original file line number Diff line number Diff line change
@@ -3,19 +3,14 @@
//
package io.deephaven.iceberg.base;

import io.deephaven.base.Pair;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.context.QueryScope;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.iceberg.util.IcebergReadInstructions;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -26,7 +21,6 @@
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.jetbrains.annotations.NotNull;
Original file line number Diff line number Diff line change
@@ -11,32 +11,31 @@
import io.deephaven.iceberg.util.IcebergTableAdapter;
import io.deephaven.iceberg.internal.DataInstructionsProviderLoader;
import io.deephaven.util.type.TypeUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.iceberg.*;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.jetbrains.annotations.NotNull;

import java.net.URI;
import java.util.*;
import java.util.stream.Collectors;

/**
* Iceberg {@link TableLocationKeyFinder location finder} for tables with partitions that will discover data files from
* a {@link Snapshot}
*/
public final class IcebergKeyValuePartitionedLayout extends IcebergBaseLayout {
private static class ColumnData {
private static class IdentityPartitioningColData {
final String name;
final Class<?> type;
final int index;
final int index; // position in the partition spec

public ColumnData(String name, Class<?> type, int index) {
private IdentityPartitioningColData(String name, Class<?> type, int index) {
this.name = name;
this.type = type;
this.index = index;
}
}

private final List<ColumnData> outputPartitioningColumns;
private final List<IdentityPartitioningColData> identityPartitioningColumns;

/**
* @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table.
@@ -52,26 +51,26 @@ public IcebergKeyValuePartitionedLayout(

// We can assume due to upstream validation that there are no duplicate names (after renaming) that are included
// in the output definition, so we can ignore duplicates.
final MutableInt icebergIndex = new MutableInt(0);
final Map<String, Integer> availablePartitioningColumns = partitionSpec.fields().stream()
.map(PartitionField::name)
.map(name -> instructions.columnRenames().getOrDefault(name, name))
.collect(Collectors.toMap(
name -> name,
name -> icebergIndex.getAndIncrement(),
(v1, v2) -> v1,
LinkedHashMap::new));
final List<PartitionField> partitionFields = partitionSpec.fields();
final int numPartitionFields = partitionFields.size();
identityPartitioningColumns = new ArrayList<>(numPartitionFields);
for (int fieldId = 0; fieldId < numPartitionFields; ++fieldId) {
final PartitionField partitionField = partitionFields.get(fieldId);
if (!partitionField.transform().isIdentity()) {
// TODO (DH-18160): Improve support for handling non-identity transforms
continue;
}
final String icebergColName = partitionField.name();
final String dhColName = instructions.columnRenames().getOrDefault(icebergColName, icebergColName);
final ColumnDefinition<?> columnDef = tableDef.getColumn(dhColName);
if (columnDef == null) {
// Table definition provided by the user doesn't have this column, so skip.
continue;
}
identityPartitioningColumns.add(new IdentityPartitioningColData(dhColName,
TypeUtils.getBoxedType(columnDef.getDataType()), fieldId));

outputPartitioningColumns = tableDef.getColumnStream()
.map((final ColumnDefinition<?> columnDef) -> {
final Integer index = availablePartitioningColumns.get(columnDef.getName());
if (index == null) {
return null;
}
return new ColumnData(columnDef.getName(), TypeUtils.getBoxedType(columnDef.getDataType()), index);
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
}

@Override
@@ -87,13 +86,20 @@ IcebergTableLocationKey keyFromDataFile(
final Map<String, Comparable<?>> partitions = new LinkedHashMap<>();

final PartitionData partitionData = (PartitionData) dataFile.partition();
for (final ColumnData colData : outputPartitioningColumns) {
for (final IdentityPartitioningColData colData : identityPartitioningColumns) {
final String colName = colData.name;
final Object colValue = partitionData.get(colData.index);
if (colValue != null && !colData.type.isAssignableFrom(colValue.getClass())) {
throw new TableDataException("Partitioning column " + colName
+ " has type " + colValue.getClass().getName()
+ " but expected " + colData.type.getName());
final Object colValue;
final Object valueFromPartitionData = partitionData.get(colData.index);
if (valueFromPartitionData != null) {
colValue = IdentityPartitionConverters.convertConstant(
partitionData.getType(colData.index), valueFromPartitionData);
if (!colData.type.isAssignableFrom(colValue.getClass())) {
throw new TableDataException("Partitioning column " + colName
+ " has type " + colValue.getClass().getName()
+ " but expected " + colData.type.getName());
}
} else {
colValue = null;
}
partitions.put(colName, (Comparable<?>) colValue);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.iceberg;

import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.testutil.TstUtils;
import io.deephaven.engine.util.TableTools;
import io.deephaven.iceberg.sqlite.DbResource;
import io.deephaven.iceberg.util.IcebergCatalogAdapter;
import io.deephaven.iceberg.util.IcebergTableAdapter;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.time.LocalDateTime;
import java.net.URISyntaxException;
import java.util.List;
import static io.deephaven.util.QueryConstants.NULL_DOUBLE;

import static org.assertj.core.api.Assertions.assertThat;

/**
* This test shows that we can integrate with data written by <a href="https://py.iceberg.apache.org/">pyiceberg</a>.
* See TESTING.md and generate-pyiceberg-2.py for more details.
*/
@Tag("security-manager-allow")
class Pyiceberg2Test {
private static final Namespace NAMESPACE = Namespace.of("trading");
private static final TableIdentifier TRADING_DATA = TableIdentifier.of(NAMESPACE, "data");

// This will need to be updated if the data is regenerated
private static final long SNAPSHOT_1_ID = 2806418501596315192L;

private static final TableDefinition TABLE_DEFINITION = TableDefinition.of(
ColumnDefinition.fromGenericType("datetime", LocalDateTime.class),
ColumnDefinition.ofString("symbol").withPartitioning(),
ColumnDefinition.ofDouble("bid"),
ColumnDefinition.ofDouble("ask"));

private IcebergCatalogAdapter catalogAdapter;

@BeforeEach
void setUp() throws URISyntaxException {
catalogAdapter = DbResource.openCatalog("pyiceberg-2");
}

@Test
void catalogInfo() {
assertThat(catalogAdapter.listNamespaces()).containsExactly(NAMESPACE);
assertThat(catalogAdapter.listTables(NAMESPACE)).containsExactly(TRADING_DATA);

final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(TRADING_DATA);
final List<Snapshot> snapshots = tableAdapter.listSnapshots();
assertThat(snapshots).hasSize(1);
{
final Snapshot snapshot = snapshots.get(0);
assertThat(snapshot.parentId()).isNull();
assertThat(snapshot.schemaId()).isEqualTo(0);
assertThat(snapshot.sequenceNumber()).isEqualTo(1L);
assertThat(snapshot.snapshotId()).isEqualTo(SNAPSHOT_1_ID);
}
}

@Test
void testDefinition() {
final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(TRADING_DATA);
final TableDefinition td = tableAdapter.definition();
assertThat(td).isEqualTo(TABLE_DEFINITION);

// Check the partition spec
final PartitionSpec partitionSpec = tableAdapter.icebergTable().spec();
assertThat(partitionSpec.fields().size()).isEqualTo(2);
final PartitionField firstPartitionField = partitionSpec.fields().get(0);
assertThat(firstPartitionField.name()).isEqualTo("datetime_day");
assertThat(firstPartitionField.transform().toString()).isEqualTo("day");

final PartitionField secondPartitionField = partitionSpec.fields().get(1);
assertThat(secondPartitionField.name()).isEqualTo("symbol");
assertThat(secondPartitionField.transform().toString()).isEqualTo("identity");
}

@Test
void testData() {
final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(TRADING_DATA);
final Table fromIceberg = tableAdapter.table();
assertThat(fromIceberg.size()).isEqualTo(5);
final Table expectedData = TableTools.newTable(TABLE_DEFINITION,
TableTools.col("datetime",
LocalDateTime.of(2024, 11, 27, 10, 0, 0),
LocalDateTime.of(2024, 11, 27, 10, 0, 0),
LocalDateTime.of(2024, 11, 26, 10, 1, 0),
LocalDateTime.of(2024, 11, 26, 10, 2, 0),
LocalDateTime.of(2024, 11, 28, 10, 3, 0)),
TableTools.stringCol("symbol", "AAPL", "MSFT", "GOOG", "AMZN", "MSFT"),
TableTools.doubleCol("bid", 150.25, 150.25, 2800.75, 3400.5, NULL_DOUBLE),
TableTools.doubleCol("ask", 151.0, 151.0, 2810.5, 3420.0, 250.0));
TstUtils.assertTableEquals(expectedData.sort("datetime", "symbol"),
fromIceberg.sort("datetime", "symbol"));
}
}
Original file line number Diff line number Diff line change
@@ -754,14 +754,7 @@ void testPartitionedAppendWithAllPartitioningTypes() {
"DoublePC = (double) 4.0",
"LocalDatePC = LocalDate.parse(`2023-10-01`)")
.moveColumns(7, "data");

// TODO (deephaven-core#6419) Dropping the local data column since it is not supported on the read side.
// Remove this when the issue is fixed.
final TableDefinition tableDefinitionWithoutLocalDate = fromIceberg.dropColumns("LocalDatePC").getDefinition();
final Table fromIcebergWithoutLocalDate = tableAdapter.table(IcebergReadInstructions.builder()
.tableDefinition(tableDefinitionWithoutLocalDate)
.build());
assertTableEquals(expected.dropColumns("LocalDatePC"), fromIcebergWithoutLocalDate);
assertTableEquals(expected, fromIceberg);
}

@Test
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"location":"catalogs/pyiceberg-2/trading.db/data","table-uuid":"d1f874d4-c065-432d-969b-39efb3e9eb1c","last-updated-ms":1733868694938,"last-column-id":4,"schemas":[{"type":"struct","fields":[{"id":1,"name":"datetime","type":"timestamp","required":false},{"id":2,"name":"symbol","type":"string","required":false},{"id":3,"name":"bid","type":"double","required":false},{"id":4,"name":"ask","type":"double","required":false}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"day","name":"datetime_day"},{"source-id":2,"field-id":1001,"transform":"identity","name":"symbol"}]}],"default-spec-id":0,"last-partition-id":1001,"properties":{},"snapshots":[],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":2,"last-sequence-number":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"location":"catalogs/pyiceberg-2/trading.db/data","table-uuid":"d1f874d4-c065-432d-969b-39efb3e9eb1c","last-updated-ms":1733868695120,"last-column-id":4,"schemas":[{"type":"struct","fields":[{"id":1,"name":"datetime","type":"timestamp","required":false},{"id":2,"name":"symbol","type":"string","required":false},{"id":3,"name":"bid","type":"double","required":false},{"id":4,"name":"ask","type":"double","required":false}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"day","name":"datetime_day"},{"source-id":2,"field-id":1001,"transform":"identity","name":"symbol"}]}],"default-spec-id":0,"last-partition-id":1001,"properties":{},"current-snapshot-id":2806418501596315192,"snapshots":[{"snapshot-id":2806418501596315192,"sequence-number":1,"timestamp-ms":1733868695120,"manifest-list":"catalogs/pyiceberg-2/trading.db/data/metadata/snap-2806418501596315192-0-d9c06748-9892-404f-a744-7bbfd06d0eeb.avro","summary":{"operation":"append","added-files-size":"9816","added-data-files":"5","added-records":"5","changed-partition-count":"5","total-data-files":"5","total-delete-files":"0","total-records":"5","total-files-size":"9816","total-position-deletes":"0","total-equality-deletes":"0"},"schema-id":0}],"snapshot-log":[{"snapshot-id":2806418501596315192,"timestamp-ms":1733868695120}],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{"main":{"snapshot-id":2806418501596315192,"type":"branch"}},"format-version":2,"last-sequence-number":1}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
'''
See TESTING.md for how to run this script.
'''

from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, DoubleType
from pyiceberg.catalog.sql import SqlCatalog
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
'''
See TESTING.md for how to run this script.
'''

import pyarrow as pa
from datetime import datetime
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.schema import Schema
from pyiceberg.types import TimestampType, FloatType, DoubleType, StringType, NestedField, StructType
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform, IdentityTransform

catalog = SqlCatalog(
"pyiceberg-2",
**{
"uri": f"sqlite:///dh-iceberg-test.db",
"warehouse": f"catalogs/pyiceberg-2",
},
)

schema = Schema(
NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=False),
NestedField(field_id=2, name="symbol", field_type=StringType(), required=False),
NestedField(field_id=3, name="bid", field_type=DoubleType(), required=False),
NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False),
)

partition_spec = PartitionSpec(
PartitionField(
source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day",
),
PartitionField(
source_id=2, field_id=1001, transform=IdentityTransform(), name="symbol",
)
)

catalog.create_namespace("trading")

tbl = catalog.create_table(
identifier="trading.data",
schema=schema,
partition_spec=partition_spec,
)

# Define the data according to your Iceberg schema
data = [
{"datetime": datetime(2024, 11, 27, 10, 0, 0), "symbol": "AAPL", "bid": 150.25, "ask": 151.0},
{"datetime": datetime(2024, 11, 27, 10, 0, 0), "symbol": "MSFT", "bid": 150.25, "ask": 151.0},
{"datetime": datetime(2024, 11, 26, 10, 1, 0), "symbol": "GOOG", "bid": 2800.75, "ask": 2810.5},
{"datetime": datetime(2024, 11, 26, 10, 2, 0), "symbol": "AMZN", "bid": 3400.5, "ask": 3420.0},
{"datetime": datetime(2024, 11, 28, 10, 3, 0), "symbol": "MSFT", "bid": None, "ask": 250.0},
]

# Create a PyArrow Table
table = pa.Table.from_pylist(data)

# Append the table to the Iceberg table
tbl.append(table)
Loading

0 comments on commit 2ff2aef

Please sign in to comment.