Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,37 +142,35 @@ class SyncSet<T : Any> internal constructor(
/**
* Removes all elements from the set that match the given [predicate] and replicates the changes.
*
* Each matching element is removed under a write lock, and each removal is replicated as a
* separate delta to ensure consistency across nodes. Deltas are published sequentially
* to maintain order.
* This operation uses atomic replication: the entire set is replaced in a single delta.
* This ensures consistency across distributed nodes since the operation cannot interleave
* with other deltas, avoiding race conditions and ensuring all nodes see the same final state.
*
* @param predicate the predicate to test each element against
* @return `true` if any elements were removed, `false` if no elements matched the predicate
*/
fun removeIf(predicate: (T) -> Boolean): Boolean {
val removedElements = mutableListOf<T>()

lock.write {
// Perform filtering and capture the results under write lock
val (remaining, hadRemovals, removedElements) = lock.write {
val removed = mutableListOf<T>()
val iterator = set.iterator()
while (iterator.hasNext()) {
val element = iterator.next()
if (predicate(element)) {
iterator.remove()
removedElements.add(element)
removed.add(element)
}
}
Triple(ObjectOpenHashSet(set), removed.isNotEmpty(), removed)
}

if (removedElements.isEmpty()) return false
if (!hadRemovals) return false

// Publish deltas sequentially to maintain order
Flux.fromIterable(removedElements)
.concatMap { element ->
val elementJson = api.json.encodeToString(elementSerializer, element)
publishLocalDelta(Delta.Remove(elementJson)).onErrorResume { Mono.empty() }
}
.subscribe()
// Replicate using a single replace-all delta for atomic consistency
val snapshotJson = api.json.encodeToString(snapshotSerializer, remaining)
publishLocalDelta(Delta.ReplaceAll(snapshotJson)).subscribe()

// Notify listeners for each removed element
removedElements.forEach { element ->
notifyListeners(listeners, SyncSetChange.Removed(element))
}
Expand Down Expand Up @@ -342,6 +340,17 @@ class SyncSet<T : Any> internal constructor(
}
if (hadElements) notifyListeners(listeners, SyncSetChange.Cleared)
}

is Delta.ReplaceAll -> {
val snapshot = api.json.decodeFromString(snapshotSerializer, delta.snapshotJson)
lock.write {
set.clear()
set.addAll(snapshot)
}
// Note: We notify Cleared for simplicity, similar to SyncList
// Individual Removed notifications are only for the local node in removeIf()
notifyListeners(listeners, SyncSetChange.Cleared)
}
}
}

Expand All @@ -367,5 +376,8 @@ class SyncSet<T : Any> internal constructor(

@Serializable
data object Clear : Delta

@Serializable
data class ReplaceAll(val snapshotJson: String) : Delta
}
}