Skip to content

Commit

Permalink
Improved ByteBuffer string conversion for Iceberg manifests
Browse files Browse the repository at this point in the history
  • Loading branch information
0dunay0 committed Feb 3, 2025
1 parent aab1097 commit 432fcd9
Show file tree
Hide file tree
Showing 3 changed files with 299 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,19 @@ public static ByteBuffer toByteBuffer(DataType type, Object value) {
case VARCHAR:
CharBuffer buffer = CharBuffer.wrap(value.toString());
try {
return ENCODER.get().encode(buffer);
ByteBuffer encoded = ENCODER.get().encode(buffer);
// ByteBuffer and CharBuffer allocate space based on capacity
// not actual content length. so we need to create a new ByteBuffer
// with the exact length of the encoded content
// to avoid padding the output with \u0000
if (encoded.limit() != encoded.capacity()) {
ByteBuffer exact = ByteBuffer.allocate(encoded.limit());
encoded.position(0);
exact.put(encoded);
exact.flip();
return exact;
}
return encoded;
} catch (CharacterCodingException e) {
throw new RuntimeException("Failed to encode value as UTF-8: " + value, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;

import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.SeekableFileInput;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.IcebergGenerics;
Expand All @@ -63,7 +68,10 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
Expand Down Expand Up @@ -595,6 +603,103 @@ public void testNestedTypes() throws Exception {
"Record(2, {20=[Record(cherry, 200), Record(pear, 201)]})");
}

@Test
public void testStringPartitionNullPadding() throws Exception {
RowType rowType =
RowType.of(
new DataType[] {DataTypes.INT(), DataTypes.VARCHAR(20)},
new String[] {"k", "country"});
FileStoreTable table =
createPaimonTable(
rowType,
Collections.singletonList("country"),
Collections.singletonList("k"),
-1);

String commitUser = UUID.randomUUID().toString();
TableWriteImpl<?> write = table.newWrite(commitUser);
TableCommitImpl commit = table.newCommit(commitUser);

write.write(GenericRow.of(1, BinaryString.fromString("Switzerland")), 1);
write.write(GenericRow.of(2, BinaryString.fromString("Australia")), 1);
write.write(GenericRow.of(3, BinaryString.fromString("Brazil")), 1);
write.write(GenericRow.of(4, BinaryString.fromString("Grand Duchy of Luxembourg")), 1);
commit.commit(1, write.prepareCommit(false, 1));
assertThat(getIcebergResult())
.containsExactlyInAnyOrder(
"Record(1, Switzerland)",
"Record(2, Australia)",
"Record(3, Brazil)",
"Record(4, Grand Duchy of Luxembourg)");

FileIO fileIO = table.fileIO();
IcebergMetadata metadata =
IcebergMetadata.fromPath(
fileIO, new Path(table.location(), "metadata/v1.metadata.json"));

IcebergPathFactory pathFactory =
new IcebergPathFactory(new Path(table.location(), "metadata"));
IcebergManifestList manifestList = IcebergManifestList.create(table, pathFactory);
String currentSnapshotManifest = metadata.currentSnapshot().manifestList();

File snapShotAvroFile = new File(currentSnapshotManifest);
String expectedPartitionSummary =
"[{\"contains_null\": false, \"contains_nan\": false, \"lower_bound\": \"Australia\", \"upper_bound\": \"Switzerland\"}]";
try (DataFileReader<GenericRecord> dataFileReader =
new DataFileReader<>(
new SeekableFileInput(snapShotAvroFile), new GenericDatumReader<>())) {
while (dataFileReader.hasNext()) {
GenericRecord record = dataFileReader.next();
String partitionSummary = record.get("partitions").toString();
assertThat(partitionSummary).doesNotContain("\\u0000");
assertThat(partitionSummary).isEqualTo(expectedPartitionSummary);
}
}

String tableManifest = manifestList.read(snapShotAvroFile.getName()).get(0).manifestPath();

try (DataFileReader<GenericRecord> dataFileReader =
new DataFileReader<>(
new SeekableFileInput(new File(tableManifest)),
new GenericDatumReader<>())) {

while (dataFileReader.hasNext()) {
GenericRecord record = dataFileReader.next();
GenericRecord dataFile = (GenericRecord) record.get("data_file");

// Check lower bounds
GenericData.Array<?> lowerBounds =
(GenericData.Array<?>) dataFile.get("lower_bounds");
if (lowerBounds != null) {
for (Object bound : lowerBounds) {
GenericRecord boundRecord = (GenericRecord) bound;
int key = (Integer) boundRecord.get("key");
if (key == 1) { // key = 1 is the partition key
ByteBuffer value = (ByteBuffer) boundRecord.get("value");
String boundValue = new String(value.array(), StandardCharsets.UTF_8);
assertThat(boundValue).doesNotContain("\u0000");
}
}
}

// Check upper bounds
GenericData.Array<?> upperBounds =
(GenericData.Array<?>) dataFile.get("upper_bounds");
if (upperBounds != null) {
for (Object bound : upperBounds) {
GenericRecord boundRecord = (GenericRecord) bound;
int key = (Integer) boundRecord.get("key");
if (key == 1) { // key = 1 is the partition key
ByteBuffer value = (ByteBuffer) boundRecord.get("value");
String boundValue = new String(value.array(), StandardCharsets.UTF_8);
assertThat(boundValue).doesNotContain("\u0000");
}
}
}
}
}
}

// ------------------------------------------------------------------------
// Random Tests
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.iceberg.manifest;

import org.apache.paimon.types.DataTypes;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class IcebergConversionsVarcharTest {

@Test
void testEmptyString() {
String empty = "";
ByteBuffer result = IcebergConversions.toByteBuffer(DataTypes.VARCHAR(10), empty);
String decodedString = new String(result.array(), StandardCharsets.UTF_8);
assertThat(result.array()).isEmpty();
assertThat(empty).isEqualTo(decodedString);
}

@Test
void testNullHandling() {
assertThatThrownBy(() -> IcebergConversions.toByteBuffer(DataTypes.VARCHAR(10), null))
.isInstanceOf(NullPointerException.class);
}

@ParameterizedTest
@MethodSource("provideSpecialStrings")
@DisplayName("Test special string cases")
void testSpecialStrings(String input) {
ByteBuffer result = IcebergConversions.toByteBuffer(DataTypes.VARCHAR(100), input);
String decoded = new String(result.array(), 0, result.limit(), StandardCharsets.UTF_8);
assertThat(decoded).isEqualTo(input);
}

private static Stream<Arguments> provideSpecialStrings() {
return Stream.of(
Arguments.of("Hello\u0000World"), // Embedded null
Arguments.of("\n\r\t"), // Control characters
Arguments.of(" "), // Single space
Arguments.of(" "), // Multiple spaces
Arguments.of("①②③"), // Unicode numbers
Arguments.of("🌟🌞🌝"), // Emojis
Arguments.of("Hello\uD83D\uDE00World"), // Surrogate pairs
Arguments.of("\uFEFF"), // Byte Order Mark
Arguments.of("Hello\\World"), // Backslashes
Arguments.of("Hello\"World"), // Quotes
Arguments.of("Hello'World"), // Single quotes
Arguments.of("Hello\bWorld"), // Backspace
Arguments.of("Hello\fWorld") // Form feed
);
}

@ParameterizedTest
@MethodSource("provideLongStrings")
void testLongStrings(String input) {
ByteBuffer result =
IcebergConversions.toByteBuffer(DataTypes.VARCHAR(input.length()), input);
String decoded = new String(result.array(), 0, result.limit(), StandardCharsets.UTF_8);
assertThat(decoded).isEqualTo(input).hasSize(input.length());
}

private static Stream<Arguments> provideLongStrings() {
return Stream.of(
Arguments.of(createString(1)),
Arguments.of(createString(10)),
Arguments.of(createString(100)),
Arguments.of(createString(1000)),
Arguments.of(createString(10000)));
}

private static String createString(int length) {
StringBuilder sb = new StringBuilder(length);
for (int i = 0; i < length; i++) {
sb.append('a');
}
return sb.toString();
}

@Test
void testMultiByteCharacters() {
String[] inputs = {
"中文", // Chinese
"한글", // Korean
"日本語", // Japanese
"🌟", // Emoji (4 bytes)
"Café", // Latin-1 Supplement
"Привет", // Cyrillic
"שָׁלוֹם", // Hebrew with combining marks
"ᄀᄁᄂᄃᄄ", // Hangul Jamo
"बहुत बढ़िया", // Devanagari
"العربية" // Arabic
};

for (String input : inputs) {
ByteBuffer result =
IcebergConversions.toByteBuffer(DataTypes.VARCHAR(input.length() * 4), input);
String decoded = new String(result.array(), 0, result.limit(), StandardCharsets.UTF_8);
assertThat(decoded).isEqualTo(input);
assertThat(result.limit()).isGreaterThanOrEqualTo(input.length());
}
}

@Test
void testBufferProperties() {
String input = "Hello, World!";
ByteBuffer result =
IcebergConversions.toByteBuffer(DataTypes.VARCHAR(input.length()), input);

assertThat(result.limit()).isEqualTo(result.array().length);
assertThat(containsTrailingZeros(result)).isFalse();
}

@Test
void testConcurrentAccess() throws InterruptedException {
int threadCount = 10;
Thread[] threads = new Thread[threadCount];
String[] inputs = new String[threadCount];
ByteBuffer[] results = new ByteBuffer[threadCount];

for (int i = 0; i < threadCount; i++) {
final int index = i;
inputs[index] = "Thread" + index;
threads[index] =
new Thread(
() -> {
results[index] =
IcebergConversions.toByteBuffer(
DataTypes.VARCHAR(inputs[index].length()),
inputs[index]);
});
threads[index].start();
}

for (Thread thread : threads) {
thread.join();
}

for (int i = 0; i < threadCount; i++) {
String decoded =
new String(results[i].array(), 0, results[i].limit(), StandardCharsets.UTF_8);
assertThat(decoded).isEqualTo(inputs[i]);
}
}

private boolean containsTrailingZeros(ByteBuffer buffer) {
byte[] array = buffer.array();
for (int i = buffer.limit(); i < array.length; i++) {
if (array[i] != 0) {
return true;
}
}
return false;
}
}

0 comments on commit 432fcd9

Please sign in to comment.