From 65196d0cca7bc666db907200112dcee5dbda758d Mon Sep 17 00:00:00 2001 From: noarkhh Date: Thu, 23 Jan 2025 17:21:50 +0100 Subject: [PATCH] Use RTP muxer and improve termination --- lib/membrane/simple_rtsp_server/pipeline.ex | 58 ++++++++++++--------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/lib/membrane/simple_rtsp_server/pipeline.ex b/lib/membrane/simple_rtsp_server/pipeline.ex index 3b56c6a..e21852f 100644 --- a/lib/membrane/simple_rtsp_server/pipeline.ex +++ b/lib/membrane/simple_rtsp_server/pipeline.ex @@ -17,26 +17,34 @@ defmodule Membrane.SimpleRTSPServer.Pipeline do }) |> child(:mp4_demuxer, %Membrane.MP4.Demuxer.ISOM{optimize_for_non_fast_start?: true}) - {[spec: spec], opts} + state = Map.merge(opts, %{tracks_playing: nil}) + + {[spec: spec], state} end @impl true def handle_child_notification({:new_tracks, tracks}, :mp4_demuxer, _ctx, state) do - spec = - [child(:rtp_session_bin, Membrane.RTP.SessionBin)] ++ - Enum.map(tracks, fn - {id, %Membrane.AAC{}} -> + {spec, media_types} = + Enum.map(tracks, fn + {id, %Membrane.AAC{}} -> + { get_child(:mp4_demuxer) |> via_out(Pad.ref(:output, id)) - |> build_track(:audio, state.media_config) + |> build_track(:audio, state.media_config), + :audio + } - {id, %Membrane.H264{}} -> + {id, %Membrane.H264{}} -> + { get_child(:mp4_demuxer) |> via_out(Pad.ref(:output, id)) - |> build_track(:video, state.media_config) - end) + |> build_track(:video, state.media_config), + :video + } + end) + |> Enum.unzip() - {[spec: spec], state} + {[spec: spec], %{state | tracks_playing: media_types}} end @impl true @@ -45,9 +53,16 @@ defmodule Membrane.SimpleRTSPServer.Pipeline do end @impl true - def handle_element_end_of_stream({:udp_sink, :video}, :input, _ctx, state) do - :gen_tcp.close(state.socket) - {[terminate: :normal], state} + def handle_element_end_of_stream({:udp_sink, media_type}, :input, _ctx, state) do + tracks_playing = List.delete(state.tracks_playing, media_type) + + if tracks_playing == [] do + Process.sleep(50) + :gen_tcp.close(state.socket) + {[terminate: :normal], %{state | tracks_playing: tracks_playing}} + else + {[], %{state | tracks_playing: tracks_playing}} + end end @impl true @@ -61,9 +76,7 @@ defmodule Membrane.SimpleRTSPServer.Pipeline do out_encapsulation: :none, output_config: :audio_specific_config }) - |> via_in(Pad.ref(:input, config.ssrc), - options: [payloader: %Membrane.RTP.AAC.Payloader{frames_per_packet: 1, mode: :hbr}] - ) + |> child(:aac_payloader, %Membrane.RTP.AAC.Payloader{frames_per_packet: 1, mode: :hbr}) |> build_tail(:audio, config) end @@ -75,9 +88,7 @@ defmodule Membrane.SimpleRTSPServer.Pipeline do skip_until_keyframe: true, output_stream_structure: :annexb }) - |> via_in(Pad.ref(:input, config.ssrc), - options: [payloader: Membrane.RTP.H264.Payloader] - ) + |> child(:h264_payloader, Membrane.RTP.H264.Payloader) |> build_tail(:video, config) end @@ -88,13 +99,10 @@ defmodule Membrane.SimpleRTSPServer.Pipeline do defp build_tail(builder, type, config) do builder - |> get_child(:rtp_session_bin) - |> via_out(Pad.ref(:rtp_output, config.ssrc), - options: [ - payload_type: config.pt, - clock_rate: config.clock_rate - ] + |> via_in(:input, + options: [ssrc: config.ssrc, payload_type: config.pt, clock_rate: config.clock_rate] ) + |> child({:rtp_muxer, type}, Membrane.RTP.Muxer) |> child({:realtimer, type}, Membrane.Realtimer) |> child({:udp_sink, type}, %Membrane.UDP.Sink{ destination_address: config.client_address,