diff --git a/README.md b/README.md index 158e2dd..96671b5 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,6 @@ [![Hex.pm](https://img.shields.io/hexpm/v/membrane_rtp_h265_plugin.svg)](https://hex.pm/packages/membrane_rtp_h265_plugin) [![API Docs](https://img.shields.io/badge/api-docs-yellow.svg?style=flat)](https://hexdocs.pm/membrane_rtp_h265_plugin/) -[![CircleCI](https://circleci.com/gh/membraneframework/membrane_rtp_h265_plugin.svg?style=svg)](https://circleci.com/gh/membraneframework/membrane_rtp_h265_plugin) RTP payloader and depayloader for H265. diff --git a/lib/depayloader.ex b/lib/depayloader.ex index 67a1712..fd78b0c 100644 --- a/lib/depayloader.ex +++ b/lib/depayloader.ex @@ -4,7 +4,7 @@ defmodule Membrane.RTP.H265.Depayloader do Based on [RFC 7798](https://tools.ietf.org/html/rfc7798). - Supported types: Single NALU, Fragementation Unit, Aggegration Packets. + Supported types: Single NALU, Fragmentation Unit, Aggegration Packets. """ use Membrane.Filter @@ -24,14 +24,28 @@ defmodule Membrane.RTP.H265.Depayloader do accepted_format: %H265.RemoteStream{alignment: :nalu}, demand_mode: :auto + def_options sprop_max_don_diff: [ + spec: 0..32_767, + default: 0, + description: """ + Specify the maximum absolute difference between the decoding order number (i.e. AbsDon) + values of any two NAL units naluA and naluB, where naluA follows naluB in decoding order + and precedes naluB in transmission order. + + If this value is greater than 0, then two additional fields `DONL` and `DOND` will + be included in the RTP payload. A `decoding_order_number` field will be added to the + buffer metadata. + """ + ] + defmodule State do @moduledoc false - defstruct parser_acc: nil + defstruct parser_acc: nil, sprop_max_don_diff: 0 end @impl true - def handle_init(_ctx, _opts) do - {[], %State{}} + def handle_init(_ctx, opts) do + {[], %State{sprop_max_don_diff: opts.sprop_max_don_diff}} end @impl true @@ -67,7 +81,15 @@ defmodule Membrane.RTP.H265.Depayloader do def handle_event(pad, event, context, state), do: super(pad, event, context, state) defp handle_unit_type(:single_nalu, _nalu, buffer, state) do - result = buffer_output(buffer.payload, buffer, state) + {don, buffer} = + if state.sprop_max_don_diff > 0 do + <> = buffer.payload + {don, %Buffer{buffer | payload: payload}} + else + {nil, buffer} + end + + result = buffer_output(buffer.payload, buffer, don, state) {:ok, result} end @@ -75,11 +97,11 @@ defmodule Membrane.RTP.H265.Depayloader do %Buffer{metadata: %{rtp: %{sequence_number: seq_num}}} = buffer case FU.parse(data, seq_num, map_state_to_fu(state)) do - {:ok, {data, type}} -> + {:ok, {data, type, don}} -> data = NAL.Header.add_header(data, 0, type, header.nuh_layer_id, header.nuh_temporal_id_plus1) - result = buffer_output(data, buffer, %State{state | parser_acc: nil}) + result = buffer_output(data, buffer, don, %State{state | parser_acc: nil}) {:ok, result} {:incomplete, fu} -> @@ -92,25 +114,35 @@ defmodule Membrane.RTP.H265.Depayloader do end defp handle_unit_type(:ap, {_header, data}, buffer, state) do - with {:ok, result} <- AP.parse(data) do - buffers = Enum.map(result, &%Buffer{buffer | payload: add_prefix(&1)}) + with {:ok, nalus} <- AP.parse(data, state.sprop_max_don_diff > 0) do + buffers = + Enum.map(nalus, fn {nalu, don} -> + metadata = put_if(not is_nil(don), buffer.metadata, :decoding_order_number, don) + %Buffer{buffer | payload: add_prefix(nalu), metadata: metadata} + end) + result = {[buffer: {:output, buffers}], state} {:ok, result} end end - defp buffer_output(data, buffer, state) do - {action_from_data(data, buffer), state} + defp buffer_output(data, buffer, don, state) do + {action_from_data(data, buffer, don), state} end - defp action_from_data(data, buffer) do + defp action_from_data(data, buffer, nil) do [buffer: {:output, %Buffer{buffer | payload: add_prefix(data)}}] end + defp action_from_data(data, buffer, don) do + metadata = Map.put(buffer.metadata, :decoding_order_number, don) + [buffer: {:output, %Buffer{buffer | payload: add_prefix(data), metadata: metadata}}] + end + defp add_prefix(data), do: @frame_prefix <> data defp map_state_to_fu(%State{parser_acc: %FU{} = fu}), do: fu - defp map_state_to_fu(_state), do: %FU{} + defp map_state_to_fu(state), do: %FU{donl?: state.sprop_max_don_diff > 0} defp log_malformed_buffer(packet, reason) do Membrane.Logger.warn(""" @@ -119,4 +151,7 @@ defmodule Membrane.RTP.H265.Depayloader do Packet: #{inspect(packet, limit: :infinity)} """) end + + defp put_if(true, map, key, value), do: Map.put(map, key, value) + defp put_if(false, map, _key, _value), do: map end diff --git a/lib/nal_formats/ap.ex b/lib/nal_formats/ap.ex index de8311b..26a3985 100644 --- a/lib/nal_formats/ap.ex +++ b/lib/nal_formats/ap.ex @@ -26,24 +26,71 @@ defmodule Membrane.RTP.H265.AP do | :...OPTIONAL RTP padding | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ ``` + + A packet width DONL + ``` + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | RTP Header | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | PayloadHdr (Type=48) | NALU 1 DONL | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | NALU 1 Size | NALU 1 HDR | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | | + | NALU 1 Data . . . | + | | + + . . . +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+| + | NALU 2 DOND | NALU 2 Size | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | NALU 2 HDR | | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ NALU 2 Data | + | | + | . . . +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | :...OPTIONAL RTP padding | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + ``` """ use Bunch alias Membrane.RTP.H265.NAL - @spec parse(binary()) :: {:ok, [binary()]} | {:error, :packet_malformed} - def parse(data) do - do_parse(data, []) + @type don :: nil | non_neg_integer() + + @spec parse(binary(), boolean()) :: {:ok, [{binary(), don()}]} | {:error, :packet_malformed} + def parse(data, donl? \\ false) do + if donl?, + do: do_parse(data, 0, []), + else: do_parse(data, []) end + # Parse packet without DONL defp do_parse(<<>>, acc), do: {:ok, Enum.reverse(acc)} - defp do_parse(<>, acc) do - do_parse(rest, [nalu | acc]) - end + defp do_parse(<>, acc), + do: do_parse(rest, [{nalu, nil} | acc]) defp do_parse(_data, _acc), do: {:error, :packet_malformed} + # Parse packets with DONL + defp do_parse(<<>>, _last_don, acc), do: {:ok, Enum.reverse(acc)} + + defp do_parse( + <>, + _last_don, + [] = acc + ) do + do_parse(rest, donl, [{nalu, donl} | acc]) + end + + defp do_parse(<>, last_don, acc) do + don = rem(last_don + dond + 1, 65_536) + do_parse(rest, don, [{nalu, don} | acc]) + end + + defp do_parse(_data, _last_don, _acc), do: {:error, :packet_malformed} + @spec aggregation_unit_size(binary()) :: pos_integer() def aggregation_unit_size(nalu), do: byte_size(nalu) + 2 diff --git a/lib/nal_formats/fu.ex b/lib/nal_formats/fu.ex index 1e27a5c..fecd0bc 100644 --- a/lib/nal_formats/fu.ex +++ b/lib/nal_formats/fu.ex @@ -6,12 +6,16 @@ defmodule Membrane.RTP.H265.FU do alias __MODULE__ alias Membrane.RTP.H265.NAL - defstruct [:last_seq_num, data: [], type: nil] + defstruct [:last_seq_num, data: [], type: nil, donl?: false, don: nil] + + @type don :: nil | non_neg_integer() @type t :: %__MODULE__{ data: [binary()], last_seq_num: nil | non_neg_integer(), - type: NAL.Header.type() + type: NAL.Header.type(), + donl?: boolean(), + don: don() } defguardp is_next(last_seq_num, next_seq_num) when rem(last_seq_num + 1, 65_536) == next_seq_num @@ -21,11 +25,12 @@ defmodule Membrane.RTP.H265.FU do If a packet that is being parsed is not considered last then a `{:incomplete, t()}` tuple will be returned. - In case of last packet `{:ok, {type, data}}` tuple will be returned, where data - is `NAL Unit` created by concatenating subsequent Fragmentation Units. + In case of last packet `{:ok, {type, data, don}}` tuple will be returned, where data + is `NAL Unit` created by concatenating subsequent Fragmentation Units and `don` is the + decoding order number of the `NAL unit` in case `donl` field is present in the packet. """ @spec parse(binary(), non_neg_integer(), t) :: - {:ok, {binary(), NAL.Header.type()}} + {:ok, {binary(), NAL.Header.type(), don()}} | {:error, :packet_malformed | :invalid_first_packet} | {:incomplete, t()} def parse(data, seq_num, acc) do @@ -79,16 +84,21 @@ defmodule Membrane.RTP.H265.FU do defp do_parse(header, data, seq_num, acc) - defp do_parse(%FU.Header{start_bit: true, type: type}, data, seq_num, acc), + defp do_parse(%FU.Header{start_bit: true, type: type}, data, seq_num, %{donl?: false} = acc), do: {:incomplete, %__MODULE__{acc | data: [data], last_seq_num: seq_num, type: type}} + defp do_parse(%FU.Header{start_bit: true, type: type}, <>, seq_num, acc) do + {:incomplete, %__MODULE__{acc | data: [data], last_seq_num: seq_num, type: type, don: don}} + end + defp do_parse(%FU.Header{start_bit: false}, _data, _seq_num, %__MODULE__{last_seq_num: nil}), do: {:error, :invalid_first_packet} defp do_parse(%FU.Header{end_bit: true}, data, seq_num, %__MODULE__{ data: acc, last_seq_num: last, - type: type + type: type, + don: don }) when is_next(last, seq_num) do result = @@ -96,7 +106,7 @@ defmodule Membrane.RTP.H265.FU do |> Enum.reverse() |> Enum.join() - {:ok, {result, type}} + {:ok, {result, type, don}} end defp do_parse(_header, data, seq_num, %__MODULE__{data: acc, last_seq_num: last} = fu) diff --git a/lib/payloader.ex b/lib/payloader.ex index 5fed199..8156bd1 100644 --- a/lib/payloader.ex +++ b/lib/payloader.ex @@ -28,8 +28,8 @@ defmodule Membrane.RTP.H265.Payloader do description: """ In `:single_nalu` mode, payloader puts exactly one NAL unit into each payload, altering only RTP metadata. `:non_interleaved` - mode handles also FU-A and STAP-A packetization. See - [RFC 6184](https://tools.ietf.org/html/rfc7798) for details. + mode handles also FU and AP packetization. See + [RFC 7798](https://tools.ietf.org/html/rfc7798) for details. """ ] diff --git a/test/depayloader_pipeline_test.exs b/test/depayloader_pipeline_test.exs index 86a015c..c3f9491 100644 --- a/test/depayloader_pipeline_test.exs +++ b/test/depayloader_pipeline_test.exs @@ -1,4 +1,6 @@ defmodule Membrane.RTP.H265.DepayloaderPipelineTest do + @moduledoc false + use ExUnit.Case import Membrane.Testing.Assertions @@ -9,7 +11,7 @@ defmodule Membrane.RTP.H265.DepayloaderPipelineTest do alias Membrane.Testing.Source describe "Depayloader in a pipeline" do - test "does not crash when parsing staps" do + test "does not crash when parsing AP" do pid = APFactory.sample_data() |> Enum.chunk_every(2) diff --git a/test/depayloader_test.exs b/test/depayloader_test.exs index 0cdd7f6..e5e57e8 100644 --- a/test/depayloader_test.exs +++ b/test/depayloader_test.exs @@ -9,9 +9,10 @@ defmodule Membrane.RTP.H265.DepayloaderTest do alias Membrane.Support.Formatters.{APFactory, FUFactory, RBSPNaluFactory} @empty_state %Depayloader.State{} + @don_state %Depayloader.State{sprop_max_don_diff: 1} describe "Depayloader when processing data" do - test "passes through packets with type 0..9 and 16..21 (RBSP types)" do + test "passes through packets with type 0..47 (RBSP types)" do data = RBSPNaluFactory.sample_nalu() buffer = %Buffer{payload: data} @@ -43,6 +44,30 @@ defmodule Membrane.RTP.H265.DepayloaderTest do assert data == <<1::32, FUFactory.glued_fixtures()::binary>> end + test "parses FU packets with donl" do + assert {actions, @don_state} = + FUFactory.get_all_fixtures() + |> then(&[FUFactory.add_donl_field(hd(&1), 1_000) | tl(&1)]) + |> Enum.map(&FUFactory.precede_with_fu_nal_header/1) + ~> (enum -> Enum.zip(enum, 1..Enum.count(enum))) + |> Enum.map(fn {elem, seq_num} -> + %Buffer{payload: elem, metadata: %{rtp: %{sequence_number: seq_num}}} + end) + |> Enum.reduce(@don_state, fn buffer, prev_state -> + Depayloader.handle_process(:input, buffer, nil, prev_state) + ~> ( + {[], %Depayloader.State{} = state} -> state + {actions, state} -> {actions, state} + ) + end) + + assert {:output, %Buffer{payload: data, metadata: metadata}} = + Keyword.fetch!(actions, :buffer) + + assert data == <<1::32, FUFactory.glued_fixtures()::binary>> + assert metadata.decoding_order_number == 1_000 + end + test "parses AP packets" do data = APFactory.sample_data() @@ -59,6 +84,26 @@ defmodule Membrane.RTP.H265.DepayloaderTest do assert <<1::32, ^original_data::binary>> = result_data end) end + + test "parses AP packets with donl and dond" do + data = APFactory.sample_data() + don = :rand.uniform(10_000) + + buffer = %Buffer{payload: APFactory.into_ap_unit_with_don(data, don)} + + assert {actions, _state} = Depayloader.handle_process(:input, buffer, nil, @don_state) + + assert [buffer: {:output, buffers}] = actions + + buffers + |> Enum.zip(data) + |> Enum.with_index(0) + |> Enum.each(fn {{result, original_data}, index} -> + assert %Buffer{payload: result_data, metadata: metadata} = result + assert <<1::32, ^original_data::binary>> = result_data + assert metadata.decoding_order_number == don + index + end) + end end describe "Depayloader when handling events" do diff --git a/test/nal_formatters/ap_test.exs b/test/nal_formatters/ap_test.exs index 464ca61..9faadaa 100644 --- a/test/nal_formatters/ap_test.exs +++ b/test/nal_formatters/ap_test.exs @@ -14,10 +14,27 @@ defmodule Membrane.RTP.H265.APTest do test_data |> APFactory.binaries_into_ap() |> AP.parse() - ~> ({:ok, result} -> Enum.zip(result, test_data)) + ~> ({:ok, result} -> result |> Enum.map(&elem(&1, 0)) |> Enum.zip(test_data)) |> Enum.each(fn {a, b} -> assert a == b end) end + test "properly decodes nal aggregate with donl and dond fields" do + test_data = APFactory.sample_data() + don = :rand.uniform(10_000) + + test_data + |> APFactory.binaries_into_ap_with_don(don) + |> AP.parse(true) + ~> ({:ok, result} -> + result + |> Enum.with_index(fn {data, don}, index -> {index, data, don} end) + |> Enum.zip(test_data)) + |> Enum.each(fn {{idx, parsed_data, expected_don}, expected_data} -> + assert don + idx == expected_don + assert parsed_data == expected_data + end) + end + test "returns error when packet is malformed" do assert {:error, :packet_malformed} == AP.parse(<<35_402::16, 0, 0, 0, 0, 0, 0, 1, 1, 2>>) end diff --git a/test/nal_formatters/fu_test.exs b/test/nal_formatters/fu_test.exs index 8da2285..ba5b0f4 100644 --- a/test/nal_formatters/fu_test.exs +++ b/test/nal_formatters/fu_test.exs @@ -29,7 +29,25 @@ defmodule Membrane.RTP.H265.FUTest do end) expected_result = FUFactory.glued_fixtures() ~> (<<_header::16, rest::binary>> -> rest) - assert result == {expected_result, 19} + assert result == {expected_result, 19, nil} + end + + test "parses packet with donl field" do + don = :rand.uniform(10_000) + + [first_fixture | rest] = FUFactory.get_all_fixtures() + fixtures = [FUFactory.add_donl_field(first_fixture, don) | rest] + + result = + fixtures + |> Enum.zip(1..Enum.count(fixtures)) + |> Enum.reduce(%FU{donl?: true}, fn {elem, seq_num}, acc -> + FU.parse(elem, seq_num, acc) + ~> ({_command, value} -> value) + end) + + expected_result = FUFactory.glued_fixtures() ~> (<<_header::16, rest::binary>> -> rest) + assert result == {expected_result, 19, don} end test "returns error when one of non edge packets dropped" do diff --git a/test/payloader_pipeline_test.exs b/test/payloader_pipeline_test.exs index 4878455..37d855e 100644 --- a/test/payloader_pipeline_test.exs +++ b/test/payloader_pipeline_test.exs @@ -83,7 +83,7 @@ defmodule Membrane.RTP.H265.PayloaderPipelineTest do type = NAL.Header.encode_type(:ap) assert <<0::1, ^type::6, 0::6, 0::3, rest::binary>> = data assert {:ok, glued} = AP.parse(rest) - assert glued == List.duplicate(single_unit, number_of_packets) + assert Enum.map(glued, &elem(&1, 0)) == List.duplicate(single_unit, number_of_packets) Membrane.Pipeline.terminate(pid, blocking?: true) end diff --git a/test/support/depayloader_testing_pipeline.ex b/test/support/depayloader_testing_pipeline.ex index 9bc3c34..5609249 100644 --- a/test/support/depayloader_testing_pipeline.ex +++ b/test/support/depayloader_testing_pipeline.ex @@ -7,7 +7,7 @@ defmodule Membrane.Support.DepayloaderTestingPipeline do alias Membrane.{RTP, Testing} alias Testing.Pipeline - @spec start_pipeline(any()) :: :ignore | {:error, any()} | {:ok, pid(), pid()} + @spec start_pipeline(any()) :: pid() def start_pipeline(data) do structure = [ child(:source, %Testing.Source{output: data, stream_format: %RTP{}}) diff --git a/test/support/formatters/ap_factory.ex b/test/support/formatters/ap_factory.ex index 73e512c..496d28c 100644 --- a/test/support/formatters/ap_factory.ex +++ b/test/support/formatters/ap_factory.ex @@ -2,7 +2,7 @@ defmodule Membrane.Support.Formatters.APFactory do @moduledoc false @spec sample_data() :: [binary()] def sample_data do - Enum.map(1..10, &<<&1>>) + Enum.map(3..10, &<<&1>>) end @spec binaries_into_ap([binary()]) :: binary() @@ -12,16 +12,36 @@ defmodule Membrane.Support.Formatters.APFactory do |> Enum.reduce(&(&2 <> &1)) end + @spec binaries_into_ap_with_don([binary()], non_neg_integer()) :: binary() + def binaries_into_ap_with_don(binaries, don) do + binaries + |> into_aggregation_units_with_don(don) + |> Enum.reduce(&(&2 <> &1)) + end + @spec sample_ap_header() :: <<_::16>> def sample_ap_header, do: <<0::1, 48::6, 0::6, 1::3>> @spec into_ap_unit([binary()]) :: binary() def into_ap_unit(data), do: sample_ap_header() <> binaries_into_ap(data) + @spec into_ap_unit_with_don([binary()], non_neg_integer()) :: binary() + def into_ap_unit_with_don(data, don), + do: sample_ap_header() <> binaries_into_ap_with_don(data, don) + # AP @spec into_aggregation_units([binary()]) :: [binary()] def into_aggregation_units(binaries), do: Enum.map(binaries, &<>) + @spec into_aggregation_units_with_don([binary()], non_neg_integer()) :: [binary()] + def into_aggregation_units_with_don(binaries, don) do + Enum.with_index(binaries, 1) + |> Enum.map(fn + {data, 1} -> <> + {data, _} -> <<0::8, byte_size(data)::16, data::binary>> + end) + end + @spec example_nalu_hdr() :: <<_::16>> def example_nalu_hdr, do: <<0::1, 1::6, 0::6, 1::3>> end diff --git a/test/support/formatters/fu_factory.ex b/test/support/formatters/fu_factory.ex index cc92abc..5198b14 100644 --- a/test/support/formatters/fu_factory.ex +++ b/test/support/formatters/fu_factory.ex @@ -22,6 +22,11 @@ defmodule Membrane.Support.Formatters.FUFactory do @spec last() :: binary() def last, do: get_fixture(@max_fixtures) + @spec add_donl_field(binary(), non_neg_integer()) :: binary() + def add_donl_field(<>, don) do + <> + end + @spec precede_with_fu_nal_header(binary()) :: binary def precede_with_fu_nal_header(data) when is_binary(data), do: <<0::1, 49::6, 0::6, 1::3>> <> data diff --git a/test/support/payloader_testing_pipeline.ex b/test/support/payloader_testing_pipeline.ex index b6f05a5..ad19c61 100644 --- a/test/support/payloader_testing_pipeline.ex +++ b/test/support/payloader_testing_pipeline.ex @@ -8,7 +8,7 @@ defmodule Membrane.Support.PayloaderTestingPipeline do alias Membrane.Testing alias Testing.Pipeline - @spec start_pipeline(any()) :: :ignore | {:error, any()} | {:ok, pid(), pid()} + @spec start_pipeline(any()) :: pid() def start_pipeline(data) do structure = [ child(:source, %Testing.Source{