Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/IO/S3/URI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <boost/algorithm/string/case_conv.hpp>
#include <Poco/Util/AbstractConfiguration.h>


namespace DB
{

Expand All @@ -20,7 +19,9 @@ struct URIConverter
static void modifyURI(Poco::URI & uri, std::unordered_map<std::string, std::string> mapper)
{
Macros macros({{"bucket", uri.getHost()}});
uri = macros.expand(mapper[uri.getScheme()]).empty() ? uri : Poco::URI(macros.expand(mapper[uri.getScheme()]) + uri.getPathAndQuery());
uri = macros.expand(mapper[uri.getScheme()]).empty()
? uri
: Poco::URI(macros.expand(mapper[uri.getScheme()]) + uri.getPathAndQuery());
}
};

Expand Down
3 changes: 2 additions & 1 deletion src/Storages/ObjectStorage/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,10 @@ std::pair<DB::ObjectStoragePtr, std::string> resolveObjectStorageForPath(
{
normalized_path = "s3://" + target_decomposed.authority + "/" + target_decomposed.key;
}
// enable_url_encoding=false, path from metadata must have correct encoding already
S3::URI s3_uri(normalized_path);

std::string key_to_use = s3_uri.key;
std::string key_to_use = target_decomposed.key;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve parsed S3 key for path-style URLs

Using target_decomposed.key here regresses path-style S3 HTTP(S) URIs (for example, https://s3.amazonaws.com/<bucket>/<key>): SchemeAuthorityKey keeps bucket/<key> in key, while s3_uri has already split bucket and object key correctly. The matching logic still uses s3_uri for bucket/endpoint checks, so when a storage match is found we now return a key with an extra bucket prefix, which makes object lookups fail with missing-file errors for those URLs.

Useful? React with 👍 / 👎.


bool use_base_storage = false;
if (base_storage->getType() == ObjectStorageType::S3)
Expand Down
93 changes: 91 additions & 2 deletions tests/integration/test_database_iceberg/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import random
import time
import uuid
from datetime import datetime, timedelta
from datetime import datetime, timedelta, time as dtime

import pyarrow as pa
import pytest
Expand All @@ -26,7 +26,8 @@
StringType,
StructType,
TimestampType,
TimestamptzType
TimestamptzType,
TimeType,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER

Expand Down Expand Up @@ -939,3 +940,91 @@ def test_cluster_select(started_cluster):
assert len(cluster_secondary_queries) == 1

assert node2.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`", settings={"parallel_replicas_for_cluster_engines":1, 'enable_parallel_replicas': 2, 'cluster_for_parallel_replicas': 'cluster_simple', 'parallel_replicas_for_cluster_engines' : 1}) == 'pablo\n'


@pytest.mark.parametrize("storage_type", ["s3"])
def test_partitioning_by_time(started_cluster, storage_type):
node = started_cluster.instances["node1"]

test_ref = f"test_partitioning_by_time_{uuid.uuid4()}"
table_name = f"{test_ref}_table"
root_namespace = f"{test_ref}_namespace"

namespace = f"{root_namespace}.A"
catalog = load_catalog_impl(started_cluster)
catalog.create_namespace(namespace)

schema = Schema(
NestedField(
field_id=1,
name="key",
field_type=TimeType(),
required=False
),
NestedField(
field_id=2,
name="value",
field_type=StringType(),
required=False,
),
)

partition_spec = PartitionSpec(
PartitionField(
source_id=1, field_id=1000, transform=IdentityTransform(), name="partition_key"
)
)

table = create_table(catalog, namespace, table_name, schema=schema, partition_spec=partition_spec)
data = [{"key": dtime(12,0,0), "value": "test"}]
df = pa.Table.from_pylist(data)
table.append(df)

create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)

# Fix test when https://github.com/Altinity/ClickHouse/issues/15355 is resolved
# Must be 43200
assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}`") == "43200000000\ttest\n"


@pytest.mark.parametrize("storage_type", ["s3"])
def test_partitioning_by_string(started_cluster, storage_type):
node = started_cluster.instances["node1"]

test_ref = f"test_partitioning_by_string_{uuid.uuid4()}"
table_name = f"{test_ref}_table"
root_namespace = f"{test_ref}_namespace"

namespace = f"{root_namespace}.A"
catalog = load_catalog_impl(started_cluster)
catalog.create_namespace(namespace)

schema = Schema(
NestedField(
field_id=1,
name="key",
field_type=StringType(),
required=False
),
NestedField(
field_id=2,
name="value",
field_type=StringType(),
required=False,
),
)

partition_spec = PartitionSpec(
PartitionField(
source_id=1, field_id=1000, transform=IdentityTransform(), name="partition_key"
)
)

table = create_table(catalog, namespace, table_name, schema=schema, partition_spec=partition_spec)
data = [{"key": "a:b,c[d=e/f%g?h", "value": "test"}]
df = pa.Table.from_pylist(data)
table.append(df)

create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)

assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}`") == "a:b,c[d=e/f%g?h\ttest\n"
Loading