From a1d1004617abf99d52677bbc0230a8e9e552911c Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 16 May 2025 01:23:47 +0900 Subject: [PATCH 1/3] Add support for `stop(cause:)`. --- lib/async/task.rb | 58 ++++++++++++++++++++++++++++++++++++++-------- test/async/task.rb | 22 +++++++++++++++++- 2 files changed, 69 insertions(+), 11 deletions(-) diff --git a/lib/async/task.rb b/lib/async/task.rb index e366f972..d57eca64 100644 --- a/lib/async/task.rb +++ b/lib/async/task.rb @@ -19,13 +19,45 @@ module Async # Raised when a task is explicitly stopped. class Stop < Exception + # Represents the source of the stop operation. + class Cause < Exception + if RUBY_VERSION >= "3.4" + # @returns [Array(Thread::Backtrace::Location)] The backtrace of the caller. + def self.backtrace + caller_locations(2..-1) + end + else + # @returns [Array(String)] The backtrace of the caller. + def self.backtrace + caller(2..-1) + end + end + + # Create a new cause of the stop operation, with the given message. + # + # @parameter message [String] The error message. + # @returns [Cause] The cause of the stop operation. + def self.for(message = "Task was stopped") + instance = self.new(message) + instance.set_backtrace(self.backtrace) + return instance + end + end + + # Create a new stop operation. + def initialize(message = "Task was stopped") + super(message) + end + # Used to defer stopping the current task until later. class Later # Create a new stop later operation. # # @parameter task [Task] The task to stop later. - def initialize(task) + # @parameter cause [Exception] The cause of the stop operation. + def initialize(task, cause = nil) @task = task + @cause = cause end # @returns [Boolean] Whether the task is alive. @@ -35,7 +67,7 @@ def alive? # Transfer control to the operation - this will stop the task. def transfer - @task.stop + @task.stop(false, cause: @cause) end end end @@ -271,7 +303,13 @@ def wait # If `later` is false, it means that `stop` has been invoked directly. When `later` is true, it means that `stop` is invoked by `stop_children` or some other indirect mechanism. In that case, if we encounter the "current" fiber, we can't stop it right away, as it's currently performing `#stop`. Stopping it immediately would interrupt the current stop traversal, so we need to schedule the stop to occur later. # # @parameter later [Boolean] Whether to stop the task later, or immediately. - def stop(later = false) + # @parameter cause [Exception] The cause of the stop operation. + def stop(later = false, cause: $!) + # If no cause is given, we generate one from the current call stack: + unless cause + cause = Stop::Cause.for("Stopping task!") + end + if self.stopped? # If the task is already stopped, a `stop` state transition re-enters the same state which is a no-op. However, we will also attempt to stop any running children too. This can happen if the children did not stop correctly the first time around. Doing this should probably be considered a bug, but it's better to be safe than sorry. return stopped! @@ -285,7 +323,7 @@ def stop(later = false) # If we are deferring stop... if @defer_stop == false # Don't stop now... but update the state so we know we need to stop later. - @defer_stop = true + @defer_stop = cause return false end @@ -293,19 +331,19 @@ def stop(later = false) # If the fiber is current, and later is `true`, we need to schedule the fiber to be stopped later, as it's currently invoking `stop`: if later # If the fiber is the current fiber and we want to stop it later, schedule it: - Fiber.scheduler.push(Stop::Later.new(self)) + Fiber.scheduler.push(Stop::Later.new(self, cause)) else # Otherwise, raise the exception directly: - raise Stop, "Stopping current task!" + raise Stop, "Stopping current task!", cause: cause end else # If the fiber is not curent, we can raise the exception directly: begin # There is a chance that this will stop the fiber that originally called stop. If that happens, the exception handling in `#stopped` will rescue the exception and re-raise it later. - Fiber.scheduler.raise(@fiber, Stop) + Fiber.scheduler.raise(@fiber, Stop, cause: cause) rescue FiberError # In some cases, this can cause a FiberError (it might be resumed already), so we schedule it to be stopped later: - Fiber.scheduler.push(Stop::Later.new(self)) + Fiber.scheduler.push(Stop::Later.new(self, cause)) end end else @@ -345,7 +383,7 @@ def defer_stop # If we were asked to stop, we should do so now: if defer_stop - raise Stop, "Stopping current task (was deferred)!" + raise Stop, "Stopping current task (was deferred)!", cause: defer_stop end end else @@ -356,7 +394,7 @@ def defer_stop # @returns [Boolean] Whether stop has been deferred. def stop_deferred? - @defer_stop + !!@defer_stop end # Lookup the {Task} for the current fiber. Raise `RuntimeError` if none is available. diff --git a/test/async/task.rb b/test/async/task.rb index ca1bcc91..cb928e12 100644 --- a/test/async/task.rb +++ b/test/async/task.rb @@ -552,6 +552,26 @@ expect(transient).to be(:running?) end.wait end + + it "can stop a task and provide a cause" do + error = nil + + cause = Async::Stop::Cause.for("boom") + + task = reactor.async do |task| + begin + task.stop(cause: cause) + rescue Async::Stop => error + raise + end + end + + reactor.run + + expect(task).to be(:stopped?) + expect(error).to be_a(Async::Stop) + expect(error.cause).to be == cause + end end with "#sleep" do @@ -923,7 +943,7 @@ def sleep_forever reactor.run_once(0) - expect(child_task.stop_deferred?).to be == nil + expect(child_task.stop_deferred?).to be == false end end From 3789e0902078f23ac3248cb2c88ad320c2045a9f Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 23 Jul 2025 00:22:17 +1200 Subject: [PATCH 2/3] Add test for 3.5+. --- test/async/task.rb | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/test/async/task.rb b/test/async/task.rb index cb928e12..0f5cd62c 100644 --- a/test/async/task.rb +++ b/test/async/task.rb @@ -553,7 +553,7 @@ end.wait end - it "can stop a task and provide a cause" do + it "can stop a task from within with a cause" do error = nil cause = Async::Stop::Cause.for("boom") @@ -572,6 +572,30 @@ expect(error).to be_a(Async::Stop) expect(error.cause).to be == cause end + + it "can stop a task from outside with a cause" do + skip_unless_minimum_ruby_version("3.5") + + error = nil + + cause = RuntimeError.new("boom") + + task = reactor.async do |task| + begin + task.yield + rescue Async::Stop => error + raise + end + end + + task.stop(cause: cause) + + reactor.run + + expect(task).to be(:stopped?) + expect(error).to be_a(Async::Stop) + expect(error.cause).to be == cause + end end with "#sleep" do From 33811a581a4ce98790e2e9b236a25553d4a64bc7 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 24 Jul 2025 17:06:27 +1200 Subject: [PATCH 3/3] Move `Async::Stop` to `lib/async/stop.rb` and implement compatibility for older Ruby versions. --- lib/async/stop.rb | 82 ++++++++++++++++++++++++++++++++++++++++++++++ lib/async/task.rb | 56 +------------------------------ test/async/task.rb | 2 -- 3 files changed, 83 insertions(+), 57 deletions(-) create mode 100644 lib/async/stop.rb diff --git a/lib/async/stop.rb b/lib/async/stop.rb new file mode 100644 index 00000000..f83f2e5a --- /dev/null +++ b/lib/async/stop.rb @@ -0,0 +1,82 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require "fiber" +require "console" + +module Async + # Raised when a task is explicitly stopped. + class Stop < Exception + # Represents the source of the stop operation. + class Cause < Exception + if RUBY_VERSION >= "3.4" + # @returns [Array(Thread::Backtrace::Location)] The backtrace of the caller. + def self.backtrace + caller_locations(2..-1) + end + else + # @returns [Array(String)] The backtrace of the caller. + def self.backtrace + caller(2..-1) + end + end + + # Create a new cause of the stop operation, with the given message. + # + # @parameter message [String] The error message. + # @returns [Cause] The cause of the stop operation. + def self.for(message = "Task was stopped") + instance = self.new(message) + instance.set_backtrace(self.backtrace) + return instance + end + end + + if RUBY_VERSION < "3.5" + # Create a new stop operation. + # + # This is a compatibility method for Ruby versions before 3.5 where cause is not propagated correctly when using {Fiber#raise} + # + # @parameter message [String | Hash] The error message or a hash containing the cause. + def initialize(message = "Task was stopped") + if message.is_a?(Hash) + @cause = message[:cause] + message = "Task was stopped" + end + + super(message) + end + + # @returns [Exception] The cause of the stop operation. + # + # This is a compatibility method for Ruby versions before 3.5 where cause is not propagated correctly when using {Fiber#raise}, we explicitly capture the cause here. + def cause + super || @cause + end + end + + # Used to defer stopping the current task until later. + class Later + # Create a new stop later operation. + # + # @parameter task [Task] The task to stop later. + # @parameter cause [Exception] The cause of the stop operation. + def initialize(task, cause = nil) + @task = task + @cause = cause + end + + # @returns [Boolean] Whether the task is alive. + def alive? + true + end + + # Transfer control to the operation - this will stop the task. + def transfer + @task.stop(false, cause: @cause) + end + end + end +end diff --git a/lib/async/task.rb b/lib/async/task.rb index d57eca64..852b8927 100644 --- a/lib/async/task.rb +++ b/lib/async/task.rb @@ -13,65 +13,11 @@ require_relative "node" require_relative "condition" +require_relative "stop" Fiber.attr_accessor :async_task module Async - # Raised when a task is explicitly stopped. - class Stop < Exception - # Represents the source of the stop operation. - class Cause < Exception - if RUBY_VERSION >= "3.4" - # @returns [Array(Thread::Backtrace::Location)] The backtrace of the caller. - def self.backtrace - caller_locations(2..-1) - end - else - # @returns [Array(String)] The backtrace of the caller. - def self.backtrace - caller(2..-1) - end - end - - # Create a new cause of the stop operation, with the given message. - # - # @parameter message [String] The error message. - # @returns [Cause] The cause of the stop operation. - def self.for(message = "Task was stopped") - instance = self.new(message) - instance.set_backtrace(self.backtrace) - return instance - end - end - - # Create a new stop operation. - def initialize(message = "Task was stopped") - super(message) - end - - # Used to defer stopping the current task until later. - class Later - # Create a new stop later operation. - # - # @parameter task [Task] The task to stop later. - # @parameter cause [Exception] The cause of the stop operation. - def initialize(task, cause = nil) - @task = task - @cause = cause - end - - # @returns [Boolean] Whether the task is alive. - def alive? - true - end - - # Transfer control to the operation - this will stop the task. - def transfer - @task.stop(false, cause: @cause) - end - end - end - # Raised if a timeout occurs on a specific Fiber. Handled gracefully by `Task`. # @public Since *Async v1*. class TimeoutError < StandardError diff --git a/test/async/task.rb b/test/async/task.rb index 0f5cd62c..5d5f5615 100644 --- a/test/async/task.rb +++ b/test/async/task.rb @@ -574,8 +574,6 @@ end it "can stop a task from outside with a cause" do - skip_unless_minimum_ruby_version("3.5") - error = nil cause = RuntimeError.new("boom")