Skip to content

Commit

Permalink
Implement ResqueConnector and enqueuing/running jobs async
Browse files Browse the repository at this point in the history
  • Loading branch information
azimux committed Feb 6, 2024
1 parent a4f970b commit 074412b
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 2 deletions.
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
GIT
remote: foobara
revision: 8c83cf4d1c9096e800991953d4f73ff90af7d0af
revision: 1c4162461275d971b2d0a5c3f27a01ec3456a630
branch: main
specs:
foobara (0.0.1)
Expand Down
58 changes: 57 additions & 1 deletion spec/resque_connector_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,61 @@
RSpec.describe Foobara::ResqueConnector do
RSpec.describe Foobara::CommandConnectors::ResqueConnector do
after do
Foobara::ResqueConnector.reset_all
end

let(:command_connector) { described_class.new }

let(:command_class) do
stub_module "SomeOrg" do
foobara_organization!
end
stub_module "SomeOrg::SomeDomain" do
foobara_domain!
end
stub_class "SomeOrg::SomeDomain::SomeCommand", Foobara::Command do
inputs do
foo :integer
bar :string
end

def execute
"success! #{foo} #{bar}"
end
end
end

it "has a version number" do
expect(Foobara::ResqueConnector::VERSION).to_not be_nil
end

describe ".connect" do
before do
command_connector.connect(command_class)
end

it "gives a working ::Async method" do
command = command_class::Async.new(foo: 1, bar: "bar")

expect {
command.run!
}.to change { Resque.size(:general) }.from(0).to(1)

job = Resque.peek(:general, 0, 1)

expect(job["class"]).to eq("Foobara::CommandConnectors::ResqueConnector::CommandJob")

args = job["args"].first
command_name = args["command_name"]
inputs = args["inputs"]

expect(command_name).to eq("SomeOrg::SomeDomain::SomeCommand")
expect(inputs).to eq("foo" => 1, "bar" => "bar")

worker = Resque::Worker.new(:general)

expect(worker.work_one_job).to be(true)
expect(Resque::Failure.count).to be(0)
expect(Resque.size(:general)).to be(0)
end
end
end
33 changes: 33 additions & 0 deletions src/command_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
module Foobara
module CommandConnectors
class ResqueConnector < CommandConnector
class CommandJob
class << self
def perform(job_data)
job_data = job_data.transform_keys(&:to_sym)

allowed_keys = %i[command_name inputs connector_name]

invalid_keys = job_data.keys - allowed_keys

if invalid_keys.any?
# :nocov:
raise ArgumentError, "Invalid keys: #{invalid_keys.join(", ")}"
# :nocov:
end

command_name = job_data[:command_name]
inputs = job_data[:inputs]
connector_name = job_data[:connector_name]

connector = ResqueConnector[connector_name]

command_class = connector.transformed_command_from_name(command_name)
command = command_class.new(inputs)
command.run!
end
end
end
end
end
end
110 changes: 110 additions & 0 deletions src/resque_connector.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
module Foobara
module CommandConnectors
class ResqueConnector < CommandConnector
class NoCommandFoundError < StandardError
attr_accessor :command_class

def initialize(command_class)
# :nocov:
self.command_class = command_class

super("No command found for #{command_class}")
# :nocov:
end
end

class << self
def all
@all ||= {}
end

def new(...)
instance = super

name = instance.name

if all.key?(name)
# :nocov:
raise "#{name} already registered"
# :nocov:
end

all[name] = instance
end

def [](name)
name = name.to_sym if name

unless all.key?(name)
# :nocov:
raise "#{name} not registered"
# :nocov:
end

all[name]
end
end

attr_accessor :name

def initialize(*, name: nil, **, &)
self.name = name.to_sym if name

super(*, **, &)
end

# NOTE: inputs transformer in this context is not clear. Is it how we transform for writing the job to redis?
# Or are we transforming what comes out of redis? It seems like redis serialize/redis deserialize would make
# more sense here. It feels like these types of inputs_transformer helpers from connectors like http are not
# universally meaningful.
# It also feels like CommandClass.run_async would be a more intuitive interface.
# This makes run_async feel like an "action" like "run" and "help". So maybe "actions" should be viewed
# as methods on Org/Domain/Commands/Connector.
# However, if it made a class, like SomeCommandAsync, then it could be exposed through other connectors
# and be declared in depends_on calls and have proper possible errors for that operation.
# But on the downside, it would appear in the domain's list of commands unless coming up with a clear way
# to express that. A way could be found, though. So probably creating a command class is better.
# And in this context maybe that should be the transformed command?
# So TransformedCommand is connector specific? And some connectors might have no TransformedCommand?
def connect(command_class, *, queue: nil, **, &)
transformed_command_class = super(command_class, *, **, &)

queue ||= Resque.queue_from_class(transformed_command_class) || :general
command_name_to_queue[transformed_command_class.full_command_name] = queue

if command_class.is_a?(Class) && command_class < Command
klass = Util.make_class("#{command_class.name}::Async", RunCommandAsync)

klass.inputs transformed_command_class.inputs_type.declaration_data

klass.resque_connector = self
klass.target_command_class = transformed_command_class
end

transformed_command_class
end

def enqueue(command_name, inputs = nil)
transformed_command_class = transformed_command_from_name(command_name)

unless transformed_command_class
# :nocov:
raise NoCommandFoundError, command_name
# :nocov:
end

job = { command_name: }
job[:inputs] = inputs unless inputs.empty?
job[:connector_name] = name unless name.nil?

queue = command_name_to_queue[command_name]

Resque.enqueue_to(queue, CommandJob, job)
end

def command_name_to_queue
@command_name_to_queue ||= {}
end
end
end
end
37 changes: 37 additions & 0 deletions src/resque_connector/run_command_async.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
module Foobara
module CommandConnectors
class ResqueConnector < CommandConnector
class RunCommandAsync < Command
class << self
attr_accessor :target_command_class, :resque_connector
end

# TODO: set result type to job

def execute
enqueue_command

job
end

attr_accessor :job

def enqueue_command
self.job = resque_connector.enqueue(full_command_name, raw_inputs)
end

def full_command_name
target_command_class.full_command_name
end

def resque_connector
self.class.resque_connector
end

def target_command_class
self.class.target_command_class
end
end
end
end
end

0 comments on commit 074412b

Please sign in to comment.