Skip to content

Commit

Permalink
Apply bulk actions to selected jobs beyond page
Browse files Browse the repository at this point in the history
This expands the select functionality to extend beyond the current page
and include all filtered jobs, up to a configurable limit. The limit
defaults to 1000 and may be overridden with a resolver callback.

Closes oban-bg/oban#1104
  • Loading branch information
sorentwo committed Dec 5, 2024
1 parent a175f4b commit 44c9ab9
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 22 deletions.
2 changes: 1 addition & 1 deletion lib/oban/web/components/core.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ defmodule Oban.Web.Components.Core do

~H"""
<button
class={["flex items-center space-x-2 px-3 py-1.5 rounded text-sm", @class]}
class={["flex items-center space-x-2 px-3 py-1.5 rounded-md text-sm", @class]}
data-title={render_slot(@title)}
disabled={@disabled}
id={@click}
Expand Down
27 changes: 20 additions & 7 deletions lib/oban/web/pages/jobs_page.ex
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,15 @@ defmodule Oban.Web.JobsPage do
jobs = Query.all_jobs(params, conf, resolver: resolver)

selected =
jobs
|> MapSet.new(& &1.id)
|> MapSet.intersection(socket.assigns.selected)
if Enum.any?(socket.assigns.selected) do
all_job_ids = Query.all_job_ids(params, conf, resolver: resolver)

all_job_ids
|> MapSet.new()
|> MapSet.intersection(socket.assigns.selected)
else
MapSet.new()
end

assign(socket,
detailed: Query.refresh_job(conf, socket.assigns.detailed),
Expand Down Expand Up @@ -326,7 +332,14 @@ defmodule Oban.Web.JobsPage do
if Enum.any?(socket.assigns.selected) do
MapSet.new()
else
MapSet.new(socket.assigns.jobs, & &1.id)
# Always include the jobs we can see currently to compensate for slower refresh rates.
# Without this, visible jobs may not be selected and the interface looks broken.
local_set = MapSet.new(socket.assigns.jobs, & &1.id)

socket.assigns.params
|> Query.all_job_ids(socket.assigns.conf)
|> MapSet.new()
|> MapSet.union(local_set)
end

{:noreply, assign(socket, selected: selected)}
Expand Down Expand Up @@ -408,9 +421,9 @@ defmodule Oban.Web.JobsPage do

defp checked_mode(jobs, selected) do
cond do
Enum.any?(selected) and Enum.count(selected) == Enum.count(jobs) -> :all
Enum.any?(selected) -> :some
true -> :none
Enum.empty?(selected) -> :none
Enum.all?(jobs, &MapSet.member?(selected, &1.id)) -> :all
true -> :some
end
end

Expand Down
49 changes: 37 additions & 12 deletions lib/oban/web/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,7 @@ defmodule Oban.Web.Query do
# Queries

def all_jobs(params, conf, opts \\ []) do
params =
@defaults
|> Map.merge(params)
|> Map.update!(:sort_by, &maybe_atomize/1)
|> Map.update!(:sort_dir, &maybe_atomize/1)

params = params_with_defaults(params)
conditions = Enum.reduce(params, true, &filter/2)

query =
Expand All @@ -384,6 +379,29 @@ defmodule Oban.Web.Query do
Repo.all(conf, query)
end

def all_job_ids(params, conf, opts \\ []) do
params = params_with_defaults(params)
conditions = Enum.reduce(params, true, &filter/2)
limit = bulk_action_limit(params.state, opts)

query =
params.state
|> jobs_limit_query(opts)
|> select([j], j.id)
|> where(^conditions)
|> order(params.sort_by, params.state, params.sort_dir)
|> limit(^limit)

Repo.all(conf, query)
end

defp params_with_defaults(params) do
@defaults
|> Map.merge(params)
|> Map.update!(:sort_by, &maybe_atomize/1)
|> Map.update!(:sort_dir, &maybe_atomize/1)
end

defp jobs_limit_query(state, opts) do
@states
|> Map.fetch!(state)
Expand All @@ -394,13 +412,12 @@ defmodule Oban.Web.Query do
limit_query(qual, :hint_query_limit, opts)
end

defp bulk_action_limit(state, opts) do
resolver(:bulk_action_limit, opts).bulk_action_limit(state)
end

defp limit_query(value, fun, opts) do
resolver =
if function_exported?(opts[:resolver], fun, 1) do
opts[:resolver]
else
Resolver
end
resolver = resolver(fun, opts)

case apply(resolver, fun, [value]) do
:infinity ->
Expand All @@ -417,6 +434,14 @@ defmodule Oban.Web.Query do
end
end

defp resolver(fun, opts) do
if function_exported?(opts[:resolver], fun, 1) do
opts[:resolver]
else
Resolver
end
end

def refresh_job(%Config{} = conf, %Job{id: job_id} = job) do
query =
Job
Expand Down
32 changes: 30 additions & 2 deletions lib/oban/web/resolver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ defmodule Oban.Web.Resolver do
@impl true
def hint_query_limit(_qualifier), do: 10_000
@impl true
def bulk_action_limit(_state), do: 1_000
end
```
Expand Down Expand Up @@ -303,14 +306,14 @@ defmodule Oban.Web.Resolver do
The limit may be determined by state, e.g. `:completed` or `:cancelled`, to fine-tune query
performance for larger states. Limiting may be disabled with `:infinity`.
Without a callback impleted, the `:completed` state defaults to a conservative 100k jobs and all
Without a callback implemented, the `:completed` state defaults to a conservative 100k jobs and all
other states are `:infinite`.
## Example
Restrict the limit for all states:
def jobs_query_limit(_qualifier), do: 50_000
def jobs_query_limit(_state), do: 50_000
Use a conservative the limit for `:completed` without any limit for other states (this is the
default):
Expand Down Expand Up @@ -342,9 +345,31 @@ defmodule Oban.Web.Resolver do
"""
@callback hint_query_limit(qualifier()) :: :infinity | pos_integer()

