From 04137d3a7150cfa2c4f179ab531f4d51fe23a8a3 Mon Sep 17 00:00:00 2001 From: Michael Marino Date: Wed, 29 Jan 2025 14:51:21 +0100 Subject: [PATCH] Spark: Remove closing of IO in SerializableTable* This is to fix: #12046 To summarize, the issue is that Spark can remove broadcast variables from memory and persist them to disk in case that memory needs to be freed. In the case that this happens, the IO object would be closed even if it was still being used by tasks. This fixes the issue by removing the closure of the IO object when the serializable table is closed. The IO objects should be closed on thread finalizers. --- .../source/SerializableTableWithSize.java | 40 +++---------------- .../iceberg/TestTableSerialization.java | 5 +-- .../source/SerializableTableWithSize.java | 27 ++----------- .../iceberg/TestTableSerialization.java | 5 +-- .../source/SerializableTableWithSize.java | 27 ++----------- .../iceberg/TestTableSerialization.java | 5 +-- 6 files changed, 17 insertions(+), 92 deletions(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java index 65df29051c8c..bea1ba086e54 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java @@ -22,30 +22,22 @@ import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; import org.apache.spark.util.KnownSizeEstimation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This class provides a serializable table with a known size estimate. Spark calls its * SizeEstimator class when broadcasting variables and this can be an expensive operation, so * providing a known size estimate allows that operation to be skipped. * - *

This class also implements AutoCloseable to avoid leaking resources upon broadcasting. - * Broadcast variables are destroyed and cleaned up on the driver and executors once they are - * garbage collected on the driver. The implementation ensures only resources used by copies of the - * main table are released. + *

