Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(r2dbc): Leak of in-flight offsets #1300

Merged
merged 2 commits into from
Jan 18, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,54 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 20L)
}

"cleanup inFlight when saving offset" in {
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)

val startTime = TestClock.nowMicros().instant()
val offset1 = TimestampOffset(startTime, Map("p1" -> 3L))
val envelope1 = createEnvelope("p1", 3L, offset1.timestamp, "e1-3")
val offset2 = TimestampOffset(startTime.plusMillis(1), Map("p1" -> 4L))
val envelope2 = createEnvelope("p1", 4L, offset2.timestamp, "e1-4")
val offset3 = TimestampOffset(startTime.plusMillis(2), Map("p1" -> 5L))

// save same seqNr as inFlight should remove from inFlight
offsetStore.addInflight(envelope1)
offsetStore.getInflight().get("p1") shouldBe Some(3L)
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
offsetStore.getInflight().get("p1") shouldBe None

// clear
offsetStore.readOffset().futureValue

// save lower seqNr than inFlight should not remove from inFlight
offsetStore.addInflight(envelope1)
offsetStore.getInflight().get("p1") shouldBe Some(3L)
offsetStore.addInflight(envelope2)
offsetStore.getInflight().get("p1") shouldBe Some(4L)
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
offsetStore.getInflight().get("p1") shouldBe Some(4L)

// clear
offsetStore.readOffset().futureValue

// save higher seqNr than inFlight should remove from inFlight
offsetStore.addInflight(envelope1)
offsetStore.getInflight().get("p1") shouldBe Some(3L)
offsetStore.saveOffset(OffsetPidSeqNr(offset2, "p1", 4L)).futureValue
offsetStore.getInflight().get("p1") shouldBe None

// clear
offsetStore.readOffset().futureValue

// save higher seqNr than inFlight should remove from inFlight
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
offsetStore.addInflight(envelope2)
offsetStore.getInflight().get("p1") shouldBe Some(4L)
offsetStore.saveOffset(OffsetPidSeqNr(offset3, "p1", 5L)).futureValue
offsetStore.getInflight().get("p1") shouldBe None
}

"evict old records from same slice" in {
val projectionId = genRandomProjectionId()
val evictSettings = settings.withTimeWindow(JDuration.ofSeconds(100))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,20 @@ private[projection] object DynamoDBOffsetStore {
}
}

override def toString: String = {
val sb = new StringBuilder
sb.append("State(")
bySliceSorted.toVector.sortBy(_._1).foreach {
case (slice, records) =>
sb.append("slice ").append(slice).append(": ")
records.foreach { r =>
sb.append("[").append(r.pid).append("->").append(r.seqNr).append(" ").append(r.timestamp).append("] ")
}
}
sb.append(")")
sb.toString
}

}

final class RejectedEnvelope(message: String) extends IllegalStateException(message)
Expand Down Expand Up @@ -243,10 +257,10 @@ private[projection] class DynamoDBOffsetStore(
timestampQuery.timestampOf(persistenceId, sequenceNr).asScala.map(_.toScala)
case Some(_) =>
throw new IllegalArgumentException(
s"Expected BySlicesSourceProvider to implement EventTimestampQuery when TimestampOffset is used.")
s"$logPrefix Expected BySlicesSourceProvider to implement EventTimestampQuery when TimestampOffset is used.")
case None =>
throw new IllegalArgumentException(
s"Expected BySlicesSourceProvider to be defined when TimestampOffset is used.")
s"$logPrefix Expected BySlicesSourceProvider to be defined when TimestampOffset is used.")
}
}

Expand All @@ -262,6 +276,10 @@ private[projection] class DynamoDBOffsetStore(
Future.successful(Some(getState().latestOffset).map(_.asInstanceOf[Offset]))
}

private def dumpState(s: State, flight: Map[Pid, SeqNr]): String = {
s"$s inFlight [${flight.map { case (pid, seqNr) => s"$pid->$seqNr" }.mkString(",")}]"
}

