diff --git a/lib/absinthe/plug.ex b/lib/absinthe/plug.ex index 0461abe..4258c0d 100644 --- a/lib/absinthe/plug.ex +++ b/lib/absinthe/plug.ex @@ -305,6 +305,10 @@ defmodule Absinthe.Plug do conn |> subscribe(topic, config) + {:ok, :streaming} -> + # Already sent via SSE chunked transport + conn + {:ok, %{data: _} = result} -> conn |> encode(200, result, config) @@ -390,7 +394,7 @@ defmodule Absinthe.Plug do conn after 30_000 -> - case chunk(conn, ": keep-alive\n\n") do + case chunk(conn, ":ping\n\n") do {:ok, conn} -> subscribe_loop(conn, topic, config) @@ -542,6 +546,16 @@ defmodule Absinthe.Plug do Request.Query.add_pipeline(query, conn_info, config) case Absinthe.Pipeline.run(document, pipeline) do + {:ok, %{execution: %{incremental_delivery: true}} = bp, _} -> + conn = apply_before_send(conn, [bp], config) + + if accepts_sse?(conn) do + conn = deliver_incremental_sse(conn, bp) + {conn, {:ok, :streaming}} + else + {conn, {:ok, bp.result}} + end + {:ok, %{result: result} = bp, _} -> conn = apply_before_send(conn, [bp], config) {conn, {:ok, result}} @@ -551,6 +565,115 @@ defmodule Absinthe.Plug do end end + defp accepts_sse?(conn) do + conn + |> Plug.Conn.get_req_header("accept") + |> Enum.any?(&String.contains?(&1, "text/event-stream")) + end + + defp deliver_incremental_sse(conn, blueprint) do + alias Absinthe.Plug.Incremental.SSE.EventFormatter + + streaming_ctx = get_in(blueprint.execution.context, [:__streaming__]) + defer_info = streaming_ctx[:defer_info] || [] + full_data = blueprint.result[:data] || %{} + + # Split the resolved result: strip deferred fields from initial, collect for incremental + {initial_data, deferred_tasks} = + defer_info + |> Enum.with_index() + |> Enum.reduce({full_data, []}, fn {info, idx}, {data, tasks} -> + parent_obj = get_nested(data, info.parent_path) || %{} + + # Extract deferred fields from parent + fragment_data = + Enum.reduce(info.field_names, %{}, fn name, acc -> + case Map.fetch(parent_obj, name) do + {:ok, value} -> Map.put(acc, name, value) + :error -> acc + end + end) + + # Strip deferred fields from initial + stripped = Enum.reduce(info.field_names, parent_obj, &Map.delete(&2, &1)) + updated_data = put_nested(data, info.parent_path, stripped) + + task = %{id: to_string(idx), label: info.label, path: info.parent_path, data: fragment_data} + {updated_data, tasks ++ [task]} + end) + + has_next = not Enum.empty?(deferred_tasks) + + # Build initial payload with deferred fields stripped + initial_response = + %{data: initial_data} + |> Map.put(:hasNext, has_next) + |> then(fn r -> + if has_next do + pending = Enum.map(deferred_tasks, fn t -> + entry = %{id: t.id, path: t.path} + if t.label, do: Map.put(entry, :label, t.label), else: entry + end) + Map.put(r, :pending, pending) + else + r + end + end) + + # Setup SSE connection — clear before_send callbacks (ETag etc.) + conn = + conn + |> Plug.Conn.put_resp_header("content-type", "text/event-stream") + |> Plug.Conn.put_resp_header("cache-control", "no-cache") + |> Plug.Conn.put_resp_header("x-accel-buffering", "no") + |> Map.update(:private, %{}, &Map.put(&1, :before_send, [])) + |> Plug.Conn.send_chunked(200) + + # Send initial event + initial_event = EventFormatter.format_event("next", initial_response, 0) + + conn = + case Plug.Conn.chunk(conn, initial_event) do + {:ok, conn} -> conn + {:error, _} -> conn + end + + # Send incremental events — data already resolved, just deliver + {conn, _event_id} = + deferred_tasks + |> Enum.with_index() + |> Enum.reduce({conn, 1}, fn {task, idx}, {conn, event_id} -> + incremental_payload = %{ + incremental: [%{id: task.id, data: task.data, path: task.path}], + hasNext: idx < length(deferred_tasks) - 1 + } + + event = EventFormatter.format_event("next", incremental_payload, event_id) + + case Plug.Conn.chunk(conn, event) do + {:ok, conn} -> {conn, event_id + 1} + {:error, _} -> {conn, event_id + 1} + end + end) + + # Send complete event + complete_event = EventFormatter.format_event("complete", %{}, 999) + case Plug.Conn.chunk(conn, complete_event) do + {:ok, conn} -> conn + {:error, _} -> conn + end + end + + defp get_nested(data, []), do: data + defp get_nested(data, [key | rest]) when is_map(data), do: get_nested(Map.get(data, key), rest) + defp get_nested(_, _), do: nil + + defp put_nested(_data, [], value), do: value + defp put_nested(data, [key | rest], value) when is_map(data) do + Map.put(data, key, put_nested(Map.get(data, key, %{}), rest, value)) + end + defp put_nested(data, _, _), do: data + # # PIPELINE #