diff --git a/surf-redis-api/src/main/kotlin/dev/slne/surf/redis/sync/set/SyncSet.kt b/surf-redis-api/src/main/kotlin/dev/slne/surf/redis/sync/set/SyncSet.kt index 46b304d..88b4971 100644 --- a/surf-redis-api/src/main/kotlin/dev/slne/surf/redis/sync/set/SyncSet.kt +++ b/surf-redis-api/src/main/kotlin/dev/slne/surf/redis/sync/set/SyncSet.kt @@ -142,37 +142,35 @@ class SyncSet internal constructor( /** * Removes all elements from the set that match the given [predicate] and replicates the changes. * - * Each matching element is removed under a write lock, and each removal is replicated as a - * separate delta to ensure consistency across nodes. Deltas are published sequentially - * to maintain order. + * This operation uses atomic replication: the entire set is replaced in a single delta. + * This ensures consistency across distributed nodes since the operation cannot interleave + * with other deltas, avoiding race conditions and ensuring all nodes see the same final state. * * @param predicate the predicate to test each element against * @return `true` if any elements were removed, `false` if no elements matched the predicate */ fun removeIf(predicate: (T) -> Boolean): Boolean { - val removedElements = mutableListOf() - - lock.write { + // Perform filtering and capture the results under write lock + val (remaining, hadRemovals, removedElements) = lock.write { + val removed = mutableListOf() val iterator = set.iterator() while (iterator.hasNext()) { val element = iterator.next() if (predicate(element)) { iterator.remove() - removedElements.add(element) + removed.add(element) } } + Triple(ObjectOpenHashSet(set), removed.isNotEmpty(), removed) } - if (removedElements.isEmpty()) return false + if (!hadRemovals) return false - // Publish deltas sequentially to maintain order - Flux.fromIterable(removedElements) - .concatMap { element -> - val elementJson = api.json.encodeToString(elementSerializer, element) - publishLocalDelta(Delta.Remove(elementJson)).onErrorResume { Mono.empty() } - } - .subscribe() + // Replicate using a single replace-all delta for atomic consistency + val snapshotJson = api.json.encodeToString(snapshotSerializer, remaining) + publishLocalDelta(Delta.ReplaceAll(snapshotJson)).subscribe() + // Notify listeners for each removed element removedElements.forEach { element -> notifyListeners(listeners, SyncSetChange.Removed(element)) } @@ -342,6 +340,17 @@ class SyncSet internal constructor( } if (hadElements) notifyListeners(listeners, SyncSetChange.Cleared) } + + is Delta.ReplaceAll -> { + val snapshot = api.json.decodeFromString(snapshotSerializer, delta.snapshotJson) + lock.write { + set.clear() + set.addAll(snapshot) + } + // Note: We notify Cleared for simplicity, similar to SyncList + // Individual Removed notifications are only for the local node in removeIf() + notifyListeners(listeners, SyncSetChange.Cleared) + } } } @@ -367,5 +376,8 @@ class SyncSet internal constructor( @Serializable data object Clear : Delta + + @Serializable + data class ReplaceAll(val snapshotJson: String) : Delta } }