Skip to content

Commit 40b252b

Browse files
committed
Small fixes
1 parent ced1375 commit 40b252b

4 files changed

Lines changed: 25 additions & 15 deletions

File tree

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,9 @@ std::vector<uint8_t> dumpFieldToBytes(const Field & field, DataTypePtr type)
155155
case TypeIndex::Int32:
156156
case TypeIndex::Date:
157157
case TypeIndex::Date32:
158+
case TypeIndex::Time:
158159
return dumpValue(field.safeGet<Int32>());
159160
case TypeIndex::Int64:
160-
case TypeIndex::Time:
161161
return dumpValue(field.safeGet<Int64>());
162162
case TypeIndex::Time64:
163163
case TypeIndex::DateTime64:
@@ -423,8 +423,8 @@ void generateManifestFile(
423423

424424
case Field::Types::Decimal64:
425425
{
426-
auto type_id = partition_types[i]->getTypeId();
427-
if (type_id == TypeIndex::Time64 || type_id == TypeIndex::Time)
426+
const WhichDataType which(*partition_types[i]);
427+
if (which.isTime64())
428428
{ /// Need to write logical type into Avro
429429
auto scale = getDecimalScale(*partition_types[i]);
430430
if (scale == 0)
@@ -442,7 +442,7 @@ void generateManifestFile(
442442
{
443443
throw Exception(
444444
ErrorCodes::BAD_ARGUMENTS,
445-
"Avro file supports only seconds, milliseconds and microsecods for time, partition precision: {}", scale);
445+
"Avro file supports only seconds, milliseconds and microseconds for time, partition precision: {}", scale);
446446
}
447447
}
448448
else

src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -484,8 +484,17 @@ Poco::Dynamic::Var getAvroType(DataTypePtr type)
484484
case TypeIndex::Int64:
485485
case TypeIndex::DateTime:
486486
case TypeIndex::DateTime64:
487-
case TypeIndex::Time64:
488487
return "long";
488+
case TypeIndex::Time64:
489+
{
490+
auto scale = getDecimalScale(*type);
491+
if (scale == 0 || scale == 3)
492+
return "int";
493+
else if (scale == 6)
494+
return "long";
495+
else
496+
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported type for iceberg {}", type->getName());
497+
}
489498
case TypeIndex::Float32:
490499
return "float";
491500
case TypeIndex::Float64:
@@ -505,8 +514,14 @@ Poco::Dynamic::Var getAvroType(DataTypePtr type)
505514

506515
Poco::Dynamic::Var getAvroLogicalType(DataTypePtr type)
507516
{
508-
auto type_id = type->getTypeId();
509-
if (type_id == TypeIndex::Time || type_id == TypeIndex::Time64)
517+
if (type->isNullable())
518+
{
519+
auto type_nullable = std::static_pointer_cast<const DataTypeNullable>(type);
520+
return getAvroLogicalType(type_nullable->getNestedType());
521+
}
522+
523+
const WhichDataType which(type);
524+
if (which.isTime64())
510525
{
511526
auto scale = getDecimalScale(*type);
512527
switch (scale)

tests/integration/test_database_iceberg/test.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1018,8 +1018,7 @@ def _test_cluster_joins(started_cluster):
10181018
assert res == "Jack\tBlack\nJack\tSilver\nJohn\tBlack\nJohn\tSilver\n"
10191019

10201020

1021-
@pytest.mark.parametrize("storage_type", ["s3"])
1022-
def test_partitioning_by_time(started_cluster, storage_type):
1021+
def test_partitioning_by_time(started_cluster):
10231022
node = started_cluster.instances["node1"]
10241023

10251024
test_ref = f"test_partitioning_by_time_{uuid.uuid4()}"
@@ -1067,8 +1066,7 @@ def test_partitioning_by_time(started_cluster, storage_type):
10671066
assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}` WHERE key <= '13:00:00.000000' ORDER BY key") == "12:00:00.000000\ttest1\n13:00:00.000000\ttest2\n"
10681067

10691068

1070-
@pytest.mark.parametrize("storage_type", ["s3"])
1071-
def test_partitioning_by_string(started_cluster, storage_type):
1069+
def test_partitioning_by_string(started_cluster):
10721070
node = started_cluster.instances["node1"]
10731071

10741072
test_ref = f"test_partitioning_by_string_{uuid.uuid4()}"

tests/integration/test_storage_iceberg_no_spark/test_write_time.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,10 @@
33
from helpers.iceberg_utils import get_uuid_str
44

55

6-
@pytest.mark.parametrize("time_type", ["Time", "Time64"])
6+
@pytest.mark.parametrize("time_type", ["Time", "Time64(0)", "Time64(3)", "Time64(6)"])
77
def test_write_time(started_cluster_iceberg_no_spark, time_type):
88
node = started_cluster_iceberg_no_spark.instances["node1"]
99

10-
if time_type == "Time64":
11-
time_type = "Time64(6)"
12-
1310
TABLE_NAME = "test_partitioning_by_time_" + get_uuid_str()
1411

1512
node.query(

0 commit comments

Comments
 (0)