def readOffset[Offset](): Future[Option[Offset]] = {
// look for TimestampOffset first since that is used by akka-persistence-dynamodb,
// and then fall back to the other more primitive offset types
Expand All @@ -273,7 +291,8 @@ private[projection] class DynamoDBOffsetStore(
}(ExecutionContext.parasitic)
case None =>
// FIXME primitive offsets not supported, maybe we can change the sourceProvider parameter
throw new IllegalStateException("BySlicesSourceProvider is required. Primitive offsets not supported.")
throw new IllegalStateException(
s"$logPrefix BySlicesSourceProvider is required. Primitive offsets not supported.")
}
}

Expand All @@ -299,7 +318,9 @@ private[projection] class DynamoDBOffsetStore(
val newState = State(offsetBySlice)

if (!state.compareAndSet(oldState, newState))
throw new IllegalStateException("Unexpected concurrent modification of state from readOffset.")
throw new IllegalStateException(
s"$logPrefix Unexpected concurrent modification of state from readOffset. " +
s"${dumpState(oldState, getInflight())}")
clearInflight()
if (offsetBySlice.isEmpty) {
logger.debug("{} readTimestampOffset no stored offset", logPrefix)
Expand Down Expand Up @@ -381,14 +402,15 @@ private[projection] class DynamoDBOffsetStore(
val slice = persistenceExt.sliceForPersistenceId(pid)
Record(slice, pid, seqNr, t.timestamp)
case OffsetPidSeqNr(_: TimestampOffset, None) =>
throw new IllegalArgumentException("Required EventEnvelope or DurableStateChange for TimestampOffset.")
throw new IllegalArgumentException(
s"$logPrefix Required EventEnvelope or DurableStateChange for TimestampOffset.")
case _ =>
throw new IllegalArgumentException(
"Mix of TimestampOffset and other offset type in same transaction is not supported")
s"$logPrefix Mix of TimestampOffset and other offset type in same transaction is not supported")
}
storeTimestampOffsets(records, storeSequenceNumbers, canBeConcurrent)
} else {
throw new IllegalStateException("TimestampOffset is required. Primitive offsets not supported.")
throw new IllegalStateException(s"$logPrefix TimestampOffset is required. Primitive offsets not supported.")
}
}

Expand Down Expand Up @@ -456,7 +478,9 @@ private[projection] class DynamoDBOffsetStore(
FutureDone
} else { // concurrent update
if (canBeConcurrent) storeTimestampOffsets(records, storeSequenceNumbers, canBeConcurrent) // CAS retry
else throw new IllegalStateException("Unexpected concurrent modification of state in save offsets.")
else
throw new IllegalStateException(
s"$logPrefix Unexpected concurrent modification of state in save offsets.")
}
}
}
Expand All @@ -477,7 +501,8 @@ private[projection] class DynamoDBOffsetStore(
if (newInflight.size >= 10000) {
throw new IllegalStateException(
s"Too many envelopes in-flight [${newInflight.size}]. " +
"Please report this issue at https://github.com/akka/akka-projection")
"Please report this issue at https://github.com/akka/akka-projection " +
s"${dumpState(newState, newInflight)}")
}
if (!inflight.compareAndSet(currentInflight, newInflight))
cleanupInflight(newState) // CAS retry, concurrent update of inflight
Expand Down Expand Up @@ -773,7 +798,7 @@ private[projection] class DynamoDBOffsetStore(
case change: DurableStateChange[_] if change.offset.isInstanceOf[TimestampOffset] =>
// in case additional types are added
throw new IllegalArgumentException(
s"DurableStateChange [${change.getClass.getName}] not implemented yet. Please report bug at https://github.com/akka/akka-projection/issues")
s"$logPrefix DurableStateChange [${change.getClass.getName}] not implemented yet. Please report bug at https://github.com/akka/akka-projection/issues")
case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,54 @@ class R2dbcTimestampOffsetStoreSpec
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 20L)
}

"cleanup inFlight when saving offset" in {
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)

