-
Notifications
You must be signed in to change notification settings - Fork 1
feat(sse): wire @defer incremental delivery into Absinthe.Plug #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: gigmart/defer-stream-incremental
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -305,6 +305,10 @@ defmodule Absinthe.Plug do | |
| conn | ||
| |> subscribe(topic, config) | ||
|
|
||
| {:ok, :streaming} -> | ||
| # Already sent via SSE chunked transport | ||
| conn | ||
|
|
||
| {:ok, %{data: _} = result} -> | ||
| conn | ||
| |> encode(200, result, config) | ||
|
|
@@ -390,7 +394,7 @@ defmodule Absinthe.Plug do | |
| conn | ||
| after | ||
| 30_000 -> | ||
| case chunk(conn, ": keep-alive\n\n") do | ||
| case chunk(conn, ":ping\n\n") do | ||
| {:ok, conn} -> | ||
| subscribe_loop(conn, topic, config) | ||
|
|
||
|
|
@@ -542,6 +546,16 @@ defmodule Absinthe.Plug do | |
| Request.Query.add_pipeline(query, conn_info, config) | ||
|
|
||
| case Absinthe.Pipeline.run(document, pipeline) do | ||
| {:ok, %{execution: %{incremental_delivery: true}} = bp, _} -> | ||
| conn = apply_before_send(conn, [bp], config) | ||
|
|
||
| if accepts_sse?(conn) do | ||
| conn = deliver_incremental_sse(conn, bp) | ||
| {conn, {:ok, :streaming}} | ||
| else | ||
| {conn, {:ok, bp.result}} | ||
| end | ||
|
|
||
| {:ok, %{result: result} = bp, _} -> | ||
| conn = apply_before_send(conn, [bp], config) | ||
| {conn, {:ok, result}} | ||
|
|
@@ -551,6 +565,115 @@ defmodule Absinthe.Plug do | |
| end | ||
| end | ||
|
|
||
| defp accepts_sse?(conn) do | ||
| conn | ||
| |> Plug.Conn.get_req_header("accept") | ||
| |> Enum.any?(&String.contains?(&1, "text/event-stream")) | ||
| end | ||
|
|
||
| defp deliver_incremental_sse(conn, blueprint) do | ||
| alias Absinthe.Plug.Incremental.SSE.EventFormatter | ||
|
|
||
| streaming_ctx = get_in(blueprint.execution.context, [:__streaming__]) | ||
| defer_info = streaming_ctx[:defer_info] || [] | ||
| full_data = blueprint.result[:data] || %{} | ||
|
|
||
| # Split the resolved result: strip deferred fields from initial, collect for incremental | ||
| {initial_data, deferred_tasks} = | ||
| defer_info | ||
| |> Enum.with_index() | ||
| |> Enum.reduce({full_data, []}, fn {info, idx}, {data, tasks} -> | ||
| parent_obj = get_nested(data, info.parent_path) || %{} | ||
|
|
||
| # Extract deferred fields from parent | ||
| fragment_data = | ||
| Enum.reduce(info.field_names, %{}, fn name, acc -> | ||
| case Map.fetch(parent_obj, name) do | ||
| {:ok, value} -> Map.put(acc, name, value) | ||
| :error -> acc | ||
| end | ||
| end) | ||
|
|
||
| # Strip deferred fields from initial | ||
| stripped = Enum.reduce(info.field_names, parent_obj, &Map.delete(&2, &1)) | ||
| updated_data = put_nested(data, info.parent_path, stripped) | ||
|
|
||
| task = %{id: to_string(idx), label: info.label, path: info.parent_path, data: fragment_data} | ||
| {updated_data, tasks ++ [task]} | ||
| end) | ||
|
|
||
| has_next = not Enum.empty?(deferred_tasks) | ||
|
|
||
| # Build initial payload with deferred fields stripped | ||
| initial_response = | ||
| %{data: initial_data} | ||
| |> Map.put(:hasNext, has_next) | ||
| |> then(fn r -> | ||
| if has_next do | ||
| pending = Enum.map(deferred_tasks, fn t -> | ||
| entry = %{id: t.id, path: t.path} | ||
| if t.label, do: Map.put(entry, :label, t.label), else: entry | ||
| end) | ||
| Map.put(r, :pending, pending) | ||
| else | ||
| r | ||
| end | ||
| end) | ||
|
|
||
| # Setup SSE connection — clear before_send callbacks (ETag etc.) | ||
| conn = | ||
| conn | ||
| |> Plug.Conn.put_resp_header("content-type", "text/event-stream") | ||
| |> Plug.Conn.put_resp_header("cache-control", "no-cache") | ||
| |> Plug.Conn.put_resp_header("x-accel-buffering", "no") | ||
| |> Map.update(:private, %{}, &Map.put(&1, :before_send, [])) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Clearing before_send on wrong conn fieldHigh Severity The code clears Reviewed by Cursor Bugbot for commit dd6f8ff. Configure here. |
||
| |> Plug.Conn.send_chunked(200) | ||
|
|
||
| # Send initial event | ||
| initial_event = EventFormatter.format_event("next", initial_response, 0) | ||
|
|
||
| conn = | ||
| case Plug.Conn.chunk(conn, initial_event) do | ||
| {:ok, conn} -> conn | ||
| {:error, _} -> conn | ||
| end | ||
|
|
||
| # Send incremental events — data already resolved, just deliver | ||
| {conn, _event_id} = | ||
| deferred_tasks | ||
| |> Enum.with_index() | ||
| |> Enum.reduce({conn, 1}, fn {task, idx}, {conn, event_id} -> | ||
| incremental_payload = %{ | ||
| incremental: [%{id: task.id, data: task.data, path: task.path}], | ||
| hasNext: idx < length(deferred_tasks) - 1 | ||
| } | ||
|
|
||
| event = EventFormatter.format_event("next", incremental_payload, event_id) | ||
|
|
||
| case Plug.Conn.chunk(conn, event) do | ||
| {:ok, conn} -> {conn, event_id + 1} | ||
| {:error, _} -> {conn, event_id + 1} | ||
| end | ||
| end) | ||
|
|
||
| # Send complete event | ||
| complete_event = EventFormatter.format_event("complete", %{}, 999) | ||
| case Plug.Conn.chunk(conn, complete_event) do | ||
| {:ok, conn} -> conn | ||
| {:error, _} -> conn | ||
| end | ||
| end | ||
|
|
||
| defp get_nested(data, []), do: data | ||
| defp get_nested(data, [key | rest]) when is_map(data), do: get_nested(Map.get(data, key), rest) | ||
| defp get_nested(_, _), do: nil | ||
|
|
||
| defp put_nested(_data, [], value), do: value | ||
| defp put_nested(data, [key | rest], value) when is_map(data) do | ||
| Map.put(data, key, put_nested(Map.get(data, key, %{}), rest, value)) | ||
| end | ||
| defp put_nested(data, _, _), do: data | ||
|
|
||
| # | ||
| # PIPELINE | ||
| # | ||
|
|
||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SSE triggers for wildcard Accept breaking standard clients
High Severity
accepts_sse?returnstrueforAccept: */*, which is the default header for most HTTP clients (curl, fetch, Postman, etc.). This means nearly all@deferqueries throughAbsinthe.Plugwill get SSE chunked responses instead of standard JSON, contradicting the PR description which states SSE is forAccept: text/event-streamspecifically. Standard GraphQL clients expecting JSON will receive an unexpectedtext/event-streamresponse.Reviewed by Cursor Bugbot for commit dd6f8ff. Configure here.