From 499da0123764f72db591fd6eb2a025e8389c9bd6 Mon Sep 17 00:00:00 2001 From: noarkhh Date: Wed, 27 Nov 2024 16:22:38 +0100 Subject: [PATCH 1/6] Use RTP demuxers instead of the SessionBin --- lib/membrane_rtsp_plugin/source.ex | 86 +++++++++++++------ .../source/connection_manager.ex | 2 +- 2 files changed, 59 insertions(+), 29 deletions(-) diff --git a/lib/membrane_rtsp_plugin/source.ex b/lib/membrane_rtsp_plugin/source.ex index 1508a99..9fbf3fb 100644 --- a/lib/membrane_rtsp_plugin/source.ex +++ b/lib/membrane_rtsp_plugin/source.ex @@ -103,7 +103,8 @@ defmodule Membrane.RTSP.Source do rtsp_session: Membrane.RTSP.t() | nil, keep_alive_timer: reference() | nil, on_connection_closed: :raise_error | :send_eos, - end_of_stream: boolean() + end_of_stream: boolean(), + play_request_sent: boolean() } @enforce_keys [ @@ -120,7 +121,8 @@ defmodule Membrane.RTSP.Source do ssrc_to_track: %{}, rtsp_session: nil, keep_alive_timer: nil, - end_of_stream: false + end_of_stream: false, + play_request_sent: false ] end @@ -140,7 +142,7 @@ defmodule Membrane.RTSP.Source do end @impl true - def handle_child_playing(:rtp_session, _ctx, state) do + def handle_child_playing(_child, _ctx, %State{play_request_sent: false} = state) do {[], ConnectionManager.play(state)} end @@ -152,11 +154,19 @@ defmodule Membrane.RTSP.Source do @impl true def handle_child_notification( {:new_rtp_stream, ssrc, pt, _extensions}, - :rtp_session, + rtp_demuxer_name, _ctx, state ) do - case Enum.find(state.tracks, fn track -> track.rtpmap.payload_type == pt end) do + raise "jajco" + + matching_function = + case rtp_demuxer_name do + {:rtp_demuxer, control_path} -> fn track -> track.control_path == control_path end + :rtp_demuxer -> fn track -> track.rtpmap.payload_type == pt end + end + + case Enum.find(state.tracks, matching_function) do nil -> raise "No track of payload type #{inspect(pt)} has been requested with SETUP" @@ -219,9 +229,16 @@ defmodule Membrane.RTSP.Source do def handle_pad_added(Pad.ref(:output, ssrc) = pad, _ctx, state) do track = Map.fetch!(state.ssrc_to_track, ssrc) + demuxer_name = + case state.transport do + :tcp -> :rtp_demuxer + {:udp, _port_range_start, _port_range_end} -> {:rtp_demuxer, track.control_path} + end + spec = - get_child(:rtp_session) - |> via_out(Pad.ref(:output, ssrc), options: [depayloader: get_rtp_depayloader(track)]) + get_child(demuxer_name) + |> via_out(Pad.ref(:output, ssrc)) + |> depayloader(track) |> parser(track) |> bin_output(pad) @@ -254,9 +271,8 @@ defmodule Membrane.RTSP.Source do local_socket: socket, on_connection_closed: state.on_connection_closed }) - |> child(:tcp_depayloader, %RTSP.TCP.Decapsulator{rtsp_session: state.rtsp_session}) - |> via_in(Pad.ref(:rtp_input, make_ref())) - |> child(:rtp_session, %Membrane.RTP.SessionBin{fmt_mapping: fmt_mapping}) + |> child(:tcp_decapsulator, %RTSP.TCP.Decapsulator{rtsp_session: state.rtsp_session}) + |> child(:rtp_demuxer, Membrane.RTP.Demuxer) {:udp, _port_range_start, _port_range_end} -> [ @@ -265,35 +281,49 @@ defmodule Membrane.RTSP.Source do {:udp, rtp_port, rtcp_port} = track.transport [ - child({:udp_source, make_ref()}, %Membrane.UDP.Source{local_port_no: rtp_port}) - |> via_in(Pad.ref(:rtp_input, make_ref())) - |> get_child(:rtp_session), - child({:udp_source, make_ref()}, %Membrane.UDP.Source{local_port_no: rtcp_port}) - |> via_in(Pad.ref(:rtp_input, make_ref())) - |> get_child(:rtp_session) + child({:udp_source, rtp_port}, %Membrane.UDP.Source{local_port_no: rtp_port}) + |> child({:rtp_demuxer, track.control_path}, Membrane.RTP.Demuxer), + child({:udp_source, rtcp_port}, %Membrane.UDP.Source{local_port_no: rtcp_port}) + |> child({:rtcp_demuxer, track.control_path}, Membrane.RTP.Demuxer) ] end) ] end end - @spec get_rtp_depayloader(ConnectionManager.track()) :: module() | nil - defp get_rtp_depayloader(%{rtpmap: %{encoding: "H264"}}), do: Membrane.RTP.H264.Depayloader - defp get_rtp_depayloader(%{rtpmap: %{encoding: "H265"}}), do: Membrane.RTP.H265.Depayloader - defp get_rtp_depayloader(%{rtpmap: %{encoding: "opus"}}), do: Membrane.RTP.Opus.Depayloader + @spec depayloader(ChildrenSpec.builder(), ConnectionManager.track()) :: ChildrenSpec.builder() + defp depayloader(builder, track) do + depayloader_definition = + case track do + %{rtpmap: %{encoding: "H264"}} -> + Membrane.RTP.H264.Depayloader + + %{rtpmap: %{encoding: "H265"}} -> + Membrane.RTP.H265.Depayloader - defp get_rtp_depayloader(%{type: :audio, rtpmap: %{encoding: "mpeg4-generic"}} = track) do - mode = - case track.fmtp do - %{mode: :AAC_hbr} -> :hbr - %{mode: :AAC_lbr} -> :lbr + %{rtpmap: %{encoding: "opus"}} -> + Membrane.RTP.Opus.Depayloader + + %{type: :audio, rtpmap: %{encoding: "mpeg4-generic"}} -> + mode = + case track.fmtp do + %{mode: :AAC_hbr} -> :hbr + %{mode: :AAC_lbr} -> :lbr + end + + %Membrane.RTP.AAC.Depayloader{mode: mode} + + %{rtpmap: %{encoding: _other}} -> + nil end - %Membrane.RTP.AAC.Depayloader{mode: mode} + if depayloader_definition != nil do + child(builder, {:depayloader, make_ref()}, depayloader_definition) + else + builder + end end - defp get_rtp_depayloader(%{rtpmap: %{encoding: _other}}), do: nil - @spec parser(ChildrenSpec.builder(), ConnectionManager.track()) :: ChildrenSpec.builder() defp parser(link_builder, %{rtpmap: %{encoding: "H264"}} = track) do sps = track.fmtp.sprop_parameter_sets && track.fmtp.sprop_parameter_sets.sps diff --git a/lib/membrane_rtsp_plugin/source/connection_manager.ex b/lib/membrane_rtsp_plugin/source/connection_manager.ex index 64c9610..13d3151 100644 --- a/lib/membrane_rtsp_plugin/source/connection_manager.ex +++ b/lib/membrane_rtsp_plugin/source/connection_manager.ex @@ -52,7 +52,7 @@ defmodule Membrane.RTSP.Source.ConnectionManager do case RTSP.play(state.rtsp_session) do {:ok, %{status: 200}} -> - %{state | keep_alive_timer: start_keep_alive_timer(state)} + %{state | keep_alive_timer: start_keep_alive_timer(state), play_request_sent: true} _error -> handle_rtsp_error(:play_rtsp_failed, state) From d8cdf1bec6c111232e7a4d50a6ccd41829bdecd1 Mon Sep 17 00:00:00 2001 From: noarkhh Date: Wed, 22 Jan 2025 16:17:57 +0100 Subject: [PATCH 2/6] Change pad linking --- lib/membrane_rtsp_plugin/source.ex | 63 +++++-------------- test/membrane_rtsp_plugin/source_test.exs | 76 +++++------------------ 2 files changed, 30 insertions(+), 109 deletions(-) diff --git a/lib/membrane_rtsp_plugin/source.ex b/lib/membrane_rtsp_plugin/source.ex index 9fbf3fb..a1570e9 100644 --- a/lib/membrane_rtsp_plugin/source.ex +++ b/lib/membrane_rtsp_plugin/source.ex @@ -99,7 +99,6 @@ defmodule Membrane.RTSP.Source do timeout: Time.t(), keep_alive_interval: Time.t(), tracks: [ConnectionManager.track()], - ssrc_to_track: %{non_neg_integer() => ConnectionManager.track()}, rtsp_session: Membrane.RTSP.t() | nil, keep_alive_timer: reference() | nil, on_connection_closed: :raise_error | :send_eos, @@ -151,33 +150,6 @@ defmodule Membrane.RTSP.Source do {[], state} end - @impl true - def handle_child_notification( - {:new_rtp_stream, ssrc, pt, _extensions}, - rtp_demuxer_name, - _ctx, - state - ) do - raise "jajco" - - matching_function = - case rtp_demuxer_name do - {:rtp_demuxer, control_path} -> fn track -> track.control_path == control_path end - :rtp_demuxer -> fn track -> track.rtpmap.payload_type == pt end - end - - case Enum.find(state.tracks, matching_function) do - nil -> - raise "No track of payload type #{inspect(pt)} has been requested with SETUP" - - track -> - ssrc_to_track = Map.put(state.ssrc_to_track, ssrc, track) - - {[notify_parent: {:new_track, ssrc, Map.delete(track, :transport)}], - %{state | ssrc_to_track: ssrc_to_track}} - end - end - @impl true def handle_child_notification({:request_socket_control, _socket, pid}, :tcp_source, _ctx, state) do RTSP.transfer_socket_control(state.rtsp_session, pid) @@ -185,7 +157,8 @@ defmodule Membrane.RTSP.Source do end @impl true - def handle_child_notification(_notification, _element, _ctx, state) do + def handle_child_notification(notification, _element, _ctx, state) do + Membrane.Logger.warning("Ignoring child notification: #{inspect(notification)}") {[], state} end @@ -226,8 +199,8 @@ defmodule Membrane.RTSP.Source do end @impl true - def handle_pad_added(Pad.ref(:output, ssrc) = pad, _ctx, state) do - track = Map.fetch!(state.ssrc_to_track, ssrc) + def handle_pad_added(Pad.ref(:output, control_path) = pad, _ctx, state) do + track = Enum.find(state.tracks, &(&1.control_path == control_path)) demuxer_name = case state.transport do @@ -237,7 +210,7 @@ defmodule Membrane.RTSP.Source do spec = get_child(demuxer_name) - |> via_out(Pad.ref(:output, ssrc)) + |> via_out(:output, options: [stream_id: {:payload_type, track.rtpmap.payload_type}]) |> depayloader(track) |> parser(track) |> bin_output(pad) @@ -257,11 +230,6 @@ defmodule Membrane.RTSP.Source do @spec create_sources_spec(State.t()) :: Membrane.ChildrenSpec.t() defp create_sources_spec(state) do - fmt_mapping = - Map.new(state.tracks, fn %{rtpmap: rtpmap} -> - {rtpmap.payload_type, {String.to_atom(rtpmap.encoding), rtpmap.clock_rate}} - end) - case state.transport do :tcp -> {:tcp, socket} = List.first(state.tracks).transport @@ -276,17 +244,16 @@ defmodule Membrane.RTSP.Source do {:udp, _port_range_start, _port_range_end} -> [ - child(:rtp_session, %Membrane.RTP.SessionBin{fmt_mapping: fmt_mapping}) - | Enum.flat_map(state.tracks, fn track -> - {:udp, rtp_port, rtcp_port} = track.transport - - [ - child({:udp_source, rtp_port}, %Membrane.UDP.Source{local_port_no: rtp_port}) - |> child({:rtp_demuxer, track.control_path}, Membrane.RTP.Demuxer), - child({:udp_source, rtcp_port}, %Membrane.UDP.Source{local_port_no: rtcp_port}) - |> child({:rtcp_demuxer, track.control_path}, Membrane.RTP.Demuxer) - ] - end) + Enum.flat_map(state.tracks, fn track -> + {:udp, rtp_port, rtcp_port} = track.transport + + [ + child({:udp_source, rtp_port}, %Membrane.UDP.Source{local_port_no: rtp_port}) + |> child({:rtp_demuxer, track.control_path}, Membrane.RTP.Demuxer), + child({:udp_source, rtcp_port}, %Membrane.UDP.Source{local_port_no: rtcp_port}) + |> child({:rtcp_demuxer, track.control_path}, Membrane.RTP.Demuxer) + ] + end) ] end end diff --git a/test/membrane_rtsp_plugin/source_test.exs b/test/membrane_rtsp_plugin/source_test.exs index ec838d4..310f32b 100644 --- a/test/membrane_rtsp_plugin/source_test.exs +++ b/test/membrane_rtsp_plugin/source_test.exs @@ -34,20 +34,22 @@ defmodule Membrane.RTSP.SourceTest do end @impl true - def handle_child_notification({:new_track, ssrc, track}, _element, _ctx, state) do - file_name = - case track.rtpmap.encoding do - "H264" -> "out.h264" - "H265" -> "out.hevc" - "plain" -> "out.txt" - end - + def handle_child_notification({:set_up_tracks, tracks}, _element, _ctx, state) do spec = - get_child(:source) - |> via_out(Pad.ref(:output, ssrc)) - |> child({:sink, ssrc}, %Membrane.File.Sink{ - location: Path.join(state.dest_folder, file_name) - }) + Enum.map(tracks, fn track -> + file_name = + case track.rtpmap.encoding do + "H264" -> "out.h264" + "H265" -> "out.hevc" + "plain" -> "out.txt" + end + + get_child(:source) + |> via_out(Pad.ref(:output, track.control_path)) + |> child({:sink, track.control_path}, %Membrane.File.Sink{ + location: Path.join(state.dest_folder, file_name) + }) + end) {[spec: spec], state} end @@ -93,24 +95,6 @@ defmodule Membrane.RTSP.SourceTest do %{type: :application, rtpmap: %{encoding: "plain"}} ] = Enum.sort_by(tracks, fn %{rtpmap: %{encoding: encoding}} -> encoding end) - assert_pipeline_notified( - pid, - :source, - {:new_track, _ssrc, %{type: :video, rtpmap: %{encoding: "H264"}}} - ) - - assert_pipeline_notified( - pid, - :source, - {:new_track, _ssrc, %{type: :video, rtpmap: %{encoding: "H265"}}} - ) - - assert_pipeline_notified( - pid, - :source, - {:new_track, _ssrc, %{type: :application, rtpmap: %{encoding: "plain"}}} - ) - assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000) assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000) assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000) @@ -143,18 +127,6 @@ defmodule Membrane.RTSP.SourceTest do pid = Membrane.Testing.Pipeline.start_link_supervised!(options) - assert_pipeline_notified( - pid, - :source, - {:set_up_tracks, [%{type: :application, rtpmap: %{encoding: "plain"}}]} - ) - - assert_pipeline_notified( - pid, - :source, - {:new_track, _ssrc, %{type: :application, rtpmap: %{encoding: "plain"}}} - ) - assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000) :ok = Membrane.Testing.Pipeline.terminate(pid) @@ -192,24 +164,6 @@ defmodule Membrane.RTSP.SourceTest do %{type: :application, rtpmap: %{encoding: "plain"}} ] = Enum.sort_by(tracks, fn %{rtpmap: %{encoding: encoding}} -> encoding end) - assert_pipeline_notified( - pid, - :source, - {:new_track, _ssrc, %{type: :video, rtpmap: %{encoding: "H264"}}} - ) - - assert_pipeline_notified( - pid, - :source, - {:new_track, _ssrc, %{type: :video, rtpmap: %{encoding: "H265"}}} - ) - - assert_pipeline_notified( - pid, - :source, - {:new_track, _ssrc, %{type: :application, rtpmap: %{encoding: "plain"}}} - ) - assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000) assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000) assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000) From ed5bf9b13805d0aba825eb976b19b5e29da7e080 Mon Sep 17 00:00:00 2001 From: noarkhh Date: Wed, 22 Jan 2025 17:14:00 +0100 Subject: [PATCH 3/6] Update moduledoc --- lib/membrane_rtsp_plugin/source.ex | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/membrane_rtsp_plugin/source.ex b/lib/membrane_rtsp_plugin/source.ex index a1570e9..82f0007 100644 --- a/lib/membrane_rtsp_plugin/source.ex +++ b/lib/membrane_rtsp_plugin/source.ex @@ -2,7 +2,7 @@ defmodule Membrane.RTSP.Source do @moduledoc """ Source bin responsible for connecting to an RTSP server. - This element connects to an RTSP server, depayload and parses the received media if possible. + This element connects to an RTSP server, depayloads and parses the received media if possible. If there's no suitable depayloader and parser, the raw payload is sent to the subsequent elements in the pipeline. @@ -15,9 +15,8 @@ defmodule Membrane.RTSP.Source do * `Opus` When the element finishes setting up all tracks it will send a `t:set_up_tracks/0` notification. - Each time a track is parsed and available for further processing the element will send a - `t:new_track/0` notification. An output pad `Pad.ref(:output, ssrc)` should be linked to receive - the data. + To receive a track a corresponding `Pad.ref(:output, control_path)` pad has to be connected, + where each track's control path is provided in the `t:set_up_tracks/0` notification. """ use Membrane.Bin From 3a1ca8c5bbf57d4af0b2a592a3dc290f00e2ca54 Mon Sep 17 00:00:00 2001 From: noarkhh Date: Wed, 22 Jan 2025 17:21:09 +0100 Subject: [PATCH 4/6] Bump version, update readme --- README.md | 23 ++++++++--------------- mix.exs | 2 +- 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index fcb2aca..895a851 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ Add the following line to your deps in `mix.exs`: ```elixir def deps do [ - {:membrane_rtsp_plugin, "~> 0.6.1"} + {:membrane_rtsp_plugin, "~> 0.7.0"} ] end ``` @@ -45,15 +45,13 @@ defmodule RtspPipeline do end @impl true - def handle_child_notification({:new_track, ssrc, _track}, _element, _ctx, state) do - spec = [ - get_child(:source) - |> via_out(Pad.ref(:output, ssrc)) - |> child(:funnel, Membrane.Funnel) - |> child(:sink, , %Membrane.File.Source{ - location: "video.h264" - }) - ] + def handle_child_notification({:set_up_tracks, tracks}, _element, _ctx, state) do + spec = + Enum.map(track, fn track -> + get_child(:source) + |> via_out(Pad.ref(:output, track.control_path)) + |> child(:sink, %Membrane.File.Source{location: "video.h264"}) + end) {[spec: spec], state} end @@ -62,10 +60,5 @@ defmodule RtspPipeline do def handle_child_notification(_message, _element, _ctx, state) do {[], state} end - - @impl true - def handle_child_pad_removed(:source, _pad, _ctx, state) do - {[], state} - end end ``` diff --git a/mix.exs b/mix.exs index 982d732..de8aed8 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.RTSP.Plugin.Mixfile do use Mix.Project - @version "0.6.1" + @version "0.7.0" @github_url "https://github.com/gBillal/membrane_rtsp_plugin" def project do From cce846ff0805d16bed61ccaa1be0b0fb4c058202 Mon Sep 17 00:00:00 2001 From: noarkhh Date: Wed, 22 Jan 2025 18:09:18 +0100 Subject: [PATCH 5/6] Remove new_track notification --- lib/membrane_rtsp_plugin/source.ex | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/membrane_rtsp_plugin/source.ex b/lib/membrane_rtsp_plugin/source.ex index 82f0007..2b2d0e2 100644 --- a/lib/membrane_rtsp_plugin/source.ex +++ b/lib/membrane_rtsp_plugin/source.ex @@ -28,7 +28,6 @@ defmodule Membrane.RTSP.Source do alias Membrane.{RTSP, Time} @type set_up_tracks_notification :: {:set_up_tracks, [track()]} - @type new_track_notification :: {:new_track, ssrc :: pos_integer(), track :: track()} @type track :: %{ control_path: String.t(), type: :video | :audio | :application, From 2d6f78c74fdfff0fc39831278f2b35c0c7a670bb Mon Sep 17 00:00:00 2001 From: noarkhh Date: Mon, 27 Jan 2025 15:46:23 +0100 Subject: [PATCH 6/6] Add JitterBuffer --- lib/membrane_rtsp_plugin/source.ex | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/membrane_rtsp_plugin/source.ex b/lib/membrane_rtsp_plugin/source.ex index 2b2d0e2..a18523b 100644 --- a/lib/membrane_rtsp_plugin/source.ex +++ b/lib/membrane_rtsp_plugin/source.ex @@ -209,6 +209,9 @@ defmodule Membrane.RTSP.Source do spec = get_child(demuxer_name) |> via_out(:output, options: [stream_id: {:payload_type, track.rtpmap.payload_type}]) + |> child({:jitter_buffer, make_ref()}, %Membrane.RTP.JitterBuffer{ + clock_rate: track.rtpmap.clock_rate + }) |> depayloader(track) |> parser(track) |> bin_output(pad)