diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c055b547..e8725671 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,13 +17,13 @@ jobs: matrix: include: # Earliest-supported Elixir/Erlang pair. - - elixir: "1.12" + - elixir: "1.14" otp: "24.3" PLUG_CRYPTO_2_0: "false" # Latest-supported Elixir/Erlang pair. - elixir: "1.18" - otp: "27.2" + otp: "27.3" lint: lint PLUG_CRYPTO_2_0: "true" diff --git a/lib/plug/application.ex b/lib/plug/application.ex index de3a8c0f..46090c59 100644 --- a/lib/plug/application.ex +++ b/lib/plug/application.ex @@ -8,7 +8,7 @@ defmodule Plug.Application do Plug.Keys = :ets.new(Plug.Keys, [:named_table, :public, read_concurrency: true]) children = [ - Plug.Upload + Plug.Upload.Supervisor ] Supervisor.start_link(children, name: __MODULE__, strategy: :one_for_one) diff --git a/lib/plug/upload.ex b/lib/plug/upload.ex index b5b83037..376c9c6d 100644 --- a/lib/plug/upload.ex +++ b/lib/plug/upload.ex @@ -40,7 +40,6 @@ defmodule Plug.Upload do @dir_table __MODULE__.Dir @path_table __MODULE__.Path @max_attempts 10 - @temp_env_vars ~w(PLUG_TMPDIR TMPDIR TMP TEMP)s @doc """ Requests a random file to be created in the upload directory @@ -85,7 +84,7 @@ defmodule Plug.Upload do :ok [] -> - server = plug_server() + server = plug_server(to_pid) {:ok, tmp} = generate_tmp_dir() :ok = GenServer.call(server, {:give_away, to_pid, tmp, path}) :ets.delete_object(@path_table, {from_pid, path}) @@ -105,7 +104,7 @@ defmodule Plug.Upload do {:ok, tmp} [] -> - server = plug_server() + server = plug_server(pid) GenServer.cast(server, {:monitor, pid}) with {:ok, tmp} <- generate_tmp_dir() do @@ -188,30 +187,23 @@ defmodule Plug.Upload do end end - defp plug_server do - Process.whereis(__MODULE__) || + defp plug_server(pid) do + PartitionSupervisor.whereis_name({__MODULE__, pid}) + rescue + ArgumentError -> raise Plug.UploadError, "could not find process Plug.Upload. Have you started the :plug application?" end @doc false def start_link(_) do - GenServer.start_link(__MODULE__, :ok, name: __MODULE__) + GenServer.start_link(__MODULE__, :ok) end ## Callbacks @impl true def init(:ok) do - Process.flag(:trap_exit, true) - tmp = Enum.find_value(@temp_env_vars, "/tmp", &System.get_env/1) |> Path.expand() - cwd = Path.join(File.cwd!(), "tmp") - # Add a tiny random component to avoid clashes between nodes - suffix = :crypto.strong_rand_bytes(3) |> Base.url_encode64() - :persistent_term.put(__MODULE__, {[tmp, cwd], suffix}) - - :ets.new(@dir_table, [:named_table, :public, :set]) - :ets.new(@path_table, [:named_table, :public, :duplicate_bag]) {:ok, %{}} end @@ -255,12 +247,6 @@ defmodule Plug.Upload do {:noreply, state} end - @impl true - def terminate(_reason, _state) do - folder = fn entry, :ok -> delete_path(entry) end - :ets.foldl(folder, :ok, @path_table) - end - defp delete_path({_pid, path}) do :file.delete(path) :ok diff --git a/lib/plug/upload/supervisor.ex b/lib/plug/upload/supervisor.ex new file mode 100644 index 00000000..a1374517 --- /dev/null +++ b/lib/plug/upload/supervisor.ex @@ -0,0 +1,34 @@ +defmodule Plug.Upload.Supervisor do + @moduledoc false + use Supervisor + + @temp_env_vars ~w(PLUG_TMPDIR TMPDIR TMP TEMP)s + @dir_table Plug.Upload.Dir + @path_table Plug.Upload.Path + @otp_vsn System.otp_release() |> String.to_integer() + @write_mode if @otp_vsn >= 25, do: :auto, else: true + @ets_opts [:public, :named_table, read_concurrency: true, write_concurrency: @write_mode] + + def start_link(args) do + Supervisor.start_link(__MODULE__, args, name: __MODULE__) + end + + @impl true + def init(_args) do + # Initialize the upload system + tmp = Enum.find_value(@temp_env_vars, "/tmp", &System.get_env/1) |> Path.expand() + cwd = Path.join(File.cwd!(), "tmp") + # Add a tiny random component to avoid clashes between nodes + suffix = :crypto.strong_rand_bytes(3) |> Base.url_encode64() + :persistent_term.put(Plug.Upload, {[tmp, cwd], suffix}) + :ets.new(@dir_table, [:set | @ets_opts]) + :ets.new(@path_table, [:duplicate_bag | @ets_opts]) + + children = [ + Plug.Upload.Terminator, + {PartitionSupervisor, child_spec: Plug.Upload, name: Plug.Upload} + ] + + Supervisor.init(children, strategy: :one_for_one) + end +end diff --git a/lib/plug/upload/terminator.ex b/lib/plug/upload/terminator.ex new file mode 100644 index 00000000..343f573c --- /dev/null +++ b/lib/plug/upload/terminator.ex @@ -0,0 +1,27 @@ +defmodule Plug.Upload.Terminator do + @moduledoc false + use GenServer + + @path_table Plug.Upload.Path + + def start_link(_) do + GenServer.start_link(__MODULE__, :ok) + end + + @impl true + def init(:ok) do + Process.flag(:trap_exit, true) + {:ok, %{}} + end + + @impl true + def terminate(_reason, _state) do + folder = fn entry, :ok -> delete_path(entry) end + :ets.foldl(folder, :ok, @path_table) + end + + defp delete_path({_pid, path}) do + :file.delete(path) + :ok + end +end diff --git a/test/plug/upload_test.exs b/test/plug/upload_test.exs index c9ae1b96..89a218b9 100644 --- a/test/plug/upload_test.exs +++ b/test/plug/upload_test.exs @@ -35,7 +35,7 @@ defmodule Plug.UploadTest do test "terminate removes all files" do {:ok, path} = Plug.Upload.random_file("sample") - :ok = Plug.Upload.terminate(:shutdown, []) + :ok = Plug.Upload.Terminator.terminate(:shutdown, []) refute File.exists?(path) end @@ -224,4 +224,54 @@ defmodule Plug.UploadTest do wait_until(fn -> not File.exists?(path1) end) end end + + test "give_away with invalid path returns error" do + result = Plug.Upload.give_away("/invalid/path", spawn(fn -> :ok end)) + assert result == {:error, :unknown_path} + end + + test "give_away when target process dies during transfer" do + {:ok, path} = Plug.Upload.random_file("target_dies") + + # Create a process that dies immediately + pid = spawn(fn -> :ok end) + + # This should still work but file will be cleaned up when dead process is detected + result = Plug.Upload.give_away(path, pid) + assert result == :ok + wait_until(fn -> not File.exists?(path) end) + end + + test "routes uploads to correct partition based on process" do + parent = self() + num_processes = 10 + + # Create uploads from different processes and verify they get different servers + tasks = + Enum.map(1..num_processes, fn i -> + Task.async(fn -> + {:ok, path} = Plug.Upload.random_file("partition_test_#{i}") + server = PartitionSupervisor.whereis_name({Plug.Upload, self()}) + send(parent, {:result, i, path, server}) + path + end) + end) + + # Collect results + results = + Enum.map(1..num_processes, fn _ -> + receive do + {:result, i, path, server} -> {i, path, server} + after + 1_000 -> flunk("didn't get result") + end + end) + + # Verify different processes got different servers (partitioning working) + servers = Enum.map(results, fn {_, _, server} -> server end) + assert length(Enum.uniq(servers)) > 1 + + # Cleanup + Enum.each(tasks, &Task.await/1) + end end