Skip to content

Commit 3c49840

Browse files
committed
Add playout delay header interceptor
This interceptor adds the playout delay header extension following http://www.webrtc.org/experiments/rtp-hdrext/playout-delay
1 parent a82b843 commit 3c49840

File tree

4 files changed

+142
-0
lines changed

4 files changed

+142
-0
lines changed

AUTHORS.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,6 @@ boks1971 <[email protected]>
1515
David Zhao <[email protected]>
1616
Jonathan Müller <[email protected]>
1717
Kevin Caffrey <[email protected]>
18+
Kevin Wang <[email protected]>
1819
Mathis Engelbart <[email protected]>
1920
Sean DuBois <[email protected]>
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package playoutdelay
2+
3+
import (
4+
"time"
5+
6+
"github.com/pion/interceptor"
7+
"github.com/pion/rtp"
8+
)
9+
10+
// HeaderExtensionInterceptorFactory is a interceptor.Factory for a HeaderExtensionInterceptor
11+
type HeaderExtensionInterceptorFactory struct{}
12+
13+
const (
14+
playoutDelayMaxMs = 40950
15+
)
16+
17+
// NewInterceptor constructs a new HeaderExtensionInterceptor
18+
func (h *HeaderExtensionInterceptorFactory) NewInterceptor(id string, minDelay, maxDelay time.Duration) (interceptor.Interceptor, error) {
19+
if minDelay.Milliseconds() < 0 || minDelay.Milliseconds() > playoutDelayMaxMs || maxDelay.Milliseconds() < 0 || maxDelay.Milliseconds() > playoutDelayMaxMs {
20+
return nil, errPlayoutDelayInvalidValue
21+
}
22+
return &HeaderExtensionInterceptor{minDelay: uint16(minDelay.Milliseconds() / 10), maxDelay: uint16(maxDelay.Milliseconds() / 10)}, nil
23+
}
24+
25+
// NewHeaderExtensionInterceptor returns a HeaderExtensionInterceptorFactory
26+
func NewHeaderExtensionInterceptor() (*HeaderExtensionInterceptorFactory, error) {
27+
return &HeaderExtensionInterceptorFactory{}, nil
28+
}
29+
30+
// HeaderExtensionInterceptor adds transport wide sequence numbers as header extension to each RTP packet
31+
type HeaderExtensionInterceptor struct {
32+
interceptor.NoOp
33+
minDelay, maxDelay uint16
34+
}
35+
36+
const playoutDelayURI = "http://www.webrtc.org/experiments/rtp-hdrext/playout-delay"
37+
38+
// BindLocalStream returns a writer that adds a rtp.PlayoutDelayExtension
39+
// header with increasing sequence numbers to each outgoing packet.
40+
func (h *HeaderExtensionInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
41+
var hdrExtID uint8
42+
for _, e := range info.RTPHeaderExtensions {
43+
if e.URI == playoutDelayURI {
44+
hdrExtID = uint8(e.ID)
45+
break
46+
}
47+
}
48+
if hdrExtID == 0 { // Don't add header extension if ID is 0, because 0 is an invalid extension ID
49+
return writer
50+
}
51+
return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
52+
ext, err := (&rtp.PlayoutDelayExtension{
53+
minDelay: h.minDelay,
54+
maxDelay: h.maxDelay,
55+
}).Marshal()
56+
if err != nil {
57+
return 0, err
58+
}
59+
err = header.SetExtension(hdrExtID, ext)
60+
if err != nil {
61+
return 0, err
62+
}
63+
return writer.Write(header, payload, attributes)
64+
})
65+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package playoutdelay
2+
3+
import (
4+
"sync"
5+
"testing"
6+
"time"
7+
8+
"github.com/pion/interceptor"
9+
"github.com/pion/interceptor/internal/test"
10+
"github.com/pion/rtp"
11+
"github.com/stretchr/testify/assert"
12+
)
13+
14+
func TestHeaderExtensionInterceptor(t *testing.T) {
15+
t.Run("add playout delay to each packet", func(t *testing.T) {
16+
factory, err := NewHeaderExtensionInterceptor()
17+
assert.NoError(t, err)
18+
19+
inter, err := factory.NewInterceptor("", 10*time.Millisecond, 20*time.Millisecond)
20+
assert.NoError(t, err)
21+
22+
pChan := make(chan *rtp.Packet, 10*5)
23+
go func() {
24+
// start some parallel streams using the same interceptor to test for race conditions
25+
var wg sync.WaitGroup
26+
num := 10
27+
wg.Add(num)
28+
for i := 0; i < num; i++ {
29+
go func(ch chan *rtp.Packet, id uint16) {
30+
stream := test.NewMockStream(&interceptor.StreamInfo{RTPHeaderExtensions: []interceptor.RTPHeaderExtension{
31+
{
32+
URI: playoutDelayURI,
33+
ID: 1,
34+
},
35+
}}, inter)
36+
defer func() {
37+
wg.Done()
38+
assert.NoError(t, stream.Close())
39+
}()
40+
41+
for _, seqNum := range []uint16{id * 1, id * 2, id * 3, id * 4, id * 5} {
42+
assert.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}}))
43+
select {
44+
case p := <-stream.WrittenRTP():
45+
assert.Equal(t, seqNum, p.SequenceNumber)
46+
ch <- p
47+
case <-time.After(10 * time.Millisecond):
48+
panic("written rtp packet not found")
49+
}
50+
}
51+
}(pChan, uint16(i+1))
52+
}
53+
wg.Wait()
54+
close(pChan)
55+
}()
56+
57+
for p := range pChan {
58+
extensionHeader := p.GetExtension(1)
59+
ext := &rtp.PlayoutDelayExtension{}
60+
err = ext.Unmarshal(extensionHeader)
61+
assert.NoError(t, err)
62+
assert.Equal(t, uint16(1), ext.minDelay)
63+
assert.Equal(t, uint16(2), ext.maxDelay)
64+
}
65+
})
66+
}

pkg/playoutdelay/playout_delay.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
// Package playoutdelay implements the playout delay header extension.
2+
package playoutdelay
3+
4+
import (
5+
"errors"
6+
)
7+
8+
var (
9+
errPlayoutDelayInvalidValue = errors.New("invalid playout delay value")
10+
)

0 commit comments

Comments
 (0)