diff --git a/lib/sidekiq/active_record/manager_worker.rb b/lib/sidekiq/active_record/manager_worker.rb index 51b0165..a7d703b 100644 --- a/lib/sidekiq/active_record/manager_worker.rb +++ b/lib/sidekiq/active_record/manager_worker.rb @@ -1,154 +1,20 @@ module Sidekiq module ActiveRecord - class ManagerWorker - include Sidekiq::Worker - - DEFAULT_IDENTIFIER_KEY = :id - DEFAULT_BATCH_SIZE = 1000 - - def perform(options = {}) - default_query = self.class.get_default_models_query - self.class.perform_query_async(default_query, options) - end - + class ManagerWorker < Sidekiq::Orm::ManagerWorker class << self - # For a given model collection, it delegates each model to a sub-worker (e.g TaskWorker) - # Specify the TaskWorker with the `sidekiq_delegate_task_to` method. - # - # @param models_query ActiveRecord::Relation - # @param options Hash - # :worker_class - the worker class to delegate the task to. Alternative to the default `sidekiq_delegate_task_to` - # :identifier_key - the model identifier column. Default 'id' - # :additional_keys - additional model keys - # :batch_size - Specifies the size of the batch. Default to 1000. - # - # @example: - # class UserTaskWorker - # def perform(user_id) - # # user task logic - # end - # end - # - # class UserSyncer - # include Sidekiq::ActiveRecord::ManagerWorker - # - # sidekiq_delegate_task_to :user_task_worker # or UserTaskWorker - # sidekiq_manager_options :batch_size => 500, - # :identifier_key => :user_token, - # :additional_keys => [:status] - # end - # - # UserSyncer.perform_query_async(User.active, :batch_size => 300) - # - # - # is equivalent to doing: - # User.active.each {|user| UserTaskWorker.perform(user.id) } - # - def perform_query_async(models_query, options = {}) - set_runtime_options(options) - models = prepare_models_query(models_query) - models.find_in_batches(batch_size: batch_size) do |models_batch| - model_attributes = models_batch.map { |model| model_attributes(model) } - Sidekiq::Client.push_bulk(class: worker_class, args: model_attributes) + def find_in_batches(models) + models.find_in_batches(batch_size: batch_size) do |batch| + yield batch end end - # @required - # The task worker to delegate to. - # @param worker_klass (Sidekiq::Worker, Symbol) - UserTaskWorker or :user_task_worker - def sidekiq_delegate_task_to(worker_klass) - case worker_klass - when String, Symbol - worker_klass.to_s.split('_').map(&:capitalize).join.constantize - else - worker_klass - end - get_sidekiq_manager_options[:worker_class] = worker_klass - end - - # Allows customization for this type of ManagerWorker. - # Legal options: - # - # :worker_class - the worker class to delegate the task to. Alternative to `sidekiq_delegate_task_to` - # :identifier_key - the model identifier column. Default 'id' - # :additional_keys - additional model keys - # :batch_size - Specifies the size of the batch. Default to 1000. - def sidekiq_manager_options(opts = {}) - @sidekiq_manager_options_hash = get_sidekiq_manager_options.merge((opts || {})) - end - - # The default of query to run, when the workers runs perform - # example - # class UserManagerWorker < Sidekiq::ActiveRecord::ManagerWorker - # sidekiq_delegate_task_to UserTaskWorker - # default_models_query -> { User.active } - # end - # - # UserManagerWorker.perform_async(:batch_size => 300) - def default_models_query(query) - @query = query - end - - def get_default_models_query - @query.call() if @query.present? - end - - def default_worker_manager_options - { - identifier_key: DEFAULT_IDENTIFIER_KEY, - additional_keys: [], - batch_size: DEFAULT_BATCH_SIZE - } - end - - # returns the model attributes array: - # [model_id, attr1, attr2, ...] - def model_attributes(model) - additional_attributes = additional_keys.map { |key| model.send(key) } - id_attribute = model.send(identifier_key) - additional_attributes.unshift(id_attribute) - end - def prepare_models_query(models_query) selected_attributes = [models_query.primary_key.to_sym, identifier_key, additional_keys].uniq models_query.select(selected_attributes) end - def worker_class - fail NotImplementedError.new('`worker_class` was not specified') unless manager_options[:worker_class].present? - manager_options[:worker_class] - end - - def identifier_key - manager_options[:identifier_key] - end - - def additional_keys - manager_options[:additional_keys] - end - - def batch_size - manager_options[:batch_size] - end - - def manager_options - get_sidekiq_manager_options.merge(runtime_options) - end - - def get_sidekiq_manager_options - @sidekiq_manager_options_hash ||= default_worker_manager_options - end - - def runtime_options - @sidekiq_manager_runtime_options || {} - end - - def set_runtime_options(options={}) - @sidekiq_manager_runtime_options = options.delete_if { |_, v| v.to_s.strip == '' } - end - end end diff --git a/lib/sidekiq/active_record/task_worker.rb b/lib/sidekiq/active_record/task_worker.rb index 5384409..ca5fba7 100644 --- a/lib/sidekiq/active_record/task_worker.rb +++ b/lib/sidekiq/active_record/task_worker.rb @@ -1,158 +1,17 @@ module Sidekiq module ActiveRecord - class TaskWorker - include Sidekiq::Worker - - attr_reader :task_model - - # @example: - # class UserMailerTaskWorker < Sidekiq::ActiveRecord::TaskWorker - # - # sidekiq_task_model :user_model # or UserModel - # sidekiq_task_options :identifier_key => :token - # - # def perform_on_model - # UserMailer.deliver_registration_confirmation(user, email_type) - # end - # - # def not_found_model(token) - # Log.error "User not found for token:#{token}" - # end - # - # def should_perform_on_model? - # user.active? - # end - # - # def did_not_perform_on_model - # Log.error "User #{user.token} is inactive" - # end - # - # end - # - # - # UserMailerTaskWorker.perform(user.id, :new_email) - # - def perform(identifier, *args) - @task_model = fetch_model(identifier, *args) - return not_found_model(identifier, *args) unless @task_model.present? - - if should_perform_on_model? - perform_on_model(*args) - else - did_not_perform_on_model - end - end - - def perform_on_model(*args) - task_model - end - - # Hook that can block perform_on_model from being triggered, - # e.g in cases when the model is no longer valid - def should_perform_on_model? - true - end - - # Hook to handel a model that was not performed - def did_not_perform_on_model - task_model - end - - # Hook to handel not found model - def not_found_model(identifier, *args) - identifier - end + class TaskWorker < Sidekiq::Orm::TaskWorker def fetch_model(identifier, *args) self.class.model_class.find_by(self.class.identifier_key => identifier) end - class << self - def sidekiq_task_model(model_klass) - return if model_klass.blank? - - setup_task_model_alias(model_klass) - - get_sidekiq_task_options[:model_class] = active_record_class(model_klass) - end - - def model_class - klass = get_sidekiq_task_options[:model_class] - fail NotImplementedError.new('`sidekiq_task_model` was not specified') unless klass.present? - klass - end - - def identifier_key - get_sidekiq_task_options[:identifier_key] - end - - # - # Allows customization for this type of TaskWorker. - # Legal options: - # - # :identifier_key - the model identifier column. Default 'id' - def sidekiq_task_options(opts = {}) - @sidekiq_task_options_hash = get_sidekiq_task_options.merge((opts).symbolize_keys!) - end - - - private - - # aliases task_model with the name of the model - # - # example: - # sidekiq_task_model: AdminUser # or :admin_user - # - # then the worker will have access to `admin_user`, which is an alias to `task_model` - # - # def perform_on_admin_user - # admin_user == task_model - # end - # - # it will add the following method aliases to the hooks: - # - # def not_found_admin_user; end - # def should_perform_on_admin_user?; end - # def did_not_perform_on_admin_user; end - # - def setup_task_model_alias(model_klass_name) - if model_klass_name.is_a?(Class) - model_klass_name = model_klass_name.name.underscore - end - { - :task_model => model_klass_name, - :fetch_model => "fetch_#{model_klass_name}", - :not_found_model => "not_found_#{model_klass_name}", - :should_perform_on_model? => "should_perform_on_#{model_klass_name}?", - :did_not_perform_on_model => "did_not_perform_on_#{model_klass_name}", - :perform_on_model => "perform_on_#{model_klass_name}" - }.each do |old_name, new_name| - self.class_exec do - alias_method new_name.to_sym, old_name - end - end - end - - def get_sidekiq_task_options - @sidekiq_task_options_hash ||= default_worker_task_options - end - - def default_worker_task_options - { - identifier_key: :id - } - end + protected - def active_record_class(model_klass) - begin - model_klass = model_klass.to_s.classify.constantize - raise unless model_klass <= ::ActiveRecord::Base - rescue - fail ArgumentError.new '`sidekiq_task_model` must be an ActiveRecord model' - end - model_klass + def task_model_base_class + ::ActiveRecord::Base end end diff --git a/lib/sidekiq/activerecord.rb b/lib/sidekiq/activerecord.rb index 979aa2a..d59c3bd 100644 --- a/lib/sidekiq/activerecord.rb +++ b/lib/sidekiq/activerecord.rb @@ -6,6 +6,15 @@ require 'sidekiq/active_record/version' +module Sidekiq + module Orm + extend ActiveSupport::Autoload + + autoload :TaskWorker + autoload :ManagerWorker + end +end + module Sidekiq module ActiveRecord extend ActiveSupport::Autoload diff --git a/lib/sidekiq/orm/manager_worker.rb b/lib/sidekiq/orm/manager_worker.rb new file mode 100644 index 0000000..05ba017 --- /dev/null +++ b/lib/sidekiq/orm/manager_worker.rb @@ -0,0 +1,185 @@ +module Sidekiq + module Orm + class ManagerWorker + include Sidekiq::Worker + + DEFAULT_IDENTIFIER_KEY = :id + DEFAULT_BATCH_SIZE = 1000 + + def perform(options = {}) + default_query = self.class.get_default_models_query + self.class.perform_query_async(default_query, options) + end + + + class << self + + # For a given model collection, it delegates each model to a sub-worker (e.g TaskWorker) + # Specify the TaskWorker with the `sidekiq_delegate_task_to` method. + # + # @param models_query ActiveRecord::Relation + # @param options Hash + # :worker_class - the worker class to delegate the task to. Alternative to the default `sidekiq_delegate_task_to` + # :identifier_key - the model identifier column. Default 'id' + # :additional_keys - additional model keys + # :batch_size - Specifies the size of the batch. Default to 1000. + # + # @example: + # class UserTaskWorker + # def perform(user_id) + # # user task logic + # end + # end + # + # class UserSyncer + # include Sidekiq::ActiveRecord::ManagerWorker + # + # sidekiq_delegate_task_to :user_task_worker # or UserTaskWorker + # sidekiq_manager_options :batch_size => 500, + # :identifier_key => :user_token, + # :additional_keys => [:status] + # end + # + # UserSyncer.perform_query_async(User.active, :batch_size => 300) + # + # + # is equivalent to doing: + # User.active.each {|user| UserTaskWorker.perform(user.id) } + # + def perform_query_async(models_query, options = {}) + set_runtime_options(options) + models = prepare_models_query(models_query) + find_in_batches(models) do |models_batch| + model_attributes = models_batch.map { |model| model_attributes(model) } + Sidekiq::Client.push_bulk(class: worker_class, args: model_attributes) + end + end + + # @required + # The task worker to delegate to. + # @param worker_klass (Sidekiq::Worker, Symbol) - UserTaskWorker or :user_task_worker + def sidekiq_delegate_task_to(worker_klass) + case worker_klass + when String, Symbol + worker_klass.to_s.split('_').map(&:capitalize).join.constantize + else + worker_klass + end + get_sidekiq_manager_options[:worker_class] = worker_klass + end + + # Allows customization for this type of ManagerWorker. + # Legal options: + # + # :worker_class - the worker class to delegate the task to. Alternative to `sidekiq_delegate_task_to` + # :identifier_key - the model identifier column. Default 'id' + # :additional_keys - additional model keys + # :batch_size - Specifies the size of the batch. Default to 1000. + def sidekiq_manager_options(opts = {}) + @sidekiq_manager_options_hash = get_sidekiq_manager_options.merge((opts || {})) + end + + # The default of query to run, when the workers runs perform + # example + # class UserManagerWorker < Sidekiq::ActiveRecord::ManagerWorker + # sidekiq_delegate_task_to UserTaskWorker + # default_models_query -> { User.active } + # end + # + # UserManagerWorker.perform_async(:batch_size => 300) + def default_models_query(query) + @query = query + end + + def get_default_models_query + @query.call() if @query.present? + end + + def default_worker_manager_options + { + identifier_key: DEFAULT_IDENTIFIER_KEY, + additional_keys: [], + batch_size: DEFAULT_BATCH_SIZE + } + end + + # returns the model attributes array: + # [model_id, attr1, attr2, ...] + def model_attributes(model) + additional_attributes = additional_keys.map { |key| model.send(key) } + id_attribute = model.send(identifier_key) + additional_attributes.unshift(id_attribute) + end + + # @override in ORM specific class + # Prepares the models collection, before it is executes. + # For example, in the context of ActiveRecord - we won't to SELECT only the + # needed columns we need. e.g: + # + # def prepare_models_query(models_query) + # selected_attributes = [models_query.primary_key.to_sym, identifier_key, additional_keys].uniq + # models_query.select(selected_attributes) + # end + # + # Make sure you return the models query + # + # see: Sidekiq::ActiveRecord::ManagerWorker + def prepare_models_query(models_query) + models_query + end + + # @override in ORM specific class + # Goes over the models collection and yields each batch of records. + # Essentially, a wrapper for ActiveRecord::Batches.find_in_batches + # Example: + # + # def find_in_batches(models) + # models.find_in_batches(batch_size: batch_size) do |batch| + # yield batch + # end + # end + # + # see: Sidekiq::ActiveRecord::ManagerWorker + # see: http://api.rubyonrails.org/classes/ActiveRecord/Batches.html#method-i-find_in_batches + end + def find_in_batches(models) + end + + def worker_class + fail NotImplementedError.new('`worker_class` was not specified') unless manager_options[:worker_class].present? + manager_options[:worker_class] + end + + def identifier_key + manager_options[:identifier_key] + end + + def additional_keys + manager_options[:additional_keys] + end + + def batch_size + manager_options[:batch_size] + end + + def manager_options + get_sidekiq_manager_options.merge(runtime_options) + end + + def get_sidekiq_manager_options + @sidekiq_manager_options_hash ||= default_worker_manager_options + end + + def runtime_options + @sidekiq_manager_runtime_options || {} + end + + def set_runtime_options(options={}) + @sidekiq_manager_runtime_options = options.delete_if { |_, v| v.to_s.strip == '' } + end + + end + + end + end +end diff --git a/lib/sidekiq/orm/task_worker.rb b/lib/sidekiq/orm/task_worker.rb new file mode 100644 index 0000000..dc1c9fa --- /dev/null +++ b/lib/sidekiq/orm/task_worker.rb @@ -0,0 +1,169 @@ +module Sidekiq + module Orm + class TaskWorker + include Sidekiq::Worker + + attr_reader :task_model + + # @example: + # class UserMailerTaskWorker < Sidekiq::ActiveRecord::TaskWorker + # + # sidekiq_task_model :user_model # or UserModel + # sidekiq_task_options :identifier_key => :token + # + # def perform_on_model + # UserMailer.deliver_registration_confirmation(user, email_type) + # end + # + # def not_found_model(token) + # Log.error "User not found for token:#{token}" + # end + # + # def should_perform_on_model? + # user.active? + # end + # + # def did_not_perform_on_model + # Log.error "User #{user.token} is inactive" + # end + # + # end + # + # + # UserMailerTaskWorker.perform(user.id, :new_email) + # + def perform(identifier, *args) + @task_model = fetch_model(identifier, *args) + return not_found_model(identifier, *args) unless @task_model.present? + + if should_perform_on_model? + perform_on_model(*args) + else + did_not_perform_on_model + end + end + + def perform_on_model(*args) + task_model + end + + # Hook that can block perform_on_model from being triggered, + # e.g in cases when the model is no longer valid + def should_perform_on_model? + true + end + + # Hook to handel a model that was not performed + def did_not_perform_on_model + task_model + end + + # Hook to handel not found model + def not_found_model(identifier, *args) + identifier + end + + def fetch_model(identifier, *args) + # override in ORM specific class + end + + + class << self + + def sidekiq_task_model(model_klass) + return if model_klass.blank? + + setup_task_model_alias(model_klass) + + get_sidekiq_task_options[:model_class] = parse_task_model_class(model_klass) + end + + def model_class + klass = get_sidekiq_task_options[:model_class] + fail NotImplementedError.new('`sidekiq_task_model` was not specified') unless klass.present? + klass + end + + def identifier_key + get_sidekiq_task_options[:identifier_key] + end + + # + # Allows customization for this type of TaskWorker. + # Legal options: + # + # :identifier_key - the model identifier column. Default 'id' + def sidekiq_task_options(opts = {}) + @sidekiq_task_options_hash = get_sidekiq_task_options.merge((opts).symbolize_keys!) + end + + + private + + # aliases task_model with the name of the model + # + # example: + # sidekiq_task_model: AdminUser # or :admin_user + # + # then the worker will have access to `admin_user`, which is an alias to `task_model` + # + # def perform_on_admin_user + # admin_user == task_model + # end + # + # it will add the following method aliases to the hooks: + # + # def not_found_admin_user; end + # def should_perform_on_admin_user?; end + # def did_not_perform_on_admin_user; end + # + def setup_task_model_alias(model_klass_name) + if model_klass_name.is_a?(Class) + model_klass_name = model_klass_name.name.underscore + end + { + :task_model => model_klass_name, + :fetch_model => "fetch_#{model_klass_name}", + :not_found_model => "not_found_#{model_klass_name}", + :should_perform_on_model? => "should_perform_on_#{model_klass_name}?", + :did_not_perform_on_model => "did_not_perform_on_#{model_klass_name}", + :perform_on_model => "perform_on_#{model_klass_name}" + }.each do |old_name, new_name| + self.class_exec do + alias_method new_name.to_sym, old_name + end + end + end + + def get_sidekiq_task_options + @sidekiq_task_options_hash ||= default_worker_task_options + end + + def default_worker_task_options + { + identifier_key: :id + } + end + + def parse_task_model_class(model_klass) + begin + model_klass = model_klass.to_s.classify.constantize + raise unless model_klass <= task_model_base_class + rescue + fail ArgumentError.new "`sidekiq_task_model` must be an #{task_model_base_class.to_s} model" + end + model_klass + end + + + protected + + def task_model_base_class + ::ActiveRecord::Base + end + + end + + end + end +end