Skip to content

Commit 15f3bb3

Browse files
committed
Upload thumbnails for session recordings as they're generated
1 parent 556881c commit 15f3bb3

File tree

9 files changed

+433
-389
lines changed

9 files changed

+433
-389
lines changed

lib/auth/recordingmetadata/provider.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package recordingmetadata
2020
import (
2121
"context"
2222
"sync"
23+
"time"
2324

2425
"github.com/gravitational/teleport/lib/session"
2526
)
@@ -75,6 +76,6 @@ type noopRecordingMetadata struct{}
7576

7677
// ProcessSessionRecording is a no-op implementation of the
7778
// [Service.ProcessSessionRecording] method.
78-
func (n noopRecordingMetadata) ProcessSessionRecording(ctx context.Context, sessionID session.ID) error {
79+
func (n noopRecordingMetadata) ProcessSessionRecording(ctx context.Context, sessionID session.ID, duration time.Duration) error {
7980
return nil
8081
}

lib/auth/recordingmetadata/recordingmetadatav1/recordingmetadata.go

Lines changed: 125 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
"context"
2424
"io"
2525
"log/slog"
26-
"math/rand/v2"
26+
"math/rand"
2727
"time"
2828

2929
"github.com/gravitational/trace"
@@ -97,7 +97,8 @@ func NewRecordingMetadataService(cfg RecordingMetadataServiceConfig) (*Recording
9797

9898
// ProcessSessionRecording processes the session recording associated with the provided session ID.
9999
// It streams session events, generates metadata, and uploads thumbnails and metadata.
100-
func (s *RecordingMetadataService) ProcessSessionRecording(ctx context.Context, sessionID session.ID) error {
100+
// This method returns immediately and processes the recording in a separate goroutine.
101+
func (s *RecordingMetadataService) ProcessSessionRecording(ctx context.Context, sessionID session.ID, duration time.Duration) error {
101102
ctx, cancel := context.WithCancel(ctx)
102103
defer cancel()
103104

@@ -106,8 +107,9 @@ func (s *RecordingMetadataService) ProcessSessionRecording(ctx context.Context,
106107
var startTime time.Time
107108
var lastEvent apievents.AuditEvent
108109
var lastActivityTime time.Time
110+
var lastThumbnailTime time.Time
111+
var thumbnailCount int
109112

110-
thumbnailInterval := 1 * time.Second
111113
activeUsers := make(map[string]time.Duration)
112114

113115
vt := vt10x.New()
@@ -127,18 +129,47 @@ func (s *RecordingMetadataService) ProcessSessionRecording(ctx context.Context,
127129
})
128130
}
129131

130-
sampler := newThumbnailBucketSampler(maxThumbnails, thumbnailInterval)
132+
w, cancelUpload, uploadErrs := s.startUpload(ctx, sessionID)
133+
defer func() {
134+
if w != nil {
135+
w.Close()
136+
}
137+
}()
138+
139+
interval := calculateThumbnailInterval(duration, maxThumbnails)
140+
thumbnailIndex := getRandomThumbnailIndex(interval, duration)
131141

132142
recordThumbnail := func(start time.Time) {
133143
cols, rows := vt.Size()
144+
cursor := vt.Cursor()
145+
146+
startOffset := start.Sub(startTime)
147+
endOffset := start.Add(interval).Add(-1 * time.Millisecond).Sub(startTime)
148+
149+
thumbnail := &pb.SessionRecordingThumbnail{
150+
Svg: terminal.VtToSvg(vt),
151+
Cols: int32(cols),
152+
Rows: int32(rows),
153+
CursorX: int32(cursor.X),
154+
CursorY: int32(cursor.Y),
155+
CursorVisible: vt.CursorVisible(),
156+
StartOffset: durationpb.New(startOffset),
157+
EndOffset: durationpb.New(endOffset),
158+
}
159+
160+
if _, err := protodelim.MarshalTo(w, thumbnail); err != nil {
161+
s.logger.WarnContext(ctx, "Failed to marshal thumbnail entry",
162+
"session_id", sessionID, "error", err)
163+
}
134164

135-
sampler.add(&thumbnailState{
136-
svg: terminal.VtToSvg(vt),
137-
cols: cols,
138-
rows: rows,
139-
cursorVisible: vt.CursorVisible(),
140-
cursor: vt.Cursor(),
141-
}, start)
165+
if thumbnailCount == thumbnailIndex {
166+
if err := s.uploadThumbnail(ctx, sessionID, thumbnail); err != nil {
167+
s.logger.WarnContext(ctx, "Failed to upload thumbnail",
168+
"session_id", sessionID, "error", err)
169+
}
170+
}
171+
172+
thumbnailCount++
142173
}
143174

144175
var hasSeenPrintEvent bool
@@ -189,7 +220,8 @@ loop:
189220
addInactivityEvent(lastActivityTime, e.Time)
190221
}
191222

192-
if sampler.shouldCapture(e.Time) {
223+
if e.Time.Sub(lastThumbnailTime) >= interval {
224+
lastThumbnailTime = e.Time
193225
recordThumbnail(e.Time)
194226
}
195227

@@ -222,10 +254,12 @@ loop:
222254
}
223255

224256
if _, err := vt.Write(e.Data); err != nil {
257+
cancelUpload()
225258
return trace.Errorf("writing data to terminal: %w", err)
226259
}
227260

228-
if sampler.shouldCapture(e.Time) {
261+
if e.Time.Sub(lastThumbnailTime) >= interval {
262+
lastThumbnailTime = e.Time
229263
recordThumbnail(e.Time)
230264
}
231265

@@ -237,6 +271,7 @@ loop:
237271

238272
size, err := session.UnmarshalTerminalParams(e.TerminalSize)
239273
if err != nil {
274+
cancelUpload()
240275
return trace.Wrap(err, "parsing terminal size %q for session %v", e.TerminalSize, sessionID)
241276
}
242277

@@ -260,6 +295,9 @@ loop:
260295
vt.Resize(size.W, size.H)
261296
}
262297

298+
case err := <-uploadErrs:
299+
return trace.Wrap(err)
300+
263301
case err := <-errors:
264302
if err != nil {
265303
return trace.Wrap(err)
@@ -271,6 +309,7 @@ loop:
271309
}
272310

273311
if lastEvent == nil {
312+
cancelUpload()
274313
return trace.NotFound("no events found for session %v", sessionID)
275314
}
276315

@@ -291,87 +330,107 @@ loop:
291330
metadata.StartTime = timestamppb.New(startTime)
292331
metadata.EndTime = timestamppb.New(lastEvent.GetTime())
293332

294-
thumbnails := sampler.result()
295-
296-
return s.upload(ctx, sessionID, metadata, thumbnails)
297-
}
298-
299-
func (s *RecordingMetadataService) upload(ctx context.Context, sessionID session.ID, metadata *pb.SessionRecordingMetadata, thumbnails []*thumbnailEntry) error {
300-
metadataBuf := &bytes.Buffer{}
301-
302-
if _, err := protodelim.MarshalTo(metadataBuf, metadata); err != nil {
333+
if _, err := protodelim.MarshalTo(w, metadata); err != nil {
303334
return trace.Wrap(err)
304335
}
305336

306-
for _, t := range thumbnails {
307-
if _, err := protodelim.MarshalTo(metadataBuf, thumbnailEntryToProto(t)); err != nil {
308-
s.logger.WarnContext(ctx, "Failed to marshal thumbnail entry",
309-
"session_id", sessionID, "error", err)
310-
311-
continue
312-
}
337+
if err := w.Close(); err != nil {
338+
return trace.Wrap(err)
313339
}
314340

315-
path, err := s.uploadHandler.UploadMetadata(ctx, sessionID, metadataBuf)
316-
if err != nil {
341+
w = nil
342+
343+
if err := <-uploadErrs; err != nil {
317344
return trace.Wrap(err)
318345
}
319346

320-
s.logger.DebugContext(ctx, "Uploaded session recording metadata", "path", path)
347+
return nil
348+
}
321349

322-
thumbnail := getRandomThumbnail(thumbnails)
323-
if thumbnail != nil {
324-
b, err := proto.Marshal(thumbnailEntryToProto(thumbnail))
325-
if err != nil {
326-
return trace.Wrap(err)
350+
func (s *RecordingMetadataService) startUpload(ctx context.Context, sessionID session.ID) (io.WriteCloser, context.CancelFunc, <-chan error) {
351+
uploadCtx, cancel := context.WithCancel(ctx)
352+
r, w := io.Pipe()
353+
errs := make(chan error, 1)
354+
355+
go func() {
356+
defer r.Close()
357+
358+
select {
359+
case <-uploadCtx.Done():
360+
errs <- uploadCtx.Err()
361+
return
362+
default:
327363
}
328364

329-
path, err := s.uploadHandler.UploadThumbnail(ctx, sessionID, bytes.NewReader(b))
365+
path, err := s.uploadHandler.UploadMetadata(uploadCtx, sessionID, r)
330366
if err != nil {
331-
return trace.Wrap(err)
367+
errs <- trace.Wrap(err)
368+
return
332369
}
333370

334-
s.logger.DebugContext(ctx, "Uploaded session recording thumbnail", "path", path)
335-
}
371+
s.logger.DebugContext(ctx, "Uploaded session recording metadata", "path", path)
372+
errs <- nil
373+
}()
336374

337-
return nil
375+
return w, cancel, errs
338376
}
339377

340-
func thumbnailEntryToProto(t *thumbnailEntry) *pb.SessionRecordingThumbnail {
341-
return &pb.SessionRecordingThumbnail{
342-
Svg: t.state.svg,
343-
Cols: int32(t.state.cols),
344-
Rows: int32(t.state.rows),
345-
CursorX: int32(t.state.cursor.X),
346-
CursorY: int32(t.state.cursor.Y),
347-
CursorVisible: t.state.cursorVisible,
348-
StartOffset: durationpb.New(t.startOffset),
349-
EndOffset: durationpb.New(t.endOffset),
378+
func (s *RecordingMetadataService) uploadThumbnail(ctx context.Context, sessionID session.ID, thumbnail *pb.SessionRecordingThumbnail) error {
379+
if thumbnail == nil {
380+
return nil
350381
}
351-
}
352382

353-
// getRandomThumbnail selects a random thumbnail from the middle 60% of the provided thumbnails slice.
354-
// This tries to get a thumbnail that is more representative of the session, avoiding the very start and end.
355-
func getRandomThumbnail(thumbnails []*thumbnailEntry) *thumbnailEntry {
356-
if len(thumbnails) == 0 {
357-
return nil
383+
b, err := proto.Marshal(thumbnail)
384+
if err != nil {
385+
return trace.Wrap(err)
358386
}
359387

360-
if len(thumbnails) < 5 {
361-
randomIndex := rand.IntN(len(thumbnails))
362-
return thumbnails[randomIndex]
388+
path, err := s.uploadHandler.UploadThumbnail(ctx, sessionID, bytes.NewReader(b))
389+
if err != nil {
390+
return trace.Wrap(err)
363391
}
364392

365-
startIndex := int(float64(len(thumbnails)) * 0.2) // start at 20%
366-
endIndex := int(float64(len(thumbnails)) * 0.8) // end at 80%
393+
s.logger.DebugContext(ctx, "Uploaded session recording thumbnail", "path", path)
367394

395+
return nil
396+
}
397+
398+
// getRandomThumbnailIndex returns a random index for a thumbnail to be used as a preview.
399+
// It avoids the first and last 20% of the thumbnails to increase the chances of
400+
// getting a thumbnail with meaningful content.
401+
func getRandomThumbnailIndex(interval time.Duration, duration time.Duration) int {
402+
numIntervals := int(duration / interval)
403+
if numIntervals == 0 {
404+
return 0
405+
}
406+
407+
if numIntervals < 5 {
408+
return rand.Intn(numIntervals)
409+
}
410+
411+
startIndex := int(float64(numIntervals) * 0.2)
412+
endIndex := int(float64(numIntervals) * 0.8)
368413
if startIndex >= endIndex {
369414
endIndex = startIndex + 1
370415
}
371416

372417
rangeSize := endIndex - startIndex
373-
randomOffset := rand.IntN(rangeSize)
374-
randomIndex := startIndex + randomOffset
418+
randomOffset := rand.Intn(rangeSize)
419+
return startIndex + randomOffset
420+
}
421+
422+
func calculateThumbnailInterval(duration time.Duration, maxThumbnails int) time.Duration {
423+
interval := time.Second
424+
425+
if duration > time.Duration(maxThumbnails)*time.Second {
426+
interval = duration / time.Duration(maxThumbnails)
427+
}
428+
429+
interval = interval.Round(time.Second)
430+
431+
if interval < time.Second {
432+
interval = time.Second
433+
}
375434

376-
return thumbnails[randomIndex]
435+
return interval
377436
}

0 commit comments

Comments
 (0)