Skip to content

Commit

Permalink
Server can be accessed with ffplay
Browse files Browse the repository at this point in the history
  • Loading branch information
Noarkhh committed Sep 19, 2024
1 parent a1bcd3e commit 369c04d
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 53 deletions.
4 changes: 3 additions & 1 deletion lib/membrane/simple_rtsp_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ defmodule Membrane.SimpleRTSPServer do
handler: Membrane.SimpleRTSPServer.Handler,
handler_config: %{mp4_path: mp4_path},
port: rtsp_port,
address: address
address: address,
udp_rtp_port: 0,
udp_rtcp_port: 0
)
end
end
55 changes: 40 additions & 15 deletions lib/membrane/simple_rtsp_server/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ defmodule Membrane.SimpleRTSPServer.Handler do
@video_pt 96
@video_clock_rate 90_000
@audio_pt 97
@audio_clock_rate 90_000
@audio_clock_rate 44_100
@audio_specific_config "1210"

@impl true
def init(config) do
Expand All @@ -31,12 +32,12 @@ defmodule Membrane.SimpleRTSPServer.Handler do
a=control:video
a=rtpmap:#{@video_pt} H264/#{@video_clock_rate}
a=fmtp:#{@video_pt} packetization-mode=1
m=audio 0 RTP/AVP #{@audio_pt}
a=control:audio
a=rtpmap:#{@audio_pt} mpeg4-generic/#{@audio_clock_rate}/2
a=fmtp:#{@audio_pt} streamtype=5; profile-level-id=5; mode=AAC-hbr; config=#{@audio_specific_config}; sizeLength=13; indexLength=3;
"""

# m=audio 0 RTP/AVP #{@audio_pt}
# a=control:audio
# a=rtpmap:#{@audio_pt} MP4A-LATM/#{@audio_clock_rate}

response =
Response.new(200)
|> Response.with_header("Content-Type", "application/sdp")
Expand All @@ -52,21 +53,45 @@ defmodule Membrane.SimpleRTSPServer.Handler do

@impl true
def handle_play(configured_media_context, state) do
media_context = configured_media_context |> Map.values() |> List.first()

{client_rtp_port, _client_rtcp_port} = media_context.client_port
media_config =
Map.new(configured_media_context, fn {control_path, context} ->
{key, pt, clock_rate} =
case URI.new!(control_path) do
%URI{path: "/video"} -> {:video, @video_pt, @video_clock_rate}
%URI{path: "/audio"} -> {:audio, @audio_pt, @audio_clock_rate}
end

{client_rtp_port, _client_rtcp_port} = context.client_port

config = %{
ssrc: context.ssrc,
pt: pt,
clock_rate: clock_rate,
rtp_socket: context.rtp_socket,
client_address: context.address,
client_port: client_rtp_port
}

{key, config}
end)

arg = %{
socket: state.socket,
ssrc: media_context.ssrc,
pt: @video_pt,
clock_rate: @video_clock_rate,
client_port: client_rtp_port,
client_ip: media_context.address,
server_rtp_socket: media_context.rtp_socket,
mp4_path: state.mp4_path
mp4_path: state.mp4_path,
media_config: media_config
}

# arg = %{
# socket: state.socket,
# ssrc: media_context.ssrc,
# pt: @video_pt,
# clock_rate: @video_clock_rate,
# client_port: client_rtp_port,
# client_ip: media_context.address,
# server_rtp_socket: media_context.rtp_socket,
# mp4_path: state.mp4_path
# }

{:ok, _sup_pid, pipeline_pid} =
Membrane.SimpleRTSPServer.Pipeline.start_link(arg)

Expand Down
96 changes: 64 additions & 32 deletions lib/membrane/simple_rtsp_server/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,18 @@ defmodule Membrane.SimpleRTSPServer.Pipeline do
@impl true
def handle_child_notification({:new_tracks, tracks}, :mp4_demuxer, _ctx, state) do
spec =
Enum.map(tracks, fn
{id, %Membrane.AAC{}} ->
get_child(:mp4_demuxer)
|> via_out(Pad.ref(:output, id))
|> child(Membrane.Debug.Sink)
[child(:rtp_session_bin, Membrane.RTP.SessionBin)] ++
Enum.map(tracks, fn
{id, %Membrane.AAC{}} ->
get_child(:mp4_demuxer)
|> via_out(Pad.ref(:output, id))
|> build_track(:audio, state.media_config)

{id, %Membrane.H264{}} ->
get_child(:mp4_demuxer)
|> via_out(Pad.ref(:output, id))
|> child(:parser, %Membrane.H264.Parser{
output_alignment: :nalu,
repeat_parameter_sets: true,
skip_until_keyframe: true,
output_stream_structure: :annexb
})
|> via_in(Pad.ref(:input, state.ssrc),
options: [payloader: Membrane.RTP.H264.Payloader]
)
|> child(:rtp, Membrane.RTP.SessionBin)
|> via_out(Pad.ref(:rtp_output, state.ssrc),
options: [
payload_type: state.pt,
clock_rate: state.clock_rate
]
)
|> child(:realtimer, Membrane.Realtimer)
|> child(:udp_sink, %Membrane.UDP.Sink{
destination_address: state.client_ip,
destination_port_no: state.client_port,
local_socket: state.server_rtp_socket
})
end)
{id, %Membrane.H264{}} ->
get_child(:mp4_demuxer)
|> via_out(Pad.ref(:output, id))
|> build_track(:video, state.media_config)
end)

{[spec: spec], state}
end
Expand All @@ -65,7 +45,7 @@ defmodule Membrane.SimpleRTSPServer.Pipeline do
end

@impl true
def handle_element_end_of_stream(:udp_sink, :input, _ctx, state) do
def handle_element_end_of_stream({:udp_sink, :video}, :input, _ctx, state) do
Process.sleep(50)
:gen_tcp.close(state.socket)
{[terminate: :normal], state}
Expand All @@ -75,4 +55,56 @@ defmodule Membrane.SimpleRTSPServer.Pipeline do
def handle_element_end_of_stream(_child, _pad, _ctx, state) do
{[], state}
end

defp build_track(builder, :audio, %{audio: config}) do
builder
|> child(:aac_parser, %Membrane.AAC.Parser{
out_encapsulation: :none,
output_config: :audio_specific_config
})
|> child(%Membrane.Debug.Filter{handle_stream_format: &IO.inspect(&1, label: "strfmt")})
|> via_in(Pad.ref(:input, config.ssrc),
options: [payloader: %Membrane.RTP.AAC.Payloader{frames_per_packet: 1, mode: :hbr}]
)
|> build_tail(:audio, config)
end

defp build_track(builder, :video, %{video: config}) do
builder
|> child(:h264_parser, %Membrane.H264.Parser{
output_alignment: :nalu,
repeat_parameter_sets: true,
skip_until_keyframe: true,
output_stream_structure: :annexb
})
|> via_in(Pad.ref(:input, config.ssrc),
options: [payloader: Membrane.RTP.H264.Payloader]
)
|> build_tail(:video, config)
end

defp build_track(builder, _type, _media_config) do
builder
|> child(Membrane.Debug.Sink)
end

defp build_tail(builder, type, config) do
config |> IO.inspect(label: type)

builder
|> get_child(:rtp_session_bin)
|> via_out(Pad.ref(:rtp_output, config.ssrc),
options: [
payload_type: config.pt,
clock_rate: config.clock_rate
]
)
# |> child(%Membrane.Debug.Filter{handle_buffer: &IO.inspect(&1.pts, label: type)})
|> child({:realtimer, type}, Membrane.Realtimer)
|> child({:udp_sink, type}, %Membrane.UDP.Sink{
destination_address: config.client_address,
destination_port_no: config.client_port,
local_socket: config.rtp_socket
})
end
end
8 changes: 5 additions & 3 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,19 @@ defmodule Membrane.SimpleRTSPServer.Mixfile do
defp deps do
[
{:membrane_core, "~> 1.0"},
{:membrane_rtsp,
github: "membraneframework/membrane_rtsp", branch: "server-ports-optional"},
# {:membrane_rtsp, "~> 0.9.0"},
{:membrane_rtsp, "~> 0.10.0"},
{:membrane_rtp_plugin, "~> 0.29.0"},
{:membrane_rtp_h264_plugin, "~> 0.19.0"},
{:membrane_rtp_aac_plugin, "~> 0.9.0"},
{:membrane_file_plugin, "~> 0.17.0"},
{:membrane_mp4_plugin, "~> 0.35.0"},
{:membrane_h26x_plugin, "~> 0.10.0"},
{:membrane_aac_plugin,
github: "membraneframework/membrane_aac_plugin", branch: "config-option"},
# {:membrane_aac_plugin, "~> 0.18.0"},
{:membrane_udp_plugin, "~> 0.14.0"},
{:membrane_realtimer_plugin, "~> 0.9.0"},
{:ex_sdp, github: "membraneframework/ex_sdp", branch: "aac-fmtp", override: true},
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
{:dialyxir, ">= 0.0.0", only: :dev, runtime: false},
{:credo, ">= 0.0.0", only: :dev, runtime: false}
Expand Down
5 changes: 3 additions & 2 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
"ex_doc": {:hex, :ex_doc, "0.34.2", "13eedf3844ccdce25cfd837b99bea9ad92c4e511233199440488d217c92571e8", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "5ce5f16b41208a50106afed3de6a2ed34f4acfd65715b82a0b84b49d995f95c1"},
"ex_sdp": {:hex, :ex_sdp, "1.0.0", "c66cd66d60ad03ff1eecdc6db6a1b8a7b89fec260fcc22e8d6703fc5bbf430a3", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "e165dff988b8ab9d93588636aa5f3f683e1f848fc63b78b12382c8fa3dd39216"},
"ex_sdp": {:git, "https://github.com/membraneframework/ex_sdp.git", "b5ee75b6bf8d3c23e09942d1986f773b3e4f6c1d", [branch: "aac-fmtp"]},
"file_system": {:hex, :file_system, "1.0.1", "79e8ceaddb0416f8b8cd02a0127bdbababe7bf4a23d2a395b983c1f8b3f73edd", [:mix], [], "hexpm", "4414d1f38863ddf9120720cd976fce5bdde8e91d8283353f0e31850fa89feb9e"},
"heap": {:hex, :heap, "2.0.2", "d98cb178286cfeb5edbcf17785e2d20af73ca57b5a2cf4af584118afbcf917eb", [:mix], [], "hexpm", "ba9ea2fe99eb4bcbd9a8a28eaf71cbcac449ca1d8e71731596aace9028c9d429"},
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
Expand All @@ -18,6 +18,7 @@
"makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"},
"makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"},
"membrane_aac_format": {:hex, :membrane_aac_format, "0.8.0", "515631eabd6e584e0e9af2cea80471fee6246484dbbefc4726c1d93ece8e0838", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}], "hexpm", "a30176a94491033ed32be45e51d509fc70a5ee6e751f12fd6c0d60bd637013f6"},
"membrane_aac_plugin": {:git, "https://github.com/membraneframework/membrane_aac_plugin.git", "99f754a47c75a9bf66bf319ba17dd8c67aab7ffc", [branch: "config-option"]},
"membrane_cmaf_format": {:hex, :membrane_cmaf_format, "0.7.1", "9ea858faefdcb181cdfa8001be827c35c5f854e9809ad57d7062cff1f0f703fd", [:mix], [], "hexpm", "3c7b4ed2a986e27f6f336d2f19e9442cb31d93b3142fc024c019572faca54a73"},
"membrane_core": {:hex, :membrane_core, "1.1.1", "4dcff6e9f3b2ecd4f437c20e201e53957731772c0f15b3005062c41f7f58f500", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0 or ~> 4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3802f3fc071505c59d48792487d9927e803d4edb4039710ffa52cdb60bb0aecc"},
"membrane_file_plugin": {:hex, :membrane_file_plugin, "0.17.2", "650e134c2345d946f930082fac8bac9f5aba785a7817d38a9a9da41ffc56fa92", [:mix], [{:logger_backends, "~> 1.0", [hex: :logger_backends, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "df50c6040004cd7b901cf057bd7e99c875bbbd6ae574efc93b2c753c96f43b9d"},
Expand All @@ -33,7 +34,7 @@
"membrane_rtp_format": {:hex, :membrane_rtp_format, "0.8.0", "828924bbd27efcf85b2015ae781e824c4a9928f0a7dc132abc66817b2c6edfc4", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "bc75d2a649dfaef6df563212fbb9f9f62eebc871393692f9dae8d289bd4f94bb"},
"membrane_rtp_h264_plugin": {:hex, :membrane_rtp_h264_plugin, "0.19.2", "de3eeaf35052f9f709d469fa7630d9ecc8f5787019f7072516eae1fd881bc792", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}], "hexpm", "d298e9cd471ab3601366c48ca0fec84135966707500152bbfcf3f968700647ae"},
"membrane_rtp_plugin": {:hex, :membrane_rtp_plugin, "0.29.0", "0277310eb599b8e6de9e0b864807f23b3b245865e39a28f0cbab695d1f2c157e", [:mix], [{:bimap, "~> 1.2", [hex: :bimap, repo: "hexpm", optional: false]}, {:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:ex_libsrtp, "~> 0.6.0 or ~> 0.7.0", [hex: :ex_libsrtp, repo: "hexpm", optional: true]}, {:heap, "~> 2.0.2", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_funnel_plugin, "~> 0.9.0", [hex: :membrane_funnel_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}, {:membrane_telemetry_metrics, "~> 0.1.0", [hex: :membrane_telemetry_metrics, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.1", [hex: :qex, repo: "hexpm", optional: false]}], "hexpm", "1b3fd808114e06332b6a4e000238998a9188d1ef625c414ca3239aee70f0775d"},
"membrane_rtsp": {:git, "https://github.com/membraneframework/membrane_rtsp.git", "259b222eb4a2d63a3b1863e59d06bffcd94088d8", [branch: "server-ports-optional"]},
"membrane_rtsp": {:hex, :membrane_rtsp, "0.10.0", "c65169b20243e7f8c296f16ce670dfcf1a5bcae0f24cfcc75f9235b606f29e06", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:ex_sdp, "~> 0.17.0 or ~> 1.0", [hex: :ex_sdp, repo: "hexpm", optional: false]}, {:mockery, "~> 2.3", [hex: :mockery, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.4.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "7add88599836168f6d2c1ffa280da3d30378ba512189709403ae56d49736780e"},
"membrane_telemetry_metrics": {:hex, :membrane_telemetry_metrics, "0.1.0", "cb93d28356b436b0597736c3e4153738d82d2a14ff547f831df7e9051e54fc06", [:mix], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6.1", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "aba28dc8311f70ced95d984509be930fac55857d2d18bffcf768815e627be3f0"},
"membrane_timestamp_queue": {:hex, :membrane_timestamp_queue, "0.2.2", "1c831b2273d018a6548654aa9f7fa7c4b683f71d96ffe164934ef55f9d11f693", [:mix], [{:heap, "~> 2.0", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "7c830e760baaced0988421671cd2c83c7cda8d1bd2b61fd05332711675d1204f"},
"membrane_udp_plugin": {:hex, :membrane_udp_plugin, "0.14.0", "d533ee5f6fcdd0551ad690045cdb6c1a76307a155d9255cc4a4606f85774bc37", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:mockery, "~> 2.3.0", [hex: :mockery, repo: "hexpm", optional: false]}], "hexpm", "902d1a7aa228ec377482d53a605b100e20e0b6e59196f94f94147bb62b23c47e"},
Expand Down

0 comments on commit 369c04d

Please sign in to comment.