Skip to content

Commit

Permalink
Parse packets with DONL and DOND fields (#3)
Browse files Browse the repository at this point in the history
* Parse packets with DONL and DOND fields

* Add tests
  • Loading branch information
gBillal authored Jul 29, 2023
1 parent 6d377f6 commit 1b52a77
Show file tree
Hide file tree
Showing 14 changed files with 236 additions and 38 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

[![Hex.pm](https://img.shields.io/hexpm/v/membrane_rtp_h265_plugin.svg)](https://hex.pm/packages/membrane_rtp_h265_plugin)
[![API Docs](https://img.shields.io/badge/api-docs-yellow.svg?style=flat)](https://hexdocs.pm/membrane_rtp_h265_plugin/)
[![CircleCI](https://circleci.com/gh/membraneframework/membrane_rtp_h265_plugin.svg?style=svg)](https://circleci.com/gh/membraneframework/membrane_rtp_h265_plugin)

RTP payloader and depayloader for H265.

Expand Down
61 changes: 48 additions & 13 deletions lib/depayloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Membrane.RTP.H265.Depayloader do
Based on [RFC 7798](https://tools.ietf.org/html/rfc7798).
Supported types: Single NALU, Fragementation Unit, Aggegration Packets.
Supported types: Single NALU, Fragmentation Unit, Aggegration Packets.
"""
use Membrane.Filter

Expand All @@ -24,14 +24,28 @@ defmodule Membrane.RTP.H265.Depayloader do
accepted_format: %H265.RemoteStream{alignment: :nalu},
demand_mode: :auto

def_options sprop_max_don_diff: [
spec: 0..32_767,
default: 0,
description: """
Specify the maximum absolute difference between the decoding order number (i.e. AbsDon)
values of any two NAL units naluA and naluB, where naluA follows naluB in decoding order
and precedes naluB in transmission order.
If this value is greater than 0, then two additional fields `DONL` and `DOND` will
be included in the RTP payload. A `decoding_order_number` field will be added to the
buffer metadata.
"""
]

defmodule State do
@moduledoc false
defstruct parser_acc: nil
defstruct parser_acc: nil, sprop_max_don_diff: 0
end

@impl true
def handle_init(_ctx, _opts) do
{[], %State{}}
def handle_init(_ctx, opts) do
{[], %State{sprop_max_don_diff: opts.sprop_max_don_diff}}
end

@impl true
Expand Down Expand Up @@ -67,19 +81,27 @@ defmodule Membrane.RTP.H265.Depayloader do
def handle_event(pad, event, context, state), do: super(pad, event, context, state)

defp handle_unit_type(:single_nalu, _nalu, buffer, state) do
result = buffer_output(buffer.payload, buffer, state)
{don, buffer} =
if state.sprop_max_don_diff > 0 do
<<don::16, payload::binary>> = buffer.payload
{don, %Buffer{buffer | payload: payload}}
else
{nil, buffer}
end

result = buffer_output(buffer.payload, buffer, don, state)
{:ok, result}
end

defp handle_unit_type(:fu, {header, data}, buffer, state) do
%Buffer{metadata: %{rtp: %{sequence_number: seq_num}}} = buffer

case FU.parse(data, seq_num, map_state_to_fu(state)) do
{:ok, {data, type}} ->
{:ok, {data, type, don}} ->
data =
NAL.Header.add_header(data, 0, type, header.nuh_layer_id, header.nuh_temporal_id_plus1)

result = buffer_output(data, buffer, %State{state | parser_acc: nil})
result = buffer_output(data, buffer, don, %State{state | parser_acc: nil})
{:ok, result}

{:incomplete, fu} ->
Expand All @@ -92,25 +114,35 @@ defmodule Membrane.RTP.H265.Depayloader do
end

defp handle_unit_type(:ap, {_header, data}, buffer, state) do
with {:ok, result} <- AP.parse(data) do
buffers = Enum.map(result, &%Buffer{buffer | payload: add_prefix(&1)})
with {:ok, nalus} <- AP.parse(data, state.sprop_max_don_diff > 0) do
buffers =
Enum.map(nalus, fn {nalu, don} ->
metadata = put_if(not is_nil(don), buffer.metadata, :decoding_order_number, don)
%Buffer{buffer | payload: add_prefix(nalu), metadata: metadata}
end)

result = {[buffer: {:output, buffers}], state}
{:ok, result}
end
end

defp buffer_output(data, buffer, state) do
{action_from_data(data, buffer), state}
defp buffer_output(data, buffer, don, state) do
{action_from_data(data, buffer, don), state}
end

defp action_from_data(data, buffer) do
defp action_from_data(data, buffer, nil) do
[buffer: {:output, %Buffer{buffer | payload: add_prefix(data)}}]
end

defp action_from_data(data, buffer, don) do
metadata = Map.put(buffer.metadata, :decoding_order_number, don)
[buffer: {:output, %Buffer{buffer | payload: add_prefix(data), metadata: metadata}}]
end

defp add_prefix(data), do: @frame_prefix <> data

defp map_state_to_fu(%State{parser_acc: %FU{} = fu}), do: fu
defp map_state_to_fu(_state), do: %FU{}
defp map_state_to_fu(state), do: %FU{donl?: state.sprop_max_don_diff > 0}

defp log_malformed_buffer(packet, reason) do
Membrane.Logger.warn("""
Expand All @@ -119,4 +151,7 @@ defmodule Membrane.RTP.H265.Depayloader do
Packet: #{inspect(packet, limit: :infinity)}
""")
end

defp put_if(true, map, key, value), do: Map.put(map, key, value)
defp put_if(false, map, _key, _value), do: map
end
59 changes: 53 additions & 6 deletions lib/nal_formats/ap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,71 @@ defmodule Membrane.RTP.H265.AP do
| :...OPTIONAL RTP padding |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
```
A packet width DONL
```
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| RTP Header |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| PayloadHdr (Type=48) | NALU 1 DONL |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| NALU 1 Size | NALU 1 HDR |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| |
| NALU 1 Data . . . |
| |
+ . . . +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+|
| NALU 2 DOND | NALU 2 Size |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| NALU 2 HDR | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ NALU 2 Data |
| |
| . . . +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| :...OPTIONAL RTP padding |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
```
"""
use Bunch

alias Membrane.RTP.H265.NAL

@spec parse(binary()) :: {:ok, [binary()]} | {:error, :packet_malformed}
def parse(data) do
do_parse(data, [])
@type don :: nil | non_neg_integer()

@spec parse(binary(), boolean()) :: {:ok, [{binary(), don()}]} | {:error, :packet_malformed}
def parse(data, donl? \\ false) do
if donl?,
do: do_parse(data, 0, []),
else: do_parse(data, [])
end

# Parse packet without DONL
defp do_parse(<<>>, acc), do: {:ok, Enum.reverse(acc)}

defp do_parse(<<size::16, nalu::binary-size(size), rest::binary>>, acc) do
do_parse(rest, [nalu | acc])
end
defp do_parse(<<size::16, nalu::binary-size(size), rest::binary>>, acc),
do: do_parse(rest, [{nalu, nil} | acc])

defp do_parse(_data, _acc), do: {:error, :packet_malformed}

# Parse packets with DONL
defp do_parse(<<>>, _last_don, acc), do: {:ok, Enum.reverse(acc)}

defp do_parse(
<<donl::16, size::16, nalu::binary-size(size), rest::binary>>,
_last_don,
[] = acc
) do
do_parse(rest, donl, [{nalu, donl} | acc])
end

defp do_parse(<<dond::8, size::16, nalu::binary-size(size), rest::binary>>, last_don, acc) do
don = rem(last_don + dond + 1, 65_536)
do_parse(rest, don, [{nalu, don} | acc])
end

defp do_parse(_data, _last_don, _acc), do: {:error, :packet_malformed}

@spec aggregation_unit_size(binary()) :: pos_integer()
def aggregation_unit_size(nalu), do: byte_size(nalu) + 2

Expand Down
26 changes: 18 additions & 8 deletions lib/nal_formats/fu.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@ defmodule Membrane.RTP.H265.FU do
alias __MODULE__
alias Membrane.RTP.H265.NAL

defstruct [:last_seq_num, data: [], type: nil]
defstruct [:last_seq_num, data: [], type: nil, donl?: false, don: nil]

@type don :: nil | non_neg_integer()

@type t :: %__MODULE__{
data: [binary()],
last_seq_num: nil | non_neg_integer(),
type: NAL.Header.type()
type: NAL.Header.type(),
donl?: boolean(),
don: don()
}

defguardp is_next(last_seq_num, next_seq_num) when rem(last_seq_num + 1, 65_536) == next_seq_num
Expand All @@ -21,11 +25,12 @@ defmodule Membrane.RTP.H265.FU do
If a packet that is being parsed is not considered last then a `{:incomplete, t()}`
tuple will be returned.
In case of last packet `{:ok, {type, data}}` tuple will be returned, where data
is `NAL Unit` created by concatenating subsequent Fragmentation Units.
In case of last packet `{:ok, {type, data, don}}` tuple will be returned, where data
is `NAL Unit` created by concatenating subsequent Fragmentation Units and `don` is the
decoding order number of the `NAL unit` in case `donl` field is present in the packet.
"""
@spec parse(binary(), non_neg_integer(), t) ::
{:ok, {binary(), NAL.Header.type()}}
{:ok, {binary(), NAL.Header.type(), don()}}
| {:error, :packet_malformed | :invalid_first_packet}
| {:incomplete, t()}
def parse(data, seq_num, acc) do
Expand Down Expand Up @@ -79,24 +84,29 @@ defmodule Membrane.RTP.H265.FU do

defp do_parse(header, data, seq_num, acc)

defp do_parse(%FU.Header{start_bit: true, type: type}, data, seq_num, acc),
defp do_parse(%FU.Header{start_bit: true, type: type}, data, seq_num, %{donl?: false} = acc),
do: {:incomplete, %__MODULE__{acc | data: [data], last_seq_num: seq_num, type: type}}

defp do_parse(%FU.Header{start_bit: true, type: type}, <<don::16, data::binary>>, seq_num, acc) do
{:incomplete, %__MODULE__{acc | data: [data], last_seq_num: seq_num, type: type, don: don}}
end

defp do_parse(%FU.Header{start_bit: false}, _data, _seq_num, %__MODULE__{last_seq_num: nil}),
do: {:error, :invalid_first_packet}

defp do_parse(%FU.Header{end_bit: true}, data, seq_num, %__MODULE__{
data: acc,
last_seq_num: last,
type: type
type: type,
don: don
})
when is_next(last, seq_num) do
result =
[data | acc]
|> Enum.reverse()
|> Enum.join()

{:ok, {result, type}}
{:ok, {result, type, don}}
end

defp do_parse(_header, data, seq_num, %__MODULE__{data: acc, last_seq_num: last} = fu)
Expand Down
4 changes: 2 additions & 2 deletions lib/payloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ defmodule Membrane.RTP.H265.Payloader do
description: """
In `:single_nalu` mode, payloader puts exactly one NAL unit
into each payload, altering only RTP metadata. `:non_interleaved`
mode handles also FU-A and STAP-A packetization. See
[RFC 6184](https://tools.ietf.org/html/rfc7798) for details.
mode handles also FU and AP packetization. See
[RFC 7798](https://tools.ietf.org/html/rfc7798) for details.
"""
]

Expand Down
4 changes: 3 additions & 1 deletion test/depayloader_pipeline_test.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
defmodule Membrane.RTP.H265.DepayloaderPipelineTest do
@moduledoc false

use ExUnit.Case

import Membrane.Testing.Assertions
Expand All @@ -9,7 +11,7 @@ defmodule Membrane.RTP.H265.DepayloaderPipelineTest do
alias Membrane.Testing.Source

describe "Depayloader in a pipeline" do
test "does not crash when parsing staps" do
test "does not crash when parsing AP" do
pid =
APFactory.sample_data()
|> Enum.chunk_every(2)
Expand Down
47 changes: 46 additions & 1 deletion test/depayloader_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ defmodule Membrane.RTP.H265.DepayloaderTest do
alias Membrane.Support.Formatters.{APFactory, FUFactory, RBSPNaluFactory}

@empty_state %Depayloader.State{}
@don_state %Depayloader.State{sprop_max_don_diff: 1}

describe "Depayloader when processing data" do
test "passes through packets with type 0..9 and 16..21 (RBSP types)" do
test "passes through packets with type 0..47 (RBSP types)" do
data = RBSPNaluFactory.sample_nalu()
buffer = %Buffer{payload: data}

Expand Down Expand Up @@ -43,6 +44,30 @@ defmodule Membrane.RTP.H265.DepayloaderTest do
assert data == <<1::32, FUFactory.glued_fixtures()::binary>>
end

test "parses FU packets with donl" do
assert {actions, @don_state} =
FUFactory.get_all_fixtures()
|> then(&[FUFactory.add_donl_field(hd(&1), 1_000) | tl(&1)])
|> Enum.map(&FUFactory.precede_with_fu_nal_header/1)
~> (enum -> Enum.zip(enum, 1..Enum.count(enum)))
|> Enum.map(fn {elem, seq_num} ->
%Buffer{payload: elem, metadata: %{rtp: %{sequence_number: seq_num}}}
end)
|> Enum.reduce(@don_state, fn buffer, prev_state ->
Depayloader.handle_process(:input, buffer, nil, prev_state)
~> (
{[], %Depayloader.State{} = state} -> state
{actions, state} -> {actions, state}
)
end)

assert {:output, %Buffer{payload: data, metadata: metadata}} =
Keyword.fetch!(actions, :buffer)

assert data == <<1::32, FUFactory.glued_fixtures()::binary>>
assert metadata.decoding_order_number == 1_000
end

test "parses AP packets" do
data = APFactory.sample_data()

Expand All @@ -59,6 +84,26 @@ defmodule Membrane.RTP.H265.DepayloaderTest do
assert <<1::32, ^original_data::binary>> = result_data
end)
end

test "parses AP packets with donl and dond" do
data = APFactory.sample_data()
don = :rand.uniform(10_000)

buffer = %Buffer{payload: APFactory.into_ap_unit_with_don(data, don)}

assert {actions, _state} = Depayloader.handle_process(:input, buffer, nil, @don_state)

assert [buffer: {:output, buffers}] = actions

buffers
|> Enum.zip(data)
|> Enum.with_index(0)
|> Enum.each(fn {{result, original_data}, index} ->
assert %Buffer{payload: result_data, metadata: metadata} = result
assert <<1::32, ^original_data::binary>> = result_data
assert metadata.decoding_order_number == don + index
end)
end
end

describe "Depayloader when handling events" do
Expand Down
Loading

0 comments on commit 1b52a77

Please sign in to comment.