Skip to content

Commit

Permalink
Spark: Remove closing of IO in SerializableTable*
Browse files Browse the repository at this point in the history
This is to fix: #12046

To summarize, the issue is that Spark can remove broadcast variables from memory and
persist them to disk in case that memory needs to be freed. In the case
that this happens, the IO object would be closed even if it was still
being used by tasks.

This fixes the issue by removing the closure of the IO object when the
serializable table is closed. The IO objects should be closed on thread
finalizers.
  • Loading branch information
mgmarino committed Jan 29, 2025
1 parent e89798e commit 1855162
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,22 @@
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.spark.util.KnownSizeEstimation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class provides a serializable table with a known size estimate. Spark calls its
* SizeEstimator class when broadcasting variables and this can be an expensive operation, so
* providing a known size estimate allows that operation to be skipped.
*
* <p>This class also implements AutoCloseable to avoid leaking resources upon broadcasting.
* Broadcast variables are destroyed and cleaned up on the driver and executors once they are
* garbage collected on the driver. The implementation ensures only resources used by copies of the
* main table are released.
* <p>This class explicitly does *not* close IO objects, because broadcast variables can be removed from memory and
* persisted to disk by Spark in the case that memory is needed, even if associated IO objects are still in use.
*/
public class SerializableTableWithSize extends SerializableTable
implements KnownSizeEstimation, AutoCloseable {
implements KnownSizeEstimation {

private static final Logger LOG = LoggerFactory.getLogger(SerializableTableWithSize.class);
private static final long SIZE_ESTIMATE = 32_768L;

private final transient Object serializationMarker;

protected SerializableTableWithSize(Table table) {
super(table);
this.serializationMarker = new Object();
}

@Override
Expand All @@ -61,38 +53,16 @@ public static Table copyOf(Table table) {
}
}

@Override
public void close() throws Exception {
if (serializationMarker == null) {
LOG.info("Releasing resources");
io().close();
}
}

public static class SerializableMetadataTableWithSize extends SerializableMetadataTable
implements KnownSizeEstimation, AutoCloseable {

private static final Logger LOG =
LoggerFactory.getLogger(SerializableMetadataTableWithSize.class);

private final transient Object serializationMarker;

protected SerializableMetadataTableWithSize(BaseMetadataTable metadataTable) {
super(metadataTable);
this.serializationMarker = new Object();
}

@Override
public long estimatedSize() {
return SIZE_ESTIMATE;
}

@Override
public void close() throws Exception {
if (serializationMarker == null) {
LOG.info("Releasing resources");
io().close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void testCloseSerializableTableKryoSerialization() throws Exception {
((AutoCloseable) serializableTableCopy).close(); // mimics close on executors

verify(spyIO, never()).close();
verify(spyFileIOCopy, times(1)).close();
verify(spyFileIOCopy, never()).close();
}
}

Expand All @@ -124,7 +124,7 @@ public void testCloseSerializableTableJavaSerialization() throws Exception {
((AutoCloseable) serializableTableCopy).close(); // mimics close on executors

verify(spyIO, never()).close();
verify(spyFileIOCopy, times(1)).close();
verify(spyFileIOCopy, never()).close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,23 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.spark.SparkExecutorCache;
import org.apache.spark.util.KnownSizeEstimation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class provides a serializable table with a known size estimate. Spark calls its
* SizeEstimator class when broadcasting variables and this can be an expensive operation, so
* providing a known size estimate allows that operation to be skipped.
*
* <p>This class also implements AutoCloseable to avoid leaking resources upon broadcasting.
* Broadcast variables are destroyed and cleaned up on the driver and executors once they are
* garbage collected on the driver. The implementation ensures only resources used by copies of the
* main table are released.
* <p>This class also implements AutoCloseable , but does *not* close IO objects, because broadcast
* variables can be removed from memory and persisted to disk by Spark in the case that memory is
* needed, even if associated IO objects are still in use.
*/
public class SerializableTableWithSize extends SerializableTable
implements KnownSizeEstimation, AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(SerializableTableWithSize.class);
private static final long SIZE_ESTIMATE = 32_768L;

private final transient Object serializationMarker;

protected SerializableTableWithSize(Table table) {
super(table);
this.serializationMarker = new Object();
}

@Override
Expand All @@ -64,24 +57,14 @@ public static Table copyOf(Table table) {

@Override
public void close() throws Exception {
if (serializationMarker == null) {
LOG.info("Releasing resources");
io().close();
}
invalidateCache(name());
}

public static class SerializableMetadataTableWithSize extends SerializableMetadataTable
implements KnownSizeEstimation, AutoCloseable {

private static final Logger LOG =
LoggerFactory.getLogger(SerializableMetadataTableWithSize.class);

private final transient Object serializationMarker;

protected SerializableMetadataTableWithSize(BaseMetadataTable metadataTable) {
super(metadataTable);
this.serializationMarker = new Object();
}

@Override
Expand All @@ -91,10 +74,6 @@ public long estimatedSize() {

@Override
public void close() throws Exception {
if (serializationMarker == null) {
LOG.info("Releasing resources");
io().close();
}
invalidateCache(name());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void testCloseSerializableTableKryoSerialization() throws Exception {
((AutoCloseable) serializableTableCopy).close(); // mimics close on executors

verify(spyIO, never()).close();
verify(spyFileIOCopy, times(1)).close();
verify(spyFileIOCopy, never()).close();
}
}

Expand All @@ -124,7 +124,7 @@ public void testCloseSerializableTableJavaSerialization() throws Exception {
((AutoCloseable) serializableTableCopy).close(); // mimics close on executors

verify(spyIO, never()).close();
verify(spyFileIOCopy, times(1)).close();
verify(spyFileIOCopy, never()).close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,23 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.spark.SparkExecutorCache;
import org.apache.spark.util.KnownSizeEstimation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class provides a serializable table with a known size estimate. Spark calls its
* SizeEstimator class when broadcasting variables and this can be an expensive operation, so
* providing a known size estimate allows that operation to be skipped.
*
* <p>This class also implements AutoCloseable to avoid leaking resources upon broadcasting.
* Broadcast variables are destroyed and cleaned up on the driver and executors once they are
* garbage collected on the driver. The implementation ensures only resources used by copies of the
* main table are released.
* <p>This class also implements AutoCloseable , but does *not* close IO objects, because broadcast
* variables can be removed from memory and persisted to disk by Spark in the case that memory is
* needed, even if associated IO objects are still in use.
*/
public class SerializableTableWithSize extends SerializableTable
implements KnownSizeEstimation, AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(SerializableTableWithSize.class);
private static final long SIZE_ESTIMATE = 32_768L;

private final transient Object serializationMarker;

protected SerializableTableWithSize(Table table) {
super(table);
this.serializationMarker = new Object();
}

@Override
Expand All @@ -64,24 +57,14 @@ public static Table copyOf(Table table) {

@Override
public void close() throws Exception {
if (serializationMarker == null) {
LOG.info("Releasing resources");
io().close();
}
invalidateCache(name());
}

public static class SerializableMetadataTableWithSize extends SerializableMetadataTable
implements KnownSizeEstimation, AutoCloseable {

private static final Logger LOG =
LoggerFactory.getLogger(SerializableMetadataTableWithSize.class);

private final transient Object serializationMarker;

protected SerializableMetadataTableWithSize(BaseMetadataTable metadataTable) {
super(metadataTable);
this.serializationMarker = new Object();
}

@Override
Expand All @@ -91,10 +74,6 @@ public long estimatedSize() {

@Override
public void close() throws Exception {
if (serializationMarker == null) {
LOG.info("Releasing resources");
io().close();
}
invalidateCache(name());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -100,7 +99,7 @@ public void testCloseSerializableTableKryoSerialization() throws Exception {
((AutoCloseable) serializableTableCopy).close(); // mimics close on executors

verify(spyIO, never()).close();
verify(spyFileIOCopy, times(1)).close();
verify(spyFileIOCopy, never()).close();
}
}

Expand All @@ -121,7 +120,7 @@ public void testCloseSerializableTableJavaSerialization() throws Exception {
((AutoCloseable) serializableTableCopy).close(); // mimics close on executors

verify(spyIO, never()).close();
verify(spyFileIOCopy, times(1)).close();
verify(spyFileIOCopy, never()).close();
}
}

Expand Down

0 comments on commit 1855162

Please sign in to comment.