Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ jobs:
matrix:
include:
# Earliest-supported Elixir/Erlang pair.
- elixir: "1.12"
- elixir: "1.14"
otp: "24.3"
PLUG_CRYPTO_2_0: "false"

# Latest-supported Elixir/Erlang pair.
- elixir: "1.18"
otp: "27.2"
otp: "27.3"
lint: lint
PLUG_CRYPTO_2_0: "true"

Expand Down
2 changes: 1 addition & 1 deletion lib/plug/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Plug.Application do
Plug.Keys = :ets.new(Plug.Keys, [:named_table, :public, read_concurrency: true])

children = [
Plug.Upload
Plug.Upload.Supervisor
]

Supervisor.start_link(children, name: __MODULE__, strategy: :one_for_one)
Expand Down
28 changes: 7 additions & 21 deletions lib/plug/upload.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ defmodule Plug.Upload do
@dir_table __MODULE__.Dir
@path_table __MODULE__.Path
@max_attempts 10
@temp_env_vars ~w(PLUG_TMPDIR TMPDIR TMP TEMP)s

@doc """
Requests a random file to be created in the upload directory
Expand Down Expand Up @@ -85,7 +84,7 @@ defmodule Plug.Upload do
:ok

[] ->
server = plug_server()
server = plug_server(to_pid)
{:ok, tmp} = generate_tmp_dir()
:ok = GenServer.call(server, {:give_away, to_pid, tmp, path})
:ets.delete_object(@path_table, {from_pid, path})
Expand All @@ -105,7 +104,7 @@ defmodule Plug.Upload do
{:ok, tmp}

[] ->
server = plug_server()
server = plug_server(pid)
GenServer.cast(server, {:monitor, pid})

with {:ok, tmp} <- generate_tmp_dir() do
Expand Down Expand Up @@ -188,30 +187,23 @@ defmodule Plug.Upload do
end
end

defp plug_server do
Process.whereis(__MODULE__) ||
defp plug_server(pid) do
PartitionSupervisor.whereis_name({__MODULE__, pid})
rescue
ArgumentError ->
raise Plug.UploadError,
"could not find process Plug.Upload. Have you started the :plug application?"
end

@doc false
def start_link(_) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
GenServer.start_link(__MODULE__, :ok)
end

## Callbacks

@impl true
def init(:ok) do
Process.flag(:trap_exit, true)
tmp = Enum.find_value(@temp_env_vars, "/tmp", &System.get_env/1) |> Path.expand()
cwd = Path.join(File.cwd!(), "tmp")
# Add a tiny random component to avoid clashes between nodes
suffix = :crypto.strong_rand_bytes(3) |> Base.url_encode64()
:persistent_term.put(__MODULE__, {[tmp, cwd], suffix})

:ets.new(@dir_table, [:named_table, :public, :set])
:ets.new(@path_table, [:named_table, :public, :duplicate_bag])
{:ok, %{}}
end

Expand Down Expand Up @@ -255,12 +247,6 @@ defmodule Plug.Upload do
{:noreply, state}
end

@impl true
def terminate(_reason, _state) do
folder = fn entry, :ok -> delete_path(entry) end
:ets.foldl(folder, :ok, @path_table)
end

defp delete_path({_pid, path}) do
:file.delete(path)
:ok
Expand Down
34 changes: 34 additions & 0 deletions lib/plug/upload/supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
defmodule Plug.Upload.Supervisor do
@moduledoc false
use Supervisor

@temp_env_vars ~w(PLUG_TMPDIR TMPDIR TMP TEMP)s
@dir_table Plug.Upload.Dir
@path_table Plug.Upload.Path
@otp_vsn System.otp_release() |> String.to_integer()
@write_mode if @otp_vsn >= 25, do: :auto, else: true
@ets_opts [:public, :named_table, read_concurrency: true, write_concurrency: @write_mode]

def start_link(args) do
Supervisor.start_link(__MODULE__, args, name: __MODULE__)
end

@impl true
def init(_args) do
# Initialize the upload system
tmp = Enum.find_value(@temp_env_vars, "/tmp", &System.get_env/1) |> Path.expand()
cwd = Path.join(File.cwd!(), "tmp")
# Add a tiny random component to avoid clashes between nodes
suffix = :crypto.strong_rand_bytes(3) |> Base.url_encode64()
:persistent_term.put(Plug.Upload, {[tmp, cwd], suffix})
:ets.new(@dir_table, [:set | @ets_opts])
:ets.new(@path_table, [:duplicate_bag | @ets_opts])

children = [
Plug.Upload.Terminator,
{PartitionSupervisor, child_spec: Plug.Upload, name: Plug.Upload}
]

Supervisor.init(children, strategy: :one_for_one)
end
end
27 changes: 27 additions & 0 deletions lib/plug/upload/terminator.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
defmodule Plug.Upload.Terminator do
@moduledoc false
use GenServer

@path_table Plug.Upload.Path

def start_link(_) do
GenServer.start_link(__MODULE__, :ok)
end

@impl true
def init(:ok) do
Process.flag(:trap_exit, true)
{:ok, %{}}
end

@impl true
def terminate(_reason, _state) do
folder = fn entry, :ok -> delete_path(entry) end
:ets.foldl(folder, :ok, @path_table)
end

defp delete_path({_pid, path}) do
:file.delete(path)
:ok
end
end
52 changes: 51 additions & 1 deletion test/plug/upload_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ defmodule Plug.UploadTest do

test "terminate removes all files" do
{:ok, path} = Plug.Upload.random_file("sample")
:ok = Plug.Upload.terminate(:shutdown, [])
:ok = Plug.Upload.Terminator.terminate(:shutdown, [])
refute File.exists?(path)
end

Expand Down Expand Up @@ -224,4 +224,54 @@ defmodule Plug.UploadTest do
wait_until(fn -> not File.exists?(path1) end)
end
end

test "give_away with invalid path returns error" do
result = Plug.Upload.give_away("/invalid/path", spawn(fn -> :ok end))
assert result == {:error, :unknown_path}
end

test "give_away when target process dies during transfer" do
{:ok, path} = Plug.Upload.random_file("target_dies")

# Create a process that dies immediately
pid = spawn(fn -> :ok end)

# This should still work but file will be cleaned up when dead process is detected
result = Plug.Upload.give_away(path, pid)
assert result == :ok
wait_until(fn -> not File.exists?(path) end)
end

test "routes uploads to correct partition based on process" do
parent = self()
num_processes = 10

# Create uploads from different processes and verify they get different servers
tasks =
Enum.map(1..num_processes, fn i ->
Task.async(fn ->
{:ok, path} = Plug.Upload.random_file("partition_test_#{i}")
server = PartitionSupervisor.whereis_name({Plug.Upload, self()})
send(parent, {:result, i, path, server})
path
end)
end)

# Collect results
results =
Enum.map(1..num_processes, fn _ ->
receive do
{:result, i, path, server} -> {i, path, server}
after
1_000 -> flunk("didn't get result")
end
end)

# Verify different processes got different servers (partitioning working)
servers = Enum.map(results, fn {_, _, server} -> server end)
assert length(Enum.uniq(servers)) > 1

# Cleanup
Enum.each(tasks, &Task.await/1)
end
end