diff --git a/src/Client/BuzzHouse/Generator/FuzzConfig.cpp b/src/Client/BuzzHouse/Generator/FuzzConfig.cpp index 71801adfda14..3b538c7dd939 100644 --- a/src/Client/BuzzHouse/Generator/FuzzConfig.cpp +++ b/src/Client/BuzzHouse/Generator/FuzzConfig.cpp @@ -283,10 +283,8 @@ FuzzConfig::FuzzConfig(DB::ClientBase * c, const String & path) {"hudi", allow_hudi}, {"deltalakes3", allow_deltalakeS3}, {"deltalakeazure", allow_deltalakeAzure}, - {"deltalakelocal", allow_deltalakelocal}, {"icebergs3", allow_icebergS3}, {"icebergazure", allow_icebergAzure}, - {"iceberglocal", allow_icebergLocal}, {"merge", allow_merge}, {"distributed", allow_distributed}, {"dictionary", allow_dictionary}, diff --git a/src/Client/BuzzHouse/Generator/SQLCatalog.h b/src/Client/BuzzHouse/Generator/SQLCatalog.h index 8e54a23bb85b..99d68b873d51 100644 --- a/src/Client/BuzzHouse/Generator/SQLCatalog.h +++ b/src/Client/BuzzHouse/Generator/SQLCatalog.h @@ -236,23 +236,19 @@ struct SQLBase bool isDeltaLakeAzureEngine() const { return teng == TableEngineValues::DeltaLakeAzure; } - bool isDeltaLakeLocalEngine() const { return teng == TableEngineValues::DeltaLakeLocal; } - - bool isAnyDeltaLakeEngine() const { return teng >= TableEngineValues::DeltaLakeS3 && teng <= TableEngineValues::DeltaLakeLocal; } + bool isAnyDeltaLakeEngine() const { return teng >= TableEngineValues::DeltaLakeS3 && teng <= TableEngineValues::DeltaLakeAzure; } bool isIcebergS3Engine() const { return teng == TableEngineValues::IcebergS3; } bool isIcebergAzureEngine() const { return teng == TableEngineValues::IcebergAzure; } - bool isIcebergLocalEngine() const { return teng == TableEngineValues::IcebergLocal; } - - bool isAnyIcebergEngine() const { return teng >= TableEngineValues::IcebergS3 && teng <= TableEngineValues::IcebergLocal; } + bool isAnyIcebergEngine() const { return teng >= TableEngineValues::IcebergS3 && teng <= TableEngineValues::IcebergAzure; } bool isOnS3() const { return isIcebergS3Engine() || isDeltaLakeS3Engine() || isAnyS3Engine(); } bool isOnAzure() const { return isIcebergAzureEngine() || isDeltaLakeAzureEngine() || isAnyAzureEngine(); } - bool isOnLocal() const { return isIcebergLocalEngine() || isDeltaLakeLocalEngine(); } + bool isOnLocal() const { return false; } bool isMergeEngine() const { return teng == TableEngineValues::Merge; } diff --git a/src/Client/BuzzHouse/Generator/SQLQuery.cpp b/src/Client/BuzzHouse/Generator/SQLQuery.cpp index 3773078590cb..a582963c726d 100644 --- a/src/Client/BuzzHouse/Generator/SQLQuery.cpp +++ b/src/Client/BuzzHouse/Generator/SQLQuery.cpp @@ -311,17 +311,6 @@ void StatementGenerator::setTableFunction( } afunc->set_credential(sc.named_collection); } - else if (t.isOnLocal()) - { - lfunc = tfunc->mutable_local(); - const LocalFunc_FName val = (allow_chaos && rg.nextLargeNumber() < 11) - ? static_cast(rg.randomInt(1, 2)) - : (t.isIcebergLocalEngine() ? LocalFunc_FName::LocalFunc_FName_icebergLocal - : LocalFunc_FName::LocalFunc_FName_deltaLakeLocal); - - lfunc->set_fname(val); - lfunc->set_credential("local"); - } else if (t.isFileEngine()) { FileFunc * ffunc = tfunc->mutable_file(); diff --git a/src/Client/BuzzHouse/Generator/SQLTable.cpp b/src/Client/BuzzHouse/Generator/SQLTable.cpp index 579a9c8ba272..1139478c0855 100644 --- a/src/Client/BuzzHouse/Generator/SQLTable.cpp +++ b/src/Client/BuzzHouse/Generator/SQLTable.cpp @@ -1893,14 +1893,6 @@ void StatementGenerator::getNextTableEngine(RandomGenerator & rg, bool use_exter { this->ids.emplace_back(EmbeddedRocksDB); } - if ((fc.engine_mask & allow_icebergLocal) != 0) - { - this->ids.emplace_back(IcebergLocal); - } - if ((fc.engine_mask & allow_deltalakelocal) != 0) - { - this->ids.emplace_back(DeltaLakeLocal); - } if (fc.allow_memory_tables && (fc.engine_mask & allow_memory) != 0) { this->ids.emplace_back(Memory); diff --git a/src/Client/BuzzHouse/Generator/TableSetttings.cpp b/src/Client/BuzzHouse/Generator/TableSetttings.cpp index e6c95744641f..8edd46a81615 100644 --- a/src/Client/BuzzHouse/Generator/TableSetttings.cpp +++ b/src/Client/BuzzHouse/Generator/TableSetttings.cpp @@ -514,10 +514,8 @@ void loadFuzzerTableSettings(const FuzzConfig & fc) {Hudi, {}}, {DeltaLakeS3, dataLakeSettings}, {DeltaLakeAzure, dataLakeSettings}, - {DeltaLakeLocal, dataLakeSettings}, {IcebergS3, dataLakeSettings}, {IcebergAzure, dataLakeSettings}, - {IcebergLocal, dataLakeSettings}, {Merge, {}}, {Distributed, distributedTableSettings}, {Dictionary, {}}, @@ -558,10 +556,8 @@ void loadFuzzerTableSettings(const FuzzConfig & fc) {Hudi, {}}, {DeltaLakeS3, {}}, {DeltaLakeAzure, {}}, - {DeltaLakeLocal, {}}, {IcebergS3, {}}, {IcebergAzure, {}}, - {IcebergLocal, {}}, {Merge, {}}, {Distributed, {}}, {Dictionary, {}}, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageDefinitions.h b/src/Storages/ObjectStorage/StorageObjectStorageDefinitions.h index 83fb17ab3b8a..7c4d3fa007ca 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageDefinitions.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageDefinitions.h @@ -61,12 +61,6 @@ struct IcebergAzureDefinition static constexpr auto storage_engine_name = "IcebergAzure"; }; -struct IcebergLocalDefinition -{ - static constexpr auto name = "icebergLocal"; - static constexpr auto storage_engine_name = "IcebergLocal"; -}; - struct IcebergHDFSDefinition { static constexpr auto name = "icebergHDFS"; @@ -91,12 +85,6 @@ struct DeltaLakeAzureDefinition static constexpr auto storage_engine_name = "DeltaLakeAzure"; }; -struct DeltaLakeLocalDefinition -{ - static constexpr auto name = "deltaLakeLocal"; - static constexpr auto storage_engine_name = "DeltaLakeLocal"; -}; - struct HudiDefinition { static constexpr auto name = "hudi"; diff --git a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp index 708e9ae34b9b..bcf4caf92078 100644 --- a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp @@ -265,21 +265,6 @@ void registerStorageIceberg(StorageFactory & factory) .has_builtin_setting_fn = DataLakeStorageSettings::hasBuiltin, }); # endif - factory.registerStorage( - IcebergLocalDefinition::storage_engine_name, - [&](const StorageFactory::Arguments & args) - { - const auto storage_settings = getDataLakeStorageSettings(*args.storage_def); - auto configuration = std::make_shared(storage_settings); - return createStorageObjectStorage(args, configuration); - }, - { - .supports_settings = true, - .supports_sort_order = true, - .supports_schema_inference = true, - .source_access_type = AccessTypeObjects::Source::FILE, - .has_builtin_setting_fn = DataLakeStorageSettings::hasBuiltin, - }); } #endif @@ -334,20 +319,6 @@ void registerStorageDeltaLake(StorageFactory & factory) .has_builtin_setting_fn = DataLakeStorageSettings::hasBuiltin, }); # endif - factory.registerStorage( - DeltaLakeLocalDefinition::storage_engine_name, - [&](const StorageFactory::Arguments & args) - { - const auto storage_settings = getDataLakeStorageSettings(*args.storage_def); - auto configuration = std::make_shared(storage_settings); - return createStorageObjectStorage(args, configuration); - }, - { - .supports_settings = true, - .supports_schema_inference = true, - .source_access_type = AccessTypeObjects::Source::FILE, - .has_builtin_setting_fn = StorageObjectStorageSettings::hasBuiltin, - }); } #endif diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 959ac829a8e1..02219babc730 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -388,12 +388,6 @@ void registerTableFunctionIceberg(TableFunctionFactory & factory) .category = FunctionDocumentation::Category::TableFunction}, .allow_readonly = false}); #endif - factory.registerFunction( - {.documentation - = {.description = R"(The table function can be used to read the Iceberg table stored locally.)", - .examples{{IcebergLocalDefinition::name, "SELECT * FROM icebergLocal(filename)", ""}}, - .category = FunctionDocumentation::Category::TableFunction}, - .allow_readonly = false}); } #endif @@ -426,13 +420,6 @@ void registerTableFunctionDeltaLake(TableFunctionFactory & factory) .category = FunctionDocumentation::Category::TableFunction}, .allow_readonly = false}); #endif - // Register the new local Delta Lake table function - factory.registerFunction( - {.documentation - = {.description = R"(The table function can be used to read the DeltaLake table stored locally.)", - .examples{{DeltaLakeLocalDefinition::name, "SELECT * FROM deltaLakeLocal(path)", ""}}, - .category = FunctionDocumentation::Category::TableFunction}, - .allow_readonly = false}); } #endif diff --git a/src/TableFunctions/TableFunctionObjectStorage.h b/src/TableFunctions/TableFunctionObjectStorage.h index 0b56a747ffce..ee109bb694ec 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.h +++ b/src/TableFunctions/TableFunctionObjectStorage.h @@ -133,7 +133,6 @@ using TableFunctionIcebergAzure = TableFunctionObjectStorage; # endif -using TableFunctionIcebergLocal = TableFunctionObjectStorage; #endif #if USE_PARQUET && USE_DELTA_KERNEL_RS #if USE_AWS_S3 @@ -143,8 +142,6 @@ using TableFunctionDeltaLakeS3 = TableFunctionObjectStorage; #endif -// New alias for local Delta Lake table function -using TableFunctionDeltaLakeLocal = TableFunctionObjectStorage; #endif #if USE_AWS_S3 using TableFunctionHudi = TableFunctionObjectStorage; diff --git a/tests/integration/helpers/iceberg_utils.py b/tests/integration/helpers/iceberg_utils.py index a8a662569b69..cb23349bee0e 100644 --- a/tests/integration/helpers/iceberg_utils.py +++ b/tests/integration/helpers/iceberg_utils.py @@ -237,8 +237,6 @@ def get_creation_expression( engine_part = "Azure" elif (storage_type == "hdfs"): engine_part = "HDFS" - elif (storage_type == "local"): - engine_part = "Local" if_not_exists_prefix = "" if if_not_exists: @@ -289,28 +287,6 @@ def get_creation_expression( """ ) - elif storage_type == "local": - if run_on_cluster: - assert table_function - return f""" - iceberg{engine_part}Cluster('cluster_simple', {storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format}) - """ - else: - if table_function: - return f""" - iceberg{engine_part}({storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format}) - """ - else: - return ( - f""" - DROP TABLE IF EXISTS {table_name}; - CREATE TABLE {if_not_exists_prefix} {table_name} {schema} - ENGINE=Iceberg{engine_part}({storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format}) - {partition_by} - {settings_expression} - """ - ) - else: raise Exception(f"Unknown iceberg storage type: {storage_type}") diff --git a/tests/integration/test_storage_delta/test.py b/tests/integration/test_storage_delta/test.py index 6241ed6ecd8a..8cc7900b3a1e 100644 --- a/tests/integration/test_storage_delta/test.py +++ b/tests/integration/test_storage_delta/test.py @@ -314,19 +314,6 @@ def create_delta_table( """ + allow_dynamic_metadata_for_datalakes_suffix ) - elif storage_type == "local": - # For local storage, we need to use the absolute path - user_files_path = os.path.join( - SCRIPT_DIR, f"{cluster.instances_dir_name}/node1/database/user_files" - ) - table_path = os.path.join(user_files_path, table_name) - instance.query( - f""" - DROP TABLE IF EXISTS {table_name}; - CREATE TABLE {table_name} - ENGINE=DeltaLakeLocal('{table_path}', {format}) - """ - ) else: raise Exception(f"Unknown delta lake storage type: {storage_type}") @@ -343,10 +330,6 @@ def default_upload_directory( return started_cluster.default_azure_uploader.upload_directory( local_path, remote_path, **kwargs ) - elif storage_type == "local": - return started_cluster.local_uploader.upload_directory( - local_path, remote_path, **kwargs - ) else: raise Exception(f"Unknown delta storage type: {storage_type}") @@ -372,7 +355,7 @@ def create_initial_data_file( @pytest.mark.parametrize( "use_delta_kernel, storage_type", - [("1", "s3"), ("0", "s3"), ("0", "azure"), ("1", "local")], + [("1", "s3"), ("0", "s3"), ("0", "azure")], ) def test_single_log_file(started_cluster, use_delta_kernel, storage_type): instance = get_node(started_cluster, use_delta_kernel) @@ -390,8 +373,7 @@ def test_single_log_file(started_cluster, use_delta_kernel, storage_type): ) table_path = os.path.join(user_files_path, TABLE_NAME) - # We need to exclude the leading slash for local storage protocol file:// - delta_path = table_path if storage_type == "local" else f"/{TABLE_NAME}" + delta_path = f"/{TABLE_NAME}" write_delta_from_file(spark, parquet_data_path, delta_path) files = default_upload_directory( @@ -418,7 +400,7 @@ def test_single_log_file(started_cluster, use_delta_kernel, storage_type): @pytest.mark.parametrize( "use_delta_kernel, storage_type", - [("1", "s3"), ("0", "s3"), ("0", "azure"), ("1", "local")], + [("1", "s3"), ("0", "s3"), ("0", "azure")], ) def test_partition_by(started_cluster, use_delta_kernel, storage_type): instance = get_node(started_cluster, use_delta_kernel) @@ -432,8 +414,7 @@ def test_partition_by(started_cluster, use_delta_kernel, storage_type): ) table_path = os.path.join(user_files_path, TABLE_NAME) - # We need to exclude the leading slash for local storage protocol file:// - delta_path = table_path if storage_type == "local" else f"/{TABLE_NAME}" + delta_path = f"/{TABLE_NAME}" write_delta_from_df( spark, @@ -463,7 +444,7 @@ def test_partition_by(started_cluster, use_delta_kernel, storage_type): @pytest.mark.parametrize( "use_delta_kernel, storage_type", - [("1", "s3"), ("0", "s3"), ("0", "azure"), ("1", "local")], + [("1", "s3"), ("0", "s3"), ("0", "azure")], ) def test_checkpoint(started_cluster, use_delta_kernel, storage_type): instance = get_node(started_cluster, use_delta_kernel) @@ -477,8 +458,8 @@ def test_checkpoint(started_cluster, use_delta_kernel, storage_type): SCRIPT_DIR, f"{cluster.instances_dir_name}/{instance.name}/database/user_files" ) table_path = os.path.join(user_files_path, TABLE_NAME) - # We need to exclude the leading slash for local storage protocol file:// - delta_path = table_path if storage_type == "local" else f"/{TABLE_NAME}" + + delta_path = f"/{TABLE_NAME}" write_delta_from_df( spark, @@ -3267,141 +3248,6 @@ def insert(i): assert len(file_names) == sum(success) -def test_writes_spark_compatibility(started_cluster): - instance = started_cluster.instances["node1"] - instance_disabled_kernel = cluster.instances["node_with_disabled_delta_kernel"] - minio_client = started_cluster.minio_client - bucket = started_cluster.minio_bucket - table_name = randomize_table_name("test_writes") - result_file = f"{table_name}_data" - - schema = pa.schema([("id", pa.int32()), ("name", pa.string())]) - empty_arrays = [pa.array([], type=pa.int32()), pa.array([], type=pa.string())] - write_deltalake( - f"file:///{result_file}", - pa.Table.from_arrays(empty_arrays, schema=schema), - mode="overwrite", - ) - - LocalUploader(instance).upload_directory(f"/{result_file}/", f"/{result_file}/") - files = ( - instance.exec_in_container(["bash", "-c", f"ls /{result_file}"]) - .strip() - .split("\n") - ) - assert len(files) == 1 - assert "_delta_log" == files[0] - assert "" in instance.exec_in_container( - ["bash", "-c", f"ls /{result_file}/_delta_log"] - ) - - instance.query( - f"CREATE TABLE {table_name} (id Int32, name String) ENGINE = DeltaLakeLocal('/{result_file}') SETTINGS output_format_parquet_compression_method = 'none'" - ) - instance.query( - f"INSERT INTO {table_name} SELECT number, toString(number) FROM numbers(10)" - ) - - LocalDownloader(instance).download_directory(f"/{result_file}/", f"/{result_file}/") - - files = ( - instance.exec_in_container(["bash", "-c", f"ls /{result_file}"]) - .strip() - .split("\n") - ) - assert len(files) == 2 - pfile = files[0] if files[0].endswith(".parquet") else files[1] - - table = pq.read_table(f"/{result_file}/{pfile}") - df = table.to_pandas() - assert ( - "0 0 0\n1 1 1\n2 2 2\n3 3 3\n4 4 4\n5 5 5\n6 6 6\n7 7 7\n8 8 8\n9 9 9" - in str(df) - ) - - spark = started_cluster.spark_session - df = spark.read.format("delta").load(f"/{result_file}").collect() - assert ( - "[Row(id=0, name='0'), Row(id=1, name='1'), Row(id=2, name='2'), Row(id=3, name='3'), Row(id=4, name='4'), Row(id=5, name='5'), Row(id=6, name='6'), Row(id=7, name='7'), Row(id=8, name='8'), Row(id=9, name='9')]" - == str(df) - ) - - instance.query( - f"INSERT INTO {table_name} SELECT number, toString(number) FROM numbers(10, 10)" - ) - LocalDownloader(instance).download_directory(f"/{result_file}/", f"/{result_file}/") - files = ( - instance.exec_in_container(["bash", "-c", f"ls /{result_file}"]) - .strip() - .split("\n") - ) - assert len(files) == 3 - - df = spark.read.format("delta").load(f"/{result_file}").collect() - assert ( - "[Row(id=10, name='10'), Row(id=11, name='11'), Row(id=12, name='12'), Row(id=13, name='13'), Row(id=14, name='14'), Row(id=15, name='15'), Row(id=16, name='16'), Row(id=17, name='17'), Row(id=18, name='18'), Row(id=19, name='19'), Row(id=0, name='0'), Row(id=1, name='1'), Row(id=2, name='2'), Row(id=3, name='3'), Row(id=4, name='4'), Row(id=5, name='5'), Row(id=6, name='6'), Row(id=7, name='7'), Row(id=8, name='8'), Row(id=9, name='9')]" - == str(df) - ) - - -@pytest.mark.parametrize("partitioned", [False, True]) -@pytest.mark.parametrize("limit_enabled", [False, True]) -def test_write_limits(started_cluster, partitioned, limit_enabled): - instance = started_cluster.instances["node1"] - minio_client = started_cluster.minio_client - bucket = started_cluster.minio_bucket - table_name = randomize_table_name("test_write_limits") - result_file = f"{table_name}_data" - - schema = pa.schema([("id", pa.int32()), ("name", pa.string())]) - empty_arrays = [pa.array([], type=pa.int32()), pa.array([], type=pa.string())] - write_deltalake( - f"file:///{result_file}", - pa.Table.from_arrays(empty_arrays, schema=schema), - mode="overwrite", - partition_by=["id"] if partitioned else [], - ) - LocalUploader(instance).upload_directory(f"/{result_file}/", f"/{result_file}/") - files = ( - instance.exec_in_container(["bash", "-c", f"ls /{result_file}"]) - .strip() - .split("\n") - ) - assert len(files) == 1 - - instance.query( - f"CREATE TABLE {table_name} (id Int32, name String) ENGINE = DeltaLakeLocal('/{result_file}') SETTINGS output_format_parquet_compression_method = 'none'" - ) - - num_rows = 1000000 - partitions_num = 5 - limit_rows = 10 if limit_enabled else (num_rows + 1) - instance.query( - f"INSERT INTO {table_name} SELECT number % {partitions_num}, randomString(10) FROM numbers({num_rows}) SETTINGS delta_lake_insert_max_rows_in_data_file = {limit_rows}, max_insert_block_size = 1000, min_chunk_bytes_for_parallel_parsing = 1000" - ) - - files = LocalDownloader(instance).download_directory(f"/{result_file}/", f"/{result_file}/") - data_files = [file for file in files if file.endswith(".parquet")] - assert len(data_files) > 0, f"No data files: {files}" - - if partitioned: - if limit_enabled: - assert len(data_files) > partitions_num, f"Data files: {data_files}" - else: - assert len(data_files) == partitions_num, f"Data files: {data_files}" - else: - if limit_enabled: - assert len(data_files) > 1, f"Data files: {data_files}" - else: - assert len(data_files) == 1, f"Data files: {data_files}" - - assert num_rows == int(instance.query(f"SELECT count() FROM {table_name}")) - - spark = started_cluster.spark_session - df = spark.read.format("delta").load(f"/{result_file}") - assert df.count() == num_rows - - def test_column_mapping_id(started_cluster): node = started_cluster.instances["node1"] table_name = randomize_table_name("test_column_mapping_id") @@ -3533,39 +3379,3 @@ def s3_function(path): "2025-06-04\t('100022','2025-06-04 18:40:56.000000','2025-06-09 21:19:00.364000')\t100022" == node.query(f"SELECT * FROM {table_name} ORDER BY all").strip() ) - - -def test_write_column_order(started_cluster): - instance = started_cluster.instances["node1"] - minio_client = started_cluster.minio_client - bucket = started_cluster.minio_bucket - table_name = randomize_table_name("test_write_column_order") - result_file = f"{table_name}_data" - schema = pa.schema([("c1", pa.int32()), ("c0", pa.string())]) - empty_arrays = [pa.array([], type=pa.int32()), pa.array([], type=pa.string())] - write_deltalake( - f"file:///{result_file}", - pa.Table.from_arrays(empty_arrays, schema=schema), - mode="overwrite", - ) - LocalUploader(instance).upload_directory(f"/{result_file}/", f"/{result_file}/") - - instance.query( - f"CREATE TABLE {table_name} (c0 String, c1 Int32) ENGINE = DeltaLakeLocal('/{result_file}') SETTINGS output_format_parquet_compression_method = 'none'" - ) - num_rows = 10 - instance.query( - f"INSERT INTO {table_name} (c1, c0) SELECT number as c1, toString(number % 2) as c0 FROM numbers(10)" - ) - - assert num_rows == int(instance.query(f"SELECT count() FROM {table_name}")) - assert ( - "0\t0\n1\t1\n0\t2\n1\t3\n0\t4\n1\t5\n0\t6\n1\t7\n0\t8\n1\t9" - == instance.query(f"SELECT c0, c1 FROM {table_name}").strip() - ) - - instance.query( - f"INSERT INTO {table_name} (c1, c0) SELECT c1, c0 FROM generateRandom('c1 Int32, c0 String', 16920040705558589162, 7706, 3) LIMIT {num_rows}" - ) - - assert num_rows * 2 == int(instance.query(f"SELECT count() FROM {table_name}")) diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index f28e3226b1c9..0afb31061aab 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -153,7 +153,7 @@ def started_cluster(): @pytest.mark.parametrize("format_version", ["1", "2"]) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_single_iceberg_file(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -183,7 +183,7 @@ def test_single_iceberg_file(started_cluster, format_version, storage_type): @pytest.mark.parametrize("format_version", ["1", "2"]) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_partition_by(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -219,7 +219,7 @@ def test_partition_by(started_cluster, format_version, storage_type): @pytest.mark.parametrize("format_version", ["1", "2"]) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_multiple_iceberg_files(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -279,7 +279,7 @@ def test_multiple_iceberg_files(started_cluster, format_version, storage_type): @pytest.mark.parametrize("format_version", ["1", "2"]) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_types(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -448,7 +448,7 @@ def add_df(mode): @pytest.mark.parametrize("format_version", ["1", "2"]) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_delete_files(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -534,7 +534,7 @@ def test_delete_files(started_cluster, format_version, storage_type): @pytest.mark.parametrize("use_roaring_bitmaps", [0, 1]) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_position_deletes(started_cluster, use_roaring_bitmaps, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -635,7 +635,7 @@ def get_array(query_result: str): @pytest.mark.parametrize("format_version", ["1", "2"]) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_schema_inference(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -703,7 +703,7 @@ def test_schema_inference(started_cluster, format_version, storage_type): @pytest.mark.parametrize("format_version", ["1", "2"]) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_explanation(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -758,7 +758,7 @@ def test_explanation(started_cluster, format_version, storage_type): @pytest.mark.parametrize("format_version", ["1", "2"]) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_metadata_file_selection(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -792,7 +792,7 @@ def test_metadata_file_selection(started_cluster, format_version, storage_type): assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 500 @pytest.mark.parametrize("format_version", ["1", "2"]) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_metadata_file_format_with_uuid(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -833,7 +833,7 @@ def test_metadata_file_format_with_uuid(started_cluster, format_version, storage @pytest.mark.parametrize("format_version", ["1", "2"]) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_metadata_file_selection_from_version_hint(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -1026,7 +1026,7 @@ def test_filesystem_cache(started_cluster, storage_type): @pytest.mark.parametrize( "storage_type, run_on_cluster", - [("s3", False), ("s3", True), ("azure", False), ("local", False)], + [("s3", False), ("s3", True), ("azure", False)], ) def test_partition_pruning(started_cluster, storage_type, run_on_cluster): instance = started_cluster.instances["node1"] @@ -1202,7 +1202,7 @@ def check_validity_and_get_prunned_files(select_expression): @pytest.mark.parametrize("format_version", ["1", "2"]) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_schema_evolution_with_time_travel( started_cluster, format_version, storage_type ): @@ -1388,7 +1388,7 @@ def execute_spark_query(query: str): ) @pytest.mark.parametrize("format_version", ["1", "2"]) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_iceberg_snapshot_reads(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -1600,7 +1600,7 @@ def test_metadata_cache(started_cluster, storage_type): ) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) @pytest.mark.parametrize("is_table_function", [False, True]) def test_minmax_pruning(started_cluster, storage_type, is_table_function): instance = started_cluster.instances["node1"] @@ -1816,7 +1816,7 @@ def check_validity_and_get_prunned_files(select_expression): == 1 ) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_explicit_metadata_file(started_cluster, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -1860,7 +1860,7 @@ def test_explicit_metadata_file(started_cluster, storage_type): with pytest.raises(Exception): create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, explicit_metadata_path="../metadata/v11.metadata.json") -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_minmax_pruning_with_null(started_cluster, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -1957,7 +1957,7 @@ def check_validity_and_get_prunned_files(select_expression): ) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_bucket_partition_pruning(started_cluster, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -2086,33 +2086,8 @@ def execute_spark_query(query: str): instance.query(f"SELECT * FROM {table_function_expr_cluster} WHERE a = 1") -def test_time_travel_bug_fix_validation(started_cluster): - instance = started_cluster.instances["node1"] - TABLE_NAME = "test_bucket_partition_pruning_" + get_uuid_str() - - create_iceberg_table("local", instance, TABLE_NAME, started_cluster, "(x String, y Int64)") - - instance.query(f"INSERT INTO {TABLE_NAME} VALUES ('123', 1);", settings={"allow_experimental_insert_into_iceberg": 1, "write_full_path_in_iceberg_metadata": True}) - - default_download_directory( - started_cluster, - "local", - f"/iceberg_data/default/{TABLE_NAME}/", - f"/iceberg_data/default/{TABLE_NAME}/", - ) - - first_snapshot = get_last_snapshot(f"/iceberg_data/default/{TABLE_NAME}/") - - instance.query(f"INSERT INTO {TABLE_NAME} VALUES ('123', 1);", settings={"allow_experimental_insert_into_iceberg": 1, "write_full_path_in_iceberg_metadata": True}) - - instance.query(f"SELECT count() FROM {TABLE_NAME}", settings={"iceberg_snapshot_id": first_snapshot}) - - - assert int((instance.query(f"SELECT count() FROM {TABLE_NAME}")).strip()) == 2 - - @pytest.mark.parametrize("format_version", ["1", "2"]) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_writes_complex_types(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -2132,24 +2107,8 @@ def test_writes_complex_types(started_cluster, format_version, storage_type): instance.query(f"INSERT INTO {TABLE_NAME} VALUES ([1,2], {map_value}, (3,4));", settings={"allow_experimental_insert_into_iceberg": 1}) assert instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY ALL") == '[1,2]\t{5:6}\t(3,4)\n' - if storage_type != "local": - return - - default_download_directory( - started_cluster, - storage_type, - f"/iceberg_data/default/{TABLE_NAME}/", - f"/iceberg_data/default/{TABLE_NAME}/", - ) - - with open(f"/iceberg_data/default/{TABLE_NAME}/metadata/version-hint.text", "wb") as f: - f.write(b"1") - - df = spark.read.format("iceberg").load(f"/iceberg_data/default/{TABLE_NAME}").collect() - assert len(df) == 1 - -@pytest.mark.parametrize("storage_type", ["local", "s3"]) +@pytest.mark.parametrize("storage_type", ["s3"]) def test_compressed_metadata(started_cluster, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -2189,7 +2148,7 @@ def test_compressed_metadata(started_cluster, storage_type): @pytest.mark.parametrize("format_version", ["1", "2"]) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_writes(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -2221,39 +2180,9 @@ def test_writes(started_cluster, format_version, storage_type): instance.query(f"INSERT INTO {TABLE_NAME} VALUES (456);", settings={"allow_experimental_insert_into_iceberg": 1}) assert instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY ALL") == '42\n123\n456\n' - if storage_type != "local": - return - - initial_files = default_download_directory( - started_cluster, - storage_type, - f"/iceberg_data/default/{TABLE_NAME}/", - f"/iceberg_data/default/{TABLE_NAME}/", - ) - - with open(f"/iceberg_data/default/{TABLE_NAME}/metadata/version-hint.text", "wb") as f: - f.write(b"4") - - df = spark.read.format("iceberg").load(f"/iceberg_data/default/{TABLE_NAME}").collect() - assert len(df) == 3 - - instance.query("SYSTEM ENABLE FAILPOINT iceberg_writes_cleanup") - with pytest.raises(Exception): - instance.query(f"INSERT INTO {TABLE_NAME} VALUES (777777777777);", settings={"allow_experimental_insert_into_iceberg": 1}) - - - files = default_download_directory( - started_cluster, - storage_type, - f"/iceberg_data/default/{TABLE_NAME}/", - f"/iceberg_data/default/{TABLE_NAME}/", - ) - - assert len(initial_files) == len(files) - @pytest.mark.parametrize("format_version", ["1", "2"]) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_writes_from_zero(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -2276,25 +2205,9 @@ def test_writes_from_zero(started_cluster, format_version, storage_type): instance.query(f"INSERT INTO {TABLE_NAME} VALUES (456);", settings={"allow_experimental_insert_into_iceberg": 1}) assert instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY ALL") == '123\n456\n' - if storage_type != "local": - return - - default_download_directory( - started_cluster, - storage_type, - f"/iceberg_data/default/{TABLE_NAME}/", - f"/iceberg_data/default/{TABLE_NAME}/", - ) - - with open(f"/iceberg_data/default/{TABLE_NAME}/metadata/version-hint.text", "wb") as f: - f.write(b"3") - - df = spark.read.format("iceberg").load(f"/iceberg_data/default/{TABLE_NAME}").collect() - assert len(df) == 2 - @pytest.mark.parametrize("format_version", ["1", "2"]) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_writes_with_partitioned_table(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -2354,23 +2267,8 @@ def execute_spark_query(query: str): assert instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY ALL") == '1\tAlice\t10.5\t2024-01-20\t2024-01-20 10:00:00.000000\n2\tBob\t20\t2024-01-21\t2024-01-21 11:00:00.000000\n3\tCharlie\t30.5\t2024-01-22\t2024-01-22 12:00:00.000000\n4\tDiana\t40\t2024-01-23\t2024-01-23 13:00:00.000000\n5\tEve\t50.5\t2024-01-24\t2024-01-24 14:00:00.000000\n10\tAlice\t10.5\t2024-01-20\t2024-01-20 10:00:00.000000\n20\tBob\t20\t2024-01-21\t2024-01-21 11:00:00.000000\n30\tCharlie\t30.5\t2024-01-22\t2024-01-22 12:00:00.000000\n40\tDiana\t40\t2024-01-23\t2024-01-23 13:00:00.000000\n50\tEve\t50.5\t2024-01-24\t2024-01-24 14:00:00.000000\n' - if storage_type != "local": - return - default_download_directory( - started_cluster, - storage_type, - f"/iceberg_data/default/{TABLE_NAME}/", - f"/iceberg_data/default/{TABLE_NAME}/", - ) - - with open(f"/iceberg_data/default/{TABLE_NAME}/metadata/version-hint.text", "wb") as f: - f.write(b"3") - - df = spark.read.format("iceberg").load(f"/iceberg_data/default/{TABLE_NAME}").collect() - assert len(df) == 10 - -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_minmax_pruning_for_arrays_and_maps_subfields_disabled(started_cluster, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -2441,7 +2339,7 @@ def execute_spark_query(query: str): @pytest.mark.parametrize("format_version", [1, 2]) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_writes_create_table(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -2463,25 +2361,9 @@ def test_writes_create_table(started_cluster, format_version, storage_type): instance.query(f"INSERT INTO {TABLE_NAME} VALUES (456);", settings={"allow_experimental_insert_into_iceberg": 1}) assert instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY ALL") == '123\n456\n' - if storage_type != "local": - return - - default_download_directory( - started_cluster, - storage_type, - f"/iceberg_data/default/{TABLE_NAME}/", - f"/iceberg_data/default/{TABLE_NAME}/", - ) - - with open(f"/iceberg_data/default/{TABLE_NAME}/metadata/version-hint.text", "wb") as f: - f.write(b"2") - - df = spark.read.format("iceberg").load(f"/iceberg_data/default/{TABLE_NAME}").collect() - assert len(df) == 2 - @pytest.mark.parametrize("format_version", [1, 2]) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) @pytest.mark.parametrize("partition_type", ["identity(y)", "(identity(y))", "icebergTruncate(3, y)", "(identity(y), icebergBucket(3, x))"]) def test_writes_create_partitioned_table(started_cluster, format_version, storage_type, partition_type): instance = started_cluster.instances["node1"] @@ -2495,24 +2377,8 @@ def test_writes_create_partitioned_table(started_cluster, format_version, storag instance.query(f"INSERT INTO {TABLE_NAME} VALUES ('123', 1);", settings={"allow_experimental_insert_into_iceberg": 1}) assert instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY ALL") == '123\t1\n' - if storage_type != "local": - return - default_download_directory( - started_cluster, - storage_type, - f"/iceberg_data/default/{TABLE_NAME}/", - f"/iceberg_data/default/{TABLE_NAME}/", - ) - - with open(f"/iceberg_data/default/{TABLE_NAME}/metadata/version-hint.text", "wb") as f: - f.write(b"2") - - df = spark.read.format("iceberg").load(f"/iceberg_data/default/{TABLE_NAME}").collect() - assert len(df) == 1 - - -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_relevant_iceberg_schema_chosen(started_cluster, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -2569,86 +2435,9 @@ def test_relevant_iceberg_schema_chosen(started_cluster, storage_type): instance.query(f"SELECT * FROM {table_creation_expression} WHERE b >= 2", settings={"input_format_parquet_filter_push_down": 0, "input_format_parquet_bloom_filter_push_down": 0}) -def test_writes_create_table_with_empty_data(started_cluster): - instance = started_cluster.instances["node1"] - TABLE_NAME = "test_relevant_iceberg_schema_chosen_" + get_uuid_str() - instance.query( - f"CREATE TABLE {TABLE_NAME} (c0 Int) ENGINE = IcebergLocal('/iceberg_data/default/{TABLE_NAME}/', 'CSV') AS (SELECT 1 OFFSET 1 ROW);", - settings={"allow_experimental_insert_into_iceberg": 1} - ) - -@pytest.mark.parametrize("format_version", [1, 2]) -@pytest.mark.parametrize("storage_type", ["local"]) -def test_writes_with_compression_metadata(started_cluster, format_version, storage_type): - instance = started_cluster.instances["node1"] - spark = started_cluster.spark_session - TABLE_NAME = "test_bucket_partition_pruning_" + storage_type + "_" + get_uuid_str() - - create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, "(x String, y Int64)", format_version, use_version_hint=True, compression_method="gzip") - - assert instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY ALL") == '' - instance.query(f"INSERT INTO {TABLE_NAME} VALUES ('123', 1);", settings={"allow_experimental_insert_into_iceberg": 1, "iceberg_metadata_compression_method": "gzip"}) - assert instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY ALL") == '123\t1\n' - -def test_writes_create_table_bugs(started_cluster): - instance = started_cluster.instances["node1"] - TABLE_NAME = "test_writes_create_table_bugs_" + get_uuid_str() - TABLE_NAME_1 = "test_writes_create_table_bugs_" + get_uuid_str() - instance.query( - f"CREATE TABLE {TABLE_NAME} (c0 Int) ENGINE = IcebergLocal('/iceberg_data/default/{TABLE_NAME}/', 'CSV') AS (SELECT 1 OFFSET 1 ROW);", - settings={"allow_experimental_insert_into_iceberg": 1} - ) - - error = instance.query_and_get_error( - f"CREATE TABLE {TABLE_NAME_1} (c0 Int) ENGINE = IcebergLocal('/iceberg_data/default/{TABLE_NAME_1}/', 'CSV') PARTITION BY (icebergTruncate(c0));", - settings={"allow_experimental_insert_into_iceberg": 1} - ) - - assert "BAD_ARGUMENTS" in error - assert "LOGICAL_ERROR" not in error - @pytest.mark.parametrize("format_version", [1, 2]) -@pytest.mark.parametrize("storage_type", ["local"]) -def test_writes_create_version_hint(started_cluster, format_version, storage_type): - instance = started_cluster.instances["node1"] - spark = started_cluster.spark_session - TABLE_NAME = "test_bucket_partition_pruning_" + storage_type + "_" + get_uuid_str() - - create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, "(x String, y Int64)", format_version, use_version_hint=True) - - assert instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY ALL") == '' - default_download_directory( - started_cluster, - storage_type, - f"/iceberg_data/default/{TABLE_NAME}/", - f"/iceberg_data/default/{TABLE_NAME}/", - ) - - target_suffix = b'v1.metadata.json' - with open(f"/iceberg_data/default/{TABLE_NAME}/metadata/version-hint.text", "rb") as f: - assert f.read()[-len(target_suffix):] == target_suffix - - instance.query(f"INSERT INTO {TABLE_NAME} VALUES ('123', 1);", settings={"allow_experimental_insert_into_iceberg": 1}) - assert instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY ALL", ) == '123\t1\n' - - default_download_directory( - started_cluster, - storage_type, - f"/iceberg_data/default/{TABLE_NAME}/", - f"/iceberg_data/default/{TABLE_NAME}/", - ) - - target_suffix = b'v2.metadata.json' - with open(f"/iceberg_data/default/{TABLE_NAME}/metadata/version-hint.text", "rb") as f: - assert f.read()[-len(target_suffix):] == target_suffix - - df = spark.read.format("iceberg").load(f"/iceberg_data/default/{TABLE_NAME}").collect() - assert len(df) == 1 - - -@pytest.mark.parametrize("format_version", [1, 2]) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_writes_statistics_by_minmax_pruning(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -2780,7 +2569,7 @@ def check_validity_and_get_prunned_files(select_expression): == 3 ) -@pytest.mark.parametrize("storage_type", ["local", "s3", "azure"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_optimize(started_cluster, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -2845,48 +2634,10 @@ def get_array(query_result: str): assert instance.query(f"SELECT id FROM {TABLE_NAME} ORDER BY id SETTINGS iceberg_timestamp_ms = {int(snapshot_timestamp.timestamp() * 1000)}") == instance.query( "SELECT number FROM numbers(20, 80)" ) - if storage_type != "local": - return - - default_download_directory( - started_cluster, - storage_type, - f"/iceberg_data/default/{TABLE_NAME}/", - f"/iceberg_data/default/{TABLE_NAME}/", - ) - df = spark.read.format("iceberg").load(f"/iceberg_data/default/{TABLE_NAME}").collect() - assert len(df) == 90 - - -@pytest.mark.parametrize("format_version", [1, 2]) -@pytest.mark.parametrize("storage_type", ["local"]) -def test_writes_drop_table(started_cluster, format_version, storage_type): - instance = started_cluster.instances["node1"] - spark = started_cluster.spark_session - TABLE_NAME = "test_bucket_partition_pruning_" + storage_type + "_" + get_uuid_str() - - create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, "(x String, y Int64)", format_version, use_version_hint=True) - - instance.query(f"INSERT INTO {TABLE_NAME} VALUES ('123', 1);", settings={"allow_experimental_insert_into_iceberg": 1}) - assert instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY ALL", ) == '123\t1\n' - - drop_iceberg_table(instance, TABLE_NAME) - with pytest.raises(Exception): - drop_iceberg_table(instance, TABLE_NAME) - drop_iceberg_table(instance, TABLE_NAME, True) - - files = default_download_directory( - started_cluster, - storage_type, - f"/iceberg_data/default/{TABLE_NAME}/", - f"/iceberg_data/default/{TABLE_NAME}/", - ) - # drop should not delete user data - assert len(files) > 0 @pytest.mark.parametrize("format_version", [1, 2]) -@pytest.mark.parametrize("storage_type", ["s3", "local", "azure"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_writes_schema_evolution(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -2916,21 +2667,8 @@ def test_writes_schema_evolution(started_cluster, format_version, storage_type): assert instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY ALL") == '4.5600000000000005\n\\N\n' - if storage_type != "local": - return - - default_download_directory( - started_cluster, - storage_type, - f"/iceberg_data/default/{TABLE_NAME}/", - f"/iceberg_data/default/{TABLE_NAME}/", - ) - - df = spark.read.format("iceberg").load(f"/iceberg_data/default/{TABLE_NAME}").collect() - assert len(df) == 2 - -@pytest.mark.parametrize("storage_type", ["s3", "local", "azure"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) @pytest.mark.parametrize("partition_type", ["", "identity(x)", "icebergBucket(3, x)"]) def test_writes_mutate_delete(started_cluster, storage_type, partition_type): format_version = 2 @@ -2959,31 +2697,6 @@ def test_writes_mutate_delete(started_cluster, storage_type, partition_type): instance.query(f"ALTER TABLE {TABLE_NAME} DELETE WHERE x = '999';", settings={"allow_experimental_insert_into_iceberg": 1}) assert instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY ALL") == '456\n' - if storage_type != "local": - return - initial_files = default_download_directory( - started_cluster, - storage_type, - f"/iceberg_data/default/{TABLE_NAME}/", - f"/iceberg_data/default/{TABLE_NAME}/", - ) - - instance.query("SYSTEM ENABLE FAILPOINT iceberg_writes_cleanup") - with pytest.raises(Exception): - instance.query(f"ALTER TABLE {TABLE_NAME} DELETE WHERE x = '456';", settings={"allow_experimental_insert_into_iceberg": 1}) - - files = default_download_directory( - started_cluster, - storage_type, - f"/iceberg_data/default/{TABLE_NAME}/", - f"/iceberg_data/default/{TABLE_NAME}/", - ) - - assert len(initial_files) == len(files) - - df = spark.read.format("iceberg").load(f"/iceberg_data/default/{TABLE_NAME}").collect() - assert len(df) == 1 - class PrunedInfo: def __init__(self, not_pruned, partition_pruned, min_max_index_pruned): self.not_pruned = not_pruned @@ -3126,7 +2839,7 @@ def get_prunned_info_from_profile_events(instance, query_id: str): @pytest.mark.parametrize("format_version", ["1", "2"]) -@pytest.mark.parametrize("storage_type", ["s3", "local", "azure"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_system_iceberg_metadata(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session @@ -3203,7 +2916,7 @@ def execute_spark_query(query: str): @pytest.mark.parametrize( "storage_type", - ["s3", "azure", "local"], + ["s3", "azure"], ) def test_partition_pruning_with_subquery_set(started_cluster, storage_type): instance = started_cluster.instances["node1"] @@ -3269,18 +2982,3 @@ def check_validity_and_get_prunned_files(select_expression): ) == 3 ) - - -def test_iceberg_write_minmax(started_cluster): - instance = started_cluster.instances["node1"] - TABLE_NAME = "test_iceberg_write_minmax_" + get_uuid_str() - - create_iceberg_table("local", instance, TABLE_NAME, started_cluster, "(x Int32, y Int32)", partition_by="identity(x)") - - instance.query(f"INSERT INTO {TABLE_NAME} VALUES (1, 1), (1, 2)", settings={"allow_experimental_insert_into_iceberg": 1}) - - res = instance.query(f"SELECT x,y FROM {TABLE_NAME} WHERE y=1 ORDER BY ALL").strip() - assert res == "1\t1" - - res = instance.query(f"SELECT x,y FROM {TABLE_NAME} WHERE y=2 ORDER BY ALL").strip() - assert res == "1\t2" diff --git a/tests/queries/0_stateless/03581_iceberg_parse_partition.sql b/tests/queries/0_stateless/03581_iceberg_parse_partition.sql deleted file mode 100644 index ba96468422c1..000000000000 --- a/tests/queries/0_stateless/03581_iceberg_parse_partition.sql +++ /dev/null @@ -1,3 +0,0 @@ --- Tags: no-fasttest - -CREATE TABLE t0 (c0 Nullable(Int)) ENGINE = IcebergLocal('/file0') PARTITION BY (`c0.null` IS NULL); -- { serverError BAD_ARGUMENTS } diff --git a/tests/queries/0_stateless/03784_bad_base_backup.sh b/tests/queries/0_stateless/03784_bad_base_backup.sh index d95c3fd6f10e..655cbfadde0a 100755 --- a/tests/queries/0_stateless/03784_bad_base_backup.sh +++ b/tests/queries/0_stateless/03784_bad_base_backup.sh @@ -31,7 +31,7 @@ function thread() CREATE TABLE d1_$CLICKHOUSE_DATABASE.\"t4\" (\"c0\" Array(UInt64), \"c1\" Nullable(UInt256), \"c2\" Nullable(UInt8), \"c3\" LowCardinality(Nullable(Time))) ENGINE = Set(); - CREATE TABLE d2_$CLICKHOUSE_DATABASE.\"t9\" (\"c0\" String COMMENT '😉', \"c1\" String, \"c2\" Nullable(UInt256), \"c3\" Nullable(JSON(max_dynamic_types=21))) ENGINE = DeltaLakeLocal('${CLICKHOUSE_USER_FILES_UNIQUE}_${I}', Parquet) SETTINGS iceberg_recent_metadata_file_by_last_updated_ms_field = 0; + CREATE TABLE d2_$CLICKHOUSE_DATABASE.\"t9\" (\"c0\" String COMMENT '😉', \"c1\" String, \"c2\" Nullable(UInt256), \"c3\" Nullable(JSON(max_dynamic_types=21))) ENGINE = MergeTree() ORDER BY tuple(); BACKUP DATABASE d2_$CLICKHOUSE_DATABASE TO File('${CLICKHOUSE_DATABASE}_${I}/backup0.tar.zst') SETTINGS query_plan_enable_optimizations = 1, max_network_bandwidth = 32768, hdfs_skip_empty_files = 0, format_binary_max_array_size = 8064, force_remove_data_recursively_on_drop = 1, input_format_orc_filter_push_down = 1, parallel_replicas_index_analysis_only_on_coordinator = 1, max_threads_for_indexes = 13, output_format_decimal_trailing_zeros = 0, remote_filesystem_read_prefetch = 0, output_format_parquet_geometadata = 1, optimize_extract_common_expressions = 0, merge_tree_min_rows_for_seek = 5313, max_bytes_ratio_before_external_group_by = 0.010000, optimize_respect_aliases = 0, use_skip_indexes = 0, join_to_sort_minimum_perkey_rows = 5969, parallel_replicas_for_cluster_engines = 0, rewrite_in_to_join = 0, merge_tree_min_bytes_for_seek = 0, parallel_replica_offset = 6, update_insert_deduplication_token_in_dependent_materialized_views = 0, distributed_aggregation_memory_efficient = 0, max_number_of_partitions_for_independent_aggregation = 4222, implicit_select = 0, max_result_bytes = 0, delta_lake_throw_on_engine_predicate_error = 0, apply_row_policy_after_final = 0, input_format_defaults_for_omitted_fields = 1, filesystem_cache_enable_background_download_during_fetch = 1, output_format_sql_insert_include_column_names = 1, input_format_tsv_detect_header = 1, asterisk_include_alias_columns = 0, show_create_query_identifier_quoting_rule = 'always', log_formatted_queries = 1, database_atomic_wait_for_drop_and_detach_synchronously = 0, hdfs_ignore_file_doesnt_exist = 1, cloud_mode = 1 ASYNC;