Skip to content

Commit 6c47731

Browse files
cpwrightlbooker42
andauthored
fix: DH-20997: Allow prevFlusher for Distinct operators to run when not expose Internal. (#7447)
Cherry pick of #7446. Co-authored-by: Larry Booker <lbooker42@gmail.com>
1 parent 01f1505 commit 6c47731

17 files changed

+261
-216
lines changed

engine/table/src/main/java/io/deephaven/engine/table/impl/by/ssmcountdistinct/count/ByteChunkedCountDistinctOperator.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public void removeChunk(BucketedContext bucketedContext, Chunk<? extends Values>
129129
final WritableIntChunk<ChunkLengths> countSlice =
130130
context.countResettable.resetFromChunk(context.counts, startPosition, runLength);
131131
ssm.remove(removeContext, valueSlice, countSlice);
132-
if (ssm.size() == 0) {
132+
if (ssm.isEmpty()) {
133133
clearSsm(destination);
134134
}
135135

@@ -160,7 +160,7 @@ public void modifyChunk(BucketedContext bucketedContext, Chunk<? extends Values>
160160
final WritableIntChunk<ChunkLengths> countSlice =
161161
context.countResettable.resetFromChunk(context.counts, startPosition, runLength);
162162
ssm.remove(removeContext, valueSlice, countSlice);
163-
if (ssm.size() == 0) {
163+
if (ssm.isEmpty()) {
164164
context.ssmsToMaybeClear.set(ii, true);
165165
}
166166
}
@@ -224,7 +224,7 @@ public boolean removeChunk(SingletonContext singletonContext, int chunkSize, Chu
224224

225225
final ByteSegmentedSortedMultiset ssm = ssmForSlot(destination);
226226
ssm.remove(context.removeContext, context.valueCopy, context.counts);
227-
if (ssm.size() == 0) {
227+
if (ssm.isEmpty()) {
228228
clearSsm(destination);
229229
}
230230

@@ -244,7 +244,7 @@ public boolean modifyChunk(SingletonContext singletonContext, int chunkSize, Chu
244244
ByteSegmentedSortedMultiset ssm = ssmForSlot(destination);
245245
if (context.valueCopy.size() > 0) {
246246
ssm.insert(context.valueCopy, context.counts);
247-
} else if (ssm.size() == 0) {
247+
} else if (ssm.isEmpty()) {
248248
clearSsm(destination);
249249
}
250250

@@ -295,17 +295,15 @@ public void ensureCapacity(long tableSize) {
295295
public void startTrackingPrevValues() {
296296
resultColumn.startTrackingPrevValues();
297297

298-
if (exposeInternal) {
299-
if (prevFlusher != null) {
300-
throw new IllegalStateException("startTrackingPrevValues must only be called once");
301-
}
302-
303-
ssms.startTrackingPrevValues();
304-
prevFlusher = new UpdateCommitter<>(this,
305-
ExecutionContext.getContext().getUpdateGraph(),
306-
ByteChunkedCountDistinctOperator::flushPrevious);
307-
touchedStates = RowSetFactory.empty();
298+
if (prevFlusher != null) {
299+
throw new IllegalStateException("startTrackingPrevValues must only be called once");
308300
}
301+
302+
ssms.startTrackingPrevValues();
303+
prevFlusher = new UpdateCommitter<>(this,
304+
ExecutionContext.getContext().getUpdateGraph(),
305+
ByteChunkedCountDistinctOperator::flushPrevious);
306+
touchedStates = RowSetFactory.empty();
309307
}
310308

311309
@Override
@@ -325,7 +323,7 @@ private void clearSsm(long destination) {
325323
}
326324

327325
private boolean setResult(ByteSegmentedSortedMultiset ssm, long destination) {
328-
final long expectedResult = ssm.size() == 0 ? QueryConstants.NULL_LONG : ssm.size();
326+
final long expectedResult = ssm.isEmpty() ? QueryConstants.NULL_LONG : ssm.size();
329327
final boolean countChanged = resultColumn.getAndSetUnsafe(destination, expectedResult) != expectedResult;
330328
return countChanged || (exposeInternal && (ssm.getAddedSize() > 0 || ssm.getRemovedSize() > 0));
331329
}

engine/table/src/main/java/io/deephaven/engine/table/impl/by/ssmcountdistinct/count/CharChunkedCountDistinctOperator.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void removeChunk(BucketedContext bucketedContext, Chunk<? extends Values>
125125
final WritableIntChunk<ChunkLengths> countSlice =
126126
context.countResettable.resetFromChunk(context.counts, startPosition, runLength);
127127
ssm.remove(removeContext, valueSlice, countSlice);
128-
if (ssm.size() == 0) {
128+
if (ssm.isEmpty()) {
129129
clearSsm(destination);
130130
}
131131

@@ -156,7 +156,7 @@ public void modifyChunk(BucketedContext bucketedContext, Chunk<? extends Values>
156156
final WritableIntChunk<ChunkLengths> countSlice =
157157
context.countResettable.resetFromChunk(context.counts, startPosition, runLength);
158158
ssm.remove(removeContext, valueSlice, countSlice);
159-
if (ssm.size() == 0) {
159+
if (ssm.isEmpty()) {
160160
context.ssmsToMaybeClear.set(ii, true);
161161
}
162162
}
@@ -220,7 +220,7 @@ public boolean removeChunk(SingletonContext singletonContext, int chunkSize, Chu
220220

221221
final CharSegmentedSortedMultiset ssm = ssmForSlot(destination);
222222
ssm.remove(context.removeContext, context.valueCopy, context.counts);
223-
if (ssm.size() == 0) {
223+
if (ssm.isEmpty()) {
224224
clearSsm(destination);
225225
}
226226

@@ -240,7 +240,7 @@ public boolean modifyChunk(SingletonContext singletonContext, int chunkSize, Chu
240240
CharSegmentedSortedMultiset ssm = ssmForSlot(destination);
241241
if (context.valueCopy.size() > 0) {
242242
ssm.insert(context.valueCopy, context.counts);
243-
} else if (ssm.size() == 0) {
243+
} else if (ssm.isEmpty()) {
244244
clearSsm(destination);
245245
}
246246

@@ -291,17 +291,15 @@ public void ensureCapacity(long tableSize) {
291291
public void startTrackingPrevValues() {
292292
resultColumn.startTrackingPrevValues();
293293

294-
if (exposeInternal) {
295-
if (prevFlusher != null) {
296-
throw new IllegalStateException("startTrackingPrevValues must only be called once");
297-
}
298-
299-
ssms.startTrackingPrevValues();
300-
prevFlusher = new UpdateCommitter<>(this,
301-
ExecutionContext.getContext().getUpdateGraph(),
302-
CharChunkedCountDistinctOperator::flushPrevious);
303-
touchedStates = RowSetFactory.empty();
294+
if (prevFlusher != null) {
295+
throw new IllegalStateException("startTrackingPrevValues must only be called once");
304296
}
297+
298+
ssms.startTrackingPrevValues();
299+
prevFlusher = new UpdateCommitter<>(this,
300+
ExecutionContext.getContext().getUpdateGraph(),
301+
CharChunkedCountDistinctOperator::flushPrevious);
302+
touchedStates = RowSetFactory.empty();
305303
}
306304

307305
@Override
@@ -321,7 +319,7 @@ private void clearSsm(long destination) {
321319
}
322320

323321
private boolean setResult(CharSegmentedSortedMultiset ssm, long destination) {
324-
final long expectedResult = ssm.size() == 0 ? QueryConstants.NULL_LONG : ssm.size();
322+
final long expectedResult = ssm.isEmpty() ? QueryConstants.NULL_LONG : ssm.size();
325323
final boolean countChanged = resultColumn.getAndSetUnsafe(destination, expectedResult) != expectedResult;
326324
return countChanged || (exposeInternal && (ssm.getAddedSize() > 0 || ssm.getRemovedSize() > 0));
327325
}

engine/table/src/main/java/io/deephaven/engine/table/impl/by/ssmcountdistinct/count/DoubleChunkedCountDistinctOperator.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public void removeChunk(BucketedContext bucketedContext, Chunk<? extends Values>
129129
final WritableIntChunk<ChunkLengths> countSlice =
130130
context.countResettable.resetFromChunk(context.counts, startPosition, runLength);
131131
ssm.remove(removeContext, valueSlice, countSlice);
132-
if (ssm.size() == 0) {
132+
if (ssm.isEmpty()) {
133133
clearSsm(destination);
134134
}
135135

@@ -160,7 +160,7 @@ public void modifyChunk(BucketedContext bucketedContext, Chunk<? extends Values>
160160
final WritableIntChunk<ChunkLengths> countSlice =
161161
context.countResettable.resetFromChunk(context.counts, startPosition, runLength);
162162
ssm.remove(removeContext, valueSlice, countSlice);
163-
if (ssm.size() == 0) {
163+
if (ssm.isEmpty()) {
164164
context.ssmsToMaybeClear.set(ii, true);
165165
}
166166
}
@@ -224,7 +224,7 @@ public boolean removeChunk(SingletonContext singletonContext, int chunkSize, Chu
224224

225225
final DoubleSegmentedSortedMultiset ssm = ssmForSlot(destination);
226226
ssm.remove(context.removeContext, context.valueCopy, context.counts);
227-
if (ssm.size() == 0) {
227+
if (ssm.isEmpty()) {
228228
clearSsm(destination);
229229
}
230230

@@ -244,7 +244,7 @@ public boolean modifyChunk(SingletonContext singletonContext, int chunkSize, Chu
244244
DoubleSegmentedSortedMultiset ssm = ssmForSlot(destination);
245245
if (context.valueCopy.size() > 0) {
246246
ssm.insert(context.valueCopy, context.counts);
247-
} else if (ssm.size() == 0) {
247+
} else if (ssm.isEmpty()) {
248248
clearSsm(destination);
249249
}
250250

@@ -295,17 +295,15 @@ public void ensureCapacity(long tableSize) {
295295
public void startTrackingPrevValues() {
296296
resultColumn.startTrackingPrevValues();
297297

298-
if (exposeInternal) {
299-
if (prevFlusher != null) {
300-
throw new IllegalStateException("startTrackingPrevValues must only be called once");
301-
}
302-
303-
ssms.startTrackingPrevValues();
304-
prevFlusher = new UpdateCommitter<>(this,
305-
ExecutionContext.getContext().getUpdateGraph(),
306-
DoubleChunkedCountDistinctOperator::flushPrevious);
307-
touchedStates = RowSetFactory.empty();
298+
if (prevFlusher != null) {
299+
throw new IllegalStateException("startTrackingPrevValues must only be called once");
308300
}
301+
302+
ssms.startTrackingPrevValues();
303+
prevFlusher = new UpdateCommitter<>(this,
304+
ExecutionContext.getContext().getUpdateGraph(),
305+
DoubleChunkedCountDistinctOperator::flushPrevious);
306+
touchedStates = RowSetFactory.empty();
309307
}
310308

311309
@Override
@@ -325,7 +323,7 @@ private void clearSsm(long destination) {
325323
}
326324

327325
private boolean setResult(DoubleSegmentedSortedMultiset ssm, long destination) {
328-
final long expectedResult = ssm.size() == 0 ? QueryConstants.NULL_LONG : ssm.size();
326+
final long expectedResult = ssm.isEmpty() ? QueryConstants.NULL_LONG : ssm.size();
329327
final boolean countChanged = resultColumn.getAndSetUnsafe(destination, expectedResult) != expectedResult;
330328
return countChanged || (exposeInternal && (ssm.getAddedSize() > 0 || ssm.getRemovedSize() > 0));
331329
}

engine/table/src/main/java/io/deephaven/engine/table/impl/by/ssmcountdistinct/count/FloatChunkedCountDistinctOperator.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public void removeChunk(BucketedContext bucketedContext, Chunk<? extends Values>
129129
final WritableIntChunk<ChunkLengths> countSlice =
130130
context.countResettable.resetFromChunk(context.counts, startPosition, runLength);
131131
ssm.remove(removeContext, valueSlice, countSlice);
132-
if (ssm.size() == 0) {
132+
if (ssm.isEmpty()) {
133133
clearSsm(destination);
134134
}
135135

@@ -160,7 +160,7 @@ public void modifyChunk(BucketedContext bucketedContext, Chunk<? extends Values>
160160
final WritableIntChunk<ChunkLengths> countSlice =
161161
context.countResettable.resetFromChunk(context.counts, startPosition, runLength);
162162
ssm.remove(removeContext, valueSlice, countSlice);
163-
if (ssm.size() == 0) {
163+
if (ssm.isEmpty()) {
164164
context.ssmsToMaybeClear.set(ii, true);
165165
}
166166
}
@@ -224,7 +224,7 @@ public boolean removeChunk(SingletonContext singletonContext, int chunkSize, Chu
224224

225225
final FloatSegmentedSortedMultiset ssm = ssmForSlot(destination);
226226
ssm.remove(context.removeContext, context.valueCopy, context.counts);
227-
if (ssm.size() == 0) {
227+
if (ssm.isEmpty()) {
228228
clearSsm(destination);
229229
}
230230

@@ -244,7 +244,7 @@ public boolean modifyChunk(SingletonContext singletonContext, int chunkSize, Chu
244244
FloatSegmentedSortedMultiset ssm = ssmForSlot(destination);
245245
if (context.valueCopy.size() > 0) {
246246
ssm.insert(context.valueCopy, context.counts);
247-
} else if (ssm.size() == 0) {
247+
} else if (ssm.isEmpty()) {
248248
clearSsm(destination);
249249
}
250250

@@ -295,17 +295,15 @@ public void ensureCapacity(long tableSize) {
295295
public void startTrackingPrevValues() {
296296
resultColumn.startTrackingPrevValues();
297297

298-
if (exposeInternal) {
299-
if (prevFlusher != null) {
300-
throw new IllegalStateException("startTrackingPrevValues must only be called once");
301-
}
302-
303-
ssms.startTrackingPrevValues();
304-
prevFlusher = new UpdateCommitter<>(this,
305-
ExecutionContext.getContext().getUpdateGraph(),
306-
FloatChunkedCountDistinctOperator::flushPrevious);
307-
touchedStates = RowSetFactory.empty();
298+
if (prevFlusher != null) {
299+
throw new IllegalStateException("startTrackingPrevValues must only be called once");
308300
}
301+
302+
ssms.startTrackingPrevValues();
303+
prevFlusher = new UpdateCommitter<>(this,
304+
ExecutionContext.getContext().getUpdateGraph(),
305+
FloatChunkedCountDistinctOperator::flushPrevious);
306+
touchedStates = RowSetFactory.empty();
309307
}
310308

311309
@Override
@@ -325,7 +323,7 @@ private void clearSsm(long destination) {
325323
}
326324

327325
private boolean setResult(FloatSegmentedSortedMultiset ssm, long destination) {
328-
final long expectedResult = ssm.size() == 0 ? QueryConstants.NULL_LONG : ssm.size();
326+
final long expectedResult = ssm.isEmpty() ? QueryConstants.NULL_LONG : ssm.size();
329327
final boolean countChanged = resultColumn.getAndSetUnsafe(destination, expectedResult) != expectedResult;
330328
return countChanged || (exposeInternal && (ssm.getAddedSize() > 0 || ssm.getRemovedSize() > 0));
331329
}

engine/table/src/main/java/io/deephaven/engine/table/impl/by/ssmcountdistinct/count/IntChunkedCountDistinctOperator.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public void removeChunk(BucketedContext bucketedContext, Chunk<? extends Values>
129129
final WritableIntChunk<ChunkLengths> countSlice =
130130
context.countResettable.resetFromChunk(context.counts, startPosition, runLength);
131131
ssm.remove(removeContext, valueSlice, countSlice);
132-
if (ssm.size() == 0) {
132+
if (ssm.isEmpty()) {
133133
clearSsm(destination);
134134
}
135135

@@ -160,7 +160,7 @@ public void modifyChunk(BucketedContext bucketedContext, Chunk<? extends Values>
160160
final WritableIntChunk<ChunkLengths> countSlice =
161161
context.countResettable.resetFromChunk(context.counts, startPosition, runLength);
162162
ssm.remove(removeContext, valueSlice, countSlice);
163-
if (ssm.size() == 0) {
163+
if (ssm.isEmpty()) {
164164
context.ssmsToMaybeClear.set(ii, true);
165165
}
166166
}
@@ -224,7 +224,7 @@ public boolean removeChunk(SingletonContext singletonContext, int chunkSize, Chu
224224

225225
final IntSegmentedSortedMultiset ssm = ssmForSlot(destination);
226226
ssm.remove(context.removeContext, context.valueCopy, context.counts);
227-
if (ssm.size() == 0) {
227+
if (ssm.isEmpty()) {
228228
clearSsm(destination);
229229
}
230230

@@ -244,7 +244,7 @@ public boolean modifyChunk(SingletonContext singletonContext, int chunkSize, Chu
244244
IntSegmentedSortedMultiset ssm = ssmForSlot(destination);
245245
if (context.valueCopy.size() > 0) {
246246
ssm.insert(context.valueCopy, context.counts);
247-
} else if (ssm.size() == 0) {
247+
} else if (ssm.isEmpty()) {
248248
clearSsm(destination);
249249
}
250250

@@ -295,17 +295,15 @@ public void ensureCapacity(long tableSize) {
295295
public void startTrackingPrevValues() {
296296
resultColumn.startTrackingPrevValues();
297297

298-
if (exposeInternal) {
299-
if (prevFlusher != null) {
300-
throw new IllegalStateException("startTrackingPrevValues must only be called once");
301-
}
302-
303-
ssms.startTrackingPrevValues();
304-
prevFlusher = new UpdateCommitter<>(this,
305-
ExecutionContext.getContext().getUpdateGraph(),
306-
IntChunkedCountDistinctOperator::flushPrevious);
307-
touchedStates = RowSetFactory.empty();
298+
if (prevFlusher != null) {
299+
throw new IllegalStateException("startTrackingPrevValues must only be called once");
308300
}
301+
302+
ssms.startTrackingPrevValues();
303+
prevFlusher = new UpdateCommitter<>(this,
304+
ExecutionContext.getContext().getUpdateGraph(),
305+
IntChunkedCountDistinctOperator::flushPrevious);
306+
touchedStates = RowSetFactory.empty();
309307
}
310308

311309
@Override
@@ -325,7 +323,7 @@ private void clearSsm(long destination) {
325323
}
326324

327325
private boolean setResult(IntSegmentedSortedMultiset ssm, long destination) {
328-
final long expectedResult = ssm.size() == 0 ? QueryConstants.NULL_LONG : ssm.size();
326+
final long expectedResult = ssm.isEmpty() ? QueryConstants.NULL_LONG : ssm.size();
329327
final boolean countChanged = resultColumn.getAndSetUnsafe(destination, expectedResult) != expectedResult;
330328
return countChanged || (exposeInternal && (ssm.getAddedSize() > 0 || ssm.getRemovedSize() > 0));
331329
}

0 commit comments

Comments
 (0)