Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/auth/recordingmetadata/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package recordingmetadata
import (
"context"
"sync"
"time"

"github.com/gravitational/teleport/lib/session"
)
Expand Down Expand Up @@ -75,6 +76,6 @@ type noopRecordingMetadata struct{}

// ProcessSessionRecording is a no-op implementation of the
// [Service.ProcessSessionRecording] method.
func (n noopRecordingMetadata) ProcessSessionRecording(ctx context.Context, sessionID session.ID) error {
func (n noopRecordingMetadata) ProcessSessionRecording(ctx context.Context, sessionID session.ID, duration time.Duration) error {
return nil
}
199 changes: 130 additions & 69 deletions lib/auth/recordingmetadata/recordingmetadatav1/recordingmetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"context"
"io"
"log/slog"
"math"
"math/rand/v2"
"sync"
"time"

"github.com/gravitational/trace"
Expand Down Expand Up @@ -103,7 +105,7 @@ func NewRecordingMetadataService(cfg RecordingMetadataServiceConfig) (*Recording

// ProcessSessionRecording processes the session recording associated with the provided session ID.
// It streams session events, generates metadata, and uploads thumbnails and metadata.
func (s *RecordingMetadataService) ProcessSessionRecording(ctx context.Context, sessionID session.ID) error {
func (s *RecordingMetadataService) ProcessSessionRecording(ctx context.Context, sessionID session.ID, duration time.Duration) error {
sessionsPendingMetric.Inc()

if err := s.concurrencyLimiter.Acquire(ctx, 1); err != nil {
Expand All @@ -125,8 +127,8 @@ func (s *RecordingMetadataService) ProcessSessionRecording(ctx context.Context,
var startTime time.Time
var lastEvent apievents.AuditEvent
var lastActivityTime time.Time
var lastThumbnailTime time.Time

thumbnailInterval := 1 * time.Second
activeUsers := make(map[string]time.Duration)

vt := vt10x.New()
Expand All @@ -146,18 +148,61 @@ func (s *RecordingMetadataService) ProcessSessionRecording(ctx context.Context,
})
}

sampler := newThumbnailBucketSampler(maxThumbnails, thumbnailInterval)
// will either finish the upload or cancel it if exited early
var finish sync.Once

w, cancelUpload, uploadErrs := s.startUpload(ctx, sessionID)

defer func() {
finish.Do(func() {
cancelUpload()
w.Close()
})
}()

interval := calculateThumbnailInterval(duration, maxThumbnails)
thumbnailTime := getRandomThumbnailTime(duration)

// the thumbnail to upload for the session
var recordingThumbnail *pb.SessionRecordingThumbnail

recordThumbnail := func(start time.Time) {
cols, rows := vt.Size()
cursor := vt.Cursor()

startOffset := start.Sub(startTime)
endOffset := start.Add(interval).Add(-1 * time.Millisecond).Sub(startTime)

thumbnail := &pb.SessionRecordingThumbnail{
Svg: terminal.VtToSvg(vt),
Cols: int32(cols),
Rows: int32(rows),
CursorX: int32(cursor.X),
CursorY: int32(cursor.Y),
CursorVisible: vt.CursorVisible(),
StartOffset: durationpb.New(startOffset),
EndOffset: durationpb.New(endOffset),
}

if _, err := protodelim.MarshalTo(w, thumbnail); err != nil {
// log the error but continue processing other thumbnails and the session metadata (metadata is more important)
s.logger.WarnContext(ctx, "Failed to marshal thumbnail entry",
"session_id", sessionID, "error", err)
}

if recordingThumbnail == nil {
recordingThumbnail = thumbnail

return
}

previousDiff := math.Abs(float64(thumbnailTime - recordingThumbnail.StartOffset.AsDuration()))
diff := math.Abs(float64(thumbnailTime - startOffset))

sampler.add(&thumbnailState{
svg: terminal.VtToSvg(vt),
cols: cols,
rows: rows,
cursorVisible: vt.CursorVisible(),
cursor: vt.Cursor(),
}, start)
if diff < previousDiff {
// this thumbnail is closer to the ideal thumbnail time, use it instead
recordingThumbnail = thumbnail
}
}

var hasSeenPrintEvent bool
Expand Down Expand Up @@ -208,7 +253,8 @@ loop:
addInactivityEvent(lastActivityTime, e.Time)
}

if sampler.shouldCapture(e.Time) {
if e.Time.Sub(lastThumbnailTime) >= interval {
lastThumbnailTime = e.Time
recordThumbnail(e.Time)
}

Expand Down Expand Up @@ -244,7 +290,8 @@ loop:
return trace.Errorf("writing data to terminal: %w", err)
}

if sampler.shouldCapture(e.Time) {
if e.Time.Sub(lastThumbnailTime) >= interval {
lastThumbnailTime = e.Time
recordThumbnail(e.Time)
}

Expand Down Expand Up @@ -279,6 +326,9 @@ loop:
vt.Resize(size.W, size.H)
}

case err := <-uploadErrs:
return trace.Wrap(err)

case err := <-errors:
if err != nil {
return trace.Wrap(err)
Expand All @@ -293,6 +343,13 @@ loop:
return trace.NotFound("no events found for session %v", sessionID)
}

if recordingThumbnail != nil {
if err := s.uploadThumbnail(ctx, sessionID, recordingThumbnail); err != nil {
s.logger.WarnContext(ctx, "Failed to upload thumbnail",
"session_id", sessionID, "error", err)
}
}

// Finish off any remaining activity events
for user, userStartOffset := range activeUsers {
metadata.Events = append(metadata.Events, &pb.SessionRecordingEvent{
Expand All @@ -310,9 +367,21 @@ loop:
metadata.StartTime = timestamppb.New(startTime)
metadata.EndTime = timestamppb.New(lastEvent.GetTime())

thumbnails := sampler.result()
if _, err := protodelim.MarshalTo(w, metadata); err != nil {
return trace.Wrap(err)
}

var err error

finish.Do(func() {
err = w.Close()

if err := s.upload(ctx, sessionID, metadata, thumbnails); err != nil {
if err == nil {
err = <-uploadErrs
}
})

if err != nil {
sessionsProcessedMetric.WithLabelValues( /* success */ "false").Inc()

return trace.Wrap(err)
Expand All @@ -323,82 +392,74 @@ loop:
return nil
}

func (s *RecordingMetadataService) upload(ctx context.Context, sessionID session.ID, metadata *pb.SessionRecordingMetadata, thumbnails []*thumbnailEntry) error {
metadataBuf := &bytes.Buffer{}
func (s *RecordingMetadataService) startUpload(ctx context.Context, sessionID session.ID) (io.WriteCloser, context.CancelFunc, <-chan error) {
uploadCtx, cancel := context.WithCancel(ctx)
r, w := io.Pipe()
errs := make(chan error, 1)

if _, err := protodelim.MarshalTo(metadataBuf, metadata); err != nil {
return trace.Wrap(err)
}
go func() {
defer r.Close()

for _, t := range thumbnails {
if _, err := protodelim.MarshalTo(metadataBuf, thumbnailEntryToProto(t)); err != nil {
s.logger.WarnContext(ctx, "Failed to marshal thumbnail entry",
"session_id", sessionID, "error", err)

continue
path, err := s.uploadHandler.UploadMetadata(uploadCtx, sessionID, r)
if err != nil {
errs <- trace.Wrap(err)
return
}

s.logger.DebugContext(ctx, "Uploaded session recording metadata", "path", path)
errs <- nil
}()

return w, cancel, errs
}

func (s *RecordingMetadataService) uploadThumbnail(ctx context.Context, sessionID session.ID, thumbnail *pb.SessionRecordingThumbnail) error {
if thumbnail == nil {
return nil
}

path, err := s.uploadHandler.UploadMetadata(ctx, sessionID, metadataBuf)
b, err := proto.Marshal(thumbnail)
if err != nil {
return trace.Wrap(err)
}

s.logger.DebugContext(ctx, "Uploaded session recording metadata", "path", path)

thumbnail := getRandomThumbnail(thumbnails)
if thumbnail != nil {
b, err := proto.Marshal(thumbnailEntryToProto(thumbnail))
if err != nil {
return trace.Wrap(err)
}

path, err := s.uploadHandler.UploadThumbnail(ctx, sessionID, bytes.NewReader(b))
if err != nil {
return trace.Wrap(err)
}

s.logger.DebugContext(ctx, "Uploaded session recording thumbnail", "path", path)
path, err := s.uploadHandler.UploadThumbnail(ctx, sessionID, bytes.NewReader(b))
if err != nil {
return trace.Wrap(err)
}

s.logger.DebugContext(ctx, "Uploaded session recording thumbnail", "path", path)

return nil
}

func thumbnailEntryToProto(t *thumbnailEntry) *pb.SessionRecordingThumbnail {
return &pb.SessionRecordingThumbnail{
Svg: t.state.svg,
Cols: int32(t.state.cols),
Rows: int32(t.state.rows),
CursorX: int32(t.state.cursor.X),
CursorY: int32(t.state.cursor.Y),
CursorVisible: t.state.cursorVisible,
StartOffset: durationpb.New(t.startOffset),
EndOffset: durationpb.New(t.endOffset),
// getRandomThumbnailTime returns the ideal time offset for capturing a thumbnail
// within the session duration based on the provided interval.
// It avoids the first and last 20% of the session recording to increase the chances of
// getting a thumbnail with meaningful content.
func getRandomThumbnailTime(duration time.Duration) time.Duration {
minIndex := int(0.2 * float64(duration))
maxIndex := int(0.8 * float64(duration))

if maxIndex <= minIndex {
return duration / 2
}

return time.Duration(rand.IntN(maxIndex-minIndex) + minIndex)
}

// getRandomThumbnail selects a random thumbnail from the middle 60% of the provided thumbnails slice.
// This tries to get a thumbnail that is more representative of the session, avoiding the very start and end.
func getRandomThumbnail(thumbnails []*thumbnailEntry) *thumbnailEntry {
if len(thumbnails) == 0 {
return nil
}
func calculateThumbnailInterval(duration time.Duration, maxThumbnails int) time.Duration {
interval := time.Second

if len(thumbnails) < 5 {
randomIndex := rand.IntN(len(thumbnails))
return thumbnails[randomIndex]
if duration > time.Duration(maxThumbnails)*time.Second {
interval = duration / time.Duration(maxThumbnails)
}

startIndex := int(float64(len(thumbnails)) * 0.2) // start at 20%
endIndex := int(float64(len(thumbnails)) * 0.8) // end at 80%
interval = interval.Round(time.Second)

if startIndex >= endIndex {
endIndex = startIndex + 1
if interval < time.Second {
interval = time.Second
}

rangeSize := endIndex - startIndex
randomOffset := rand.IntN(rangeSize)
randomIndex := startIndex + randomOffset

return thumbnails[randomIndex]
return interval
}
Loading
Loading