Skip to content

Commit 1784d14

Browse files
authored
move latencies to config (#915)
* move latencies to config * fix imageQueueLatency * another miss * fix mux latency datatype
1 parent a2e4012 commit 1784d14

File tree

14 files changed

+65
-39
lines changed

14 files changed

+65
-39
lines changed

pkg/config/base.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,14 @@ type BaseConfig struct {
5555
Insecure bool `yaml:"insecure"` // allow chrome to connect to an insecure websocket
5656
Debug DebugConfig `yaml:"debug"` // create dot file on internal error
5757
ChromeFlags map[string]interface{} `yaml:"chrome_flags"` // additional flags to pass to Chrome
58+
Latency LatencyConfig `yaml:"latency"` // gstreamer latencies, modifying this may break the service
59+
}
60+
61+
type SessionLimits struct {
62+
FileOutputMaxDuration time.Duration `yaml:"file_output_max_duration"`
63+
StreamOutputMaxDuration time.Duration `yaml:"stream_output_max_duration"`
64+
SegmentOutputMaxDuration time.Duration `yaml:"segment_output_max_duration"`
65+
ImageOutputMaxDuration time.Duration `yaml:"image_output_max_duration"`
5866
}
5967

6068
type DebugConfig struct {
@@ -65,17 +73,11 @@ type DebugConfig struct {
6573
StorageConfig `yaml:",inline"` // upload config (S3, Azure, GCP, or AliOSS)
6674
}
6775

68-
type ProxyConfig struct {
69-
Url string `yaml:"url"`
70-
Username string `yaml:"username"`
71-
Password string `yaml:"password"`
72-
}
73-
74-
type SessionLimits struct {
75-
FileOutputMaxDuration time.Duration `yaml:"file_output_max_duration"`
76-
StreamOutputMaxDuration time.Duration `yaml:"stream_output_max_duration"`
77-
SegmentOutputMaxDuration time.Duration `yaml:"segment_output_max_duration"`
78-
ImageOutputMaxDuration time.Duration `yaml:"image_output_max_duration"`
76+
type LatencyConfig struct {
77+
JitterBufferLatency time.Duration
78+
AudioMixerLatency time.Duration
79+
PipelineLatency time.Duration
80+
AppSrcDrainTimeout time.Duration
7981
}
8082

8183
func (c *BaseConfig) initLogger(values ...interface{}) error {

pkg/config/pipeline.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,6 @@ import (
3737
lksdk "github.com/livekit/server-sdk-go/v2"
3838
)
3939

40-
const (
41-
JitterBufferLatency = time.Second * 2
42-
AudioMixerLatency = uint64(25e8)
43-
PipelineLatency = uint64(3e9)
44-
AppSrcDrainTimeout = time.Millisecond * 3500
45-
)
46-
4740
type PipelineConfig struct {
4841
BaseConfig `yaml:",inline"`
4942

pkg/config/service.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ const (
4343

4444
defaultIOCreateTimeout = time.Second * 15
4545
defaultIOUpdateTimeout = time.Second * 30
46+
47+
defaultJitterBufferLatency = time.Second * 2
48+
defaultAudioMixerLatency = time.Millisecond * 2500
49+
defaultPipelineLatency = time.Second * 3
50+
defaultAppSrcDrainTimeout = time.Millisecond * 3500
4651
)
4752

4853
type ServiceConfig struct {
@@ -149,4 +154,17 @@ func (c *ServiceConfig) InitDefaults() {
149154
if c.MaxUploadQueue <= 0 {
150155
c.MaxUploadQueue = maxUploadQueue
151156
}
157+
158+
if c.Latency.JitterBufferLatency == 0 {
159+
c.Latency.JitterBufferLatency = defaultJitterBufferLatency
160+
}
161+
if c.Latency.AudioMixerLatency == 0 {
162+
c.Latency.AudioMixerLatency = defaultAudioMixerLatency
163+
}
164+
if c.Latency.PipelineLatency == 0 {
165+
c.Latency.PipelineLatency = defaultPipelineLatency
166+
}
167+
if c.Latency.AppSrcDrainTimeout == 0 {
168+
c.Latency.AppSrcDrainTimeout = defaultAppSrcDrainTimeout
169+
}
152170
}

pkg/config/storage.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ type GCPConfig struct {
6262
ProxyConfig *ProxyConfig `yaml:"proxy_config"`
6363
}
6464

65+
type ProxyConfig struct {
66+
Url string `yaml:"url"`
67+
Username string `yaml:"username"`
68+
Password string `yaml:"password"`
69+
}
70+
6571
func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) (*StorageConfig, error) {
6672
sc := &StorageConfig{}
6773
if p.StorageConfig != nil {

pkg/gstreamer/bin.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type Bin struct {
3535
pipeline *gst.Pipeline
3636
mu sync.Mutex
3737
bin *gst.Bin
38-
latency uint64
38+
latency time.Duration
3939

4040
linkFunc func() error
4141
shouldLink func(string) bool

pkg/gstreamer/builder.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,20 @@
1515
package gstreamer
1616

1717
import (
18+
"time"
19+
1820
"github.com/go-gst/go-gst/gst"
1921

2022
"github.com/livekit/egress/pkg/errors"
2123
)
2224

23-
func BuildQueue(name string, latency uint64, leaky bool) (*gst.Element, error) {
25+
func BuildQueue(name string, latency time.Duration, leaky bool) (*gst.Element, error) {
2426
queue, err := gst.NewElementWithName("queue", name)
2527
if err != nil {
2628
return nil, errors.ErrGstPipelineError(err)
2729
}
2830
if latency > 0 {
29-
if err = queue.SetProperty("max-size-time", latency); err != nil {
31+
if err = queue.SetProperty("max-size-time", uint64(latency)); err != nil {
3032
return nil, errors.ErrGstPipelineError(err)
3133
}
3234
if err = queue.SetProperty("max-size-bytes", uint(0)); err != nil {

pkg/gstreamer/pipeline.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type Pipeline struct {
3838

3939
// A pipeline can have either elements or src and sink bins. If you add both you will get a wrong hierarchy error
4040
// Bins can contain both elements and src and sink bins
41-
func NewPipeline(name string, latency uint64, callbacks *Callbacks) (*Pipeline, error) {
41+
func NewPipeline(name string, latency time.Duration, callbacks *Callbacks) (*Pipeline, error) {
4242
pipeline, err := gst.NewPipeline(name)
4343
if err != nil {
4444
return nil, err

pkg/pipeline/builder/audio.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func BuildAudioBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error
7575
return err
7676
}
7777
} else {
78-
queue, err := gstreamer.BuildQueue("audio_queue", config.PipelineLatency, true)
78+
queue, err := gstreamer.BuildQueue("audio_queue", p.Latency.PipelineLatency, true)
7979
if err != nil {
8080
return errors.ErrGstPipelineError(err)
8181
}
@@ -275,7 +275,7 @@ func (b *AudioBin) addMixer() error {
275275
if err != nil {
276276
return errors.ErrGstPipelineError(err)
277277
}
278-
if err = audioMixer.SetProperty("latency", config.AudioMixerLatency); err != nil {
278+
if err = audioMixer.SetProperty("latency", uint64(b.conf.Latency.AudioMixerLatency)); err != nil {
279279
return errors.ErrGstPipelineError(err)
280280
}
281281

@@ -318,7 +318,7 @@ func (b *AudioBin) addEncoder() error {
318318
}
319319

320320
func addAudioConverter(b *gstreamer.Bin, p *config.PipelineConfig, channel int) error {
321-
audioQueue, err := gstreamer.BuildQueue("audio_input_queue", config.PipelineLatency, true)
321+
audioQueue, err := gstreamer.BuildQueue("audio_input_queue", p.Latency.PipelineLatency, true)
322322
if err != nil {
323323
return err
324324
}

pkg/pipeline/builder/image.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import (
2828
)
2929

3030
const (
31-
imageQueueLatency = uint64(200 * time.Millisecond)
31+
imageQueueLatency = 200 * time.Millisecond
3232
)
3333

3434
func BuildImageBin(c *config.ImageConfig, pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*gstreamer.Bin, error) {

pkg/pipeline/builder/stream.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type StreamBin struct {
3535
Bin *gstreamer.Bin
3636
OutputType types.OutputType
3737

38+
latency time.Duration
3839
mu sync.RWMutex
3940
pipeline *gstreamer.Pipeline
4041
}
@@ -50,7 +51,7 @@ type Stream struct {
5051
failed atomic.Bool
5152
}
5253

53-
func BuildStreamBin(pipeline *gstreamer.Pipeline, o *config.StreamConfig) (*StreamBin, error) {
54+
func BuildStreamBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig, o *config.StreamConfig) (*StreamBin, error) {
5455
b := pipeline.NewBin("stream")
5556

5657
var mux *gst.Element
@@ -68,7 +69,7 @@ func BuildStreamBin(pipeline *gstreamer.Pipeline, o *config.StreamConfig) (*Stre
6869
return nil, errors.ErrGstPipelineError(err)
6970
}
7071
// add latency to give time for flvmux to receive and order packets from both streams
71-
if err = mux.SetProperty("latency", config.PipelineLatency); err != nil {
72+
if err = mux.SetProperty("latency", uint64(p.Latency.PipelineLatency)); err != nil {
7273
return nil, errors.ErrGstPipelineError(err)
7374
}
7475

@@ -104,6 +105,7 @@ func BuildStreamBin(pipeline *gstreamer.Pipeline, o *config.StreamConfig) (*Stre
104105
sb := &StreamBin{
105106
Bin: b,
106107
OutputType: o.OutputType,
108+
latency: p.Latency.PipelineLatency,
107109
}
108110

109111
return sb, nil
@@ -113,7 +115,7 @@ func (sb *StreamBin) BuildStream(stream *config.Stream, framerate int32) (*Strea
113115
stream.Name = utils.NewGuid("")
114116
b := sb.Bin.NewBin(stream.Name)
115117

116-
queue, err := gstreamer.BuildQueue(fmt.Sprintf("queue_%s", stream.Name), config.PipelineLatency, true)
118+
queue, err := gstreamer.BuildQueue(fmt.Sprintf("queue_%s", stream.Name), sb.latency, true)
117119
if err != nil {
118120
return nil, errors.ErrGstPipelineError(err)
119121
}

0 commit comments

Comments
 (0)