@doc """
The maximum number of jobs that can be selected and operated on in bulk.
The limit may be determined by state, e.g. `:completed` or `:cancelled`, to fine-tune query
performance for larger states. Limiting may be disabled with `:infinity`.
Without a callback implemented all states default to 1000 jobs.
## Example
Bump the limit for all states:
def bulk_action_limit(_state), do: 5_000
Use a lower limit for `:completed` without any limit for other states:
def bulk_action_limit(:completed), do: 10_000
def bulk_action_limit(_state), do: :infinity
"""
@callback bulk_action_limit(Job.unique_state()) :: :infinity | pos_integer()

@optional_callbacks format_job_args: 1,
format_job_meta: 1,
format_recorded: 2,
bulk_action_limit: 1,
hint_query_limit: 1,
jobs_query_limit: 1,
resolve_user: 1,
Expand Down Expand Up @@ -412,4 +437,7 @@ defmodule Oban.Web.Resolver do

@doc false
def hint_query_limit(_qualifier), do: 10_000

@doc false
def bulk_action_limit(_state), do: 1_000
end
17 changes: 17 additions & 0 deletions test/oban/web/query_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,23 @@ defmodule Oban.Web.QueryTest do
end
end

describe "all_job_ids/3" do
test "returning all ids within the current filters" do
job_1 = insert_job!(%{ref: 1}, worker: MyApp.VideoA)
job_2 = insert_job!(%{ref: 2}, worker: MyApp.VideoB)
job_3 = insert_job!(%{ref: 3}, worker: MyApp.VideoB)

all_job_ids = fn params, opts ->
params
|> Map.put_new(:state, "available")
|> Query.all_job_ids(@conf, opts)
end

assert [job_1.id, job_2.id, job_3.id] == all_job_ids.(%{}, [])
assert [job_2.id, job_3.id] == all_job_ids.(%{workers: ~w(MyApp.VideoB)}, [])
end
end

defp filter_refs(params, opts \\ []) do
params =
params
Expand Down

0 comments on commit 44c9ab9

Please sign in to comment.