diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationOperatorException.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationOperatorException.java new file mode 100644 index 00000000000..62a6a386d2f --- /dev/null +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationOperatorException.java @@ -0,0 +1,20 @@ +package io.deephaven.engine.table.impl.by; + +import io.deephaven.UncheckedDeephavenException; + +/** + * This exception provides more context when an aggregation operator throws an Exception. + * + *

+ * When an aggregation operator results in an Error, this exception is added as a suppressed exception. + *

+ */ +public class AggregationOperatorException extends UncheckedDeephavenException { + public AggregationOperatorException(String reason, Throwable cause) { + super(reason, cause); + } + + public AggregationOperatorException(String reason) { + super(reason); + } +} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java index 76608e67c4e..2a5b8f03a7b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java @@ -725,9 +725,21 @@ private void propagateRemovesToOperators(@NotNull final RowSequence keyIndicesTo getChunk(ac.inputColumns[oi], getContexts[oi], keyIndicesToRemoveChunk, true); } } - ac.operators[oi].removeChunk(bucketedContexts[oi], inputSlot >= 0 ? valueChunks[inputSlot] : null, - permutedKeyIndices, slotsToRemoveFrom, runStarts, runLengths, - firstOperator ? modifiedSlots : slotsModifiedByOperator); + try { + ac.operators[oi].removeChunk(bucketedContexts[oi], inputSlot >= 0 ? valueChunks[inputSlot] : null, + permutedKeyIndices, slotsToRemoveFrom, runStarts, runLengths, + firstOperator ? modifiedSlots : slotsModifiedByOperator); + } catch (Exception ex) { + throw new AggregationOperatorException( + "Failed to remove data, inputcolumns=" + Arrays.toString(ac.inputNames[oi]) + ", outputs=" + + ac.operators[oi].getResultColumns().keySet(), + ex); + } catch (Error err) { + err.addSuppressed(new AggregationOperatorException( + "Failed to remove data, inputcolumns=" + Arrays.toString(ac.inputNames[oi]) + ", outputs=" + + ac.operators[oi].getResultColumns().keySet())); + throw err; + } anyOperatorModified = updateModificationState(modifiedOperators, modifiedSlots, slotsModifiedByOperator, anyOperatorModified, firstOperator, oi); @@ -801,9 +813,21 @@ private void propagateInsertsToOperators(@NotNull final RowSequence keyIndicesTo getChunk(ac.inputColumns[oi], getContexts[oi], keyIndicesToInsertChunk, false); } } - ac.operators[oi].addChunk(bucketedContexts[oi], inputSlot >= 0 ? valueChunks[inputSlot] : null, - permutedKeyIndices, slotsToAddTo, runStarts, runLengths, - firstOperator ? modifiedSlots : slotsModifiedByOperator); + try { + ac.operators[oi].addChunk(bucketedContexts[oi], inputSlot >= 0 ? valueChunks[inputSlot] : null, + permutedKeyIndices, slotsToAddTo, runStarts, runLengths, + firstOperator ? modifiedSlots : slotsModifiedByOperator); + } catch (Exception ex) { + throw new AggregationOperatorException( + "Failed to add data, inputcolumns=" + Arrays.toString(ac.inputNames[oi]) + ", outputs=" + + ac.operators[oi].getResultColumns().keySet(), + ex); + } catch (Error err) { + err.addSuppressed(new AggregationOperatorException( + "Failed to add data, inputcolumns=" + Arrays.toString(ac.inputNames[oi]) + ", outputs=" + + ac.operators[oi].getResultColumns().keySet())); + throw err; + } anyOperatorModified = updateModificationState(modifiedOperators, modifiedSlots, slotsModifiedByOperator, anyOperatorModified, firstOperator, oi); @@ -902,10 +926,24 @@ private void doProcessShiftBucketed(@NotNull final WritableLongChunk= 0 ? valueChunks[inputSlot] : null, - inputSlot >= 0 ? postValueChunks[inputSlot] : null, usePreKeys, - usePostKeys, slots, runStarts, runLengths, - firstOperator ? modifiedSlots : slotsModifiedByOperator); + try { + ac.operators[oi].shiftChunk(bucketedContexts[oi], + inputSlot >= 0 ? valueChunks[inputSlot] : null, + inputSlot >= 0 ? postValueChunks[inputSlot] : null, usePreKeys, + usePostKeys, slots, runStarts, runLengths, + firstOperator ? modifiedSlots : slotsModifiedByOperator); + } catch (Exception ex) { + throw new AggregationOperatorException( + "Failed to shift data, inputcolumns=" + Arrays.toString(ac.inputNames[oi]) + + ", outputs=" + ac.operators[oi].getResultColumns().keySet(), + ex); + } catch (Error err) { + err.addSuppressed(new AggregationOperatorException( + "Failed to shift data, inputcolumns=" + Arrays.toString(ac.inputNames[oi]) + + ", outputs=" + ac.operators[oi].getResultColumns().keySet())); + throw err; + } + anyOperatorModified = updateModificationState(modifiedOperators, modifiedSlots, slotsModifiedByOperator, anyOperatorModified, firstOperator, oi); firstOperator = false; @@ -968,8 +1006,21 @@ private void doSameSlotModifies(@NotNull final RowSequence preShiftKeyIndicesToM } if (operatorsToProcessIndicesOnly[oi]) { - ac.operators[oi].modifyRowKeys(bucketedContexts[oi], permutedKeyIndices, slots, runStarts, - runLengths, firstOperator ? modifiedSlots : slotsModifiedByOperator); + try { + ac.operators[oi].modifyRowKeys(bucketedContexts[oi], permutedKeyIndices, slots, + runStarts, + runLengths, firstOperator ? modifiedSlots : slotsModifiedByOperator); + } catch (Exception ex) { + throw new AggregationOperatorException( + "Failed to modify data, inputcolumns=" + Arrays.toString(ac.inputNames[oi]) + + ", outputs=" + ac.operators[oi].getResultColumns().keySet(), + ex); + } catch (Error err) { + err.addSuppressed(new AggregationOperatorException( + "Failed to modify data, inputcolumns=" + Arrays.toString(ac.inputNames[oi]) + + ", outputs=" + ac.operators[oi].getResultColumns().keySet())); + throw err; + } } else /* operatorsToProcess[oi] */ { final int inputSlot = ac.inputSlot(oi); if (inputSlot >= 0 && !chunkInitialized[inputSlot]) { @@ -994,10 +1045,22 @@ private void doSameSlotModifies(@NotNull final RowSequence preShiftKeyIndicesToM chunkInitialized[inputSlot] = true; } - ac.operators[oi].modifyChunk(bucketedContexts[oi], - inputSlot >= 0 ? valueChunks[inputSlot] : null, - inputSlot >= 0 ? postValueChunks[inputSlot] : null, permutedKeyIndices, slots, - runStarts, runLengths, firstOperator ? modifiedSlots : slotsModifiedByOperator); + try { + ac.operators[oi].modifyChunk(bucketedContexts[oi], + inputSlot >= 0 ? valueChunks[inputSlot] : null, + inputSlot >= 0 ? postValueChunks[inputSlot] : null, permutedKeyIndices, slots, + runStarts, runLengths, firstOperator ? modifiedSlots : slotsModifiedByOperator); + } catch (Exception ex) { + throw new AggregationOperatorException( + "Failed to modify data, inputcolumns=" + Arrays.toString(ac.inputNames[oi]) + + ", outputs=" + ac.operators[oi].getResultColumns().keySet(), + ex); + } catch (Error er) { + er.addSuppressed(new AggregationOperatorException( + "Failed to modify data, inputcolumns=" + Arrays.toString(ac.inputNames[oi]) + + ", outputs=" + ac.operators[oi].getResultColumns().keySet())); + throw er; + } } anyOperatorModified = updateModificationState(modifiedOperators, modifiedSlots, @@ -1046,9 +1109,20 @@ private void doSameSlotModifyIndicesOnly(@NotNull final RowSequence postShiftKey setFalse(slotsModifiedByOperator, runStarts.size()); } - ac.operators[oi].modifyRowKeys(bucketedContexts[oi], permutedKeyIndices, slots, runStarts, - runLengths, firstOperator ? modifiedSlots : slotsModifiedByOperator); - + try { + ac.operators[oi].modifyRowKeys(bucketedContexts[oi], permutedKeyIndices, slots, runStarts, + runLengths, firstOperator ? modifiedSlots : slotsModifiedByOperator); + } catch (Exception ex) { + throw new AggregationOperatorException( + "Failed to modify data, inputcolumns=" + Arrays.toString(ac.inputNames[oi]) + + ", outputs=" + ac.operators[oi].getResultColumns().keySet(), + ex); + } catch (Error err) { + err.addSuppressed(new AggregationOperatorException( + "Failed to modify data, inputcolumns=" + Arrays.toString(ac.inputNames[oi]) + + ", outputs=" + ac.operators[oi].getResultColumns().keySet())); + throw err; + } anyOperatorModified = updateModificationState(modifiedOperators, modifiedSlots, slotsModifiedByOperator, anyOperatorModified, firstOperator, oi); firstOperator = false; @@ -1640,8 +1714,20 @@ private static void doGroupedAddition( workingChunks[inputSlot] = ac.inputColumns[oi] == null ? null : ac.inputColumns[oi].getChunk(getContexts[oi], chunkRows); } - ac.operators[oi].addChunk(operatorContexts[oi], chunkRowsSize, - inputSlot < 0 ? null : workingChunks[inputSlot], keyIndices, ii); + try { + ac.operators[oi].addChunk(operatorContexts[oi], chunkRowsSize, + inputSlot < 0 ? null : workingChunks[inputSlot], keyIndices, ii); + } catch (Exception ex) { + throw new AggregationOperatorException( + "Failed to add data, inputcolumns=" + Arrays.toString(ac.inputNames[oi]) + + ", outputs=" + ac.operators[oi].getResultColumns().keySet(), + ex); + } catch (Error err) { + err.addSuppressed(new AggregationOperatorException( + "Failed to add data, inputcolumns=" + Arrays.toString(ac.inputNames[oi]) + + ", outputs=" + ac.operators[oi].getResultColumns().keySet())); + throw err; + } } } while (rsIt.hasMore()); } @@ -1842,9 +1928,21 @@ private static void initialBucketedKeyAddition(QueryTable input, permuteKernels[ii], chunkPosition, workingChunks[ii]); } } - ac.operators[ii].addChunk(bucketedContexts[ii], - inputSlot >= 0 ? valueChunks[inputSlot] : null, - permutedKeyIndices, outputPositions, runStarts, runLengths, unusedModifiedSlots); + try { + ac.operators[ii].addChunk(bucketedContexts[ii], + inputSlot >= 0 ? valueChunks[inputSlot] : null, + permutedKeyIndices, outputPositions, runStarts, runLengths, unusedModifiedSlots); + } catch (Exception ex) { + throw new AggregationOperatorException( + "Failed to add data, inputcolumns=" + Arrays.toString(ac.inputNames[ii]) + ", outputs=" + + ac.operators[ii].getResultColumns().keySet(), + ex); + } catch (Error err) { + err.addSuppressed(new AggregationOperatorException( + "Failed to add data, inputcolumns=" + Arrays.toString(ac.inputNames[ii]) + ", outputs=" + + ac.operators[ii].getResultColumns().keySet())); + throw err; + } } } } @@ -2179,7 +2277,19 @@ private static void doNoKeyModifications(RowSequence preIndex, RowSequence postI for (int ii = 0; ii < ac.size(); ++ii) { if (operatorsToProcessIndicesOnly[ii]) { - modifiedOperators[ii] |= ac.operators[ii].modifyRowKeys(opContexts[ii], postKeyIndices, 0); + try { + modifiedOperators[ii] |= ac.operators[ii].modifyRowKeys(opContexts[ii], postKeyIndices, 0); + } catch (Exception ex) { + throw new AggregationOperatorException( + "Failed to modify data, inputcolumns=" + Arrays.toString(ac.inputNames[ii]) + + ", outputs=" + ac.operators[ii].getResultColumns().keySet(), + ex); + } catch (Error err) { + err.addSuppressed(new AggregationOperatorException( + "Failed to modify data, inputcolumns=" + Arrays.toString(ac.inputNames[ii]) + + ", outputs=" + ac.operators[ii].getResultColumns().keySet())); + throw err; + } continue; } if (operatorsToProcess[ii]) { @@ -2198,8 +2308,20 @@ private static void doNoKeyModifications(RowSequence preIndex, RowSequence postI preValues = workingPreChunks[inputSlot]; postValues = workingPostChunks[inputSlot]; } - modifiedOperators[ii] |= ac.operators[ii].modifyChunk(opContexts[ii], chunkSize, preValues, - postValues, postKeyIndices, 0); + try { + modifiedOperators[ii] |= ac.operators[ii].modifyChunk(opContexts[ii], chunkSize, preValues, + postValues, postKeyIndices, 0); + } catch (Exception ex) { + throw new AggregationOperatorException( + "Failed to modify data, inputcolumns=" + Arrays.toString(ac.inputNames[ii]) + + ", outputs=" + ac.operators[ii].getResultColumns().keySet(), + ex); + } catch (Error er) { + er.addSuppressed(new AggregationOperatorException( + "Failed to modify data, inputcolumns=" + Arrays.toString(ac.inputNames[ii]) + + ", outputs=" + ac.operators[ii].getResultColumns().keySet())); + throw er; + } } } } @@ -2216,7 +2338,19 @@ private static void doIndexOnlyNoKeyModifications(@NotNull final RowSequence pos final LongChunk postKeyIndices = postChunkOk.asRowKeyChunk(); for (int ii = 0; ii < ac.size(); ++ii) { if (operatorsToProcessIndicesOnly[ii]) { - modifiedOperators[ii] |= ac.operators[ii].modifyRowKeys(opContexts[ii], postKeyIndices, 0); + try { + modifiedOperators[ii] |= ac.operators[ii].modifyRowKeys(opContexts[ii], postKeyIndices, 0); + } catch (Exception ex) { + throw new AggregationOperatorException( + "Failed to modify data, inputcolumns=" + Arrays.toString(ac.inputNames[ii]) + + ", outputs=" + ac.operators[ii].getResultColumns().keySet(), + ex); + } catch (Error err) { + err.addSuppressed(new AggregationOperatorException( + "Failed to modify data, inputcolumns=" + Arrays.toString(ac.inputNames[ii]) + + ", outputs=" + ac.operators[ii].getResultColumns().keySet())); + throw err; + } } } } @@ -2260,7 +2394,7 @@ private static void doNoKeyUpdate(RowSequence index, AggregationContext ac, modifiedOperators[ii] |= processColumnNoKey(remove, chunkOk, inputSlot >= 0 ? workingChunks[inputSlot] : null, - ac.operators[ii], opContexts[ii], keyIndices); + ac.operators[ii], opContexts[ii], keyIndices, ac.inputNames[ii]); } } while (rsIt.hasMore()); } @@ -2332,8 +2466,20 @@ private static void doProcessShiftNoKey(AggregationContext ac, newValues = workingPostChunks[inputSlot]; } - modifiedOperators[ii] |= ac.operators[ii].shiftChunk(opContexts[ii], previousValues, newValues, - preKeyIndices, postKeyIndices, 0); + try { + modifiedOperators[ii] |= ac.operators[ii].shiftChunk(opContexts[ii], previousValues, newValues, + preKeyIndices, postKeyIndices, 0); + } catch (Exception ex) { + throw new AggregationOperatorException( + "Failed to shift data, inputcolumns=" + Arrays.toString(ac.inputNames[ii]) + + ", outputs=" + ac.operators[ii].getResultColumns().keySet(), + ex); + } catch (Error err) { + err.addSuppressed(new AggregationOperatorException( + "Failed to shift data, inputcolumns=" + Arrays.toString(ac.inputNames[ii]) + + ", outputs=" + ac.operators[ii].getResultColumns().keySet())); + throw err; + } } } } @@ -2342,11 +2488,30 @@ private static void doProcessShiftNoKey(AggregationContext ac, private static boolean processColumnNoKey(boolean remove, RowSequence chunkOk, Chunk values, IterativeChunkedAggregationOperator operator, IterativeChunkedAggregationOperator.SingletonContext opContext, - LongChunk keyIndices) { + LongChunk keyIndices, + String[] inputNames) { if (remove) { - return operator.removeChunk(opContext, chunkOk.intSize(), values, keyIndices, 0); + try { + return operator.removeChunk(opContext, chunkOk.intSize(), values, keyIndices, 0); + } catch (Exception ex) { + throw new AggregationOperatorException("Failed to remove data, inputcolumns=" + + Arrays.toString(inputNames) + ", outputs=" + operator.getResultColumns().keySet(), ex); + } catch (Error err) { + err.addSuppressed(new AggregationOperatorException("Failed to remove data, inputcolumns=" + + Arrays.toString(inputNames) + ", outputs=" + operator.getResultColumns().keySet())); + throw err; + } } else { - return operator.addChunk(opContext, chunkOk.intSize(), values, keyIndices, 0); + try { + return operator.addChunk(opContext, chunkOk.intSize(), values, keyIndices, 0); + } catch (Exception ex) { + throw new AggregationOperatorException("Failed to add data, inputcolumns=" + Arrays.toString(inputNames) + + ", outputs=" + operator.getResultColumns().keySet(), ex); + } catch (Error err) { + err.addSuppressed(new AggregationOperatorException("Failed to add data, inputcolumns=" + + Arrays.toString(inputNames) + ", outputs=" + operator.getResultColumns().keySet())); + throw err; + } } } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestFreezeBy.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestFreezeBy.java index 98c4bf8295d..aadefd962bc 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestFreezeBy.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestFreezeBy.java @@ -6,6 +6,7 @@ import io.deephaven.api.Selectable; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.impl.by.AggregationOperatorException; import io.deephaven.engine.testutil.ControlledUpdateGraph; import io.deephaven.engine.testutil.TstUtils; import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase; @@ -176,8 +177,9 @@ public void testDuplicates() { try { FreezeBy.freezeBy(input); TestCase.fail("Expected exception."); - } catch (IllegalStateException ise) { - assertEquals("FreezeBy only allows one row per state!", ise.getMessage()); + } catch (AggregationOperatorException aoe) { + assertTrue(aoe.getCause() instanceof IllegalStateException); + assertEquals("FreezeBy only allows one row per state!", aoe.getCause().getMessage()); } final Table frozen = FreezeBy.freezeBy(input, "Key"); @@ -193,8 +195,8 @@ public void testDuplicates() { return false; } final Throwable ex = exs.get(0); - if (ex instanceof IllegalStateException) { - return "FreezeBy only allows one row per state!".equals(ex.getMessage()); + if (ex instanceof AggregationOperatorException && ex.getCause() instanceof IllegalStateException) { + return "FreezeBy only allows one row per state!".equals(ex.getCause().getMessage()); } return false; }); @@ -202,8 +204,9 @@ public void testDuplicates() { try { FreezeBy.freezeBy(input, "Key"); TestCase.fail("Expected exception."); - } catch (IllegalStateException ise) { - assertEquals("FreezeBy only allows one row per state!", ise.getMessage()); + } catch (AggregationOperatorException aoe) { + assertTrue(aoe.getCause() instanceof IllegalStateException); + assertEquals("FreezeBy only allows one row per state!", aoe.getCause().getMessage()); } } }