Skip to content

Commit 91a41a8

Browse files
committed
Revert optimization to reorder columns in parquet writer
Some files produced by this optimization were ignored by Apache Spark. Some versions of Databricks Runtime produce an exception when reading files with re-ordered columns.
1 parent 2ad26ce commit 91a41a8

File tree

2 files changed

+5
-23
lines changed

2 files changed

+5
-23
lines changed

lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import io.trino.parquet.writer.ColumnWriter.BufferData;
3030
import io.trino.spi.Page;
3131
import io.trino.spi.type.Type;
32-
import it.unimi.dsi.fastutil.ints.IntArrays;
3332
import org.apache.parquet.column.ParquetProperties;
3433
import org.apache.parquet.format.ColumnMetaData;
3534
import org.apache.parquet.format.CompressionCodec;
@@ -46,7 +45,6 @@
4645
import java.io.Closeable;
4746
import java.io.IOException;
4847
import java.io.OutputStream;
49-
import java.util.Arrays;
5048
import java.util.List;
5149
import java.util.Map;
5250
import java.util.Optional;
@@ -312,27 +310,16 @@ private void flush()
312310
.map(BufferData::getMetaData)
313311
.collect(toImmutableList());
314312

315-
// Since the reader coalesces nearby small reads, it is beneficial to
316-
// reorder data streams to group columns with small size together
317-
int[] indexes = new int[columns.size()];
318-
Arrays.setAll(indexes, index -> index);
319-
IntArrays.quickSort(indexes, (index, otherIndex) ->
320-
Long.compare(columns.get(index).getTotal_compressed_size(), columns.get(otherIndex).getTotal_compressed_size()));
321-
322-
// Ordering of columns in the metadata should remain unchanged.
323-
// Only the offsets in file at which the columns start may change as a result
324-
// of reordering column data streams by their compressed size
325313
long currentOffset = stripeStartOffset;
326-
for (int index : indexes) {
327-
ColumnMetaData columnMetaData = columns.get(index);
314+
for (ColumnMetaData columnMetaData : columns) {
328315
columnMetaData.setData_page_offset(currentOffset);
329316
currentOffset += columnMetaData.getTotal_compressed_size();
330317
}
331318
updateRowGroups(columns);
332319

333320
// flush pages
334-
for (int index : indexes) {
335-
bufferDataList.get(index).getData()
321+
for (BufferData bufferData : bufferDataList) {
322+
bufferData.getData()
336323
.forEach(data -> data.writeData(outputStream));
337324
}
338325
}

lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.testng.annotations.Test;
3636

3737
import java.io.IOException;
38-
import java.util.Comparator;
3938
import java.util.List;
4039
import java.util.Map;
4140
import java.util.Optional;
@@ -137,12 +136,8 @@ public void testColumnReordering()
137136
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
138137
assertThat(parquetMetadata.getBlocks().size()).isGreaterThanOrEqualTo(10);
139138
for (BlockMetaData blockMetaData : parquetMetadata.getBlocks()) {
140-
// Sort columns by size in file
141-
List<ColumnChunkMetaData> columns = blockMetaData.getColumns().stream()
142-
.sorted(Comparator.comparingLong(ColumnChunkMetaData::getTotalUncompressedSize))
143-
.collect(toImmutableList());
144-
// Verify that the columns are stored in the same order
145-
List<Long> offsets = columns.stream()
139+
// Verify that the columns are stored in the same order as the metadata
140+
List<Long> offsets = blockMetaData.getColumns().stream()
146141
.map(ColumnChunkMetaData::getFirstDataPageOffset)
147142
.collect(toImmutableList());
148143
assertThat(offsets).isSorted();

0 commit comments

Comments
 (0)