Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ thousand_island-*.tar
# Ignore dialyzer caches
/priv/plts/

/tmp/

# ElixirLS
/.elixir_ls
/.elixir_ls
39 changes: 29 additions & 10 deletions lib/thousand_island/transports/ssl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ defmodule ThousandIsland.Transports.SSL do

@hardcoded_options [mode: :binary, active: false]

# Default chunk size: 8MB - balances memory usage vs syscall overhead
@sendfile_chunk_size 8 * 1024 * 1024

@impl ThousandIsland.Transport
@spec listen(:inet.port_number(), [:ssl.tls_server_option()]) ::
ThousandIsland.Transport.on_listen()
Expand Down Expand Up @@ -126,18 +129,12 @@ defmodule ThousandIsland.Transports.SSL do
length :: non_neg_integer()
) :: ThousandIsland.Transport.on_sendfile()
def sendfile(socket, filename, offset, length) do
# We can't use :file.sendfile here since it works on clear sockets, not ssl
# sockets. Build our own (much slower and not optimized for large files) version.
case :file.open(filename, [:raw]) do
# We can't use :file.sendfile here since it works on clear sockets, not ssl sockets.
# Build our own version with chunking for large files.
case :file.open(filename, [:read, :raw, :binary]) do
{:ok, fd} ->
try do
with {:ok, data} <- :file.pread(fd, offset, length),
:ok <- :ssl.send(socket, data) do
{:ok, length}
else
:eof -> {:error, :eof}
{:error, reason} -> {:error, reason}
end
sendfile_loop(socket, fd, offset, length, 0)
after
:file.close(fd)
end
Expand All @@ -147,6 +144,28 @@ defmodule ThousandIsland.Transports.SSL do
end
end

defp sendfile_loop(_socket, _fd, _offset, sent, sent) when 0 != sent do
{:ok, sent}
end

defp sendfile_loop(socket, fd, offset, length, sent) do
with read_size <- chunk_size(length, sent, @sendfile_chunk_size),
{:ok, data} <- :file.pread(fd, offset, read_size),
:ok <- :ssl.send(socket, data) do
now_sent = byte_size(data)
sendfile_loop(socket, fd, offset + now_sent, length, sent + now_sent)
else
:eof ->
{:ok, sent}

{:error, reason} ->
{:error, reason}
end
end

defp chunk_size(0, _sent, chunk_size), do: chunk_size
defp chunk_size(length, sent, chunk), do: min(length - sent, chunk)

@impl ThousandIsland.Transport
@spec getopts(socket(), ThousandIsland.Transport.socket_get_options()) ::
ThousandIsland.Transport.on_getopts()
Expand Down
2 changes: 1 addition & 1 deletion lib/thousand_island/transports/tcp.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ defmodule ThousandIsland.Transports.TCP do
length :: non_neg_integer()
) :: ThousandIsland.Transport.on_sendfile()
def sendfile(socket, filename, offset, length) do
case :file.open(filename, [:raw]) do
case :file.open(filename, [:read, :raw, :binary]) do
{:ok, fd} ->
try do
:file.sendfile(fd, socket, offset, length, [])
Expand Down
83 changes: 76 additions & 7 deletions test/thousand_island/socket_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,22 @@ defmodule ThousandIsland.SocketTest do

use Machete

def gen_tcp_setup(_context) do
{:ok, %{client_mod: :gen_tcp, client_opts: [active: false], server_opts: []}}
@eight_mb_chunks 8 * 1024 * 1024
@large_file_size 256 * 1024 * 1024

def gen_tcp_setup(context) do
if context[:tmp_dir], do: maybe_create_big_file(context.tmp_dir)
{:ok, %{client_mod: :gen_tcp, client_opts: [:binary, active: false], server_opts: []}}
end

def ssl_setup(_context) do
def ssl_setup(context) do
if context[:tmp_dir], do: maybe_create_big_file(context.tmp_dir)

