Skip to content

Commit

Permalink
Merge branch 'packager-server'
Browse files Browse the repository at this point in the history
  • Loading branch information
philipgiuliani committed Oct 9, 2024
2 parents 62d617c + 3d37901 commit 55aa6bf
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 204 deletions.
106 changes: 21 additions & 85 deletions lib/membrane/hls/cmaf_sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Membrane.HLS.CMAFSink do
)

def_options(
packager_pid: [
packager: [
spec: pid(),
description: "PID of the packager."
],
Expand All @@ -17,8 +17,7 @@ defmodule Membrane.HLS.CMAFSink do
description: "ID of the track."
],
build_stream: [
spec:
(URI.t(), Membrane.CMAF.Track.t() -> HLS.VariantStream.t() | HLS.AlternativeRendition.t()),
spec: (Membrane.CMAF.Track.t() -> HLS.VariantStream.t() | HLS.AlternativeRendition.t()),
description: "Build the stream with the given stream format"
],
target_segment_duration: [
Expand All @@ -28,7 +27,7 @@ defmodule Membrane.HLS.CMAFSink do

@impl true
def handle_init(_context, opts) do
{[], %{opts: opts, upload_tasks: %{}}}
{[], %{opts: opts}}
end

def handle_stream_format(:input, format, _ctx, state) do
Expand All @@ -38,94 +37,31 @@ defmodule Membrane.HLS.CMAFSink do
Membrane.Time.as_seconds(state.opts.target_segment_duration, :exact)
|> Ratio.ceil()

Agent.update(
state.opts.packager_pid,
fn packager ->
packager =
if Packager.has_track?(packager, track_id) do
# Packager.discontinue_track(packager, track_id)
packager
else
uri = Packager.new_variant_uri(packager, track_id)

Packager.add_track(
packager,
track_id,
codecs: Membrane.HLS.serialize_codecs(format.codecs),
stream: state.opts.build_stream.(uri, format),
segment_extension: ".m4s",
target_segment_duration: target_segment_duration
)
end

Packager.put_init_section(packager, track_id, format.header)
end,
:infinity
)
if Packager.has_track?(state.opts.packager, track_id) do
# TODO: Render this configurable
# Packager.discontinue_track(packager, track_id)
else
Packager.add_track(
state.opts.packager,
track_id,
codecs: Membrane.HLS.serialize_codecs(format.codecs),
stream: state.opts.build_stream.(format),
segment_extension: ".m4s",
target_segment_duration: target_segment_duration
)
end

{[], state}
end

def handle_buffer(:input, buffer, _ctx, state) do
{job_ref, upload_fun} =
Agent.get_and_update(
state.opts.packager_pid,
fn packager ->
{packager, {ref, upload_fun}} =
Packager.put_segment_async(
packager,
state.opts.track_id,
buffer.payload,
Membrane.Time.as_seconds(buffer.metadata.duration) |> Ratio.to_float()
)

{{ref, upload_fun}, packager}
end,
:infinity
)

task = Task.async(upload_fun)

{[], put_in(state, [:upload_tasks, task.ref], %{job_ref: job_ref, task: task})}
end

def handle_info({task_ref, :ok}, _ctx, state) do
Process.demonitor(task_ref, [:flush])

{data, state} = pop_in(state, [:upload_tasks, task_ref])

Agent.update(
state.opts.packager_pid,
fn packager ->
Packager.ack_segment(packager, state.opts.track_id, data.job_ref)
end,
:infinity
Packager.put_segment(
state.opts.packager,
state.opts.track_id,
buffer.payload,
Membrane.Time.as_seconds(buffer.metadata.duration) |> Ratio.to_float()
)

{[], state}
end

def handle_info({:DOWN, _ref, _, _, reason}, _ctx, state) do
raise "Cannot write segment of track #{state.track_id} with reason: #{inspect(reason)}."
{[], state}
end

def handle_end_of_stream(:input, _ctx, state) do
state.upload_tasks
|> Map.values()
|> Enum.map(& &1.task)
|> Task.await_many(:infinity)

Agent.update(
state.opts.packager_pid,
fn packager ->
Enum.reduce(state.upload_tasks, packager, fn {_task_ref, data}, packager ->
Packager.ack_segment(packager, state.opts.track_id, data.job_ref)
end)
end,
:infinity
)

{[], %{state | upload_tasks: %{}}}
end
end
34 changes: 12 additions & 22 deletions lib/membrane/hls/sink_bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ defmodule Membrane.HLS.SinkBin do
require Membrane.Logger

def_options(
packager_pid: [
packager: [
spec: pid(),
description: """
PID of a `HLS.Packager` which must be wrapped in an Agent (for now).
PID of a `HLS.Packager`.
"""
],
target_segment_duration: [
Expand Down Expand Up @@ -52,7 +52,7 @@ defmodule Membrane.HLS.SinkBin do
],
build_stream: [
spec:
(URI.t(), Membrane.CMAF.Track.t() ->
(Membrane.CMAF.Track.t() ->
HLS.VariantStream.t() | HLS.AlternativeRendition.t()),
description: "Build either a `HLS.VariantStream` or a `HLS.AlternativeRendition`."
],
Expand Down Expand Up @@ -112,7 +112,7 @@ defmodule Membrane.HLS.SinkBin do
})
|> via_out(Pad.ref(:output), options: [tracks: [track_id]])
|> child({:sink, track_id}, %Membrane.HLS.CMAFSink{
packager_pid: state.opts.packager_pid,
packager: state.opts.packager,
track_id: track_id,
target_segment_duration: state.opts.target_segment_duration,
build_stream: pad_opts.build_stream
Expand All @@ -133,7 +133,7 @@ defmodule Membrane.HLS.SinkBin do
segment_min_duration: pad_opts.segment_duration
})
|> child({:sink, track_id}, %Membrane.HLS.CMAFSink{
packager_pid: state.opts.packager_pid,
packager: state.opts.packager,
track_id: track_id,
target_segment_duration: state.opts.target_segment_duration,
build_stream: pad_opts.build_stream
Expand All @@ -159,7 +159,7 @@ defmodule Membrane.HLS.SinkBin do
]
})
|> child({:sink, track_id}, %Membrane.HLS.WebVTTSink{
packager_pid: state.opts.packager_pid,
packager: state.opts.packager,
track_id: track_id,
target_segment_duration: state.opts.target_segment_duration,
build_stream: pad_opts.build_stream
Expand All @@ -178,7 +178,7 @@ defmodule Membrane.HLS.SinkBin do
|> put_in([:live_state], %{stop: true})
|> put_in([:ended_sinks], ended_sinks)

if state.flush, do: Agent.update(state.opts.packager_pid, &Packager.flush/1, :infinity)
if state.flush, do: Packager.flush(state.opts.packager)

{[notify_parent: :end_of_stream], state}
else
Expand All @@ -193,7 +193,7 @@ defmodule Membrane.HLS.SinkBin do
@impl true
def handle_parent_notification(:flush, ctx, state) do
if not state.flush and all_streams_ended?(ctx, state.ended_sinks) do
Agent.update(state.opts.packager_pid, &Packager.flush/1, :infinity)
Packager.flush(state.opts.packager)
{[notify_parent: :end_of_stream], %{state | flush: true}}
else
{[], %{state | flush: true}}
Expand All @@ -210,13 +210,7 @@ defmodule Membrane.HLS.SinkBin do
"Packager: syncing playlists up to #{state.live_state.next_sync_point}s"
)

Agent.update(
state.opts.packager_pid,
fn p ->
Packager.sync(p, state.live_state.next_sync_point)
end,
:infinity
)
Packager.sync(state.opts.packager, state.live_state.next_sync_point)

{[], live_schedule_next_sync(state)}
end
Expand Down Expand Up @@ -246,13 +240,9 @@ defmodule Membrane.HLS.SinkBin do
defp live_init_state(state) do
# Tells where in the playlist we should start issuing segments.
next_sync_point =
Agent.get(
state.opts.packager_pid,
&Packager.next_sync_point(
&1,
Membrane.Time.as_seconds(state.opts.target_segment_duration, :round)
),
:infinity
Packager.next_sync_point(
state.opts.packager,
Membrane.Time.as_seconds(state.opts.target_segment_duration, :round)
)

{:live, safety_delay} = state.opts.mode
Expand Down
99 changes: 19 additions & 80 deletions lib/membrane/hls/webvtt_sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Membrane.HLS.WebVTTSink do
)

def_options(
packager_pid: [
packager: [
spec: pid(),
description: "PID of the packager."
],
Expand All @@ -17,8 +17,7 @@ defmodule Membrane.HLS.WebVTTSink do
description: "ID of the track."
],
build_stream: [
spec:
(URI.t(), Membrane.CMAF.Track.t() -> HLS.VariantStream.t() | HLS.AlternativeRendition.t()),
spec: (Membrane.CMAF.Track.t() -> HLS.VariantStream.t() | HLS.AlternativeRendition.t()),
description: "Build the stream with the given stream format"
],
target_segment_duration: [
Expand All @@ -38,90 +37,30 @@ defmodule Membrane.HLS.WebVTTSink do
Membrane.Time.as_seconds(state.opts.target_segment_duration, :exact)
|> Ratio.ceil()

Agent.update(
state.opts.packager_pid,
fn packager ->
if Packager.has_track?(packager, track_id) do
# Packager.discontinue_track(packager, track_id)
packager
else
uri = Packager.new_variant_uri(packager, track_id)

Packager.add_track(
packager,
track_id,
stream: state.opts.build_stream.(uri, format),
segment_extension: ".vtt",
target_segment_duration: target_segment_duration
)
end
end,
:infinity
)
if Packager.has_track?(state.opts.packager, track_id) do
# TODO: Render this configurable
# Packager.discontinue_track(packager, track_id)
else
Packager.add_track(
state.opts.packager,
track_id,
stream: state.opts.build_stream.(format),
segment_extension: ".vtt",
target_segment_duration: target_segment_duration
)
end

{[], state}
end

def handle_buffer(:input, buffer, _ctx, state) do
{job_ref, upload_fun} =
Agent.get_and_update(
state.opts.packager_pid,
fn packager ->
{packager, {ref, upload_fun}} =
Packager.put_segment_async(
packager,
state.opts.track_id,
buffer.payload,
Membrane.Time.as_seconds(buffer.metadata.duration) |> Ratio.to_float()
)

{{ref, upload_fun}, packager}
end,
:infinity
)

task = Task.async(upload_fun)

{[], put_in(state, [:upload_tasks, task.ref], %{job_ref: job_ref, task: task})}
end

def handle_info({task_ref, :ok}, _ctx, state) do
Process.demonitor(task_ref, [:flush])

{data, state} = pop_in(state, [:upload_tasks, task_ref])

Agent.update(
state.opts.packager_pid,
fn packager ->
Packager.ack_segment(packager, state.opts.track_id, data.job_ref)
end,
:infinity
Packager.put_segment(
state.opts.packager,
state.opts.track_id,
buffer.payload,
Membrane.Time.as_seconds(buffer.metadata.duration) |> Ratio.to_float()
)

{[], state}
end

def handle_info({:DOWN, _ref, _, _, reason}, _ctx, state) do
raise "Cannot write segment of track #{state.track_id} with reason: #{inspect(reason)}."
{[], state}
end

def handle_end_of_stream(:input, _ctx, state) do
state.upload_tasks
|> Map.values()
|> Enum.map(& &1.task)
|> Task.await_many(:infinity)

Agent.update(
state.opts.packager_pid,
fn packager ->
Enum.reduce(state.upload_tasks, packager, fn {_task_ref, data}, packager ->
Packager.ack_segment(packager, state.opts.track_id, data.job_ref)
end)
end,
:infinity
)

{[], %{state | upload_tasks: %{}}}
end
end
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"bunch": {:hex, :bunch, "1.6.1", "5393d827a64d5f846092703441ea50e65bc09f37fd8e320878f13e63d410aec7", [:mix], [], "hexpm", "286cc3add551628b30605efbe2fca4e38cc1bea89bcd0a1a7226920b3364fe4a"},
"coerce": {:hex, :coerce, "1.0.1", "211c27386315dc2894ac11bc1f413a0e38505d808153367bd5c6e75a4003d096", [:mix], [], "hexpm", "b44a691700f7a1a15b4b7e2ff1fa30bebd669929ac8aa43cffe9e2f8bf051cf1"},
"heap": {:hex, :heap, "2.0.2", "d98cb178286cfeb5edbcf17785e2d20af73ca57b5a2cf4af584118afbcf917eb", [:mix], [], "hexpm", "ba9ea2fe99eb4bcbd9a8a28eaf71cbcac449ca1d8e71731596aace9028c9d429"},
"kim_hls": {:git, "https://github.com/kim-company/kim_hls.git", "3afc81306caaebeda30294bd14456abfffcd954c", []},
"kim_hls": {:git, "https://github.com/kim-company/kim_hls.git", "c06f4de542da94fc2079dbefb7e13ae584ebe13c", []},
"kim_q": {:hex, :kim_q, "1.0.0", "17cfc45e9f7e65485f0f31bbf09893d6ff35cc2fbefc39aed146a3c29740584e", [:mix], [{:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7a8ee76a2c2e774c34345df3c7a234a8effeedc3f3aea845feb7c09030097278"},
"kim_subtitle": {:git, "https://github.com/kim-company/kim_subtitle.git", "8239e1bcea938167829a6b8bd2a9678c63c7bdd4", []},
"logger_backends": {:hex, :logger_backends, "1.0.0", "09c4fad6202e08cb0fbd37f328282f16539aca380f512523ce9472b28edc6bdf", [:mix], [], "hexpm", "1faceb3e7ec3ef66a8f5746c5afd020e63996df6fd4eb8cdb789e5665ae6c9ce"},
Expand Down
Loading

0 comments on commit 55aa6bf

Please sign in to comment.