Skip to content

Commit

Permalink
Stream large transaction log jsons instead of storing in-memory
Browse files Browse the repository at this point in the history
Operations fetching metadata and protocol entries can skip reading
the rest of the json file after those entries are found
  • Loading branch information
raunaqmorarka committed Dec 24, 2024
1 parent 881ca99 commit f888217
Show file tree
Hide file tree
Showing 25 changed files with 751 additions and 322 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.Transaction;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries;
import io.trino.plugin.deltalake.util.PageListBuilder;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
Expand Down Expand Up @@ -144,7 +145,7 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHand
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);
try {
List<Transaction> transactions = loadNewTailBackward(fileSystem, tableLocation, startVersionExclusive, endVersionInclusive.get()).reversed();
return new FixedPageSource(buildPages(session, pagesBuilder, transactions));
return new FixedPageSource(buildPages(session, pagesBuilder, transactions, fileSystem));
}
catch (TrinoException e) {
throw e;
Expand All @@ -170,7 +171,7 @@ private static List<Transaction> loadNewTailBackward(
boolean endOfHead = false;

while (!endOfHead) {
Optional<List<DeltaLakeTransactionLogEntry>> results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem);
Optional<TransactionLogEntries> results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, DataSize.of(0, DataSize.Unit.BYTE));
if (results.isPresent()) {
transactionsBuilder.add(new Transaction(version, results.get()));
version = entryNumber;
Expand All @@ -187,5 +188,6 @@ private static List<Transaction> loadNewTailBackward(
return transactionsBuilder.build();
}

protected abstract List<Page> buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List<Transaction> transactions);
protected abstract List<Page> buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List<Transaction> transactions, TrinoFileSystem fileSystem)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;

import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.canonicalizePartitionValues;
import static java.util.Objects.requireNonNull;
Expand All @@ -38,7 +42,7 @@ public class DeltaLakeCommitSummary
private final Set<Map<String, Optional<String>>> addedFilesCanonicalPartitionValues;
private final Optional<Boolean> isBlindAppend;