{:ok,
%{
client_mod: :ssl,
client_opts: [
:binary,
active: false,
verify: :verify_peer,
cacertfile: Path.join(__DIR__, "../support/ca.pem")
Expand Down Expand Up @@ -50,6 +57,18 @@ defmodule ThousandIsland.SocketTest do
end
end

defmodule LargeSendfile do
use ThousandIsland.Handler

@impl ThousandIsland.Handler
def handle_connection(socket, state) do
large_file_path = Path.join(state[:tmp_dir], "large_sendfile")
ThousandIsland.Socket.sendfile(socket, large_file_path, 0, 0)
send(state[:test_pid], Process.info(self(), :monitored_by))
{:close, state}
end
end

defmodule Closer do
use ThousandIsland.Handler

Expand Down Expand Up @@ -88,7 +107,7 @@ defmodule ThousandIsland.SocketTest do
{:ok, client} = context.client_mod.connect(~c"localhost", port, context.client_opts)

assert context.client_mod.send(client, "HELLO") == :ok
assert context.client_mod.recv(client, 0) == {:ok, ~c"HELLO"}
assert context.client_mod.recv(client, 0) == {:ok, "HELLO"}
end

test "it should close connections", context do
Expand All @@ -105,7 +124,7 @@ defmodule ThousandIsland.SocketTest do
{:ok, client} = context.client_mod.connect(~c"localhost", port, context.client_opts)

:ok = context.client_mod.send(client, "HELLO")
{:ok, ~c"HELLO"} = context.client_mod.recv(client, 0)
{:ok, "HELLO"} = context.client_mod.recv(client, 0)
context.client_mod.close(client)

assert_receive {:telemetry, [:thousand_island, :connection, :recv], measurements,
Expand Down Expand Up @@ -151,7 +170,18 @@ defmodule ThousandIsland.SocketTest do
server_opts = Keyword.put(context.server_opts, :handler_options, test_pid: self())
{:ok, port} = start_handler(Sendfile, server_opts)
{:ok, client} = context.client_mod.connect(~c"localhost", port, context.client_opts)
assert context.client_mod.recv(client, 9) == {:ok, ~c"ABCDEFBCD"}
assert context.client_mod.recv(client, 9) == {:ok, "ABCDEFBCD"}
assert_receive {:monitored_by, []}
end

@tag :tmp_dir
test "it should send large files", %{tmp_dir: tmp_dir} = context do
opts = [test_pid: self(), tmp_dir: tmp_dir]
server_opts = Keyword.put(context.server_opts, :handler_options, opts)
{:ok, port} = start_handler(LargeSendfile, server_opts)
{:ok, client} = context.client_mod.connect(~c"localhost", port, context.client_opts)
total_received = receive_all_data(context.client_mod, client, @large_file_size, "")
assert byte_size(total_received) == @large_file_size
assert_receive {:monitored_by, []}
end
end
Expand Down Expand Up @@ -193,7 +223,18 @@ defmodule ThousandIsland.SocketTest do
server_opts = Keyword.put(context.server_opts, :handler_options, test_pid: self())
{:ok, port} = start_handler(Sendfile, server_opts)
{:ok, client} = context.client_mod.connect(~c"localhost", port, context.client_opts)
assert context.client_mod.recv(client, 9) == {:ok, ~c"ABCDEFBCD"}
assert context.client_mod.recv(client, 9) == {:ok, "ABCDEFBCD"}
assert_receive {:monitored_by, [_pid]}
end

@tag :tmp_dir
test "it should send large files", %{tmp_dir: tmp_dir} = context do
opts = [test_pid: self(), tmp_dir: tmp_dir]
server_opts = Keyword.put(context.server_opts, :handler_options, opts)
{:ok, port} = start_handler(LargeSendfile, server_opts)
{:ok, client} = context.client_mod.connect(~c"localhost", port, context.client_opts)
total_received = receive_all_data(context.client_mod, client, @large_file_size, "")
assert byte_size(total_received) == @large_file_size
assert_receive {:monitored_by, [_pid]}
end
end
Expand All @@ -204,4 +245,32 @@ defmodule ThousandIsland.SocketTest do
{:ok, {_ip, port}} = ThousandIsland.listener_info(server_pid)
{:ok, port}
end

defp maybe_create_big_file(tmp_dir) do
path = Path.join(tmp_dir, "large_sendfile")

unless File.exists?(path) and File.stat!(path).size == @large_file_size do
# Create a large file by writing 8MB chunks to avoid memory issues
chunks_needed = div(@large_file_size, @eight_mb_chunks)
chunk_data = :binary.copy(<<0>>, @eight_mb_chunks)
{:ok, file} = File.open(path, [:write, :binary])
for _i <- 1..chunks_needed, do: IO.binwrite(file, chunk_data)
File.close(file)
end
end

defp receive_all_data(_, _, total_size, acc) when total_size <= 0, do: acc

defp receive_all_data(client_mod, client, total_size, acc) do
case client_mod.recv(client, @eight_mb_chunks) do
{:ok, data} ->
receive_all_data(client_mod, client, total_size - byte_size(data), acc <> data)

{:error, :closed} when byte_size(acc) == total_size ->
acc

{:error, reason} ->
raise "Failed to receive data: #{inspect(reason)}"
end
end
end