Skip to content

Commit

Permalink
fix: DH-18567: Filter locations before removal allowed checks. (#6607) (
Browse files Browse the repository at this point in the history
  • Loading branch information
cpwright authored Feb 3, 2025
1 parent 4707ce8 commit b6503a8
Show file tree
Hide file tree
Showing 3 changed files with 396 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import javax.annotation.OverridingMethodsMustInvokeSuper;
import java.util.ArrayList;
import java.util.Collection;
import java.util.stream.Stream;

/**
* Basic uncoalesced table that only adds keys.
Expand Down Expand Up @@ -153,7 +154,6 @@ private void initializeAvailableLocations() {
try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
locationBuffer.processPending()) {
if (locationUpdate != null) {
maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
}
}
Expand Down Expand Up @@ -188,14 +188,26 @@ private void maybeAddLocations(@NotNull final Collection<LiveSupplier<ImmutableT
.forEach(lk -> columnSourceManager.addLocation(locationProvider.getTableLocation(lk.get())));
}

private void maybeRemoveLocations(@NotNull final Collection<LiveSupplier<ImmutableTableLocationKey>> removedKeys) {
private void maybeRemoveLocations(@NotNull final Collection<LiveSupplier<ImmutableTableLocationKey>> removedKeys,
final boolean removedAllowed) {
if (removedKeys.isEmpty()) {
return;
}

filterLocationKeys(removedKeys).stream()
final Collection<LiveSupplier<ImmutableTableLocationKey>> filteredSuppliers = filterLocationKeys(removedKeys);
if (filteredSuppliers.isEmpty()) {
return;
}

if (removedAllowed) {
filteredSuppliers.stream().map(LiveSupplier::get).forEach(columnSourceManager::removeLocationKey);
return;
}

final ImmutableTableLocationKey[] keys = filteredSuppliers.stream()
.map(LiveSupplier::get)
.forEach(columnSourceManager::removeLocationKey);
.toArray(ImmutableTableLocationKey[]::new);
throw new TableLocationRemovedException("Source table does not support removed locations", keys);
}

private void initializeLocationSizes() {
Expand Down Expand Up @@ -238,16 +250,8 @@ protected void instrumentedRefresh() {
try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
locationBuffer.processPending()) {
if (locationUpdate != null) {
if (!locationProvider.getUpdateMode().removeAllowed()
&& !locationUpdate.getPendingRemovedLocationKeys().isEmpty()) {
// This TLP doesn't support removed locations, we need to throw an exception.
final ImmutableTableLocationKey[] keys = locationUpdate.getPendingRemovedLocationKeys().stream()
.map(LiveSupplier::get).toArray(ImmutableTableLocationKey[]::new);
throw new TableLocationRemovedException(
"Source table does not support removed locations", keys);
}

maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys(),
locationProvider.getUpdateMode().removeAllowed());
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ private void allowLivenessRelease() {
{
allowing(locationProvider).supportsSubscriptions();
allowing(locationProvider).unsubscribe(with(any(TableLocationProvider.Listener.class)));
will(returnValue(true));
for (int li = 0; li < tableLocations.length; ++li) {
final TableLocation tableLocation = tableLocations[li];
allowing(tableLocation).supportsSubscriptions();
Expand Down
Loading

0 comments on commit b6503a8

Please sign in to comment.