Skip to content

Commit

Permalink
CAS retry can't store offsets again, unique constraint violation
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Jan 17, 2025
1 parent 70c6efd commit d467894
Showing 1 changed file with 40 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -630,39 +630,61 @@ private[projection] class R2dbcOffsetStore(
if (filteredRecords.isEmpty) {
FutureDone
} else {
val newState = oldState.add(filteredRecords)
val offsetInserts = dao.insertTimestampOffsetInTx(conn, filteredRecords)

val slices =
if (filteredRecords.size == 1) Set(filteredRecords.head.slice)
else filteredRecords.iterator.map(_.slice).toSet

val currentInflight = getInflight()
val evictedNewState = slices.foldLeft(newState) {
case (s, slice) =>
s.evict(
slice,
settings.timeWindow,
// Only persistence IDs that aren't inflight are evictable,
// if only so that those persistence IDs can be removed from
// inflight... in the absence of further records from that
// persistence ID, the next store will evict (further records
// would make that persistence ID recent enough to not be evicted)
record => !currentInflight.contains(record.pid))
def addRecordsAndEvict(old: State): State = {
val newState = old.add(filteredRecords)

val currentInflight = getInflight()
slices.foldLeft(newState) {
case (s, slice) =>
s.evict(
slice,
settings.timeWindow,
// Only persistence IDs that aren't inflight are evictable,
// if only so that those persistence IDs can be removed from
// inflight... in the absence of further records from that
// persistence ID, the next store will evict (further records
// would make that persistence ID recent enough to not be evicted)
record => !currentInflight.contains(record.pid))
}
}

val offsetInserts = dao.insertTimestampOffsetInTx(conn, filteredRecords)
def compareAndSwapRetry(): Future[Done] = {
load(records.map(_.pid)).flatMap { old =>
val evictedNewState = addRecordsAndEvict(old)
if (state.compareAndSet(old, evictedNewState)) {
slices.foreach(s => triggerDeletionPerSlice.put(s, TRUE))
cleanupInflight(evictedNewState)
FutureDone
} else {
// concurrent update
compareAndSwapRetry()
}
}
}

val evictedNewState = addRecordsAndEvict(oldState)

offsetInserts.flatMap { _ =>
if (state.compareAndSet(oldState, evictedNewState)) {
slices.foreach(s => triggerDeletionPerSlice.put(s, TRUE))
cleanupInflight(evictedNewState)
FutureDone
} else { // concurrent update
if (canBeConcurrent) saveTimestampOffsetInTx(conn, records, canBeConcurrent) // CAS retry
else
} else {
// concurrent update
// the offsets have already been successfully stored within this transaction so we can't
// retry entire saveTimestampOffsetInTx because that would result in unique constraint violation
if (canBeConcurrent) {
compareAndSwapRetry()
} else
throw new IllegalStateException(
s"$logPrefix Unexpected concurrent modification of state from saveOffset. " +
s"${dumpState(newState, currentInflight)}")
s"${dumpState(evictedNewState, getInflight())}")
}

}
Expand Down

0 comments on commit d467894

Please sign in to comment.