val startTime = TestClock.nowMicros().instant()
val offset1 = TimestampOffset(startTime, Map("p1" -> 3L))
val envelope1 = createEnvelope("p1", 3L, offset1.timestamp, "e1-3")
val offset2 = TimestampOffset(startTime.plusMillis(1), Map("p1" -> 4L))
val envelope2 = createEnvelope("p1", 4L, offset2.timestamp, "e1-4")
val offset3 = TimestampOffset(startTime.plusMillis(2), Map("p1" -> 5L))

// save same seqNr as inFlight should remove from inFlight
offsetStore.addInflight(envelope1)
offsetStore.getInflight().get("p1") shouldBe Some(3L)
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
offsetStore.getInflight().get("p1") shouldBe None

// clear
offsetStore.readOffset().futureValue

// save lower seqNr than inFlight should not remove from inFlight
offsetStore.addInflight(envelope1)
offsetStore.getInflight().get("p1") shouldBe Some(3L)
offsetStore.addInflight(envelope2)
offsetStore.getInflight().get("p1") shouldBe Some(4L)
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
offsetStore.getInflight().get("p1") shouldBe Some(4L)

// clear
offsetStore.readOffset().futureValue

// save higher seqNr than inFlight should remove from inFlight
offsetStore.addInflight(envelope1)
offsetStore.getInflight().get("p1") shouldBe Some(3L)
offsetStore.saveOffset(OffsetPidSeqNr(offset2, "p1", 4L)).futureValue
offsetStore.getInflight().get("p1") shouldBe None

// clear
offsetStore.readOffset().futureValue

// save higher seqNr than inFlight should remove from inFlight
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
offsetStore.addInflight(envelope2)
offsetStore.getInflight().get("p1") shouldBe Some(4L)
offsetStore.saveOffset(OffsetPidSeqNr(offset3, "p1", 5L)).futureValue
offsetStore.getInflight().get("p1") shouldBe None
}

"evict old records from same slice" in {
val projectionId = genRandomProjectionId()
val evictSettings = settings.withTimeWindow(100.seconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,20 @@ private[projection] object R2dbcOffsetStore {
}
}

override def toString: String = {
val sb = new StringBuilder
sb.append("State(")
bySliceSorted.toVector.sortBy(_._1).foreach {
case (slice, records) =>
sb.append("slice ").append(slice).append(": ")
records.foreach { r =>
sb.append("[").append(r.pid).append("->").append(r.seqNr).append(" ").append(r.timestamp).append("] ")
}
}
sb.append(")")
sb.toString
}

}

final class RejectedEnvelope(message: String) extends IllegalStateException(message)
Expand Down Expand Up @@ -333,10 +347,10 @@ private[projection] class R2dbcOffsetStore(
timestampQuery.timestampOf(persistenceId, sequenceNr).asScala.map(_.toScala)
case Some(_) =>
throw new IllegalArgumentException(
s"Expected BySlicesSourceProvider to implement EventTimestampQuery when TimestampOffset is used.")
s"$logPrefix Expected BySlicesSourceProvider to implement EventTimestampQuery when TimestampOffset is used.")
case None =>
throw new IllegalArgumentException(
s"Expected BySlicesSourceProvider to be defined when TimestampOffset is used.")
s"$logPrefix Expected BySlicesSourceProvider to be defined when TimestampOffset is used.")
}
}

Expand All @@ -353,6 +367,10 @@ private[projection] class R2dbcOffsetStore(
}
}

private def dumpState(s: State, flight: Map[Pid, SeqNr]): String = {
s"$s inFlight [${flight.map { case (pid, seqNr) => s"$pid->$seqNr" }.mkString(",")}]"
}

