Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use RTP demuxer, change notification API #18

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 8 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Add the following line to your deps in `mix.exs`:
```elixir
def deps do
[
{:membrane_rtsp_plugin, "~> 0.6.1"}
{:membrane_rtsp_plugin, "~> 0.7.0"}
]
end
```
Expand Down Expand Up @@ -45,15 +45,13 @@ defmodule RtspPipeline do
end

@impl true
def handle_child_notification({:new_track, ssrc, _track}, _element, _ctx, state) do
spec = [
get_child(:source)
|> via_out(Pad.ref(:output, ssrc))
|> child(:funnel, Membrane.Funnel)
|> child(:sink, , %Membrane.File.Source{
location: "video.h264"
})
]
def handle_child_notification({:set_up_tracks, tracks}, _element, _ctx, state) do
spec =
Enum.map(track, fn track ->
get_child(:source)
|> via_out(Pad.ref(:output, track.control_path))
|> child(:sink, %Membrane.File.Source{location: "video.h264"})
end)

{[spec: spec], state}
end
Expand All @@ -62,10 +60,5 @@ defmodule RtspPipeline do
def handle_child_notification(_message, _element, _ctx, state) do
{[], state}
end

@impl true
def handle_child_pad_removed(:source, _pad, _ctx, state) do
{[], state}
end
end
```
130 changes: 64 additions & 66 deletions lib/membrane_rtsp_plugin/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Membrane.RTSP.Source do
@moduledoc """
Source bin responsible for connecting to an RTSP server.

This element connects to an RTSP server, depayload and parses the received media if possible.
This element connects to an RTSP server, depayloads and parses the received media if possible.
If there's no suitable depayloader and parser, the raw payload is sent to the subsequent elements in
the pipeline.

Expand All @@ -15,9 +15,8 @@ defmodule Membrane.RTSP.Source do
* `Opus`

When the element finishes setting up all tracks it will send a `t:set_up_tracks/0` notification.
Each time a track is parsed and available for further processing the element will send a
`t:new_track/0` notification. An output pad `Pad.ref(:output, ssrc)` should be linked to receive
the data.
To receive a track a corresponding `Pad.ref(:output, control_path)` pad has to be connected,
where each track's control path is provided in the `t:set_up_tracks/0` notification.
"""

use Membrane.Bin
Expand All @@ -29,7 +28,6 @@ defmodule Membrane.RTSP.Source do
alias Membrane.{RTSP, Time}

@type set_up_tracks_notification :: {:set_up_tracks, [track()]}
@type new_track_notification :: {:new_track, ssrc :: pos_integer(), track :: track()}
@type track :: %{
control_path: String.t(),
type: :video | :audio | :application,
Expand Down Expand Up @@ -99,11 +97,11 @@ defmodule Membrane.RTSP.Source do
timeout: Time.t(),
keep_alive_interval: Time.t(),
tracks: [ConnectionManager.track()],
ssrc_to_track: %{non_neg_integer() => ConnectionManager.track()},
rtsp_session: Membrane.RTSP.t() | nil,
keep_alive_timer: reference() | nil,
on_connection_closed: :raise_error | :send_eos,
end_of_stream: boolean()
end_of_stream: boolean(),
play_request_sent: boolean()
}

@enforce_keys [
Expand All @@ -120,7 +118,8 @@ defmodule Membrane.RTSP.Source do
ssrc_to_track: %{},
rtsp_session: nil,
keep_alive_timer: nil,
end_of_stream: false
end_of_stream: false,
play_request_sent: false
]
end

Expand All @@ -140,7 +139,7 @@ defmodule Membrane.RTSP.Source do
end

@impl true
def handle_child_playing(:rtp_session, _ctx, state) do
def handle_child_playing(_child, _ctx, %State{play_request_sent: false} = state) do
{[], ConnectionManager.play(state)}
end

Expand All @@ -149,33 +148,15 @@ defmodule Membrane.RTSP.Source do
{[], state}
end

@impl true
def handle_child_notification(
{:new_rtp_stream, ssrc, pt, _extensions},
:rtp_session,
_ctx,
state
) do
case Enum.find(state.tracks, fn track -> track.rtpmap.payload_type == pt end) do
nil ->
raise "No track of payload type #{inspect(pt)} has been requested with SETUP"

track ->
ssrc_to_track = Map.put(state.ssrc_to_track, ssrc, track)

{[notify_parent: {:new_track, ssrc, Map.delete(track, :transport)}],
%{state | ssrc_to_track: ssrc_to_track}}
end
end

@impl true
def handle_child_notification({:request_socket_control, _socket, pid}, :tcp_source, _ctx, state) do
RTSP.transfer_socket_control(state.rtsp_session, pid)
{[], state}
end

@impl true
def handle_child_notification(_notification, _element, _ctx, state) do
def handle_child_notification(notification, _element, _ctx, state) do
Membrane.Logger.warning("Ignoring child notification: #{inspect(notification)}")
{[], state}
end

Expand Down Expand Up @@ -216,12 +197,22 @@ defmodule Membrane.RTSP.Source do
end

@impl true
def handle_pad_added(Pad.ref(:output, ssrc) = pad, _ctx, state) do
track = Map.fetch!(state.ssrc_to_track, ssrc)
def handle_pad_added(Pad.ref(:output, control_path) = pad, _ctx, state) do
track = Enum.find(state.tracks, &(&1.control_path == control_path))

demuxer_name =
case state.transport do
:tcp -> :rtp_demuxer
{:udp, _port_range_start, _port_range_end} -> {:rtp_demuxer, track.control_path}
end

spec =
get_child(:rtp_session)
|> via_out(Pad.ref(:output, ssrc), options: [depayloader: get_rtp_depayloader(track)])
get_child(demuxer_name)
|> via_out(:output, options: [stream_id: {:payload_type, track.rtpmap.payload_type}])
|> child({:jitter_buffer, make_ref()}, %Membrane.RTP.JitterBuffer{
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you think we should have two separate pipelines for tcp and udp ? the jitter buffer will only introduce latency for tcp.

clock_rate: track.rtpmap.clock_rate
})
|> depayloader(track)
|> parser(track)
|> bin_output(pad)

Expand All @@ -240,11 +231,6 @@ defmodule Membrane.RTSP.Source do

@spec create_sources_spec(State.t()) :: Membrane.ChildrenSpec.t()
defp create_sources_spec(state) do
fmt_mapping =
Map.new(state.tracks, fn %{rtpmap: rtpmap} ->
{rtpmap.payload_type, {String.to_atom(rtpmap.encoding), rtpmap.clock_rate}}
end)

case state.transport do
:tcp ->
{:tcp, socket} = List.first(state.tracks).transport
Expand All @@ -254,46 +240,58 @@ defmodule Membrane.RTSP.Source do
local_socket: socket,
on_connection_closed: state.on_connection_closed
})
|> child(:tcp_depayloader, %RTSP.TCP.Decapsulator{rtsp_session: state.rtsp_session})
|> via_in(Pad.ref(:rtp_input, make_ref()))
|> child(:rtp_session, %Membrane.RTP.SessionBin{fmt_mapping: fmt_mapping})
|> child(:tcp_decapsulator, %RTSP.TCP.Decapsulator{rtsp_session: state.rtsp_session})
|> child(:rtp_demuxer, Membrane.RTP.Demuxer)

{:udp, _port_range_start, _port_range_end} ->
[
child(:rtp_session, %Membrane.RTP.SessionBin{fmt_mapping: fmt_mapping})
| Enum.flat_map(state.tracks, fn track ->
{:udp, rtp_port, rtcp_port} = track.transport

[
child({:udp_source, make_ref()}, %Membrane.UDP.Source{local_port_no: rtp_port})
|> via_in(Pad.ref(:rtp_input, make_ref()))
|> get_child(:rtp_session),
child({:udp_source, make_ref()}, %Membrane.UDP.Source{local_port_no: rtcp_port})
|> via_in(Pad.ref(:rtp_input, make_ref()))
|> get_child(:rtp_session)
]
end)
Enum.flat_map(state.tracks, fn track ->
{:udp, rtp_port, rtcp_port} = track.transport

[
child({:udp_source, rtp_port}, %Membrane.UDP.Source{local_port_no: rtp_port})
|> child({:rtp_demuxer, track.control_path}, Membrane.RTP.Demuxer),
child({:udp_source, rtcp_port}, %Membrane.UDP.Source{local_port_no: rtcp_port})
|> child({:rtcp_demuxer, track.control_path}, Membrane.RTP.Demuxer)
]
end)
]
end
end

@spec get_rtp_depayloader(ConnectionManager.track()) :: module() | nil
defp get_rtp_depayloader(%{rtpmap: %{encoding: "H264"}}), do: Membrane.RTP.H264.Depayloader
defp get_rtp_depayloader(%{rtpmap: %{encoding: "H265"}}), do: Membrane.RTP.H265.Depayloader
defp get_rtp_depayloader(%{rtpmap: %{encoding: "opus"}}), do: Membrane.RTP.Opus.Depayloader
@spec depayloader(ChildrenSpec.builder(), ConnectionManager.track()) :: ChildrenSpec.builder()
defp depayloader(builder, track) do
depayloader_definition =
case track do
%{rtpmap: %{encoding: "H264"}} ->
Membrane.RTP.H264.Depayloader

%{rtpmap: %{encoding: "H265"}} ->
Membrane.RTP.H265.Depayloader

%{rtpmap: %{encoding: "opus"}} ->
Membrane.RTP.Opus.Depayloader

%{type: :audio, rtpmap: %{encoding: "mpeg4-generic"}} ->
mode =
case track.fmtp do
%{mode: :AAC_hbr} -> :hbr
%{mode: :AAC_lbr} -> :lbr
end

defp get_rtp_depayloader(%{type: :audio, rtpmap: %{encoding: "mpeg4-generic"}} = track) do
mode =
case track.fmtp do
%{mode: :AAC_hbr} -> :hbr
%{mode: :AAC_lbr} -> :lbr
%Membrane.RTP.AAC.Depayloader{mode: mode}

%{rtpmap: %{encoding: _other}} ->
nil
end

%Membrane.RTP.AAC.Depayloader{mode: mode}
if depayloader_definition != nil do
child(builder, {:depayloader, make_ref()}, depayloader_definition)
else
builder
end
end

defp get_rtp_depayloader(%{rtpmap: %{encoding: _other}}), do: nil

@spec parser(ChildrenSpec.builder(), ConnectionManager.track()) :: ChildrenSpec.builder()
defp parser(link_builder, %{rtpmap: %{encoding: "H264"}} = track) do
sps = track.fmtp.sprop_parameter_sets && track.fmtp.sprop_parameter_sets.sps
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane_rtsp_plugin/source/connection_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ defmodule Membrane.RTSP.Source.ConnectionManager do

case RTSP.play(state.rtsp_session) do
{:ok, %{status: 200}} ->
%{state | keep_alive_timer: start_keep_alive_timer(state)}
%{state | keep_alive_timer: start_keep_alive_timer(state), play_request_sent: true}

_error ->
handle_rtsp_error(:play_rtsp_failed, state)
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.RTSP.Plugin.Mixfile do
use Mix.Project

@version "0.6.1"
@version "0.7.0"
@github_url "https://github.com/gBillal/membrane_rtsp_plugin"

def project do
Expand Down
76 changes: 15 additions & 61 deletions test/membrane_rtsp_plugin/source_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,22 @@ defmodule Membrane.RTSP.SourceTest do
end

@impl true
def handle_child_notification({:new_track, ssrc, track}, _element, _ctx, state) do
file_name =
case track.rtpmap.encoding do
"H264" -> "out.h264"
"H265" -> "out.hevc"
"plain" -> "out.txt"
end

def handle_child_notification({:set_up_tracks, tracks}, _element, _ctx, state) do
spec =
get_child(:source)
|> via_out(Pad.ref(:output, ssrc))
|> child({:sink, ssrc}, %Membrane.File.Sink{
location: Path.join(state.dest_folder, file_name)
})
Enum.map(tracks, fn track ->
file_name =
case track.rtpmap.encoding do
"H264" -> "out.h264"
"H265" -> "out.hevc"
"plain" -> "out.txt"
end

get_child(:source)
|> via_out(Pad.ref(:output, track.control_path))
|> child({:sink, track.control_path}, %Membrane.File.Sink{
location: Path.join(state.dest_folder, file_name)
})
end)

{[spec: spec], state}
end
Expand Down Expand Up @@ -93,24 +95,6 @@ defmodule Membrane.RTSP.SourceTest do
%{type: :application, rtpmap: %{encoding: "plain"}}
] = Enum.sort_by(tracks, fn %{rtpmap: %{encoding: encoding}} -> encoding end)

assert_pipeline_notified(
pid,
:source,
{:new_track, _ssrc, %{type: :video, rtpmap: %{encoding: "H264"}}}
)

assert_pipeline_notified(
pid,
:source,
{:new_track, _ssrc, %{type: :video, rtpmap: %{encoding: "H265"}}}
)

assert_pipeline_notified(
pid,
:source,
{:new_track, _ssrc, %{type: :application, rtpmap: %{encoding: "plain"}}}
)

assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000)
assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000)
assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000)
Expand Down Expand Up @@ -143,18 +127,6 @@ defmodule Membrane.RTSP.SourceTest do

pid = Membrane.Testing.Pipeline.start_link_supervised!(options)

assert_pipeline_notified(
pid,
:source,
{:set_up_tracks, [%{type: :application, rtpmap: %{encoding: "plain"}}]}
)

assert_pipeline_notified(
pid,
:source,
{:new_track, _ssrc, %{type: :application, rtpmap: %{encoding: "plain"}}}
)

assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000)

:ok = Membrane.Testing.Pipeline.terminate(pid)
Expand Down Expand Up @@ -192,24 +164,6 @@ defmodule Membrane.RTSP.SourceTest do
%{type: :application, rtpmap: %{encoding: "plain"}}
] = Enum.sort_by(tracks, fn %{rtpmap: %{encoding: encoding}} -> encoding end)

assert_pipeline_notified(
pid,
:source,
{:new_track, _ssrc, %{type: :video, rtpmap: %{encoding: "H264"}}}
)

assert_pipeline_notified(
pid,
:source,
{:new_track, _ssrc, %{type: :video, rtpmap: %{encoding: "H265"}}}
)

assert_pipeline_notified(
pid,
:source,
{:new_track, _ssrc, %{type: :application, rtpmap: %{encoding: "plain"}}}
)

assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000)
assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000)
assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000)
Expand Down