diff --git a/.gitignore b/.gitignore index 8c6f556..92f05d3 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,7 @@ thousand_island-*.tar # Ignore dialyzer caches /priv/plts/ +/tmp/ # ElixirLS -/.elixir_ls \ No newline at end of file +/.elixir_ls diff --git a/lib/thousand_island/transports/ssl.ex b/lib/thousand_island/transports/ssl.ex index db15bb4..35c81d4 100644 --- a/lib/thousand_island/transports/ssl.ex +++ b/lib/thousand_island/transports/ssl.ex @@ -45,6 +45,9 @@ defmodule ThousandIsland.Transports.SSL do @hardcoded_options [mode: :binary, active: false] + # Default chunk size: 8MB - balances memory usage vs syscall overhead + @sendfile_chunk_size 8 * 1024 * 1024 + @impl ThousandIsland.Transport @spec listen(:inet.port_number(), [:ssl.tls_server_option()]) :: ThousandIsland.Transport.on_listen() @@ -126,18 +129,12 @@ defmodule ThousandIsland.Transports.SSL do length :: non_neg_integer() ) :: ThousandIsland.Transport.on_sendfile() def sendfile(socket, filename, offset, length) do - # We can't use :file.sendfile here since it works on clear sockets, not ssl - # sockets. Build our own (much slower and not optimized for large files) version. - case :file.open(filename, [:raw]) do + # We can't use :file.sendfile here since it works on clear sockets, not ssl sockets. + # Build our own version with chunking for large files. + case :file.open(filename, [:read, :raw, :binary]) do {:ok, fd} -> try do - with {:ok, data} <- :file.pread(fd, offset, length), - :ok <- :ssl.send(socket, data) do - {:ok, length} - else - :eof -> {:error, :eof} - {:error, reason} -> {:error, reason} - end + sendfile_loop(socket, fd, offset, length, 0) after :file.close(fd) end @@ -147,6 +144,28 @@ defmodule ThousandIsland.Transports.SSL do end end + defp sendfile_loop(_socket, _fd, _offset, sent, sent) when 0 != sent do + {:ok, sent} + end + + defp sendfile_loop(socket, fd, offset, length, sent) do + with read_size <- chunk_size(length, sent, @sendfile_chunk_size), + {:ok, data} <- :file.pread(fd, offset, read_size), + :ok <- :ssl.send(socket, data) do + now_sent = byte_size(data) + sendfile_loop(socket, fd, offset + now_sent, length, sent + now_sent) + else + :eof -> + {:ok, sent} + + {:error, reason} -> + {:error, reason} + end + end + + defp chunk_size(0, _sent, chunk_size), do: chunk_size + defp chunk_size(length, sent, chunk), do: min(length - sent, chunk) + @impl ThousandIsland.Transport @spec getopts(socket(), ThousandIsland.Transport.socket_get_options()) :: ThousandIsland.Transport.on_getopts() diff --git a/lib/thousand_island/transports/tcp.ex b/lib/thousand_island/transports/tcp.ex index a7d7941..f0e9e2d 100644 --- a/lib/thousand_island/transports/tcp.ex +++ b/lib/thousand_island/transports/tcp.ex @@ -107,7 +107,7 @@ defmodule ThousandIsland.Transports.TCP do length :: non_neg_integer() ) :: ThousandIsland.Transport.on_sendfile() def sendfile(socket, filename, offset, length) do - case :file.open(filename, [:raw]) do + case :file.open(filename, [:read, :raw, :binary]) do {:ok, fd} -> try do :file.sendfile(fd, socket, offset, length, []) diff --git a/test/thousand_island/socket_test.exs b/test/thousand_island/socket_test.exs index 4cb2313..5427162 100644 --- a/test/thousand_island/socket_test.exs +++ b/test/thousand_island/socket_test.exs @@ -3,15 +3,22 @@ defmodule ThousandIsland.SocketTest do use Machete - def gen_tcp_setup(_context) do - {:ok, %{client_mod: :gen_tcp, client_opts: [active: false], server_opts: []}} + @eight_mb_chunks 8 * 1024 * 1024 + @large_file_size 256 * 1024 * 1024 + + def gen_tcp_setup(context) do + if context[:tmp_dir], do: maybe_create_big_file(context.tmp_dir) + {:ok, %{client_mod: :gen_tcp, client_opts: [:binary, active: false], server_opts: []}} end - def ssl_setup(_context) do + def ssl_setup(context) do + if context[:tmp_dir], do: maybe_create_big_file(context.tmp_dir) + {:ok, %{ client_mod: :ssl, client_opts: [ + :binary, active: false, verify: :verify_peer, cacertfile: Path.join(__DIR__, "../support/ca.pem") @@ -50,6 +57,18 @@ defmodule ThousandIsland.SocketTest do end end + defmodule LargeSendfile do + use ThousandIsland.Handler + + @impl ThousandIsland.Handler + def handle_connection(socket, state) do + large_file_path = Path.join(state[:tmp_dir], "large_sendfile") + ThousandIsland.Socket.sendfile(socket, large_file_path, 0, 0) + send(state[:test_pid], Process.info(self(), :monitored_by)) + {:close, state} + end + end + defmodule Closer do use ThousandIsland.Handler @@ -88,7 +107,7 @@ defmodule ThousandIsland.SocketTest do {:ok, client} = context.client_mod.connect(~c"localhost", port, context.client_opts) assert context.client_mod.send(client, "HELLO") == :ok - assert context.client_mod.recv(client, 0) == {:ok, ~c"HELLO"} + assert context.client_mod.recv(client, 0) == {:ok, "HELLO"} end test "it should close connections", context do @@ -105,7 +124,7 @@ defmodule ThousandIsland.SocketTest do {:ok, client} = context.client_mod.connect(~c"localhost", port, context.client_opts) :ok = context.client_mod.send(client, "HELLO") - {:ok, ~c"HELLO"} = context.client_mod.recv(client, 0) + {:ok, "HELLO"} = context.client_mod.recv(client, 0) context.client_mod.close(client) assert_receive {:telemetry, [:thousand_island, :connection, :recv], measurements, @@ -151,7 +170,18 @@ defmodule ThousandIsland.SocketTest do server_opts = Keyword.put(context.server_opts, :handler_options, test_pid: self()) {:ok, port} = start_handler(Sendfile, server_opts) {:ok, client} = context.client_mod.connect(~c"localhost", port, context.client_opts) - assert context.client_mod.recv(client, 9) == {:ok, ~c"ABCDEFBCD"} + assert context.client_mod.recv(client, 9) == {:ok, "ABCDEFBCD"} + assert_receive {:monitored_by, []} + end + + @tag :tmp_dir + test "it should send large files", %{tmp_dir: tmp_dir} = context do + opts = [test_pid: self(), tmp_dir: tmp_dir] + server_opts = Keyword.put(context.server_opts, :handler_options, opts) + {:ok, port} = start_handler(LargeSendfile, server_opts) + {:ok, client} = context.client_mod.connect(~c"localhost", port, context.client_opts) + total_received = receive_all_data(context.client_mod, client, @large_file_size, "") + assert byte_size(total_received) == @large_file_size assert_receive {:monitored_by, []} end end @@ -193,7 +223,18 @@ defmodule ThousandIsland.SocketTest do server_opts = Keyword.put(context.server_opts, :handler_options, test_pid: self()) {:ok, port} = start_handler(Sendfile, server_opts) {:ok, client} = context.client_mod.connect(~c"localhost", port, context.client_opts) - assert context.client_mod.recv(client, 9) == {:ok, ~c"ABCDEFBCD"} + assert context.client_mod.recv(client, 9) == {:ok, "ABCDEFBCD"} + assert_receive {:monitored_by, [_pid]} + end + + @tag :tmp_dir + test "it should send large files", %{tmp_dir: tmp_dir} = context do + opts = [test_pid: self(), tmp_dir: tmp_dir] + server_opts = Keyword.put(context.server_opts, :handler_options, opts) + {:ok, port} = start_handler(LargeSendfile, server_opts) + {:ok, client} = context.client_mod.connect(~c"localhost", port, context.client_opts) + total_received = receive_all_data(context.client_mod, client, @large_file_size, "") + assert byte_size(total_received) == @large_file_size assert_receive {:monitored_by, [_pid]} end end @@ -204,4 +245,32 @@ defmodule ThousandIsland.SocketTest do {:ok, {_ip, port}} = ThousandIsland.listener_info(server_pid) {:ok, port} end + + defp maybe_create_big_file(tmp_dir) do + path = Path.join(tmp_dir, "large_sendfile") + + unless File.exists?(path) and File.stat!(path).size == @large_file_size do + # Create a large file by writing 8MB chunks to avoid memory issues + chunks_needed = div(@large_file_size, @eight_mb_chunks) + chunk_data = :binary.copy(<<0>>, @eight_mb_chunks) + {:ok, file} = File.open(path, [:write, :binary]) + for _i <- 1..chunks_needed, do: IO.binwrite(file, chunk_data) + File.close(file) + end + end + + defp receive_all_data(_, _, total_size, acc) when total_size <= 0, do: acc + + defp receive_all_data(client_mod, client, total_size, acc) do + case client_mod.recv(client, @eight_mb_chunks) do + {:ok, data} -> + receive_all_data(client_mod, client, total_size - byte_size(data), acc <> data) + + {:error, :closed} when byte_size(acc) == total_size -> + acc + + {:error, reason} -> + raise "Failed to receive data: #{inspect(reason)}" + end + end end