Skip to content

Conversation

Copy link
Contributor

Copilot AI commented Jun 2, 2025

This PR addresses the issue where users cannot safely cancel Rx streams from within operator functions without encountering concurrent modification exceptions.

Problem

The original issue was that calling c.cancel from within a map function creates a concurrent modification problem:

var c = Cancelable.empty
c = rx.map(x => if cond then c.cancel)  // This throws concurrent modification exception
c.run()

Solution

Added two new methods that allow operators to safely signal completion by returning None:

  • mapWithCompletion[B](f: A => Option[B]): Rx[B]
  • flatMapWithCompletion[B](f: A => Option[RxOps[B]]): Rx[B]

These methods follow the same pattern as the existing TakeOp implementation, where returning None triggers an OnCompletion event and stops further processing with RxResult.Stop.

Usage Example

val rx = Rx.sequence(1, 2, 3, 4, 5)

// Safe completion signaling
rx.mapWithCompletion { x =>
  if (x == 3) {
    None  // Complete the stream safely
  } else {
    Some(x * 2)  // Continue processing
  }
}.run()

// Works with flatMap too
rx.flatMapWithCompletion { x =>
  if (x == 3) {
    None  // Complete the stream safely
  } else {
    Some(Rx.single(x * 10))  // Continue with new stream
  }
}.run()

Implementation Details

  • Added new case classes MapWithCompletionOp and FlatMapWithCompletionOp to represent these operators
  • Implemented handling in RxRunner that properly emits OnCompletion events when None is returned
  • Maintains full backwards compatibility - existing map and flatMap methods are unchanged
  • Follows existing code patterns and conventions in the codebase

Testing

  • Added comprehensive test suite covering normal operation, early completion, empty sequences, and error propagation
  • All existing tests continue to pass, ensuring no regressions
  • Code formatted with scalafmtAll to follow project conventions

Fixes #3689.


💡 You can make Copilot smarter by setting up custom instructions, customizing its development environment and configuring Model Context Protocol (MCP) servers. Learn more Copilot coding agent tips in the docs.

Copilot AI changed the title [WIP] rx: Provide a way to safely cancel subscription inside the Rx operators feat: add mapWithCompletion and flatMapWithCompletion for safe stream cancellation Jun 2, 2025
Copilot AI requested a review from xerial June 2, 2025 21:15
Copilot finished work on behalf of xerial June 2, 2025 21:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

rx: Provide a way to safely cancel subscription inside the Rx operators

2 participants