Skip to content

Commit 03eabed

Browse files
ryanclarkmmcallister
authored andcommitted
Upload thumbnails for session recordings as they're generated (#60313)
1 parent aef7c47 commit 03eabed

File tree

9 files changed

+490
-416
lines changed

9 files changed

+490
-416
lines changed

lib/auth/recordingmetadata/provider.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package recordingmetadata
2020
import (
2121
"context"
2222
"sync"
23+
"time"
2324

2425
"github.com/gravitational/teleport/lib/session"
2526
)
@@ -75,6 +76,6 @@ type noopRecordingMetadata struct{}
7576

7677
// ProcessSessionRecording is a no-op implementation of the
7778
// [Service.ProcessSessionRecording] method.
78-
func (n noopRecordingMetadata) ProcessSessionRecording(ctx context.Context, sessionID session.ID) error {
79+
func (n noopRecordingMetadata) ProcessSessionRecording(ctx context.Context, sessionID session.ID, duration time.Duration) error {
7980
return nil
8081
}

lib/auth/recordingmetadata/recordingmetadatav1/recordingmetadata.go

Lines changed: 130 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ import (
2323
"context"
2424
"io"
2525
"log/slog"
26+
"math"
2627
"math/rand/v2"
28+
"sync"
2729
"time"
2830

2931
"github.com/gravitational/trace"
@@ -103,7 +105,7 @@ func NewRecordingMetadataService(cfg RecordingMetadataServiceConfig) (*Recording
103105

104106
// ProcessSessionRecording processes the session recording associated with the provided session ID.
105107
// It streams session events, generates metadata, and uploads thumbnails and metadata.
106-
func (s *RecordingMetadataService) ProcessSessionRecording(ctx context.Context, sessionID session.ID) error {
108+
func (s *RecordingMetadataService) ProcessSessionRecording(ctx context.Context, sessionID session.ID, duration time.Duration) error {
107109
sessionsPendingMetric.Inc()
108110

109111
if err := s.concurrencyLimiter.Acquire(ctx, 1); err != nil {
@@ -125,8 +127,8 @@ func (s *RecordingMetadataService) ProcessSessionRecording(ctx context.Context,
125127
var startTime time.Time
126128
var lastEvent apievents.AuditEvent
127129
var lastActivityTime time.Time
130+
var lastThumbnailTime time.Time
128131

129-
thumbnailInterval := 1 * time.Second
130132
activeUsers := make(map[string]time.Duration)
131133

132134
vt := vt10x.New()
@@ -146,18 +148,61 @@ func (s *RecordingMetadataService) ProcessSessionRecording(ctx context.Context,
146148
})
147149
}
148150

149-
sampler := newThumbnailBucketSampler(maxThumbnails, thumbnailInterval)
151+
// will either finish the upload or cancel it if exited early
152+
var finish sync.Once
153+
154+
w, cancelUpload, uploadErrs := s.startUpload(ctx, sessionID)
155+
156+
defer func() {
157+
finish.Do(func() {
158+
cancelUpload()
159+
w.Close()
160+
})
161+
}()
162+
163+
interval := calculateThumbnailInterval(duration, maxThumbnails)
164+
thumbnailTime := getRandomThumbnailTime(duration)
165+
166+
// the thumbnail to upload for the session
167+
var recordingThumbnail *pb.SessionRecordingThumbnail
150168

151169
recordThumbnail := func(start time.Time) {
152170
cols, rows := vt.Size()
171+
cursor := vt.Cursor()
172+
173+
startOffset := start.Sub(startTime)
174+
endOffset := start.Add(interval).Add(-1 * time.Millisecond).Sub(startTime)
175+
176+
thumbnail := &pb.SessionRecordingThumbnail{
177+
Svg: terminal.VtToSvg(vt),
178+
Cols: int32(cols),
179+
Rows: int32(rows),
180+
CursorX: int32(cursor.X),
181+
CursorY: int32(cursor.Y),
182+
CursorVisible: vt.CursorVisible(),
183+
StartOffset: durationpb.New(startOffset),
184+
EndOffset: durationpb.New(endOffset),
185+
}
186+
187+
if _, err := protodelim.MarshalTo(w, thumbnail); err != nil {
188+
// log the error but continue processing other thumbnails and the session metadata (metadata is more important)
189+
s.logger.WarnContext(ctx, "Failed to marshal thumbnail entry",
190+
"session_id", sessionID, "error", err)
191+
}
192+
193+
if recordingThumbnail == nil {
194+
recordingThumbnail = thumbnail
195+
196+
return
197+
}
198+
199+
previousDiff := math.Abs(float64(thumbnailTime - recordingThumbnail.StartOffset.AsDuration()))
200+
diff := math.Abs(float64(thumbnailTime - startOffset))
153201

154-
sampler.add(&thumbnailState{
155-
svg: terminal.VtToSvg(vt),
156-
cols: cols,
157-
rows: rows,
158-
cursorVisible: vt.CursorVisible(),
159-
cursor: vt.Cursor(),
160-
}, start)
202+
if diff < previousDiff {
203+
// this thumbnail is closer to the ideal thumbnail time, use it instead
204+
recordingThumbnail = thumbnail
205+
}
161206
}
162207

163208
var hasSeenPrintEvent bool
@@ -208,7 +253,8 @@ loop:
208253
addInactivityEvent(lastActivityTime, e.Time)
209254
}
210255

211-
if sampler.shouldCapture(e.Time) {
256+
if e.Time.Sub(lastThumbnailTime) >= interval {
257+
lastThumbnailTime = e.Time
212258
recordThumbnail(e.Time)
213259
}
214260

@@ -244,7 +290,8 @@ loop:
244290
return trace.Errorf("writing data to terminal: %w", err)
245291
}
246292

247-
if sampler.shouldCapture(e.Time) {
293+
if e.Time.Sub(lastThumbnailTime) >= interval {
294+
lastThumbnailTime = e.Time
248295
recordThumbnail(e.Time)
249296
}
250297

@@ -279,6 +326,9 @@ loop:
279326
vt.Resize(size.W, size.H)
280327
}
281328

329+
case err := <-uploadErrs:
330+
return trace.Wrap(err)
331+
282332
case err := <-errors:
283333
if err != nil {
284334
return trace.Wrap(err)
@@ -293,6 +343,13 @@ loop:
293343
return trace.NotFound("no events found for session %v", sessionID)
294344
}
295345

346+
if recordingThumbnail != nil {
347+
if err := s.uploadThumbnail(ctx, sessionID, recordingThumbnail); err != nil {
348+
s.logger.WarnContext(ctx, "Failed to upload thumbnail",
349+
"session_id", sessionID, "error", err)
350+
}
351+
}
352+
296353
// Finish off any remaining activity events
297354
for user, userStartOffset := range activeUsers {
298355
metadata.Events = append(metadata.Events, &pb.SessionRecordingEvent{
@@ -310,9 +367,21 @@ loop:
310367
metadata.StartTime = timestamppb.New(startTime)
311368
metadata.EndTime = timestamppb.New(lastEvent.GetTime())
312369

313-
thumbnails := sampler.result()
370+
if _, err := protodelim.MarshalTo(w, metadata); err != nil {
371+
return trace.Wrap(err)
372+
}
373+
374+
var err error
375+
376+
finish.Do(func() {
377+
err = w.Close()
314378

315-
if err := s.upload(ctx, sessionID, metadata, thumbnails); err != nil {
379+
if err == nil {
380+
err = <-uploadErrs
381+
}
382+
})
383+
384+
if err != nil {
316385
sessionsProcessedMetric.WithLabelValues( /* success */ "false").Inc()
317386

318387
return trace.Wrap(err)
@@ -323,82 +392,74 @@ loop:
323392
return nil
324393
}
325394

326-
func (s *RecordingMetadataService) upload(ctx context.Context, sessionID session.ID, metadata *pb.SessionRecordingMetadata, thumbnails []*thumbnailEntry) error {
327-
metadataBuf := &bytes.Buffer{}
395+
func (s *RecordingMetadataService) startUpload(ctx context.Context, sessionID session.ID) (io.WriteCloser, context.CancelFunc, <-chan error) {
396+
uploadCtx, cancel := context.WithCancel(ctx)
397+
r, w := io.Pipe()
398+
errs := make(chan error, 1)
328399

329-
if _, err := protodelim.MarshalTo(metadataBuf, metadata); err != nil {
330-
return trace.Wrap(err)
331-
}
400+
go func() {
401+
defer r.Close()
332402

333-
for _, t := range thumbnails {
334-
if _, err := protodelim.MarshalTo(metadataBuf, thumbnailEntryToProto(t)); err != nil {
335-
s.logger.WarnContext(ctx, "Failed to marshal thumbnail entry",
336-
"session_id", sessionID, "error", err)
337-
338-
continue
403+
path, err := s.uploadHandler.UploadMetadata(uploadCtx, sessionID, r)
404+
if err != nil {
405+
errs <- trace.Wrap(err)
406+
return
339407
}
408+
409+
s.logger.DebugContext(ctx, "Uploaded session recording metadata", "path", path)
410+
errs <- nil
411+
}()
412+
413+
return w, cancel, errs
414+
}
415+
416+
func (s *RecordingMetadataService) uploadThumbnail(ctx context.Context, sessionID session.ID, thumbnail *pb.SessionRecordingThumbnail) error {
417+
if thumbnail == nil {
418+
return nil
340419
}
341420

342-
path, err := s.uploadHandler.UploadMetadata(ctx, sessionID, metadataBuf)
421+
b, err := proto.Marshal(thumbnail)
343422
if err != nil {
344423
return trace.Wrap(err)
345424
}
346425

347-
s.logger.DebugContext(ctx, "Uploaded session recording metadata", "path", path)
348-
349-
thumbnail := getRandomThumbnail(thumbnails)
350-
if thumbnail != nil {
351-
b, err := proto.Marshal(thumbnailEntryToProto(thumbnail))
352-
if err != nil {
353-
return trace.Wrap(err)
354-
}
355-
356-
path, err := s.uploadHandler.UploadThumbnail(ctx, sessionID, bytes.NewReader(b))
357-
if err != nil {
358-
return trace.Wrap(err)
359-
}
360-
361-
s.logger.DebugContext(ctx, "Uploaded session recording thumbnail", "path", path)
426+
path, err := s.uploadHandler.UploadThumbnail(ctx, sessionID, bytes.NewReader(b))
427+
if err != nil {
428+
return trace.Wrap(err)
362429
}
363430

431+
s.logger.DebugContext(ctx, "Uploaded session recording thumbnail", "path", path)
432+
364433
return nil
365434
}
366435

367-
func thumbnailEntryToProto(t *thumbnailEntry) *pb.SessionRecordingThumbnail {
368-
return &pb.SessionRecordingThumbnail{
369-
Svg: t.state.svg,
370-
Cols: int32(t.state.cols),
371-
Rows: int32(t.state.rows),
372-
CursorX: int32(t.state.cursor.X),
373-
CursorY: int32(t.state.cursor.Y),
374-
CursorVisible: t.state.cursorVisible,
375-
StartOffset: durationpb.New(t.startOffset),
376-
EndOffset: durationpb.New(t.endOffset),
436+
// getRandomThumbnailTime returns the ideal time offset for capturing a thumbnail
437+
// within the session duration based on the provided interval.
438+
// It avoids the first and last 20% of the session recording to increase the chances of
439+
// getting a thumbnail with meaningful content.
440+
func getRandomThumbnailTime(duration time.Duration) time.Duration {
441+
minIndex := int(0.2 * float64(duration))
442+
maxIndex := int(0.8 * float64(duration))
443+
444+
if maxIndex <= minIndex {
445+
return duration / 2
377446
}
447+
448+
return time.Duration(rand.IntN(maxIndex-minIndex) + minIndex)
378449
}
379450

380-
// getRandomThumbnail selects a random thumbnail from the middle 60% of the provided thumbnails slice.
381-
// This tries to get a thumbnail that is more representative of the session, avoiding the very start and end.
382-
func getRandomThumbnail(thumbnails []*thumbnailEntry) *thumbnailEntry {
383-
if len(thumbnails) == 0 {
384-
return nil
385-
}
451+
func calculateThumbnailInterval(duration time.Duration, maxThumbnails int) time.Duration {
452+
interval := time.Second
386453

387-
if len(thumbnails) < 5 {
388-
randomIndex := rand.IntN(len(thumbnails))
389-
return thumbnails[randomIndex]
454+
if duration > time.Duration(maxThumbnails)*time.Second {
455+
interval = duration / time.Duration(maxThumbnails)
390456
}
391457

392-
startIndex := int(float64(len(thumbnails)) * 0.2) // start at 20%
393-
endIndex := int(float64(len(thumbnails)) * 0.8) // end at 80%
458+
interval = interval.Round(time.Second)
394459

395-
if startIndex >= endIndex {
396-
endIndex = startIndex + 1
460+
if interval < time.Second {
461+
interval = time.Second
397462
}
398463

399-
rangeSize := endIndex - startIndex
400-
randomOffset := rand.IntN(rangeSize)
401-
randomIndex := startIndex + randomOffset
402-
403-
return thumbnails[randomIndex]
464+
return interval
404465
}

0 commit comments

Comments
 (0)