diff --git a/README.md b/README.md index 40cea400..584302cc 100644 --- a/README.md +++ b/README.md @@ -251,6 +251,17 @@ It is recommended to set this value less than or equal to the queue database's c - `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else. +### Scheduler polling interval + +The scheduler process checks for due recurring tasks and reloads dynamic tasks at a configurable interval. You can set this interval using the `polling_interval` key under the `scheduler` section in your `config/queue.yml`: + +```yaml +scheduler: + polling_interval: 5 # seconds +``` + +This controls how frequently the scheduler wakes up to enqueue due recurring jobs and reload dynamic tasks. + ### Queue order and priorities As mentioned above, if you specify a list of queues for a worker, these will be polled in the order given, such as for the list `real_time,background`, no jobs will be taken from `background` unless there aren't any more jobs waiting in `real_time`. @@ -657,6 +668,47 @@ my_periodic_resque_job: and the job will be enqueued via `perform_later` so it'll run in Resque. However, in this case we won't track any `solid_queue_recurring_execution` record for it and there won't be any guarantees that the job is enqueued only once each time. + +### Creating and Deleting Recurring Tasks Dynamically + +You can create and delete recurring tasks at runtime, without editing the configuration file. Use the following methods: + +#### Creating a recurring task + +```ruby +SolidQueue.schedule_recurring_task( + "my_dynamic_task", + command: "puts 'Hello from a dynamic task!'", + schedule: "every 10 minutes" +) +``` + +This will create a dynamic recurring task with the given key, command, and schedule. You can also use the `class` and `args` options as in the configuration file. + +#### Deleting a recurring task + +```ruby +SolidQueue.delete_recurring_task(task_id) +``` + +This will delete a dynamically scheduled recurring task by its ID. If you attempt to delete a static (configuration-defined) recurring task, an error will be raised. + +> **Note:** Static recurring tasks (those defined in `config/recurring.yml`) cannot be deleted at runtime. Attempting to do so will raise an error. + +#### Example: Creating and deleting a recurring task + +```ruby +# Create a new dynamic recurring task +recurring_task = SolidQueue.schedule_recurring_task( + "cleanup_temp_files", + command: "TempFileCleaner.clean!", + schedule: "every day at 2am" +) + +# Delete the task later by ID +SolidQueue.delete_recurring_task(recurring_task.id) +``` + ## Inspiration Solid Queue has been inspired by [resque](https://github.com/resque/resque) and [GoodJob](https://github.com/bensheldon/good_job). We recommend checking out these projects as they're great examples from which we've learnt a lot. diff --git a/app/models/solid_queue/recurring_task.rb b/app/models/solid_queue/recurring_task.rb index 5363f0a7..55906b88 100644 --- a/app/models/solid_queue/recurring_task.rb +++ b/app/models/solid_queue/recurring_task.rb @@ -11,6 +11,7 @@ class RecurringTask < Record validate :existing_job_class scope :static, -> { where(static: true) } + scope :dynamic, -> { where(static: false) } has_many :recurring_executions, foreign_key: :task_key, primary_key: :key diff --git a/lib/generators/solid_queue/install/templates/config/queue.yml b/lib/generators/solid_queue/install/templates/config/queue.yml index 15691e9d..d7b0e6b9 100644 --- a/lib/generators/solid_queue/install/templates/config/queue.yml +++ b/lib/generators/solid_queue/install/templates/config/queue.yml @@ -7,6 +7,8 @@ default: &default threads: 3 processes: <%%= ENV.fetch("JOB_CONCURRENCY", 1) %> polling_interval: 0.1 + scheduler: + polling_interval: 1 development: <<: *default diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index e0d51c8c..0b8ea4e3 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -43,6 +43,15 @@ module SolidQueue delegate :on_start, :on_stop, :on_exit, to: Supervisor + + def create_recurring_task(key, **attributes) + RecurringTask.create!(**attributes, key:, static: false) + end + + def destroy_recurring_task(id) + RecurringTask.dynamic.find(id).destroy! + end + [ Dispatcher, Scheduler, Worker ].each do |process| define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block| process.on_start(&block) diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index ba13f0f4..af3a7267 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -28,6 +28,10 @@ def instantiate concurrency_maintenance_interval: 600 } + SCHEDULER_DEFAULTS = { + polling_interval: 1 + } + DEFAULT_CONFIG_FILE_PATH = "config/queue.yml" DEFAULT_RECURRING_SCHEDULE_FILE_PATH = "config/recurring.yml" @@ -122,11 +126,9 @@ def dispatchers end def schedulers - if !skip_recurring_tasks? && recurring_tasks.any? - [ Process.new(:scheduler, recurring_tasks: recurring_tasks) ] - else - [] - end + return [] if skip_recurring_tasks? + + [ Process.new(:scheduler, { recurring_tasks:, **scheduler_options.with_defaults(SCHEDULER_DEFAULTS) }) ] end def workers_options @@ -139,6 +141,10 @@ def dispatchers_options .map { |options| options.dup.symbolize_keys } end + def scheduler_options + @scheduler_options ||= processes_config.fetch(:scheduler, {}).dup.symbolize_keys + end + def recurring_tasks @recurring_tasks ||= recurring_tasks_config.map do |id, options| RecurringTask.from_configuration(id, **options) if options&.has_key?(:schedule) @@ -147,9 +153,13 @@ def recurring_tasks def processes_config @processes_config ||= config_from \ - options.slice(:workers, :dispatchers).presence || options[:config_file], - keys: [ :workers, :dispatchers ], - fallback: { workers: [ WORKER_DEFAULTS ], dispatchers: [ DISPATCHER_DEFAULTS ] } + options.slice(:workers, :dispatchers, :scheduler).presence || options[:config_file], + keys: [ :workers, :dispatchers, :scheduler ], + fallback: { + workers: [ WORKER_DEFAULTS ], + dispatchers: [ DISPATCHER_DEFAULTS ], + scheduler: SCHEDULER_DEFAULTS + } end def recurring_tasks_config diff --git a/lib/solid_queue/processes/registrable.rb b/lib/solid_queue/processes/registrable.rb index 58cabfa8..2d74b5ec 100644 --- a/lib/solid_queue/processes/registrable.rb +++ b/lib/solid_queue/processes/registrable.rb @@ -58,5 +58,9 @@ def heartbeat self.process = nil wake_up end + + def refresh_registered_process + process.update_columns(metadata: metadata.compact) + end end end diff --git a/lib/solid_queue/scheduler.rb b/lib/solid_queue/scheduler.rb index 3cec90fa..f0464f2b 100644 --- a/lib/solid_queue/scheduler.rb +++ b/lib/solid_queue/scheduler.rb @@ -5,7 +5,7 @@ class Scheduler < Processes::Base include Processes::Runnable include LifecycleHooks - attr_reader :recurring_schedule + attr_reader :recurring_schedule, :polling_interval after_boot :run_start_hooks after_boot :schedule_recurring_tasks @@ -15,6 +15,8 @@ class Scheduler < Processes::Base def initialize(recurring_tasks:, **options) @recurring_schedule = RecurringSchedule.new(recurring_tasks) + options = options.dup.with_defaults(SolidQueue::Configuration::SCHEDULER_DEFAULTS) + @polling_interval = options[:polling_interval] super(**options) end @@ -24,13 +26,14 @@ def metadata end private - SLEEP_INTERVAL = 60 # Right now it doesn't matter, can be set to 1 in the future for dynamic tasks - def run loop do break if shutting_down? - interruptible_sleep(SLEEP_INTERVAL) + recurring_schedule.reload! + refresh_registered_process if recurring_schedule.changed? + + interruptible_sleep(polling_interval) end ensure SolidQueue.instrument(:shutdown_process, process: self) do diff --git a/lib/solid_queue/scheduler/recurring_schedule.rb b/lib/solid_queue/scheduler/recurring_schedule.rb index 4070a0ec..d9b6883c 100644 --- a/lib/solid_queue/scheduler/recurring_schedule.rb +++ b/lib/solid_queue/scheduler/recurring_schedule.rb @@ -4,11 +4,13 @@ module SolidQueue class Scheduler::RecurringSchedule include AppExecutor - attr_reader :configured_tasks, :scheduled_tasks + attr_reader :static_tasks, :configured_tasks, :scheduled_tasks, :changes def initialize(tasks) - @configured_tasks = Array(tasks).map { |task| SolidQueue::RecurringTask.wrap(task) }.select(&:valid?) + @static_tasks = Array(tasks).map { |task| SolidQueue::RecurringTask.wrap(task) }.select(&:valid?) + @configured_tasks = @static_tasks + dynamic_tasks @scheduled_tasks = Concurrent::Hash.new + @changes = Concurrent::Hash.new end def empty? @@ -17,8 +19,8 @@ def empty? def schedule_tasks wrap_in_app_executor do - persist_tasks - reload_tasks + persist_static_tasks + reload_static_tasks end configured_tasks.each do |task| @@ -26,6 +28,23 @@ def schedule_tasks end end + def dynamic_tasks + SolidQueue::RecurringTask.dynamic + end + + def schedule_new_dynamic_tasks + dynamic_tasks.where.not(key: scheduled_tasks.keys).each do |task| + schedule_task(task) + end + end + + def unschedule_old_dynamic_tasks + (scheduled_tasks.keys - SolidQueue::RecurringTask.pluck(:key)).each do |key| + scheduled_tasks[key].cancel + scheduled_tasks.delete(key) + end + end + def schedule_task(task) scheduled_tasks[task.key] = schedule(task) end @@ -35,18 +54,37 @@ def unschedule_tasks scheduled_tasks.clear end + def static_task_keys + static_tasks.map(&:key) + end + def task_keys - configured_tasks.map(&:key) + static_task_keys + dynamic_tasks.map(&:key) + end + + def reload! + { added_tasks: schedule_new_dynamic_tasks, + removed_tasks: unschedule_old_dynamic_tasks }.each do |key, values| + if values.any? + changes[key] = values + else + changes.delete(key) + end + end + end + + def changed? + @changes.any? end private - def persist_tasks - SolidQueue::RecurringTask.static.where.not(key: task_keys).delete_all - SolidQueue::RecurringTask.create_or_update_all configured_tasks + def persist_static_tasks + SolidQueue::RecurringTask.static.where.not(key: static_task_keys).delete_all + SolidQueue::RecurringTask.create_or_update_all static_tasks end - def reload_tasks - @configured_tasks = SolidQueue::RecurringTask.where(key: task_keys) + def reload_static_tasks + @static_tasks = SolidQueue::RecurringTask.static.where(key: static_task_keys) end def schedule(task) diff --git a/test/solid_queue_test.rb b/test/solid_queue_test.rb index d6d61b57..2c7bd00b 100644 --- a/test/solid_queue_test.rb +++ b/test/solid_queue_test.rb @@ -4,4 +4,31 @@ class SolidQueueTest < ActiveSupport::TestCase test "it has a version number" do assert SolidQueue::VERSION end + + test "creates recurring tasks" do + SolidQueue.create_recurring_task("test 1", command: "puts 1", schedule: "every hour") + SolidQueue.create_recurring_task("test 2", command: "puts 2", schedule: "every minute", static: true) + + assert SolidQueue::RecurringTask.exists?(key: "test 1", command: "puts 1", schedule: "every hour", static: false) + assert SolidQueue::RecurringTask.exists?(key: "test 2", command: "puts 2", schedule: "every minute", static: false) + end + + test "destroys recurring tasks" do + dynamic_task = SolidQueue::RecurringTask.create!( + key: "dynamic", command: "puts 'd'", schedule: "every day", static: false + ) + + static_task = SolidQueue::RecurringTask.create!( + key: "static", command: "puts 's'", schedule: "every week", static: true + ) + + SolidQueue.destroy_recurring_task(dynamic_task.id) + + assert_raises(ActiveRecord::RecordNotFound) do + SolidQueue.destroy_recurring_task(static_task.id) + end + + assert_not SolidQueue::RecurringTask.exists?(key: "dynamic", static: false) + assert SolidQueue::RecurringTask.exists?(key: "static", static: true) + end end diff --git a/test/unit/configuration_test.rb b/test/unit/configuration_test.rb index 68a693e3..5f4e4909 100644 --- a/test/unit/configuration_test.rb +++ b/test/unit/configuration_test.rb @@ -21,7 +21,7 @@ class ConfigurationTest < ActiveSupport::TestCase test "default configuration when config given is empty" do configuration = SolidQueue::Configuration.new(config_file: config_file_path(:empty_configuration), recurring_schedule_file: config_file_path(:empty_configuration)) - assert_equal 2, configuration.configured_processes.count + assert_equal 3, configuration.configured_processes.count # includes scheduler for dynamic tasks assert_processes configuration, :worker, 1, queues: "*" assert_processes configuration, :dispatcher, 1, batch_size: SolidQueue::Configuration::DISPATCHER_DEFAULTS[:batch_size] end @@ -101,11 +101,11 @@ class ConfigurationTest < ActiveSupport::TestCase configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:recurring_with_production_only)) assert configuration.valid? - assert_processes configuration, :scheduler, 0 + assert_processes configuration, :scheduler, 1 # Starts in case of dynamic tasks configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:recurring_with_empty)) assert configuration.valid? - assert_processes configuration, :scheduler, 0 + assert_processes configuration, :scheduler, 1 # Starts in case of dynamic tasks # No processes configuration = SolidQueue::Configuration.new(skip_recurring: true, dispatchers: [], workers: []) diff --git a/test/unit/scheduler_test.rb b/test/unit/scheduler_test.rb index 9478b9f1..e2b88312 100644 --- a/test/unit/scheduler_test.rb +++ b/test/unit/scheduler_test.rb @@ -3,7 +3,7 @@ class SchedulerTest < ActiveSupport::TestCase self.use_transactional_tests = false - test "recurring schedule" do + test "recurring schedule (only static)" do recurring_tasks = { example_task: { class: "AddToBufferJob", schedule: "every hour", args: 42 } } scheduler = SolidQueue::Scheduler.new(recurring_tasks: recurring_tasks).tap(&:start) @@ -17,6 +17,41 @@ class SchedulerTest < ActiveSupport::TestCase scheduler.stop end + test "recurring schedule (only dynamic)" do + SolidQueue::RecurringTask.create( + key: "dynamic_task", static: false, class_name: "AddToBufferJob", schedule: "every second", arguments: [ 42 ] + ) + scheduler = SolidQueue::Scheduler.new(recurring_tasks: {}).tap(&:start) + + wait_for_registered_processes(1, timeout: 1.second) + + process = SolidQueue::Process.first + assert_equal "Scheduler", process.kind + + assert_metadata process, recurring_schedule: [ "dynamic_task" ] + ensure + scheduler.stop + end + + test "recurring schedule (static + dynamic)" do + SolidQueue::RecurringTask.create( + key: "dynamic_task", static: false, class_name: "AddToBufferJob", schedule: "every second", arguments: [ 42 ] + ) + + recurring_tasks = { static_task: { class: "AddToBufferJob", schedule: "every hour", args: 42 } } + + scheduler = SolidQueue::Scheduler.new(recurring_tasks: recurring_tasks).tap(&:start) + + wait_for_registered_processes(1, timeout: 1.second) + + process = SolidQueue::Process.first + assert_equal "Scheduler", process.kind + + assert_metadata process, recurring_schedule: [ "static_task", "dynamic_task" ] + ensure + scheduler.stop + end + test "run more than one instance of the scheduler with recurring tasks" do recurring_tasks = { example_task: { class: "AddToBufferJob", schedule: "every second", args: 42 } } schedulers = 2.times.collect do @@ -33,4 +68,61 @@ class SchedulerTest < ActiveSupport::TestCase assert_equal 1, run_at_times[i + 1] - run_at_times[i] end end + + test "updates metadata after adding dynamic task post-start" do + scheduler = SolidQueue::Scheduler.new(recurring_tasks: {}, polling_interval: 0.1).tap(&:start) + + wait_for_registered_processes(1, timeout: 1.second) + + process = SolidQueue::Process.first + # initially there are no recurring_schedule keys + assert process.metadata, {} + + # now create a dynamic task after the scheduler has booted + SolidQueue::RecurringTask.create( + key: "new_dynamic_task", + static: false, + class_name: "AddToBufferJob", + schedule: "every second", + arguments: [ 42 ] + ) + + sleep 1 + + process.reload + + # metadata should now include the new key + assert_metadata process, recurring_schedule: [ "new_dynamic_task" ] + ensure + scheduler&.stop + end + + test "updates metadata after removing dynamic task post-start" do + old_dynamic_task = SolidQueue::RecurringTask.create( + key: "old_dynamic_task", + static: false, + class_name: "AddToBufferJob", + schedule: "every second", + arguments: [ 42 ] + ) + + scheduler = SolidQueue::Scheduler.new(recurring_tasks: {}, polling_interval: 0.1).tap(&:start) + + wait_for_registered_processes(1, timeout: 1.second) + + process = SolidQueue::Process.first + # initially there is one recurring_schedule key + assert_metadata process, recurring_schedule: [ "old_dynamic_task" ] + + old_dynamic_task.destroy + + sleep 1 + + process.reload + + # The task is unschedule after it's being removed, and it's reflected in the metadata + assert process.metadata, {} + ensure + scheduler&.stop + end end