Skip to content

Commit 9d2899e

Browse files
committed
Implement pacing interceptor
1 parent 8492094 commit 9d2899e

File tree

5 files changed

+394
-1
lines changed

5 files changed

+394
-1
lines changed

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
module github.com/pion/interceptor
22

3-
go 1.21
3+
go 1.24.0
44

55
require (
66
github.com/pion/logging v0.2.4
77
github.com/pion/rtcp v1.2.16
88
github.com/pion/rtp v1.8.24
99
github.com/pion/transport/v3 v3.0.8
1010
github.com/stretchr/testify v1.11.1
11+
golang.org/x/time v0.14.0
1112
)
1213

1314
require (

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu
1616
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
1717
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
1818
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
19+
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
20+
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
1921
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
2022
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
2123
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

pkg/pacing/interceptor.go

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
2+
// SPDX-License-Identifier: MIT
3+
4+
// Package pacing implements a pacing interceptor.
5+
package pacing
6+
7+
import (
8+
"errors"
9+
"log/slog"
10+
"maps"
11+
"sync"
12+
"time"
13+
14+
"github.com/pion/interceptor"
15+
"github.com/pion/logging"
16+
"github.com/pion/rtp"
17+
)
18+
19+
var (
20+
errPacerClosed = errors.New("pacer closed")
21+
errPacerOverflow = errors.New("pacer queue overflow")
22+
)
23+
24+
type pacerFactory func(initialRate, burst int) pacer
25+
26+
type pacer interface {
27+
SetRate(rate, burst int)
28+
Budget(time.Time) float64
29+
AllowN(time.Time, int) bool
30+
}
31+
32+
// Option is a configuration option for pacing interceptors.
33+
type Option func(*Interceptor) error
34+
35+
// InitialRate configures the initial pacing rate for interceptors created by
36+
// the interceptor factory.
37+
func InitialRate(rate int) Option {
38+
return func(i *Interceptor) error {
39+
i.initialRate = rate
40+
41+
return nil
42+
}
43+
}
44+
45+
// Interval configures the pacing interval for interceptors created by the
46+
// interceptor factory.
47+
func Interval(interval time.Duration) Option {
48+
return func(i *Interceptor) error {
49+
i.interval = interval
50+
51+
return nil
52+
}
53+
}
54+
55+
func setPacerFactory(f pacerFactory) Option {
56+
return func(i *Interceptor) error {
57+
i.pacerFactory = f
58+
59+
return nil
60+
}
61+
}
62+
63+
// InterceptorFactory is a factory for pacing interceptors. It also keeps a map
64+
// of interceptors created in the past by ID.
65+
type InterceptorFactory struct {
66+
lock sync.Mutex
67+
opts []Option
68+
interceptors map[string]*Interceptor
69+
}
70+
71+
// NewInterceptor returns a new InterceptorFactory.
72+
func NewInterceptor(opts ...Option) *InterceptorFactory {
73+
return &InterceptorFactory{
74+
lock: sync.Mutex{},
75+
opts: opts,
76+
interceptors: map[string]*Interceptor{},
77+
}
78+
}
79+
80+
// SetRate updates the pacing rate of the pacing interceptor with the given ID.
81+
func (f *InterceptorFactory) SetRate(id string, r int) {
82+
f.lock.Lock()
83+
defer f.lock.Unlock()
84+
85+
i, ok := f.interceptors[id]
86+
if !ok {
87+
return
88+
}
89+
i.setRate(r)
90+
}
91+
92+
// NewInterceptor creates a new pacing interceptor.
93+
func (f *InterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) {
94+
f.lock.Lock()
95+
defer f.lock.Unlock()
96+
97+
interceptor := &Interceptor{
98+
NoOp: interceptor.NoOp{},
99+
log: logging.NewDefaultLoggerFactory().NewLogger("pacer_interceptor"),
100+
initialRate: 1_000_000,
101+
interval: 5 * time.Millisecond,
102+
queueSize: 1_000_000,
103+
pacerFactory: func(initialRate, burst int) pacer {
104+
return newRateLimitPacer(initialRate, burst)
105+
},
106+
limit: nil,
107+
queue: nil,
108+
closed: make(chan struct{}),
109+
wg: sync.WaitGroup{},
110+
}
111+
for _, opt := range f.opts {
112+
if err := opt(interceptor); err != nil {
113+
return nil, err
114+
}
115+
}
116+
interceptor.limit = interceptor.pacerFactory(
117+
interceptor.initialRate,
118+
burst(interceptor.initialRate, interceptor.interval),
119+
)
120+
interceptor.queue = make(chan packet, interceptor.queueSize)
121+
122+
f.interceptors[id] = interceptor
123+
124+
interceptor.wg.Add(1)
125+
go func() {
126+
defer interceptor.wg.Done()
127+
interceptor.loop()
128+
}()
129+
130+
return interceptor, nil
131+
}
132+
133+
// Interceptor implements packet pacing using a token bucket filter and sends
134+
// packets at a fixed interval.
135+
type Interceptor struct {
136+
interceptor.NoOp
137+
log logging.LeveledLogger
138+
139+
// config
140+
initialRate int
141+
interval time.Duration
142+
queueSize int
143+
pacerFactory pacerFactory
144+
145+
// limiter and queue
146+
limit pacer
147+
queue chan packet
148+
149+
// shutdown
150+
closed chan struct{}
151+
wg sync.WaitGroup
152+
}
153+
154+
// burst calculates the minimal burst size required to reach the given rate and
155+
// pacing interval.
156+
func burst(rate int, interval time.Duration) int {
157+
if interval == 0 {
158+
interval = time.Millisecond
159+
}
160+
f := float64(time.Second.Milliseconds() / interval.Milliseconds())
161+
162+
return 8 * int(float64(rate)/f)
163+
}
164+
165+
// setRate updates the pacing rate and burst of the rate limiter.
166+
func (i *Interceptor) setRate(r int) {
167+
i.limit.SetRate(r, burst(r, i.interval))
168+
}
169+
170+
// BindLocalStream implements interceptor.Interceptor.
171+
func (i *Interceptor) BindLocalStream(
172+
info *interceptor.StreamInfo,
173+
writer interceptor.RTPWriter,
174+
) interceptor.RTPWriter {
175+
return interceptor.RTPWriterFunc(func(
176+
header *rtp.Header,
177+
payload []byte,
178+
attributes interceptor.Attributes,
179+
) (int, error) {
180+
hdr := header.Clone()
181+
pay := make([]byte, len(payload))
182+
copy(pay, payload)
183+
attr := maps.Clone(attributes)
184+
select {
185+
case i.queue <- packet{
186+
writer: writer,
187+
header: &hdr,
188+
payload: pay,
189+
attributes: attr,
190+
}:
191+
case <-i.closed:
192+
return 0, errPacerClosed
193+
default:
194+
return 0, errPacerOverflow
195+
}
196+
197+
return header.MarshalSize() + len(payload), nil
198+
})
199+
}
200+
201+
// Close implements interceptor.Interceptor.
202+
func (i *Interceptor) Close() error {
203+
defer i.wg.Done()
204+
205+
return nil
206+
}
207+
208+
func (i *Interceptor) loop() {
209+
ticker := time.NewTicker(i.interval)
210+
queue := make([]packet, 0)
211+
for {
212+
select {
213+
case now := <-ticker.C:
214+
for len(queue) > 0 && i.limit.Budget(now) > 8*float64(queue[0].len()) {
215+
i.limit.AllowN(now, 8*queue[0].len())
216+
var next packet
217+
next, queue = queue[0], queue[1:]
218+
if _, err := next.writer.Write(next.header, next.payload, next.attributes); err != nil {
219+
slog.Warn("error on writing RTP packet", "error", err)
220+
}
221+
}
222+
case pkt := <-i.queue:
223+
queue = append(queue, pkt)
224+
case <-i.closed:
225+
return
226+
}
227+
}
228+
}
229+
230+
type packet struct {
231+
writer interceptor.RTPWriter
232+
header *rtp.Header
233+
payload []byte
234+
attributes interceptor.Attributes
235+
}
236+
237+
func (p *packet) len() int {
238+
return p.header.MarshalSize() + len(p.payload)
239+
}

pkg/pacing/interceptor_test.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package pacing
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/pion/interceptor"
8+
"github.com/pion/interceptor/internal/test"
9+
"github.com/pion/rtp"
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
type mockPacer struct {
14+
rate int
15+
burst int
16+
17+
allow bool
18+
allowCalled bool
19+
budget float64
20+
budgetCalled bool
21+
}
22+
23+
// AllowN implements pacer.
24+
func (m *mockPacer) AllowN(time.Time, int) bool {
25+
m.allowCalled = true
26+
27+
return m.allow
28+
}
29+
30+
// Budget implements pacer.
31+
func (m *mockPacer) Budget(time.Time) float64 {
32+
m.budgetCalled = true
33+
34+
return m.budget
35+
}
36+
37+
// SetRate implements pacer.
38+
func (m *mockPacer) SetRate(rate int, burst int) {
39+
m.rate = rate
40+
m.burst = burst
41+
}
42+
43+
func TestInterceptor(t *testing.T) {
44+
t.Run("calls_set_rate", func(t *testing.T) {
45+
mp := &mockPacer{}
46+
i := NewInterceptor(
47+
setPacerFactory(func(initialRate, burst int) pacer {
48+
return mp
49+
}),
50+
)
51+
52+
_, err := i.NewInterceptor("")
53+
assert.NoError(t, err)
54+
55+
i.SetRate("", 1_000_000)
56+
assert.Equal(t, 1_000_000, mp.rate)
57+
assert.Equal(t, 40_000, mp.burst)
58+
})
59+
60+
t.Run("paces_packets", func(t *testing.T) {
61+
mp := &mockPacer{
62+
rate: 0,
63+
burst: 0,
64+
allow: false,
65+
allowCalled: false,
66+
budget: 0,
67+
budgetCalled: false,
68+
}
69+
i := NewInterceptor(
70+
setPacerFactory(func(initialRate, burst int) pacer {
71+
return mp
72+
}),
73+
Interval(time.Millisecond),
74+
)
75+
76+
pacer, err := i.NewInterceptor("")
77+
assert.NoError(t, err)
78+
79+
stream := test.NewMockStream(&interceptor.StreamInfo{}, pacer)
80+
defer func() {
81+
assert.NoError(t, stream.Close())
82+
}()
83+
84+
mp.allow = true
85+
mp.budget = 8 * 1500
86+
87+
hdr := rtp.Header{}
88+
err = stream.WriteRTP(&rtp.Packet{
89+
Header: hdr,
90+
Payload: make([]byte, 1200-hdr.MarshalSize()),
91+
})
92+
time.Sleep(2 * time.Millisecond)
93+
assert.NoError(t, err)
94+
assert.True(t, mp.allowCalled)
95+
assert.True(t, mp.budgetCalled)
96+
97+
select {
98+
case <-stream.WrittenRTP():
99+
default:
100+
assert.Fail(t, "no RTP packet written")
101+
}
102+
103+
mp.allow = false
104+
mp.budget = 0
105+
106+
hdr = rtp.Header{}
107+
err = stream.WriteRTP(&rtp.Packet{
108+
Header: hdr,
109+
Payload: make([]byte, 1200-hdr.MarshalSize()),
110+
})
111+
assert.NoError(t, err)
112+
assert.True(t, mp.allowCalled)
113+
assert.True(t, mp.budgetCalled)
114+
115+
select {
116+
case <-stream.WrittenRTP():
117+
assert.Fail(t, "RTP packet written without pacing budget")
118+
default:
119+
}
120+
})
121+
}

0 commit comments

Comments
 (0)