Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 0 additions & 29 deletions src/Storages/ObjectStorage/registerStorageObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,21 +278,6 @@ void registerStorageIceberg(StorageFactory & factory)
.has_builtin_setting_fn = DataLakeStorageSettings::hasBuiltin,
});
# endif
factory.registerStorage(
IcebergLocalDefinition::storage_engine_name,
[&](const StorageFactory::Arguments & args)
{
const auto storage_settings = getDataLakeStorageSettings(*args.storage_def);
auto configuration = std::make_shared<StorageLocalIcebergConfiguration>(storage_settings);
return createStorageObjectStorage(args, configuration);
},
{
.supports_settings = true,
.supports_sort_order = true,
.supports_schema_inference = true,
.source_access_type = AccessTypeObjects::Source::FILE,
.has_builtin_setting_fn = DataLakeStorageSettings::hasBuiltin,
});
}

#endif
Expand Down Expand Up @@ -347,20 +332,6 @@ void registerStorageDeltaLake(StorageFactory & factory)
.has_builtin_setting_fn = DataLakeStorageSettings::hasBuiltin,
});
# endif
factory.registerStorage(
DeltaLakeLocalDefinition::storage_engine_name,
[&](const StorageFactory::Arguments & args)
{
const auto storage_settings = getDataLakeStorageSettings(*args.storage_def);
auto configuration = std::make_shared<StorageLocalDeltaLakeConfiguration>(storage_settings);
return createStorageObjectStorage(args, configuration);
},
{
.supports_settings = true,
.supports_schema_inference = true,
.source_access_type = AccessTypeObjects::Source::FILE,
.has_builtin_setting_fn = StorageObjectStorageSettings::hasBuiltin,
});
}
#endif

Expand Down
7 changes: 1 addition & 6 deletions src/TableFunctions/TableFunctionObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,12 +330,7 @@ template class TableFunctionObjectStorage<HudiClusterDefinition, StorageS3HudiCo
#if USE_AVRO
void registerTableFunctionIceberg(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionIcebergLocal>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored locally.)",
.examples{{IcebergLocalDefinition::name, "SELECT * FROM icebergLocal(filename)", ""}},
.category = FunctionDocumentation::Category::TableFunction},
.allow_readonly = false});
UNUSED(factory);
}
#endif
}
7 changes: 0 additions & 7 deletions src/TableFunctions/TableFunctionObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,6 @@ void registerTableFunctionIcebergCluster(TableFunctionFactory & factory)
.category = FunctionDocumentation::Category::TableFunction},
.allow_readonly = false});

factory.registerFunction<TableFunctionIcebergLocalCluster>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on shared storage in parallel for many nodes in a specified cluster.)",
.examples{{IcebergLocalClusterDefinition::name, "SELECT * FROM icebergLocalCluster(cluster, filename, format, [,compression])", ""}},
.category = FunctionDocumentation::Category::TableFunction},
.allow_readonly = false});

# if USE_AWS_S3
factory.registerFunction<TableFunctionIcebergS3Cluster>(
{.documentation
Expand Down
20 changes: 0 additions & 20 deletions src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,26 +295,6 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa
}
);

factory.registerFunction<TableFunctionIcebergLocalClusterFallback>(
{
.documentation = {
.description=R"(The table function can be used to read the Iceberg table stored on shared disk in parallel for many nodes in a specified cluster or from single node.)",
.examples{
{
"icebergLocal",
"SELECT * FROM icebergLocal(filename)", ""
},
{
"icebergLocal",
"SELECT * FROM icebergLocal(filename) "
"SETTINGS object_storage_cluster='cluster'", ""
},
},
.category = FunctionDocumentation::Category::TableFunction
},
.allow_readonly = false
}
);
#endif

#if USE_AVRO && USE_AWS_S3
Expand Down
198 changes: 6 additions & 192 deletions tests/integration/test_storage_delta/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,19 +316,6 @@ def create_delta_table(
"""
+ allow_dynamic_metadata_for_datalakes_suffix
)
elif storage_type == "local":
# For local storage, we need to use the absolute path
user_files_path = os.path.join(
SCRIPT_DIR, f"{cluster.instances_dir_name}/node1/database/user_files"
)
table_path = os.path.join(user_files_path, table_name)
instance.query(
f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=DeltaLakeLocal('{table_path}', {format})
"""
)
else:
raise Exception(f"Unknown delta lake storage type: {storage_type}")

