Skip to content

Commit

Permalink
feat: Simplify Barrage Viewport Table Updates (#6347)
Browse files Browse the repository at this point in the history
The barrage viewport protocol has now changed:
- viewports are never sent included rows; all added rows are included
- added rows are position space from within the server-respected client
requested viewport
- removed rows are position space from within the server-respected
client requested viewport
- snapshots on viewports include removed rows to simplify client side
processing
- barrage update metadata now explicitly includes current table size;
viewports no longer track the source table's full rowset
 
viewport clients must infer shifts:
- removed rows are in pre-update space
- added rows are in post-update space
- the end-result viewport is retained rows + added rows (retained are
the existing rows minus removed rows) where the adds are in the offsets
specified by added rows

In this PR we'll bump the following dependencies:
- flatbuffer from 1.12.0 to 24.3.25
- arrow from 13.0.0 to 18.0.0
- protobuf 3.25.3 to 3.25.4
- barrage from 0.6.0 to 0.7.2 (noting that 0.7.x was released for this
feature set)

Fixes #6039.
Fixes #6053.
  • Loading branch information
nbauernfeind authored Nov 18, 2024
1 parent b0c417e commit d3b0ad4
Show file tree
Hide file tree
Showing 42 changed files with 1,550 additions and 438 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public void unapply(final RowKeyRangeShiftCallback shiftCallback) {
* @param rowSet The {@link WritableRowSet} to shift
* @return {@code rowSet}
*/
public boolean apply(final WritableRowSet rowSet) {
public WritableRowSet apply(final WritableRowSet rowSet) {
final RowSetBuilderSequential toRemove = RowSetFactory.builderSequential();
final RowSetBuilderSequential toInsert = RowSetFactory.builderSequential();
try (final RowSequence.Iterator rsIt = rowSet.getRowSequenceIterator()) {
Expand Down Expand Up @@ -315,7 +315,7 @@ public boolean apply(final WritableRowSet rowSet) {
rowSet.remove(remove);
rowSet.insert(insert);

return remove.isNonempty() || insert.isNonempty();
return rowSet;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.deephaven.engine.table.impl.sources.RedirectedColumnSource;
import io.deephaven.engine.table.impl.util.*;
import org.apache.commons.lang3.mutable.MutableObject;
import org.jetbrains.annotations.NotNull;

import java.util.LinkedHashMap;
import java.util.Map;
Expand Down Expand Up @@ -98,6 +99,34 @@ private void onUpdate(final TableUpdate upstream) {
try (final RowSet prevRowSet = rowSet.copyPrev()) {
downstream.removed = prevRowSet.invert(upstream.removed());
}

if (newSize < prevSize) {
resultTable.getRowSet().writableCast().removeRange(newSize, prevSize - 1);
} else if (newSize > prevSize) {
resultTable.getRowSet().writableCast().insertRange(prevSize, newSize - 1);
}

downstream.shifted = computeFlattenedRowSetShiftData(downstream.removed(), downstream.added(), prevSize);
prevSize = newSize;
resultTable.notifyListeners(downstream);
}

/**
* Compute the shift data for a flattened row set given which rows were removed and which were added.
*
* @param removed the rows that were removed in the flattened pre-update key-space
* @param added the rows that were added in the flattened post-update key-space
* @param prevSize the size of the table before the update
* @return the shift data
*/
public static RowSetShiftData computeFlattenedRowSetShiftData(
@NotNull final RowSet removed,
@NotNull final RowSet added,
final long prevSize) {
if (removed.isEmpty() && added.isEmpty()) {
return RowSetShiftData.EMPTY;
}

final RowSetShiftData.Builder outShifted = new RowSetShiftData.Builder();

// Helper to ensure that we can prime iterators and still detect the end.
Expand All @@ -110,8 +139,8 @@ private void onUpdate(final TableUpdate upstream) {
};

// Create our range iterators and prime them.
final MutableObject<RowSet.RangeIterator> rmIt = new MutableObject<>(downstream.removed().rangeIterator());
final MutableObject<RowSet.RangeIterator> addIt = new MutableObject<>(downstream.added().rangeIterator());
final MutableObject<RowSet.RangeIterator> rmIt = new MutableObject<>(removed.rangeIterator());
final MutableObject<RowSet.RangeIterator> addIt = new MutableObject<>(added.rangeIterator());
updateIt.accept(rmIt);
updateIt.accept(addIt);

Expand Down Expand Up @@ -163,14 +192,6 @@ private void onUpdate(final TableUpdate upstream) {
outShifted.shiftRange(currMarker, prevSize - 1, currDelta);
}

if (newSize < prevSize) {
resultTable.getRowSet().writableCast().removeRange(newSize, prevSize - 1);
} else if (newSize > prevSize) {
resultTable.getRowSet().writableCast().insertRange(prevSize, newSize - 1);
}

downstream.shifted = outShifted.build();
prevSize = newSize;
resultTable.notifyListeners(downstream);
return outShifted.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ public static BarrageMessage constructBackplaneSnapshotInPositionSpace(
final long clockStep =
callDataSnapshotFunction(System.identityHashCode(logIdentityObject), control, doSnapshot);
final BarrageMessage snapshot = snapshotMsg.getValue();
snapshot.step = snapshot.firstSeq = snapshot.lastSeq = clockStep;
snapshot.firstSeq = snapshot.lastSeq = clockStep;
return snapshot;
}
}
Expand Down Expand Up @@ -1314,6 +1314,7 @@ private static boolean snapshotAllTable(
@Nullable final RowSet keysToSnapshot) {

snapshot.rowsAdded = (usePrev ? table.getRowSet().prev() : table.getRowSet()).copy();
snapshot.tableSize = snapshot.rowsAdded.size();
snapshot.rowsRemoved = RowSetFactory.empty();
snapshot.addColumnData = new BarrageMessage.AddColumnData[table.getColumnSources().size()];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,17 @@ public static class AddColumnData {

public long firstSeq = -1;
public long lastSeq = -1;
public long step = -1;
/** The size of the table after this update. -1 if unknown. */
public long tableSize = -1;

public boolean isSnapshot;
/** The RowSet the server is now respecting for this client; only set when parsing on the client. */
public RowSet snapshotRowSet;
/** Whether the server-respecting viewport is a tail; only set when parsing on the client. */
public boolean snapshotRowSetIsReversed;
/** The BitSet of columns the server is now respecting for this client; only set when parsing on the client. */
public BitSet snapshotColumns;

public boolean isSnapshot;
public RowSet rowsAdded;
public RowSet rowsIncluded;
public RowSet rowsRemoved;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.deephaven.annotations.BuildableStyle;
import io.deephaven.barrage.flatbuf.BarrageSnapshotRequest;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.util.annotations.FinalDefault;
import org.immutables.value.Value.Default;
import org.immutables.value.Value.Immutable;

Expand All @@ -21,42 +22,30 @@ public static BarrageSnapshotOptions of(final io.deephaven.barrage.flatbuf.Barra
if (options == null) {
return builder().build();
}
final byte mode = options.columnConversionMode();
return builder()
.useDeephavenNulls(options.useDeephavenNulls())
.columnConversionMode(ColumnConversionMode.conversionModeFbToEnum(mode))
.batchSize(options.batchSize())
.maxMessageSize(options.maxMessageSize())
.previewListLengthLimit(options.previewListLengthLimit())
.build();
}

public static BarrageSnapshotOptions of(final BarrageSnapshotRequest snapshotRequest) {
return of(snapshotRequest.snapshotOptions());
}

/**
* By default, prefer to communicate null values using the arrow-compatible validity structure.
*
* @return whether to use deephaven nulls
*/
@Override
@Default
public boolean useDeephavenNulls() {
return false;
}

/**
* @return the preferred batch size if specified
*/
@Override
@Default
public int batchSize() {
return 0;
}

/**
* @return the maximum GRPC message size if specified
*/
@Override
@Default
public int maxMessageSize() {
Expand All @@ -65,27 +54,68 @@ public int maxMessageSize() {

@Override
@Default
public ColumnConversionMode columnConversionMode() {
return ColumnConversionMode.Stringify;
public long previewListLengthLimit() {
return 0;
}

public int appendTo(FlatBufferBuilder builder) {
return io.deephaven.barrage.flatbuf.BarrageSnapshotOptions.createBarrageSnapshotOptions(
builder, ColumnConversionMode.conversionModeEnumToFb(columnConversionMode()), useDeephavenNulls(),
return io.deephaven.barrage.flatbuf.BarrageSnapshotOptions.createBarrageSnapshotOptions(builder,
useDeephavenNulls(),
batchSize(),
maxMessageSize());
maxMessageSize(),
previewListLengthLimit());
}

public interface Builder {

/**
* See {@link StreamReaderOptions#useDeephavenNulls()} for details.
*
* @param useDeephavenNulls whether to use deephaven nulls
* @return this builder
*/
Builder useDeephavenNulls(boolean useDeephavenNulls);

Builder columnConversionMode(ColumnConversionMode columnConversionMode);
/**
* The default conversion mode is to Stringify all objects that do not have a registered encoding. Column
* conversion modes are no longer supported.
*
* @deprecated Since 0.37.0 and is marked for removal. (Note, GWT does not support encoding this context via
* annotation values.)
*/
@FinalDefault
@Deprecated
default Builder columnConversionMode(ColumnConversionMode columnConversionMode) {
return this;
}

/**
* See {@link StreamReaderOptions#batchSize()} for details.
*
* @param batchSize the ideal number of records to send per record batch
* @return this builder
*/
Builder batchSize(int batchSize);

/**
* See {@link StreamReaderOptions#maxMessageSize()} for details.
*
* @param messageSize the maximum size of a GRPC message in bytes
* @return this builder
*/
Builder maxMessageSize(int messageSize);

/**
* See {@link StreamReaderOptions#previewListLengthLimit()} for details.
*
* @param previewListLengthLimit the magnitude of the number of elements to include in a preview list
* @return this builder
*/
Builder previewListLengthLimit(long previewListLengthLimit);

/**
* @return a new BarrageSnapshotOptions instance
*/
BarrageSnapshotOptions build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,35 @@ BarrageStreamGenerator newGenerator(
* Obtain a Full-Subscription View of this StreamGenerator that can be sent to a single subscriber.
*
* @param options serialization options for this specific view
* @param isInitialSnapshot indicates whether or not this is the first snapshot for the listener
* @param isInitialSnapshot indicates whether this is the first snapshot for the listener
* @return a MessageView filtered by the subscription properties that can be sent to that subscriber
*/
MessageView getSubView(BarrageSubscriptionOptions options, boolean isInitialSnapshot);

/**
* Obtain a View of this StreamGenerator that can be sent to a single subscriber.
* <p>
* Note that all passed in arguments are owned by the caller and may be modified external to this method.
*
* @param options serialization options for this specific view
* @param isInitialSnapshot indicates whether or not this is the first snapshot for the listener
* @param isInitialSnapshot indicates whether this is the first snapshot for the listener
* @param isFullSubscription whether this is a full subscription (possibly a growing viewport)
* @param viewport is the position-space viewport
* @param reverseViewport is the viewport reversed (relative to end of table instead of beginning)
* @param keyspaceViewportPrev is the key-space viewport prior to applying the update
* @param keyspaceViewport is the key-space viewport
* @param subscribedColumns are the columns subscribed for this view
* @return a MessageView filtered by the subscription properties that can be sent to that subscriber
*/
MessageView getSubView(BarrageSubscriptionOptions options, boolean isInitialSnapshot, @Nullable RowSet viewport,
boolean reverseViewport, @Nullable RowSet keyspaceViewport, BitSet subscribedColumns);
MessageView getSubView(
BarrageSubscriptionOptions options,
boolean isInitialSnapshot,
boolean isFullSubscription,
@Nullable RowSet viewport,
boolean reverseViewport,
@Nullable RowSet keyspaceViewportPrev,
@Nullable RowSet keyspaceViewport,
BitSet subscribedColumns);

/**
* Obtain a Full-Snapshot View of this StreamGenerator that can be sent to a single requestor.
Expand All @@ -88,14 +99,19 @@ MessageView getSubView(BarrageSubscriptionOptions options, boolean isInitialSnap

/**
* Obtain a View of this StreamGenerator that can be sent to a single requestor.
* <p>
* Note that all passed in arguments are owned by the caller and may be modified external to this method.
*
* @param options serialization options for this specific view
* @param viewport is the position-space viewport
* @param reverseViewport is the viewport reversed (relative to end of table instead of beginning)
* @param snapshotColumns are the columns included for this view
* @return a MessageView filtered by the snapshot properties that can be sent to that requestor
*/
MessageView getSnapshotView(BarrageSnapshotOptions options, @Nullable RowSet viewport, boolean reverseViewport,
MessageView getSnapshotView(
BarrageSnapshotOptions options,
@Nullable RowSet viewport,
boolean reverseViewport,
@Nullable RowSet keyspaceViewport, BitSet snapshotColumns);

}
Loading

0 comments on commit d3b0ad4

Please sign in to comment.