Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 4
"modification": 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,6 +47,7 @@ class RecordWriter {
private final Table table;
private final String absoluteFilename;
private final FileFormat fileFormat;
private @Nullable FileIO io;

RecordWriter(
Catalog catalog, IcebergDestination destination, String filename, PartitionKey partitionKey)
Expand All @@ -72,12 +74,14 @@ class RecordWriter {
}
OutputFile outputFile;
EncryptionKeyMetadata keyMetadata;
try (FileIO io = table.io()) {
OutputFile tmpFile = io.newOutputFile(absoluteFilename);
EncryptedOutputFile encryptedOutputFile = table.encryption().encrypt(tmpFile);
outputFile = encryptedOutputFile.encryptingOutputFile();
keyMetadata = encryptedOutputFile.keyMetadata();
}
// Keep FileIO open for the lifetime of this writer to avoid
// premature shutdown of underlying client pools (e.g., S3),
// which manifests as "Connection pool shut down" (Issue #36438).
this.io = table.io();
OutputFile tmpFile = io.newOutputFile(absoluteFilename);
EncryptedOutputFile encryptedOutputFile = table.encryption().encrypt(tmpFile);
outputFile = encryptedOutputFile.encryptingOutputFile();
keyMetadata = encryptedOutputFile.keyMetadata();

switch (fileFormat) {
case AVRO:
Expand Down Expand Up @@ -120,16 +124,38 @@ public void write(Record record) {
}

public void close() throws IOException {
IOException closeError = null;
try {
icebergDataWriter.close();
} catch (IOException e) {
throw new IOException(
String.format(
"Failed to close %s writer for table %s, path: %s",
fileFormat, table.name(), absoluteFilename),
e);
closeError =
new IOException(
String.format(
"Failed to close %s writer for table %s, path: %s",
fileFormat, table.name(), absoluteFilename),
e);
} finally {
// Always attempt to close FileIO and decrement metrics
if (io != null) {
try {
io.close();
} catch (Exception ioCloseError) {
if (closeError != null) {
closeError.addSuppressed(ioCloseError);
} else {
closeError = new IOException("Failed to close FileIO", ioCloseError);
}
} finally {
io = null;
}
}
activeIcebergWriters.dec();
}

if (closeError != null) {
throw closeError;
}
activeIcebergWriters.dec();

DataFile dataFile = icebergDataWriter.toDataFile();
LOG.info(
"Closed {} writer for table '{}' ({} records, {} bytes), path: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.values.Row;
Expand All @@ -65,6 +66,10 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
Expand All @@ -83,6 +88,7 @@
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

/** Test class for {@link RecordWriterManager}. */
@RunWith(JUnit4.class)
Expand Down Expand Up @@ -950,6 +956,105 @@ public void testDefaultMetrics() throws IOException {
}
}

@Test
public void testRecordWriterKeepsFileIOOpenUntilClose() throws IOException {
TableIdentifier tableId =
TableIdentifier.of(
"default",
"table_"
+ testName.getMethodName()
+ "_"
+ UUID.randomUUID().toString().replace("-", "").substring(0, 6));
Table table = warehouse.createTable(tableId, ICEBERG_SCHEMA);

CloseTrackingFileIO trackingFileIO = new CloseTrackingFileIO(table.io());
Table spyTable = Mockito.spy(table);
Mockito.doReturn(trackingFileIO).when(spyTable).io();

PartitionKey partitionKey = new PartitionKey(spyTable.spec(), spyTable.schema());
RecordWriter writer =
new RecordWriter(spyTable, FileFormat.PARQUET, "file.parquet", partitionKey);

Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();

writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row));
writer.close();

assertTrue("FileIO should be closed after writer close", trackingFileIO.closed);
}

private static final class CloseTrackingFileIO implements FileIO {
private final FileIO delegate;
volatile boolean closed = false;

CloseTrackingFileIO(FileIO delegate) {
this.delegate = delegate;
}

@Override
public InputFile newInputFile(String path) {
return delegate.newInputFile(path);
}

@Override
public OutputFile newOutputFile(String path) {
OutputFile underlying = delegate.newOutputFile(path);
return new CloseAwareOutputFile(underlying, this);
}

@Override
public void deleteFile(String path) {
delegate.deleteFile(path);
}

@Override
public Map<String, String> properties() {
return delegate.properties();
}

@Override
public void close() {
closed = true;
delegate.close();
}
}

private static final class CloseAwareOutputFile implements OutputFile {
private final OutputFile delegate;
private final CloseTrackingFileIO io;

CloseAwareOutputFile(OutputFile delegate, CloseTrackingFileIO io) {
this.delegate = delegate;
this.io = io;
}

@Override
public PositionOutputStream create() {
if (io.closed) {
throw new IllegalStateException("Connection pool shut down");
}
return delegate.create();
}

@Override
public PositionOutputStream createOrOverwrite() {
if (io.closed) {
throw new IllegalStateException("Connection pool shut down");
}
return delegate.createOrOverwrite();
}

@Override
public String location() {
return delegate.location();
}

@Override
public InputFile toInputFile() {
return delegate.toInputFile();
}
}

@Test
public void testGetOrCreateTable_refreshLogic() {
Table mockTable = mock(Table.class);
Expand Down
Loading