Expand All @@ -345,10 +332,6 @@ def default_upload_directory(
return started_cluster.default_azure_uploader.upload_directory(
local_path, remote_path, **kwargs
)
elif storage_type == "local":
return started_cluster.local_uploader.upload_directory(
local_path, remote_path, **kwargs
)
else:
raise Exception(f"Unknown delta storage type: {storage_type}")

Expand All @@ -374,7 +357,7 @@ def create_initial_data_file(

@pytest.mark.parametrize(
"use_delta_kernel, storage_type",
[("1", "s3"), ("0", "s3"), ("0", "azure"), ("1", "local")],
[("1", "s3"), ("0", "s3"), ("0", "azure")],
)
def test_single_log_file(started_cluster, use_delta_kernel, storage_type):
instance = get_node(started_cluster, use_delta_kernel)
Expand All @@ -393,7 +376,7 @@ def test_single_log_file(started_cluster, use_delta_kernel, storage_type):
table_path = os.path.join(user_files_path, TABLE_NAME)

# We need to exclude the leading slash for local storage protocol file://
delta_path = table_path if storage_type == "local" else f"/{TABLE_NAME}"
delta_path = f"/{TABLE_NAME}"
write_delta_from_file(spark, parquet_data_path, delta_path)

files = default_upload_directory(
Expand All @@ -420,7 +403,7 @@ def test_single_log_file(started_cluster, use_delta_kernel, storage_type):

@pytest.mark.parametrize(
"use_delta_kernel, storage_type",
[("1", "s3"), ("0", "s3"), ("0", "azure"), ("1", "local")],
[("1", "s3"), ("0", "s3"), ("0", "azure")],
)
def test_partition_by(started_cluster, use_delta_kernel, storage_type):
instance = get_node(started_cluster, use_delta_kernel)
Expand All @@ -435,7 +418,7 @@ def test_partition_by(started_cluster, use_delta_kernel, storage_type):
table_path = os.path.join(user_files_path, TABLE_NAME)

# We need to exclude the leading slash for local storage protocol file://
delta_path = table_path if storage_type == "local" else f"/{TABLE_NAME}"
delta_path = f"/{TABLE_NAME}"

write_delta_from_df(
spark,
Expand Down Expand Up @@ -465,7 +448,7 @@ def test_partition_by(started_cluster, use_delta_kernel, storage_type):

@pytest.mark.parametrize(
"use_delta_kernel, storage_type",
[("1", "s3"), ("0", "s3"), ("0", "azure"), ("1", "local")],
[("1", "s3"), ("0", "s3"), ("0", "azure")],
)
def test_checkpoint(started_cluster, use_delta_kernel, storage_type):
instance = get_node(started_cluster, use_delta_kernel)
Expand All @@ -480,7 +463,7 @@ def test_checkpoint(started_cluster, use_delta_kernel, storage_type):
)
table_path = os.path.join(user_files_path, TABLE_NAME)
# We need to exclude the leading slash for local storage protocol file://
delta_path = table_path if storage_type == "local" else f"/{TABLE_NAME}"
delta_path = f"/{TABLE_NAME}"

write_delta_from_df(
spark,
Expand Down Expand Up @@ -3272,141 +3255,6 @@ def insert(i):
assert len(file_names) == sum(success)


def test_writes_spark_compatibility(started_cluster):
instance = started_cluster.instances["node1"]
instance_disabled_kernel = cluster.instances["node_with_disabled_delta_kernel"]
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
table_name = randomize_table_name("test_writes")
result_file = f"{table_name}_data"

schema = pa.schema([("id", pa.int32()), ("name", pa.string())])
empty_arrays = [pa.array([], type=pa.int32()), pa.array([], type=pa.string())]
write_deltalake(
f"file:///{result_file}",
pa.Table.from_arrays(empty_arrays, schema=schema),
mode="overwrite",
)

LocalUploader(instance).upload_directory(f"/{result_file}/", f"/{result_file}/")
files = (
instance.exec_in_container(["bash", "-c", f"ls /{result_file}"])
.strip()
.split("\n")
)
assert len(files) == 1
assert "_delta_log" == files[0]
assert "" in instance.exec_in_container(
["bash", "-c", f"ls /{result_file}/_delta_log"]
)

instance.query(
f"CREATE TABLE {table_name} (id Int32, name String) ENGINE = DeltaLakeLocal('/{result_file}') SETTINGS output_format_parquet_compression_method = 'none'"
)
instance.query(
f"INSERT INTO {table_name} SELECT number, toString(number) FROM numbers(10)"
)

LocalDownloader(instance).download_directory(f"/{result_file}/", f"/{result_file}/")

files = (
instance.exec_in_container(["bash", "-c", f"ls /{result_file}"])
.strip()
.split("\n")
)
assert len(files) == 2
pfile = files[0] if files[0].endswith(".parquet") else files[1]

table = pq.read_table(f"/{result_file}/{pfile}")
df = table.to_pandas()
assert (
"0 0 0\n1 1 1\n2 2 2\n3 3 3\n4 4 4\n5 5 5\n6 6 6\n7 7 7\n8 8 8\n9 9 9"
in str(df)
)

spark = started_cluster.spark_session
df = spark.read.format("delta").load(f"/{result_file}").collect()
assert (
"[Row(id=0, name='0'), Row(id=1, name='1'), Row(id=2, name='2'), Row(id=3, name='3'), Row(id=4, name='4'), Row(id=5, name='5'), Row(id=6, name='6'), Row(id=7, name='7'), Row(id=8, name='8'), Row(id=9, name='9')]"
== str(df)
)

instance.query(
f"INSERT INTO {table_name} SELECT number, toString(number) FROM numbers(10, 10)"
)
LocalDownloader(instance).download_directory(f"/{result_file}/", f"/{result_file}/")
files = (
instance.exec_in_container(["bash", "-c", f"ls /{result_file}"])
.strip()
.split("\n")
)
assert len(files) == 3

df = spark.read.format("delta").load(f"/{result_file}").collect()
assert (
"[Row(id=10, name='10'), Row(id=11, name='11'), Row(id=12, name='12'), Row(id=13, name='13'), Row(id=14, name='14'), Row(id=15, name='15'), Row(id=16, name='16'), Row(id=17, name='17'), Row(id=18, name='18'), Row(id=19, name='19'), Row(id=0, name='0'), Row(id=1, name='1'), Row(id=2, name='2'), Row(id=3, name='3'), Row(id=4, name='4'), Row(id=5, name='5'), Row(id=6, name='6'), Row(id=7, name='7'), Row(id=8, name='8'), Row(id=9, name='9')]"
== str(df)
)


@pytest.mark.parametrize("partitioned", [False, True])
@pytest.mark.parametrize("limit_enabled", [False, True])
def test_write_limits(started_cluster, partitioned, limit_enabled):
instance = started_cluster.instances["node1"]
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
table_name = randomize_table_name("test_write_limits")
result_file = f"{table_name}_data"

schema = pa.schema([("id", pa.int32()), ("name", pa.string())])
empty_arrays = [pa.array([], type=pa.int32()), pa.array([], type=pa.string())]
write_deltalake(
f"file:///{result_file}",
pa.Table.from_arrays(empty_arrays, schema=schema),
mode="overwrite",
partition_by=["id"] if partitioned else [],
)
LocalUploader(instance).upload_directory(f"/{result_file}/", f"/{result_file}/")
files = (
instance.exec_in_container(["bash", "-c", f"ls /{result_file}"])
.strip()
.split("\n")
)
assert len(files) == 1

instance.query(
f"CREATE TABLE {table_name} (id Int32, name String) ENGINE = DeltaLakeLocal('/{result_file}') SETTINGS output_format_parquet_compression_method = 'none'"
)

num_rows = 1000000
partitions_num = 5
limit_rows = 10 if limit_enabled else (num_rows + 1)
instance.query(
f"INSERT INTO {table_name} SELECT number % {partitions_num}, randomString(10) FROM numbers({num_rows}) SETTINGS delta_lake_insert_max_rows_in_data_file = {limit_rows}, max_insert_block_size = 1000, min_chunk_bytes_for_parallel_parsing = 1000"
)

files = LocalDownloader(instance).download_directory(f"/{result_file}/", f"/{result_file}/")
data_files = [file for file in files if file.endswith(".parquet")]
assert len(data_files) > 0, f"No data files: {files}"

if partitioned:
if limit_enabled:
assert len(data_files) > partitions_num, f"Data files: {data_files}"
else:
assert len(data_files) == partitions_num, f"Data files: {data_files}"
else:
if limit_enabled:
assert len(data_files) > 1, f"Data files: {data_files}"
else:
assert len(data_files) == 1, f"Data files: {data_files}"

assert num_rows == int(instance.query(f"SELECT count() FROM {table_name}"))

spark = started_cluster.spark_session
df = spark.read.format("delta").load(f"/{result_file}")
assert df.count() == num_rows


def test_column_mapping_id(started_cluster):
node = started_cluster.instances["node1"]
table_name = randomize_table_name("test_column_mapping_id")
Expand Down Expand Up @@ -3540,37 +3388,3 @@ def s3_function(path):
)


def test_write_column_order(started_cluster):
instance = started_cluster.instances["node1"]
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
table_name = randomize_table_name("test_write_column_order")
result_file = f"{table_name}_data"
schema = pa.schema([("c1", pa.int32()), ("c0", pa.string())])
empty_arrays = [pa.array([], type=pa.int32()), pa.array([], type=pa.string())]
write_deltalake(
f"file:///{result_file}",
pa.Table.from_arrays(empty_arrays, schema=schema),
mode="overwrite",
)
LocalUploader(instance).upload_directory(f"/{result_file}/", f"/{result_file}/")

instance.query(
f"CREATE TABLE {table_name} (c0 String, c1 Int32) ENGINE = DeltaLakeLocal('/{result_file}') SETTINGS output_format_parquet_compression_method = 'none'"
)
num_rows = 10
instance.query(
f"INSERT INTO {table_name} (c1, c0) SELECT number as c1, toString(number % 2) as c0 FROM numbers(10)"
)

assert num_rows == int(instance.query(f"SELECT count() FROM {table_name}"))
assert (
"0\t0\n1\t1\n0\t2\n1\t3\n0\t4\n1\t5\n0\t6\n1\t7\n0\t8\n1\t9"
== instance.query(f"SELECT c0, c1 FROM {table_name}").strip()
)

instance.query(
f"INSERT INTO {table_name} (c1, c0) SELECT c1, c0 FROM generateRandom('c1 Int32, c0 String', 16920040705558589162, 7706, 3) LIMIT {num_rows}"
)

assert num_rows * 2 == int(instance.query(f"SELECT count() FROM {table_name}"))
Loading
Loading