This class explicitly does *not* close IO objects, because broadcast variables can be removed + * from memory and persisted to disk by Spark in the case that memory is needed, even if associated + * IO objects are still in use. */ -public class SerializableTableWithSize extends SerializableTable - implements KnownSizeEstimation, AutoCloseable { +public class SerializableTableWithSize extends SerializableTable implements KnownSizeEstimation { - private static final Logger LOG = LoggerFactory.getLogger(SerializableTableWithSize.class); private static final long SIZE_ESTIMATE = 32_768L; - private final transient Object serializationMarker; - protected SerializableTableWithSize(Table table) { super(table); - this.serializationMarker = new Object(); } @Override @@ -61,38 +53,16 @@ public static Table copyOf(Table table) { } } - @Override - public void close() throws Exception { - if (serializationMarker == null) { - LOG.info("Releasing resources"); - io().close(); - } - } - public static class SerializableMetadataTableWithSize extends SerializableMetadataTable - implements KnownSizeEstimation, AutoCloseable { - - private static final Logger LOG = - LoggerFactory.getLogger(SerializableMetadataTableWithSize.class); - - private final transient Object serializationMarker; + implements KnownSizeEstimation { protected SerializableMetadataTableWithSize(BaseMetadataTable metadataTable) { super(metadataTable); - this.serializationMarker = new Object(); } @Override public long estimatedSize() { return SIZE_ESTIMATE; } - - @Override - public void close() throws Exception { - if (serializationMarker == null) { - LOG.info("Releasing resources"); - io().close(); - } - } } } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java index ebab094cbe84..7ebbcf42bb25 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java @@ -22,7 +22,6 @@ import static org.apache.iceberg.types.Types.NestedField.required; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -103,7 +102,7 @@ public void testCloseSerializableTableKryoSerialization() throws Exception { ((AutoCloseable) serializableTableCopy).close(); // mimics close on executors verify(spyIO, never()).close(); - verify(spyFileIOCopy, times(1)).close(); + verify(spyFileIOCopy, never()).close(); } } @@ -124,7 +123,7 @@ public void testCloseSerializableTableJavaSerialization() throws Exception { ((AutoCloseable) serializableTableCopy).close(); // mimics close on executors verify(spyIO, never()).close(); - verify(spyFileIOCopy, times(1)).close(); + verify(spyFileIOCopy, never()).close(); } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java index f6913fb9d00d..66bad3eb825f 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java @@ -23,30 +23,23 @@ import org.apache.iceberg.Table; import org.apache.iceberg.spark.SparkExecutorCache; import org.apache.spark.util.KnownSizeEstimation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This class provides a serializable table with a known size estimate. Spark calls its * SizeEstimator class when broadcasting variables and this can be an expensive operation, so * providing a known size estimate allows that operation to be skipped. * - *

This class also implements AutoCloseable to avoid leaking resources upon broadcasting. - * Broadcast variables are destroyed and cleaned up on the driver and executors once they are - * garbage collected on the driver. The implementation ensures only resources used by copies of the - * main table are released. + *

This class also implements AutoCloseable , but does *not* close IO objects, because broadcast + * variables can be removed from memory and persisted to disk by Spark in the case that memory is + * needed, even if associated IO objects are still in use. */ public class SerializableTableWithSize extends SerializableTable implements KnownSizeEstimation, AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(SerializableTableWithSize.class); private static final long SIZE_ESTIMATE = 32_768L; - private final transient Object serializationMarker; - protected SerializableTableWithSize(Table table) { super(table); - this.serializationMarker = new Object(); } @Override @@ -64,24 +57,14 @@ public static Table copyOf(Table table) { @Override public void close() throws Exception { - if (serializationMarker == null) { - LOG.info("Releasing resources"); - io().close(); - } invalidateCache(name()); } public static class SerializableMetadataTableWithSize extends SerializableMetadataTable implements KnownSizeEstimation, AutoCloseable { - private static final Logger LOG = - LoggerFactory.getLogger(SerializableMetadataTableWithSize.class); - - private final transient Object serializationMarker; - protected SerializableMetadataTableWithSize(BaseMetadataTable metadataTable) { super(metadataTable); - this.serializationMarker = new Object(); } @Override @@ -91,10 +74,6 @@ public long estimatedSize() { @Override public void close() throws Exception { - if (serializationMarker == null) { - LOG.info("Releasing resources"); - io().close(); - } invalidateCache(name()); } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java index ebab094cbe84..7ebbcf42bb25 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java @@ -22,7 +22,6 @@ import static org.apache.iceberg.types.Types.NestedField.required; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -103,7 +102,7 @@ public void testCloseSerializableTableKryoSerialization() throws Exception { ((AutoCloseable) serializableTableCopy).close(); // mimics close on executors verify(spyIO, never()).close(); - verify(spyFileIOCopy, times(1)).close(); + verify(spyFileIOCopy, never()).close(); } } @@ -124,7 +123,7 @@ public void testCloseSerializableTableJavaSerialization() throws Exception { ((AutoCloseable) serializableTableCopy).close(); // mimics close on executors verify(spyIO, never()).close(); - verify(spyFileIOCopy, times(1)).close(); + verify(spyFileIOCopy, never()).close(); } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java index f6913fb9d00d..66bad3eb825f 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java @@ -23,30 +23,23 @@ import org.apache.iceberg.Table; import org.apache.iceberg.spark.SparkExecutorCache; import org.apache.spark.util.KnownSizeEstimation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This class provides a serializable table with a known size estimate. Spark calls its * SizeEstimator class when broadcasting variables and this can be an expensive operation, so * providing a known size estimate allows that operation to be skipped. * - *

This class also implements AutoCloseable to avoid leaking resources upon broadcasting. - * Broadcast variables are destroyed and cleaned up on the driver and executors once they are - * garbage collected on the driver. The implementation ensures only resources used by copies of the - * main table are released. + *

This class also implements AutoCloseable , but does *not* close IO objects, because broadcast + * variables can be removed from memory and persisted to disk by Spark in the case that memory is + * needed, even if associated IO objects are still in use. */ public class SerializableTableWithSize extends SerializableTable implements KnownSizeEstimation, AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(SerializableTableWithSize.class); private static final long SIZE_ESTIMATE = 32_768L; - private final transient Object serializationMarker; - protected SerializableTableWithSize(Table table) { super(table); - this.serializationMarker = new Object(); } @Override @@ -64,24 +57,14 @@ public static Table copyOf(Table table) { @Override public void close() throws Exception { - if (serializationMarker == null) { - LOG.info("Releasing resources"); - io().close(); - } invalidateCache(name()); } public static class SerializableMetadataTableWithSize extends SerializableMetadataTable implements KnownSizeEstimation, AutoCloseable { - private static final Logger LOG = - LoggerFactory.getLogger(SerializableMetadataTableWithSize.class); - - private final transient Object serializationMarker; - protected SerializableMetadataTableWithSize(BaseMetadataTable metadataTable) { super(metadataTable); - this.serializationMarker = new Object(); } @Override @@ -91,10 +74,6 @@ public long estimatedSize() { @Override public void close() throws Exception { - if (serializationMarker == null) { - LOG.info("Releasing resources"); - io().close(); - } invalidateCache(name()); } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java index fd6dfd07b568..303bb4c3de1d 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java @@ -23,7 +23,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -100,7 +99,7 @@ public void testCloseSerializableTableKryoSerialization() throws Exception { ((AutoCloseable) serializableTableCopy).close(); // mimics close on executors verify(spyIO, never()).close(); - verify(spyFileIOCopy, times(1)).close(); + verify(spyFileIOCopy, never()).close(); } } @@ -121,7 +120,7 @@ public void testCloseSerializableTableJavaSerialization() throws Exception { ((AutoCloseable) serializableTableCopy).close(); // mimics close on executors verify(spyIO, never()).close(); - verify(spyFileIOCopy, times(1)).close(); + verify(spyFileIOCopy, never()).close(); } }