-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement ResqueConnector and enqueuing/running jobs async
- Loading branch information
Showing
4 changed files
with
237 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,61 @@ | ||
RSpec.describe Foobara::ResqueConnector do | ||
RSpec.describe Foobara::CommandConnectors::ResqueConnector do | ||
after do | ||
Foobara::ResqueConnector.reset_all | ||
end | ||
|
||
let(:command_connector) { described_class.new } | ||
|
||
let(:command_class) do | ||
stub_module "SomeOrg" do | ||
foobara_organization! | ||
end | ||
stub_module "SomeOrg::SomeDomain" do | ||
foobara_domain! | ||
end | ||
stub_class "SomeOrg::SomeDomain::SomeCommand", Foobara::Command do | ||
inputs do | ||
foo :integer | ||
bar :string | ||
end | ||
|
||
def execute | ||
"success! #{foo} #{bar}" | ||
end | ||
end | ||
end | ||
|
||
it "has a version number" do | ||
expect(Foobara::ResqueConnector::VERSION).to_not be_nil | ||
end | ||
|
||
describe ".connect" do | ||
before do | ||
command_connector.connect(command_class) | ||
end | ||
|
||
it "gives a working ::Async method" do | ||
command = command_class::Async.new(foo: 1, bar: "bar") | ||
|
||
expect { | ||
command.run! | ||
}.to change { Resque.size(:general) }.from(0).to(1) | ||
|
||
job = Resque.peek(:general, 0, 1) | ||
|
||
expect(job["class"]).to eq("Foobara::CommandConnectors::ResqueConnector::CommandJob") | ||
|
||
args = job["args"].first | ||
command_name = args["command_name"] | ||
inputs = args["inputs"] | ||
|
||
expect(command_name).to eq("SomeOrg::SomeDomain::SomeCommand") | ||
expect(inputs).to eq("foo" => 1, "bar" => "bar") | ||
|
||
worker = Resque::Worker.new(:general) | ||
|
||
expect(worker.work_one_job).to be(true) | ||
expect(Resque::Failure.count).to be(0) | ||
expect(Resque.size(:general)).to be(0) | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
module Foobara | ||
module CommandConnectors | ||
class ResqueConnector < CommandConnector | ||
class CommandJob | ||
class << self | ||
def perform(job_data) | ||
job_data = job_data.transform_keys(&:to_sym) | ||
|
||
allowed_keys = %i[command_name inputs connector_name] | ||
|
||
invalid_keys = job_data.keys - allowed_keys | ||
|
||
if invalid_keys.any? | ||
# :nocov: | ||
raise ArgumentError, "Invalid keys: #{invalid_keys.join(", ")}" | ||
# :nocov: | ||
end | ||
|
||
command_name = job_data[:command_name] | ||
inputs = job_data[:inputs] | ||
connector_name = job_data[:connector_name] | ||
|
||
connector = ResqueConnector[connector_name] | ||
|
||
command_class = connector.transformed_command_from_name(command_name) | ||
command = command_class.new(inputs) | ||
command.run! | ||
end | ||
end | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
module Foobara | ||
module CommandConnectors | ||
class ResqueConnector < CommandConnector | ||
class NoCommandFoundError < StandardError | ||
attr_accessor :command_class | ||
|
||
def initialize(command_class) | ||
# :nocov: | ||
self.command_class = command_class | ||
|
||
super("No command found for #{command_class}") | ||
# :nocov: | ||
end | ||
end | ||
|
||
class << self | ||
def all | ||
@all ||= {} | ||
end | ||
|
||
def new(...) | ||
instance = super | ||
|
||
name = instance.name | ||
|
||
if all.key?(name) | ||
# :nocov: | ||
raise "#{name} already registered" | ||
# :nocov: | ||
end | ||
|
||
all[name] = instance | ||
end | ||
|
||
def [](name) | ||
name = name.to_sym if name | ||
|
||
unless all.key?(name) | ||
# :nocov: | ||
raise "#{name} not registered" | ||
# :nocov: | ||
end | ||
|
||
all[name] | ||
end | ||
end | ||
|
||
attr_accessor :name | ||
|
||
def initialize(*, name: nil, **, &) | ||
self.name = name.to_sym if name | ||
|
||
super(*, **, &) | ||
end | ||
|
||
# NOTE: inputs transformer in this context is not clear. Is it how we transform for writing the job to redis? | ||
# Or are we transforming what comes out of redis? It seems like redis serialize/redis deserialize would make | ||
# more sense here. It feels like these types of inputs_transformer helpers from connectors like http are not | ||
# universally meaningful. | ||
# It also feels like CommandClass.run_async would be a more intuitive interface. | ||
# This makes run_async feel like an "action" like "run" and "help". So maybe "actions" should be viewed | ||
# as methods on Org/Domain/Commands/Connector. | ||
# However, if it made a class, like SomeCommandAsync, then it could be exposed through other connectors | ||
# and be declared in depends_on calls and have proper possible errors for that operation. | ||
# But on the downside, it would appear in the domain's list of commands unless coming up with a clear way | ||
# to express that. A way could be found, though. So probably creating a command class is better. | ||
# And in this context maybe that should be the transformed command? | ||
# So TransformedCommand is connector specific? And some connectors might have no TransformedCommand? | ||
def connect(command_class, *, queue: nil, **, &) | ||
transformed_command_class = super(command_class, *, **, &) | ||
|
||
queue ||= Resque.queue_from_class(transformed_command_class) || :general | ||
command_name_to_queue[transformed_command_class.full_command_name] = queue | ||
|
||
if command_class.is_a?(Class) && command_class < Command | ||
klass = Util.make_class("#{command_class.name}::Async", RunCommandAsync) | ||
|
||
klass.inputs transformed_command_class.inputs_type.declaration_data | ||
|
||
klass.resque_connector = self | ||
klass.target_command_class = transformed_command_class | ||
end | ||
|
||
transformed_command_class | ||
end | ||
|
||
def enqueue(command_name, inputs = nil) | ||
transformed_command_class = transformed_command_from_name(command_name) | ||
|
||
unless transformed_command_class | ||
# :nocov: | ||
raise NoCommandFoundError, command_name | ||
# :nocov: | ||
end | ||
|
||
job = { command_name: } | ||
job[:inputs] = inputs unless inputs.empty? | ||
job[:connector_name] = name unless name.nil? | ||
|
||
queue = command_name_to_queue[command_name] | ||
|
||
Resque.enqueue_to(queue, CommandJob, job) | ||
end | ||
|
||
def command_name_to_queue | ||
@command_name_to_queue ||= {} | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
module Foobara | ||
module CommandConnectors | ||
class ResqueConnector < CommandConnector | ||
class RunCommandAsync < Command | ||
class << self | ||
attr_accessor :target_command_class, :resque_connector | ||
end | ||
|
||
# TODO: set result type to job | ||
|
||
def execute | ||
enqueue_command | ||
|
||
job | ||
end | ||
|
||
attr_accessor :job | ||
|
||
def enqueue_command | ||
self.job = resque_connector.enqueue(full_command_name, raw_inputs) | ||
end | ||
|
||
def full_command_name | ||
target_command_class.full_command_name | ||
end | ||
|
||
def resque_connector | ||
self.class.resque_connector | ||
end | ||
|
||
def target_command_class | ||
self.class.target_command_class | ||
end | ||
end | ||
end | ||
end | ||
end |