Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 76 additions & 3 deletions cmd/livekit-cli/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package main
import (
"fmt"
"io"
"log"
"net"
"os"
"os/signal"
Expand All @@ -26,13 +27,17 @@ import (
"time"

"github.com/pion/rtcp"
"github.com/pion/rtp/codecs"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media/oggwriter"
"github.com/urfave/cli/v2"
"go.uber.org/atomic"

provider2 "github.com/livekit/livekit-cli/pkg/provider"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
lksdk "github.com/livekit/server-sdk-go/v2"
"github.com/livekit/server-sdk-go/v2/pkg/samplebuilder"
)

var (
Expand All @@ -55,6 +60,11 @@ var (
"can be used multiple times to publish multiple files. " +
"can publish from Unix or TCP socket using the format `codec://socket_name` or `codec://host:address` respectively. Valid codecs are h264, vp8, opus",
},
&cli.StringFlag{
Name: "subscribe-audio",
Usage: "subscribe to audio tracks and write to a local TCP socket." +
"pass the directory to place the socket file. For example --subscribe-audio /tmp/my-audio",
},
&cli.Float64Flag{
Name: "fps",
Usage: "if video files are published, indicates FPS of video",
Expand All @@ -76,12 +86,12 @@ func joinRoom(c *cli.Context) error {
return err
}

audioPath := c.String("subscribe-audio")
done := make(chan os.Signal, 1)
roomCB := &lksdk.RoomCallback{
ParticipantCallback: lksdk.ParticipantCallback{
OnDataReceived: func(data []byte, params lksdk.DataReceiveParams) {
identity := params.SenderIdentity
logger.Infow("received data", "data", data, "participant", identity)
OnDataPacket: func(data lksdk.DataPacket, params lksdk.DataReceiveParams) {
logger.Infow("received data", "data", data.ToProto(), "participant", params.SenderIdentity)
},
OnConnectionQualityChanged: func(update *livekit.ConnectionQualityInfo, p lksdk.Participant) {
logger.Debugw("connection quality changed", "participant", p.Identity(), "quality", update.Quality)
Expand All @@ -93,6 +103,9 @@ func joinRoom(c *cli.Context) error {
"source", pub.Source(),
"participant", participant.Identity(),
)
if pub.Kind() == lksdk.TrackKindAudio && audioPath != "" {
go writeTrackToSocket(audioPath, participant.Identity(), pub.SID(), track)
}
},
OnTrackUnsubscribed: func(track *webrtc.TrackRemote, pub *lksdk.RemoteTrackPublication, participant *lksdk.RemoteParticipant) {
logger.Infow("track unsubscribed",
Expand Down Expand Up @@ -365,3 +378,63 @@ func publishReader(room *lksdk.Room,
}
return nil
}

func writeTrackToSocket(destPath, identity, trackID string, track *webrtc.TrackRemote) {
if err := os.MkdirAll(destPath, 0755); err != nil {
log.Fatalf("Error creating directory: %v", err)
}
socketFile := fmt.Sprintf("%s/%s_%s.sock", destPath, identity, trackID)

if _, err := os.Stat(socketFile); err == nil {
os.Remove(socketFile)
}

listener, err := net.Listen("unix", socketFile)
if err != nil {
log.Fatalf("could not listen to unix socket: %v", err)
}
defer listener.Close()

connPtr := atomic.NewPointer[net.Conn](nil)
writerPtr := atomic.NewPointer[oggwriter.OggWriter](nil)
go func() {
conn, err := listener.Accept()
if err != nil {
log.Fatalf("could not accept connection: %v", err)
}
connPtr.Store(&conn)
writer, err := oggwriter.NewWith(conn, 48000, track.Codec().Channels)
if err != nil {
log.Fatalf("could not create Ogg writer: %v", err)
}
writerPtr.Store(writer)
}()
defer func() {
if conn := connPtr.Load(); conn != nil {
(*conn).Close()
}
}()

// start reading from the track, but only write if TCP socket is being read
sb := samplebuilder.New(200, &codecs.OpusPacket{}, 48000)
var writer *oggwriter.OggWriter
for {
packet, _, err := track.ReadRTP()
if err != nil {
break
}
if writer == nil {
writer = writerPtr.Load()
if writer != nil {
fmt.Printf("socket %v connected, writing track %v", socketFile, trackID)
}
}

sb.Push(packet)
for _, p := range sb.PopPackets() {
if writer != nil {
writer.WriteRTP(p)
}
}
}
}
29 changes: 14 additions & 15 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ toolchain go1.22.2
require (
github.com/frostbyte73/core v0.0.10
github.com/go-logr/logr v1.4.2
github.com/livekit/protocol v1.15.0
github.com/livekit/server-sdk-go/v2 v2.1.3-0.20240507072004-e3121c9908be
github.com/livekit/protocol v1.17.1-0.20240606023900-429fec77a69b
github.com/livekit/server-sdk-go/v2 v2.1.3
github.com/manifoldco/promptui v0.9.0
github.com/olekukonko/tablewriter v0.0.5
github.com/pion/rtcp v1.2.14
github.com/pion/rtp v1.8.5
github.com/pion/rtp v1.8.6
github.com/pion/webrtc/v3 v3.2.40
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c
github.com/pkg/errors v0.9.1
Expand Down Expand Up @@ -45,29 +45,29 @@ require (
github.com/gorilla/websocket v1.5.1 // indirect
github.com/jxskiss/base62 v1.1.0 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/lithammer/shortuuid/v4 v4.0.0 // indirect
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 // indirect
github.com/livekit/mediatransportutil v0.0.0-20240501132628-6105557bbb9a // indirect
github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4 // indirect
github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5 // indirect
github.com/magefile/mage v1.15.0 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/nats-io/nats.go v1.33.1 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pion/datachannel v1.5.5 // indirect
github.com/pion/dtls/v2 v2.2.10 // indirect
github.com/pion/datachannel v1.5.6 // indirect
github.com/pion/dtls/v2 v2.2.11 // indirect
github.com/pion/ice/v2 v2.3.24 // indirect
github.com/pion/interceptor v0.1.27 // indirect
github.com/pion/interceptor v0.1.29 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/mdns v0.0.12 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/sctp v1.8.16 // indirect
github.com/pion/sdp/v3 v3.0.9 // indirect
github.com/pion/srtp/v2 v2.0.18 // indirect
github.com/pion/stun v0.6.1 // indirect
github.com/pion/transport/v2 v2.2.4 // indirect
github.com/pion/turn/v2 v2.1.4 // indirect
github.com/pion/transport/v2 v2.2.5 // indirect
github.com/pion/turn/v2 v2.1.6 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
Expand All @@ -81,11 +81,10 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
go.uber.org/zap/exp v0.2.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/exp v0.0.0-20240604190554-fc45aab8b7f8 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sys v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240228224816-df926f6c8641 // indirect
google.golang.org/grpc v1.63.2 // indirect
)
Loading