public DeltaLakeCommitSummary(long version, List<DeltaLakeTransactionLogEntry> transactionLogEntries)
public DeltaLakeCommitSummary(long version, TransactionLogEntries transactionLogEntries, TrinoFileSystem fileSystem)
{
requireNonNull(transactionLogEntries, "transactionLogEntries is null");
ImmutableList.Builder<MetadataEntry> metadataUpdatesBuilder = ImmutableList.builder();
Expand All @@ -48,26 +52,29 @@ public DeltaLakeCommitSummary(long version, List<DeltaLakeTransactionLogEntry> t
ImmutableSet.Builder<Map<String, Optional<String>>> removedFilesCanonicalPartitionValuesBuilder = ImmutableSet.builder();
boolean containsRemoveFileWithoutPartitionValues = false;

for (DeltaLakeTransactionLogEntry transactionLogEntry : transactionLogEntries) {
if (transactionLogEntry.getMetaData() != null) {
metadataUpdatesBuilder.add(transactionLogEntry.getMetaData());
}
else if (transactionLogEntry.getProtocol() != null) {
optionalProtocol = Optional.of(transactionLogEntry.getProtocol());
}
else if (transactionLogEntry.getCommitInfo() != null) {
optionalCommitInfo = Optional.of(transactionLogEntry.getCommitInfo());
}
else if (transactionLogEntry.getAdd() != null) {
addedFilesCanonicalPartitionValuesBuilder.add(transactionLogEntry.getAdd().getCanonicalPartitionValues());
}
else if (transactionLogEntry.getRemove() != null) {
Map<String, String> partitionValues = transactionLogEntry.getRemove().partitionValues();
if (partitionValues == null) {
containsRemoveFileWithoutPartitionValues = true;
try (Stream<DeltaLakeTransactionLogEntry> logEntryStream = transactionLogEntries.getEntries(fileSystem)) {
for (Iterator<DeltaLakeTransactionLogEntry> it = logEntryStream.iterator(); it.hasNext(); ) {
DeltaLakeTransactionLogEntry transactionLogEntry = it.next();
if (transactionLogEntry.getMetaData() != null) {
metadataUpdatesBuilder.add(transactionLogEntry.getMetaData());
}
else if (transactionLogEntry.getProtocol() != null) {
optionalProtocol = Optional.of(transactionLogEntry.getProtocol());
}
else if (transactionLogEntry.getCommitInfo() != null) {
optionalCommitInfo = Optional.of(transactionLogEntry.getCommitInfo());
}
else if (transactionLogEntry.getAdd() != null) {
addedFilesCanonicalPartitionValuesBuilder.add(transactionLogEntry.getAdd().getCanonicalPartitionValues());
}
else {
removedFilesCanonicalPartitionValuesBuilder.add(canonicalizePartitionValues(partitionValues));
else if (transactionLogEntry.getRemove() != null) {
Map<String, String> partitionValues = transactionLogEntry.getRemove().partitionValues();
if (partitionValues == null) {
containsRemoveFileWithoutPartitionValues = true;
}
else {
removedFilesCanonicalPartitionValuesBuilder.add(canonicalizePartitionValues(partitionValues));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class DeltaLakeConfig
{
public static final String EXTENDED_STATISTICS_ENABLED = "delta.extended-statistics.enabled";
public static final String VACUUM_MIN_RETENTION = "delta.vacuum.min-retention";
public static final DataSize DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE = DataSize.of(16, MEGABYTE);

// Runtime.getRuntime().maxMemory() is not 100% stable and may return slightly different value over JVM lifetime. We use
// constant so default configuration for cache size is stable.
Expand All @@ -60,6 +61,7 @@ public class DeltaLakeConfig

private Duration metadataCacheTtl = new Duration(30, TimeUnit.MINUTES);
private DataSize metadataCacheMaxRetainedSize = DEFAULT_METADATA_CACHE_MAX_RETAINED_SIZE;
private DataSize transactionLogMaxCachedFileSize = DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE;
private DataSize dataFileCacheSize = DEFAULT_DATA_FILE_CACHE_SIZE;
private Duration dataFileCacheTtl = new Duration(30, TimeUnit.MINUTES);
private int domainCompactionThreshold = 1000;
Expand Down Expand Up @@ -121,6 +123,19 @@ public DeltaLakeConfig setMetadataCacheMaxRetainedSize(DataSize metadataCacheMax
return this;
}

public DataSize getTransactionLogMaxCachedFileSize()
{
return transactionLogMaxCachedFileSize;
}

@Config("delta.transaction-log.max-cached-file-size")
@ConfigDescription("Maximum size of delta transaction log file that will be cached in memory")
public DeltaLakeConfig setTransactionLogMaxCachedFileSize(DataSize transactionLogMaxCachedFileSize)
{
this.transactionLogMaxCachedFileSize = transactionLogMaxCachedFileSize;
return this;
}

public DataSize getDataFileCacheSize()
{
return dataFileCacheSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
Expand All @@ -30,6 +31,7 @@

import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.spi.type.BigintType.BIGINT;
Expand Down Expand Up @@ -74,13 +76,15 @@ public DeltaLakeHistoryTable(
}

@Override
protected List<Page> buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List<Transaction> transactions)
protected List<Page> buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List<Transaction> transactions, TrinoFileSystem fileSystem)
{
List<CommitInfoEntry> commitInfoEntries = transactions.stream()
.flatMap(transaction -> transaction.transactionEntries().stream())
List<CommitInfoEntry> commitInfoEntries;
try (Stream<CommitInfoEntry> commitStream = transactions.stream()
.flatMap(transaction -> transaction.transactionEntries().getEntries(fileSystem))
.map(DeltaLakeTransactionLogEntry::getCommitInfo)
.filter(Objects::nonNull)
.collect(toImmutableList());
.filter(Objects::nonNull)) {
commitInfoEntries = commitStream.collect(toImmutableList());
}

TimeZoneKey timeZoneKey = session.getTimeZoneKey();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,14 @@
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.UnsupportedTypeException;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriterManager;
import io.trino.plugin.deltalake.transactionlog.checkpoint.MetadataAndProtocolEntries;
import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeFileStatistics;
import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeJsonFileStatistics;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionConflictException;
Expand Down Expand Up @@ -197,6 +198,7 @@
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.Sets.difference;
import static com.google.common.primitives.Ints.max;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.trino.filesystem.Locations.appendPath;
import static io.trino.filesystem.Locations.areDirectoryLocationsEquivalent;
import static io.trino.hive.formats.HiveClassNames.HIVE_SEQUENCEFILE_OUTPUT_FORMAT_CLASS;
Expand Down Expand Up @@ -290,8 +292,6 @@
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.getMandatoryCurrentVersion;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.METADATA;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson;
import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME;
import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE;
Expand Down Expand Up @@ -630,27 +630,21 @@ public LocatedTableHandle getTableHandle(
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
TableSnapshot tableSnapshot = getSnapshot(session, tableName, tableLocation, endVersion.map(version -> getVersion(fileSystem, tableLocation, version)));

Map<Class<?>, Object> logEntries;
MetadataAndProtocolEntries logEntries;
try {
logEntries = transactionLogAccess.getTransactionLogEntries(
session,
tableSnapshot,
ImmutableSet.of(METADATA, PROTOCOL),
entryStream -> entryStream
.filter(entry -> entry.getMetaData() != null || entry.getProtocol() != null)
.map(entry -> firstNonNull(entry.getMetaData(), entry.getProtocol())));
logEntries = transactionLogAccess.getMetadataAndProtocolEntry(session, tableSnapshot);
}
catch (TrinoException e) {
if (e.getErrorCode().equals(DELTA_LAKE_INVALID_SCHEMA.toErrorCode())) {
return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, e);
}
throw e;
}
MetadataEntry metadataEntry = (MetadataEntry) logEntries.get(MetadataEntry.class);
MetadataEntry metadataEntry = logEntries.metadata().orElse(null);
if (metadataEntry == null) {
return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableSnapshot.getTable()));
}
ProtocolEntry protocolEntry = (ProtocolEntry) logEntries.get(ProtocolEntry.class);
ProtocolEntry protocolEntry = logEntries.protocol().orElse(null);
if (protocolEntry == null) {
return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol not found in transaction log for " + tableSnapshot.getTable()));
}
Expand Down Expand Up @@ -2267,16 +2261,16 @@ private void checkForConcurrentTransactionConflicts(
if (currentVersion > readVersionValue) {
String transactionLogDirectory = getTransactionLogDir(tableLocation);
for (long version = readVersionValue + 1; version <= currentVersion; version++) {
List<DeltaLakeTransactionLogEntry> transactionLogEntries;
TransactionLogEntries transactionLogEntries;
try {
long finalVersion = version;
transactionLogEntries = getEntriesFromJson(version, transactionLogDirectory, fileSystem)
transactionLogEntries = getEntriesFromJson(version, transactionLogDirectory, fileSystem, DataSize.of(0, BYTE))
.orElseThrow(() -> new TrinoException(DELTA_LAKE_BAD_DATA, "Delta Lake log entries are missing for version " + finalVersion));
}
catch (IOException e) {
throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR, "Failed to access table metadata", e);
}
DeltaLakeCommitSummary commitSummary = new DeltaLakeCommitSummary(version, transactionLogEntries);
DeltaLakeCommitSummary commitSummary = new DeltaLakeCommitSummary(version, transactionLogEntries, fileSystem);
checkNoMetadataUpdates(commitSummary);
checkNoProtocolUpdates(commitSummary);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import io.airlift.json.JsonCodec;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.Transaction;
Expand Down Expand Up @@ -62,12 +63,13 @@ public DeltaLakeTransactionsTable(
}

@Override
protected List<Page> buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List<Transaction> transactions)
protected List<Page> buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List<Transaction> transactions, TrinoFileSystem fileSystem)
{
for (Transaction transaction : transactions) {
pagesBuilder.beginRow();
pagesBuilder.appendBigint(transaction.transactionId());
pagesBuilder.appendVarchar(TRANSACTION_LOG_ENTRIES_CODEC.toJson(transaction.transactionEntries()));
pagesBuilder.appendVarchar(TRANSACTION_LOG_ENTRIES_CODEC.toJson(
transaction.transactionEntries().getEntriesList(fileSystem)));
pagesBuilder.endRow();
}
return pagesBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.deltalake.DeltaLakeConfig.DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_DATA;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_FILESYSTEM_ERROR;
import static io.trino.plugin.deltalake.functions.tablechanges.TableChangesFileType.CDF_FILE;
Expand Down Expand Up @@ -74,11 +75,9 @@ private Stream<ConnectorSplit> prepareSplits(long currentVersion, long tableRead
.boxed()
.flatMap(version -> {
try {
List<DeltaLakeTransactionLogEntry> entries = getEntriesFromJson(version, transactionLogDir, fileSystem)
.orElseThrow(() -> new TrinoException(DELTA_LAKE_BAD_DATA, "Delta Lake log entries are missing for version " + version));
if (entries.isEmpty()) {
return ImmutableList.<ConnectorSplit>of().stream();
}
List<DeltaLakeTransactionLogEntry> entries = getEntriesFromJson(version, transactionLogDir, fileSystem, DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE)
.orElseThrow(() -> new TrinoException(DELTA_LAKE_BAD_DATA, "Delta Lake log entries are missing for version " + version))
.getEntriesList(fileSystem);
List<CommitInfoEntry> commitInfoEntries = entries.stream()
.map(DeltaLakeTransactionLogEntry::getCommitInfo)
.filter(Objects::nonNull)
Expand Down
Loading

0 comments on commit f888217

Please sign in to comment.