def readOffset[Offset](): Future[Option[Offset]] = {
scheduleTasks()

Expand Down Expand Up @@ -441,7 +459,9 @@ private[projection] class R2dbcOffsetStore(
startOffset)

if (!state.compareAndSet(oldState, newState))
throw new IllegalStateException("Unexpected concurrent modification of state from readOffset.")
throw new IllegalStateException(
s"$logPrefix Unexpected concurrent modification of state from readOffset. " +
s"${dumpState(oldState, getInflight())}")

startOffset
}
Expand Down Expand Up @@ -540,7 +560,8 @@ private[projection] class R2dbcOffsetStore(
val record = Record(slice, pid, seqNr, t.timestamp)
saveTimestampOffsetInTx(conn, Vector(record), canBeConcurrent = true)
case OffsetPidSeqNr(_: TimestampOffset, None) =>
throw new IllegalArgumentException("Required EventEnvelope or DurableStateChange for TimestampOffset.")
throw new IllegalArgumentException(
s"$logPrefix Required EventEnvelope or DurableStateChange for TimestampOffset.")
case _ =>
savePrimitiveOffsetInTx(conn, offset.offset)
}
Expand Down Expand Up @@ -569,10 +590,11 @@ private[projection] class R2dbcOffsetStore(
val slice = persistenceExt.sliceForPersistenceId(pid)
Record(slice, pid, seqNr, t.timestamp)
case OffsetPidSeqNr(_: TimestampOffset, None) =>
throw new IllegalArgumentException("Required EventEnvelope or DurableStateChange for TimestampOffset.")
throw new IllegalArgumentException(
s"$logPrefix Required EventEnvelope or DurableStateChange for TimestampOffset.")
case _ =>
throw new IllegalArgumentException(
"Mix of TimestampOffset and other offset type in same transaction is not supported")
s"$logPrefix Mix of TimestampOffset and other offset type in same transaction is not supported")
}
saveTimestampOffsetInTx(conn, records, canBeConcurrent)
} else {
Expand Down Expand Up @@ -630,15 +652,19 @@ private[projection] class R2dbcOffsetStore(

val offsetInserts = dao.insertTimestampOffsetInTx(conn, filteredRecords)

offsetInserts.map { _ =>
offsetInserts.flatMap { _ =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the bug. I think it was introduced in a recent version when the lazy loading of offsets was added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏼 Not noticed in #1255

if (state.compareAndSet(oldState, evictedNewState)) {
slices.foreach(s => triggerDeletionPerSlice.put(s, TRUE))
cleanupInflight(evictedNewState)
FutureDone
} else { // concurrent update
if (canBeConcurrent) saveTimestampOffsetInTx(conn, records, canBeConcurrent) // CAS retry
else throw new IllegalStateException("Unexpected concurrent modification of state from saveOffset.")
else
throw new IllegalStateException(
s"$logPrefix Unexpected concurrent modification of state from saveOffset. " +
s"${dumpState(newState, currentInflight)}")
}
Done

}
}
}
Expand All @@ -656,8 +682,9 @@ private[projection] class R2dbcOffsetStore(
}
if (newInflight.size >= 10000) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have also tested this in a real service where I logged the inFlight size and could see that it was increasing over time. After the fix it is 0 after each save offsets, as expected.

throw new IllegalStateException(
s"Too many envelopes in-flight [${newInflight.size}]. " +
"Please report this issue at https://github.com/akka/akka-projection")
s"$logPrefix Too many envelopes in-flight [${newInflight.size}]. " +
"Please report this issue at https://github.com/akka/akka-projection " +
s"${dumpState(newState, newInflight)}")
}
if (!inflight.compareAndSet(currentInflight, newInflight))
cleanupInflight(newState) // CAS retry, concurrent update of inflight
Expand Down Expand Up @@ -1197,7 +1224,7 @@ private[projection] class R2dbcOffsetStore(
case change: DurableStateChange[_] if change.offset.isInstanceOf[TimestampOffset] =>
// in case additional types are added
throw new IllegalArgumentException(
s"DurableStateChange [${change.getClass.getName}] not implemented yet. Please report bug at https://github.com/akka/akka-projection/issues")
s"$logPrefix DurableStateChange [${change.getClass.getName}] not implemented yet. Please report bug at https://github.com/akka/akka-projection/issues")
case _ => None
}
}
Expand Down
Loading