diff --git a/lib/async/stop.rb b/lib/async/stop.rb new file mode 100644 index 00000000..f83f2e5a --- /dev/null +++ b/lib/async/stop.rb @@ -0,0 +1,82 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require "fiber" +require "console" + +module Async + # Raised when a task is explicitly stopped. + class Stop < Exception + # Represents the source of the stop operation. + class Cause < Exception + if RUBY_VERSION >= "3.4" + # @returns [Array(Thread::Backtrace::Location)] The backtrace of the caller. + def self.backtrace + caller_locations(2..-1) + end + else + # @returns [Array(String)] The backtrace of the caller. + def self.backtrace + caller(2..-1) + end + end + + # Create a new cause of the stop operation, with the given message. + # + # @parameter message [String] The error message. + # @returns [Cause] The cause of the stop operation. + def self.for(message = "Task was stopped") + instance = self.new(message) + instance.set_backtrace(self.backtrace) + return instance + end + end + + if RUBY_VERSION < "3.5" + # Create a new stop operation. + # + # This is a compatibility method for Ruby versions before 3.5 where cause is not propagated correctly when using {Fiber#raise} + # + # @parameter message [String | Hash] The error message or a hash containing the cause. + def initialize(message = "Task was stopped") + if message.is_a?(Hash) + @cause = message[:cause] + message = "Task was stopped" + end + + super(message) + end + + # @returns [Exception] The cause of the stop operation. + # + # This is a compatibility method for Ruby versions before 3.5 where cause is not propagated correctly when using {Fiber#raise}, we explicitly capture the cause here. + def cause + super || @cause + end + end + + # Used to defer stopping the current task until later. + class Later + # Create a new stop later operation. + # + # @parameter task [Task] The task to stop later. + # @parameter cause [Exception] The cause of the stop operation. + def initialize(task, cause = nil) + @task = task + @cause = cause + end + + # @returns [Boolean] Whether the task is alive. + def alive? + true + end + + # Transfer control to the operation - this will stop the task. + def transfer + @task.stop(false, cause: @cause) + end + end + end +end diff --git a/lib/async/task.rb b/lib/async/task.rb index e366f972..852b8927 100644 --- a/lib/async/task.rb +++ b/lib/async/task.rb @@ -13,33 +13,11 @@ require_relative "node" require_relative "condition" +require_relative "stop" Fiber.attr_accessor :async_task module Async - # Raised when a task is explicitly stopped. - class Stop < Exception - # Used to defer stopping the current task until later. - class Later - # Create a new stop later operation. - # - # @parameter task [Task] The task to stop later. - def initialize(task) - @task = task - end - - # @returns [Boolean] Whether the task is alive. - def alive? - true - end - - # Transfer control to the operation - this will stop the task. - def transfer - @task.stop - end - end - end - # Raised if a timeout occurs on a specific Fiber. Handled gracefully by `Task`. # @public Since *Async v1*. class TimeoutError < StandardError @@ -271,7 +249,13 @@ def wait # If `later` is false, it means that `stop` has been invoked directly. When `later` is true, it means that `stop` is invoked by `stop_children` or some other indirect mechanism. In that case, if we encounter the "current" fiber, we can't stop it right away, as it's currently performing `#stop`. Stopping it immediately would interrupt the current stop traversal, so we need to schedule the stop to occur later. # # @parameter later [Boolean] Whether to stop the task later, or immediately. - def stop(later = false) + # @parameter cause [Exception] The cause of the stop operation. + def stop(later = false, cause: $!) + # If no cause is given, we generate one from the current call stack: + unless cause + cause = Stop::Cause.for("Stopping task!") + end + if self.stopped? # If the task is already stopped, a `stop` state transition re-enters the same state which is a no-op. However, we will also attempt to stop any running children too. This can happen if the children did not stop correctly the first time around. Doing this should probably be considered a bug, but it's better to be safe than sorry. return stopped! @@ -285,7 +269,7 @@ def stop(later = false) # If we are deferring stop... if @defer_stop == false # Don't stop now... but update the state so we know we need to stop later. - @defer_stop = true + @defer_stop = cause return false end @@ -293,19 +277,19 @@ def stop(later = false) # If the fiber is current, and later is `true`, we need to schedule the fiber to be stopped later, as it's currently invoking `stop`: if later # If the fiber is the current fiber and we want to stop it later, schedule it: - Fiber.scheduler.push(Stop::Later.new(self)) + Fiber.scheduler.push(Stop::Later.new(self, cause)) else # Otherwise, raise the exception directly: - raise Stop, "Stopping current task!" + raise Stop, "Stopping current task!", cause: cause end else # If the fiber is not curent, we can raise the exception directly: begin # There is a chance that this will stop the fiber that originally called stop. If that happens, the exception handling in `#stopped` will rescue the exception and re-raise it later. - Fiber.scheduler.raise(@fiber, Stop) + Fiber.scheduler.raise(@fiber, Stop, cause: cause) rescue FiberError # In some cases, this can cause a FiberError (it might be resumed already), so we schedule it to be stopped later: - Fiber.scheduler.push(Stop::Later.new(self)) + Fiber.scheduler.push(Stop::Later.new(self, cause)) end end else @@ -345,7 +329,7 @@ def defer_stop # If we were asked to stop, we should do so now: if defer_stop - raise Stop, "Stopping current task (was deferred)!" + raise Stop, "Stopping current task (was deferred)!", cause: defer_stop end end else @@ -356,7 +340,7 @@ def defer_stop # @returns [Boolean] Whether stop has been deferred. def stop_deferred? - @defer_stop + !!@defer_stop end # Lookup the {Task} for the current fiber. Raise `RuntimeError` if none is available. diff --git a/test/async/task.rb b/test/async/task.rb index ca1bcc91..5d5f5615 100644 --- a/test/async/task.rb +++ b/test/async/task.rb @@ -552,6 +552,48 @@ expect(transient).to be(:running?) end.wait end + + it "can stop a task from within with a cause" do + error = nil + + cause = Async::Stop::Cause.for("boom") + + task = reactor.async do |task| + begin + task.stop(cause: cause) + rescue Async::Stop => error + raise + end + end + + reactor.run + + expect(task).to be(:stopped?) + expect(error).to be_a(Async::Stop) + expect(error.cause).to be == cause + end + + it "can stop a task from outside with a cause" do + error = nil + + cause = RuntimeError.new("boom") + + task = reactor.async do |task| + begin + task.yield + rescue Async::Stop => error + raise + end + end + + task.stop(cause: cause) + + reactor.run + + expect(task).to be(:stopped?) + expect(error).to be_a(Async::Stop) + expect(error.cause).to be == cause + end end with "#sleep" do @@ -923,7 +965,7 @@ def sleep_forever reactor.run_once(0) - expect(child_task.stop_deferred?).to be == nil + expect(child_task.stop_deferred?).to be == false end end