Skip to content

Commit

Permalink
Updated tests for coverage and corrected PR comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
lbooker42 committed Jan 31, 2025
1 parent 7dd79ab commit 324d5cf
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import io.deephaven.engine.rowset.WritableRowSet;

public interface IncrementalNaturalJoinStateManager {
long getRightIndex(int slot);
long getRightRowKey(int slot);

RowSet getRightRowSet(int slot);

Expand All @@ -20,7 +20,7 @@ public interface IncrementalNaturalJoinStateManager {
void checkExactMatch(NaturalJoinType joinType, long leftKeyIndex, long rightSide);

/**
* Given the join type, return the correct row key for the set of duplicate RHS rows.
* Given the join type, return the correct row key from the set of RHS duplicates.
*/
default long getRightRowKeyFromDuplicates(final WritableRowSet duplicates, final NaturalJoinType joinType) {
if (joinType == NaturalJoinType.LAST_MATCH) {
Expand All @@ -30,8 +30,7 @@ default long getRightRowKeyFromDuplicates(final WritableRowSet duplicates, final
}

/**
* Add a key to the RHS duplicate rowset, following the rules for NaturalJoinType to return the new row key for this
* set *AFTER* the addition.
* Add a key to the RHS duplicates, return the appropriate row key from this set *AFTER* the addition.
*/
default long addRightRowKeyToDuplicates(final WritableRowSet duplicates, final long keyToRemove,
final NaturalJoinType joinType) {
Expand All @@ -40,8 +39,7 @@ default long addRightRowKeyToDuplicates(final WritableRowSet duplicates, final l
}

/**
* Remove the key from the RHS duplicate rowset, following the rules for NaturalJoinType to return the original row
* key for this set *BEFORE* the removal.
* Remove the key from the RHS duplicates, return the appropriate row key from this set *BEFORE* the removal.
*/
default long removeRightRowKeyFromDuplicates(final WritableRowSet duplicates, final long keyToRemove,
final NaturalJoinType joinType) {
Expand All @@ -51,7 +49,7 @@ default long removeRightRowKeyFromDuplicates(final WritableRowSet duplicates, fi
}

/**
* Shift a key in the RHS duplicate rowset.
* Shift a key in the RHS duplicate row set.
*/
default void shiftOneKey(WritableRowSet duplicates, long shiftedKey, long shiftDelta) {
final long sizeBefore = duplicates.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ private static QueryTable zeroKeyColumnsJoin(QueryTable leftTable, QueryTable ri
rowRedirection = getSingleValueRowRedirection(rightRefreshing, RowSequence.NULL_ROW_KEY);
} else {
rowRedirection = getSingleValueRowRedirection(rightRefreshing, RowSequence.NULL_ROW_KEY);
// re-direct to the appropriate RHS row
// immediately re-direct to the appropriate RHS row
updateRightRedirection(rightTable, rowRedirection, joinType);
}
} else if (rightTable.size() == 1) {
Expand Down Expand Up @@ -689,7 +689,7 @@ public void accept(int updatedSlot, long originalRightValue, byte flag) {
return;
}

long index = jsm.getRightIndex(updatedSlot);
long index = jsm.getRightRowKey(updatedSlot);
if (index == StaticNaturalJoinStateManager.DUPLICATE_RIGHT_VALUE) {
if (joinType == NaturalJoinType.ERROR_ON_DUPLICATE
|| joinType == NaturalJoinType.EXACTLY_ONE_MATCH) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@
//
package io.deephaven.engine.table.impl;

import io.deephaven.api.*;
import io.deephaven.api.AsOfJoinMatch;
import io.deephaven.api.ColumnName;
import io.deephaven.api.JoinAddition;
import io.deephaven.api.JoinMatch;
import io.deephaven.api.NaturalJoinType;
import io.deephaven.api.Pair;
import io.deephaven.api.RangeJoinMatch;
import io.deephaven.api.Selectable;
import io.deephaven.api.SortColumn;
import io.deephaven.api.agg.Aggregation;
import io.deephaven.api.agg.spec.AggSpec;
import io.deephaven.api.filter.Filter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@
//
package io.deephaven.engine.table.impl;

import io.deephaven.api.*;
import io.deephaven.api.AsOfJoinMatch;
import io.deephaven.api.ColumnName;
import io.deephaven.api.JoinAddition;
import io.deephaven.api.JoinMatch;
import io.deephaven.api.NaturalJoinType;
import io.deephaven.api.Pair;
import io.deephaven.api.RangeJoinMatch;
import io.deephaven.api.Selectable;
import io.deephaven.api.SortColumn;
import io.deephaven.api.agg.Aggregation;
import io.deephaven.api.agg.spec.AggSpec;
import io.deephaven.api.filter.Filter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ protected void freeDuplicateLocation(long duplicateLocation) {
}

@Override
public long getRightIndex(int slot) {
public long getRightRowKey(int slot) {
final long rightRowKey;
if ((slot & AlternatingColumnSource.ALTERNATE_SWITCH_MASK) == mainInsertMask) {
// slot needs to represent whether we are in the main or alternate using main insert mask!
Expand Down Expand Up @@ -475,6 +475,24 @@ public String keyString(int slot) {
return extractKeyStringFromSourceTable(firstLeftRowKey);
}

private long getRightRowKeyFromState(
final long leftRowKey,
final long rightRowKeyForState,
final NaturalJoinType joinType) {
if (rightRowKeyForState <= FIRST_DUPLICATE) {
if (joinType == NaturalJoinType.ERROR_ON_DUPLICATE || joinType == NaturalJoinType.EXACTLY_ONE_MATCH) {
throw new IllegalStateException("Natural Join found duplicate right key for "
+ extractKeyStringFromSourceTable(leftRowKey));
}
final long location = duplicateLocationFromRowKey(rightRowKeyForState);
final WritableRowSet rightRowSet = rightSideDuplicateRowSets.getUnsafe(location);
return joinType == NaturalJoinType.FIRST_MATCH
? rightRowSet.firstRowKey()
: rightRowSet.lastRowKey();
}
return rightRowKeyForState;
}

public WritableRowRedirection buildIndexedRowRedirection(
QueryTable leftTable,
NaturalJoinType joinType,
Expand All @@ -499,9 +517,12 @@ public WritableRowRedirection buildIndexedRowRedirection(
final RowSet leftRowSetForKey = indexRowSets.get(leftRowSet.firstRowKey());
// Reset mainLeftRowSet to contain the indexed row set.
mainLeftRowSet.set(ii, leftRowSetForKey.copy());
final long leftRowKey = leftRowSetForKey.firstRowKey();
final long rightRowKeyForState = mainRightRowKey.getUnsafe(ii);
checkExactMatch(joinType, leftRowSet.firstRowKey(), rightRowKeyForState);
leftRowSetForKey.forAllRowKeys(pos -> innerIndex[(int) pos] = rightRowKeyForState);
final long key = getRightRowKeyFromState(leftRowKey, rightRowKeyForState, joinType);
checkExactMatch(joinType, leftRowKey, key);
// Set unconditionally, need to populate the entire array with NULL_ROW_KEY or the RHS key
leftRowSetForKey.forAllRowKeys(pos -> innerIndex[(int) pos] = key);
}
}

Expand All @@ -517,11 +538,13 @@ public WritableRowRedirection buildIndexedRowRedirection(
final RowSet leftRowSetForKey = indexRowSets.get(leftRowSet.firstRowKey());
// Reset mainLeftRowSet to contain the indexed row set.
mainLeftRowSet.set(ii, leftRowSetForKey.copy());
final long leftRowKey = leftRowSetForKey.firstRowKey();
final long rightRowKeyForState = mainRightRowKey.getUnsafe(ii);
if (rightRowKeyForState != RowSet.NULL_ROW_KEY) {
leftRowSetForKey.forAllRowKeys(pos -> sparseRedirections.set(pos, rightRowKeyForState));
final long key = getRightRowKeyFromState(leftRowKey, rightRowKeyForState, joinType);
if (key == RowSet.NULL_ROW_KEY) {
checkExactMatch(joinType, leftRowKey, key);
} else {
checkExactMatch(joinType, leftRowSet.firstRowKey(), rightRowKeyForState);
leftRowSetForKey.forAllRowKeys(pos -> sparseRedirections.set(pos, key));
}
}
}
Expand All @@ -538,11 +561,13 @@ public WritableRowRedirection buildIndexedRowRedirection(
final RowSet leftRowSetForKey = indexRowSets.get(leftRowSet.firstRowKey());
// Reset mainLeftRowSet to contain the indexed row set.
mainLeftRowSet.set(ii, leftRowSetForKey.copy());
final long leftRowKey = leftRowSetForKey.firstRowKey();
final long rightRowKeyForState = mainRightRowKey.getUnsafe(ii);
if (rightRowKeyForState != RowSet.NULL_ROW_KEY) {
leftRowSetForKey.forAllRowKeys(pos -> rowRedirection.put(pos, rightRowKeyForState));
final long key = getRightRowKeyFromState(leftRowKey, rightRowKeyForState, joinType);
if (key == RowSet.NULL_ROW_KEY) {
checkExactMatch(joinType, leftRowKey, key);
} else {
checkExactMatch(joinType, leftRowSet.firstRowKey(), rightRowKeyForState);
leftRowSetForKey.forAllRowKeys(pos -> rowRedirection.put(pos, key));
}
}
}
Expand All @@ -567,56 +592,28 @@ public WritableRowRedirection buildRowRedirectionFromRedirections(QueryTable lef
for (int ii = 0; ii < tableSize; ++ii) {
final WritableRowSet leftRowSet = this.mainLeftRowSet.getUnsafe(ii);
if (leftRowSet != null && !leftRowSet.isEmpty()) {
final long leftRowKey = leftRowSet.firstRowKey();
final long rightRowKeyForState = mainRightRowKey.getUnsafe(ii);
if (rightRowKeyForState == RowSet.NULL_ROW_KEY) {
checkExactMatch(joinType, leftRowSet.firstRowKey(), rightRowKeyForState);
leftRowSet.forAllRowKeys(pos -> innerIndex[(int) pos] = rightRowKeyForState);
} else if (rightRowKeyForState <= FIRST_DUPLICATE) {
// Multiple RHS rows, we may have an error state
if (joinType == NaturalJoinType.FIRST_MATCH) {
final long location = duplicateLocationFromRowKey(rightRowKeyForState);
final long firstKey = rightSideDuplicateRowSets.getUnsafe(location).firstRowKey();
leftRowSet.forAllRowKeys(pos -> innerIndex[(int) pos] = firstKey);
} else if (joinType == NaturalJoinType.LAST_MATCH) {
final long location = duplicateLocationFromRowKey(rightRowKeyForState);
final long lastKey = rightSideDuplicateRowSets.getUnsafe(location).lastRowKey();
leftRowSet.forAllRowKeys(pos -> innerIndex[(int) pos] = lastKey);
} else {
throw new IllegalStateException("Natural Join found duplicate right key for "
+ extractKeyStringFromSourceTable(leftRowSet.firstRowKey()));
}
} else {
leftRowSet.forAllRowKeys(pos -> innerIndex[(int) pos] = rightRowKeyForState);
}
final long key = getRightRowKeyFromState(leftRowKey, rightRowKeyForState, joinType);
checkExactMatch(joinType, leftRowKey, key);
// Set unconditionally, need to populate the entire array with NULL_ROW_KEY or the RHS key
leftRowSet.forAllRowKeys(pos -> innerIndex[(int) pos] = key);
}
}

return new ContiguousWritableRowRedirection(innerIndex);
}
case Sparse: {
final LongSparseArraySource sparseRedirections = new LongSparseArraySource();
for (int ii = 0; ii < tableSize; ++ii) {
final WritableRowSet leftRowSet = this.mainLeftRowSet.getUnsafe(ii);
if (leftRowSet != null && !leftRowSet.isEmpty()) {
final long leftRowKey = leftRowSet.firstRowKey();
final long rightRowKeyForState = mainRightRowKey.getUnsafe(ii);
if (rightRowKeyForState == RowSet.NULL_ROW_KEY) {
checkExactMatch(joinType, leftRowSet.firstRowKey(), rightRowKeyForState);
} else if (rightRowKeyForState <= FIRST_DUPLICATE) {
// Multiple RHS rows, we may have an error state
if (joinType == NaturalJoinType.FIRST_MATCH) {
final long location = duplicateLocationFromRowKey(rightRowKeyForState);
final long firstKey = rightSideDuplicateRowSets.getUnsafe(location).firstRowKey();
leftRowSet.forAllRowKeys(pos -> sparseRedirections.set(pos, firstKey));
} else if (joinType == NaturalJoinType.LAST_MATCH) {
final long location = duplicateLocationFromRowKey(rightRowKeyForState);
final long lastKey = rightSideDuplicateRowSets.getUnsafe(location).lastRowKey();
leftRowSet.forAllRowKeys(pos -> sparseRedirections.set(pos, lastKey));
} else {
throw new IllegalStateException("Natural Join found duplicate right key for "
+ extractKeyStringFromSourceTable(leftRowSet.firstRowKey()));
}
final long key = getRightRowKeyFromState(leftRowKey, rightRowKeyForState, joinType);
if (key == RowSet.NULL_ROW_KEY) {
checkExactMatch(joinType, leftRowKey, key);
} else {
leftRowSet.forAllRowKeys(pos -> sparseRedirections.set(pos, rightRowKeyForState));
leftRowSet.forAllRowKeys(pos -> sparseRedirections.set(pos, key));
}
}
}
Expand All @@ -628,25 +625,13 @@ public WritableRowRedirection buildRowRedirectionFromRedirections(QueryTable lef
for (int ii = 0; ii < tableSize; ++ii) {
final WritableRowSet leftRowSet = this.mainLeftRowSet.getUnsafe(ii);
if (leftRowSet != null && !leftRowSet.isEmpty()) {
final long leftRowKey = leftRowSet.firstRowKey();
final long rightRowKeyForState = mainRightRowKey.getUnsafe(ii);
if (rightRowKeyForState == RowSet.NULL_ROW_KEY) {
checkExactMatch(joinType, leftRowSet.firstRowKey(), rightRowKeyForState);
} else if (rightRowKeyForState <= FIRST_DUPLICATE) {
// Multiple RHS rows, we may have an error state
if (joinType == NaturalJoinType.FIRST_MATCH) {
final long location = duplicateLocationFromRowKey(rightRowKeyForState);
final long firstKey = rightSideDuplicateRowSets.getUnsafe(location).firstRowKey();
leftRowSet.forAllRowKeys(pos -> rowRedirection.put(pos, firstKey));
} else if (joinType == NaturalJoinType.LAST_MATCH) {
final long location = duplicateLocationFromRowKey(rightRowKeyForState);
final long lastKey = rightSideDuplicateRowSets.getUnsafe(location).lastRowKey();
leftRowSet.forAllRowKeys(pos -> rowRedirection.put(pos, lastKey));
} else {
throw new IllegalStateException("Natural Join found duplicate right key for "
+ extractKeyStringFromSourceTable(leftRowSet.firstRowKey()));
}
final long key = getRightRowKeyFromState(leftRowKey, rightRowKeyForState, joinType);
if (key == RowSet.NULL_ROW_KEY) {
checkExactMatch(joinType, leftRowKey, key);
} else {
leftRowSet.forAllRowKeys(pos -> rowRedirection.put(pos, rightRowKeyForState));
leftRowSet.forAllRowKeys(pos -> rowRedirection.put(pos, key));
}
}
}
Expand Down
Loading

0 comments on commit 324d5cf

Please sign in to comment.