@@ -110,7 +110,7 @@ func (sh *shard) flushSegment(ctx context.Context) {
110
110
if s .debuginfo .movedHeads > 0 {
111
111
_ = level .Debug (s .logger ).Log ("msg" ,
112
112
"writing segment block done" ,
113
- "heads-count" , len (s .heads ),
113
+ "heads-count" , len (s .datasets ),
114
114
"heads-moved-count" , s .debuginfo .movedHeads ,
115
115
"inflight-duration" , s .debuginfo .waitInflight ,
116
116
"flush-heads-duration" , s .debuginfo .flushHeadsDuration ,
@@ -198,7 +198,7 @@ func (sw *segmentsWriter) newSegment(sh *shard, sk shardKey, sl log.Logger) *seg
198
198
s := & segment {
199
199
logger : log .With (sl , "segment-id" , id .String ()),
200
200
ulid : id ,
201
- heads : make (map [datasetKey ]dataset ),
201
+ datasets : make (map [datasetKey ]* dataset ),
202
202
sw : sw ,
203
203
sh : sh ,
204
204
shard : sk ,
@@ -211,7 +211,7 @@ func (sw *segmentsWriter) newSegment(sh *shard, sk shardKey, sl log.Logger) *seg
211
211
func (s * segment ) flush (ctx context.Context ) (err error ) {
212
212
span , ctx := opentracing .StartSpanFromContext (ctx , "segment.flush" , opentracing.Tags {
213
213
"block_id" : s .ulid .String (),
214
- "datasets" : len (s .heads ),
214
+ "datasets" : len (s .datasets ),
215
215
"shard" : s .shard ,
216
216
})
217
217
defer span .Finish ()
@@ -273,7 +273,7 @@ func (s *segment) flushBlock(stream flushStream) ([]byte, *metastorev1.BlockMeta
273
273
f := stream .At ()
274
274
svc , err := concatSegmentHead (f , w , stringTable )
275
275
if err != nil {
276
- level .Error (s .logger ).Log ("msg" , "failed to concat segment head " , "err" , err )
276
+ level .Error (s .logger ).Log ("msg" , "failed to concat segment dataset " , "err" , err )
277
277
continue
278
278
}
279
279
meta .MinTime = min (meta .MinTime , svc .MinTime )
@@ -339,6 +339,10 @@ func concatSegmentHead(f *headFlush, w *writerOffset, s *metadata.StringTable) (
339
339
lb .WithLabelSet (model .LabelNameServiceName , f .head .key .service , model .LabelNameProfileType , profileType )
340
340
}
341
341
342
+ if f .head .needsSymbolization {
343
+ lb .WithLabelSet (metadata .LabelNameNeedsSymbolization , "true" )
344
+ }
345
+
342
346
// Other optional labels:
343
347
// lb.WithLabelSet("label_name", "label_value", ...)
344
348
ds .Labels = lb .Build ()
@@ -347,8 +351,8 @@ func concatSegmentHead(f *headFlush, w *writerOffset, s *metadata.StringTable) (
347
351
}
348
352
349
353
func (s * segment ) flushHeads (ctx context.Context ) flushStream {
350
- heads := maps .Values (s .heads )
351
- slices .SortFunc (heads , func (a , b dataset ) int {
354
+ heads := maps .Values (s .datasets )
355
+ slices .SortFunc (heads , func (a , b * dataset ) int {
352
356
return a .key .compare (b .key )
353
357
})
354
358
@@ -363,15 +367,15 @@ func (s *segment) flushHeads(ctx context.Context) flushStream {
363
367
defer close (f .done )
364
368
flushed , err := s .flushHead (ctx , f .head )
365
369
if err != nil {
366
- level .Error (s .logger ).Log ("msg" , "failed to flush head " , "err" , err )
370
+ level .Error (s .logger ).Log ("msg" , "failed to flush dataset " , "err" , err )
367
371
return
368
372
}
369
373
if flushed == nil {
370
- level .Debug (s .logger ).Log ("msg" , "skipping nil head " )
374
+ level .Debug (s .logger ).Log ("msg" , "skipping nil dataset " )
371
375
return
372
376
}
373
377
if flushed .Meta .NumSamples == 0 {
374
- level .Debug (s .logger ).Log ("msg" , "skipping empty head " )
378
+ level .Debug (s .logger ).Log ("msg" , "skipping empty dataset " )
375
379
return
376
380
}
377
381
f .flushed = flushed
@@ -402,24 +406,24 @@ func (s *flushStream) Next() bool {
402
406
return false
403
407
}
404
408
405
- func (s * segment ) flushHead (ctx context.Context , e dataset ) (* memdb.FlushedHead , error ) {
409
+ func (s * segment ) flushHead (ctx context.Context , e * dataset ) (* memdb.FlushedHead , error ) {
406
410
th := time .Now ()
407
411
flushed , err := e .head .Flush (ctx )
408
412
if err != nil {
409
413
s .sw .metrics .flushServiceHeadDuration .WithLabelValues (s .sshard , e .key .tenant ).Observe (time .Since (th ).Seconds ())
410
414
s .sw .metrics .flushServiceHeadError .WithLabelValues (s .sshard , e .key .tenant ).Inc ()
411
- return nil , fmt .Errorf ("failed to flush head : %w" , err )
415
+ return nil , fmt .Errorf ("failed to flush dataset : %w" , err )
412
416
}
413
417
s .sw .metrics .flushServiceHeadDuration .WithLabelValues (s .sshard , e .key .tenant ).Observe (time .Since (th ).Seconds ())
414
418
level .Debug (s .logger ).Log (
415
- "msg" , "flushed head " ,
419
+ "msg" , "flushed dataset " ,
416
420
"tenant" , e .key .tenant ,
417
421
"service" , e .key .service ,
418
422
"profiles" , flushed .Meta .NumProfiles ,
419
423
"profiletypes" , fmt .Sprintf ("%v" , flushed .Meta .ProfileTypeNames ),
420
424
"mintime" , flushed .Meta .MinTimeNanos ,
421
425
"maxtime" , flushed .Meta .MaxTimeNanos ,
422
- "head -flush-duration" , time .Since (th ).String (),
426
+ "dataset -flush-duration" , time .Since (th ).String (),
423
427
)
424
428
return flushed , nil
425
429
}
@@ -437,12 +441,13 @@ func (k datasetKey) compare(x datasetKey) int {
437
441
}
438
442
439
443
type dataset struct {
440
- key datasetKey
441
- head * memdb.Head
444
+ key datasetKey
445
+ head * memdb.Head
446
+ needsSymbolization bool
442
447
}
443
448
444
449
type headFlush struct {
445
- head dataset
450
+ head * dataset
446
451
flushed * memdb.FlushedHead
447
452
// protects head
448
453
done chan struct {}
@@ -453,10 +458,12 @@ type segment struct {
453
458
shard shardKey
454
459
sshard string
455
460
inFlightProfiles sync.WaitGroup
456
- heads map [datasetKey ]dataset
457
- headsLock sync.RWMutex
458
- logger log.Logger
459
- sw * segmentsWriter
461
+
462
+ mu sync.RWMutex
463
+ datasets map [datasetKey ]* dataset
464
+
465
+ logger log.Logger
466
+ sw * segmentsWriter
460
467
461
468
// TODO(kolesnikovae): Revisit.
462
469
doneChan chan struct {}
@@ -499,11 +506,13 @@ func (s *segment) ingest(tenantID string, p *profilev1.Profile, id uuid.UUID, la
499
506
tenant : tenantID ,
500
507
service : model .Labels (labels ).Get (model .LabelNameServiceName ),
501
508
}
509
+ ds := s .datasetForIngest (k )
510
+ ds .needsSymbolization = ! hasSymbols (p )
502
511
size := p .SizeVT ()
503
512
rules := s .sw .limits .IngestionRelabelingRules (tenantID )
504
513
usage := s .sw .limits .DistributorUsageGroups (tenantID ).GetUsageGroups (tenantID , labels )
505
514
appender := & sampleAppender {
506
- head : s . headForIngest ( k ) ,
515
+ dataset : ds ,
507
516
profile : p ,
508
517
id : id ,
509
518
}
@@ -514,9 +523,79 @@ func (s *segment) ingest(tenantID string, p *profilev1.Profile, id uuid.UUID, la
514
523
// CountReceivedBytes is tracked in distributors.
515
524
}
516
525
526
+ func hasSymbols (p * profilev1.Profile ) bool {
527
+ if p == nil {
528
+ return false
529
+ }
530
+
531
+ // If there are no functions or locations, the profile can't be symbolized
532
+ if len (p .Function ) == 0 || len (p .Location ) == 0 {
533
+ return false
534
+ }
535
+
536
+ if len (p .StringTable ) <= 1 {
537
+ return false
538
+ }
539
+
540
+ for _ , loc := range p .Location {
541
+ // If a location has no line information, it's missing symbols
542
+ if len (loc .Line ) == 0 {
543
+ return false
544
+ }
545
+
546
+ // Check if all lines have valid function references
547
+ for _ , line := range loc .Line {
548
+ if line .FunctionId == 0 {
549
+ return false
550
+ }
551
+
552
+ // Verify the function exists and has a name
553
+ fn , exists := getFunctionById (p , line .FunctionId )
554
+ if ! exists || fn .Name == 0 || int (fn .Name ) >= len (p .StringTable ) {
555
+ return false
556
+ }
557
+
558
+ // Check if the function name is just a hex address (unsymbolized)
559
+ name := p .StringTable [fn .Name ]
560
+ if isHexAddress (name ) {
561
+ return false
562
+ }
563
+ }
564
+ }
565
+
566
+ return true
567
+ }
568
+
569
+ // getFunctionById returns a function by its ID
570
+ func getFunctionById (p * profilev1.Profile , id uint64 ) (* profilev1.Function , bool ) {
571
+ for _ , fn := range p .Function {
572
+ if fn .Id == id {
573
+ return fn , true
574
+ }
575
+ }
576
+ return nil , false
577
+ }
578
+
579
+ // isHexAddress checks if a string looks like a hexadecimal memory address
580
+ func isHexAddress (s string ) bool {
581
+ // Most hex addresses in profiles start with "0x"
582
+ if len (s ) < 3 || ! strings .HasPrefix (s , "0x" ) {
583
+ return false
584
+ }
585
+
586
+ // Check if the rest of the string is hexadecimal
587
+ for _ , c := range s [2 :] {
588
+ if ! ((c >= '0' && c <= '9' ) || (c >= 'a' && c <= 'f' ) || (c >= 'A' && c <= 'F' )) {
589
+ return false
590
+ }
591
+ }
592
+
593
+ return true
594
+ }
595
+
517
596
type sampleAppender struct {
518
597
id uuid.UUID
519
- head * memdb. Head
598
+ dataset * dataset
520
599
profile * profilev1.Profile
521
600
exporter * pprofmodel.SampleExporter
522
601
@@ -525,7 +604,7 @@ type sampleAppender struct {
525
604
}
526
605
527
606
func (v * sampleAppender ) VisitProfile (labels []* typesv1.LabelPair ) {
528
- v .head .Ingest (v .profile , v .id , labels )
607
+ v .dataset . head .Ingest (v .profile , v .id , labels )
529
608
}
530
609
531
610
func (v * sampleAppender ) VisitSampleSeries (labels []* typesv1.LabelPair , samples []* profilev1.Sample ) {
@@ -534,37 +613,36 @@ func (v *sampleAppender) VisitSampleSeries(labels []*typesv1.LabelPair, samples
534
613
}
535
614
var n profilev1.Profile
536
615
v .exporter .ExportSamples (& n , samples )
537
- v .head .Ingest (& n , v .id , labels )
616
+ v .dataset . head .Ingest (& n , v .id , labels )
538
617
}
539
618
540
619
func (v * sampleAppender ) Discarded (profiles , bytes int ) {
541
620
v .discardedProfiles += profiles
542
621
v .discardedBytes += bytes
543
622
}
544
623
545
- func (s * segment ) headForIngest (k datasetKey ) * memdb. Head {
546
- s .headsLock .RLock ()
547
- h , ok := s .heads [k ]
548
- s .headsLock .RUnlock ()
624
+ func (s * segment ) datasetForIngest (k datasetKey ) * dataset {
625
+ s .mu .RLock ()
626
+ ds , ok := s .datasets [k ]
627
+ s .mu .RUnlock ()
549
628
if ok {
550
- return h . head
629
+ return ds
551
630
}
552
631
553
- s .headsLock .Lock ()
554
- defer s .headsLock .Unlock ()
555
- h , ok = s .heads [k ]
556
- if ok {
557
- return h .head
632
+ s .mu .Lock ()
633
+ defer s .mu .Unlock ()
634
+ if ds , ok = s .datasets [k ]; ok {
635
+ return ds
558
636
}
559
637
560
- nh := memdb .NewHead (s .sw .headMetrics )
561
-
562
- s .heads [k ] = dataset {
638
+ h := memdb .NewHead (s .sw .headMetrics )
639
+ ds = & dataset {
563
640
key : k ,
564
- head : nh ,
641
+ head : h ,
565
642
}
566
643
567
- return nh
644
+ s .datasets [k ] = ds
645
+ return ds
568
646
}
569
647
570
648
func (sw * segmentsWriter ) uploadBlock (ctx context.Context , blockData []byte , meta * metastorev1.BlockMeta , s * segment ) error {
@@ -652,6 +730,12 @@ func (sw *segmentsWriter) storeMetadata(ctx context.Context, meta *metastorev1.B
652
730
sw .metrics .storeMetadataDLQ .WithLabelValues (statusLabelValue (err )).Inc ()
653
731
}()
654
732
733
+ // Log the metadata before storing it
734
+ for i , ds := range meta .Datasets {
735
+ fmt .Printf (">>> Storing dataset %d: Name=%s, Labels=%v\n " ,
736
+ i , meta .StringTable [ds .Name ], ds .Labels )
737
+ }
738
+
655
739
if err = s .sw .storeMetadataDLQ (ctx , meta ); err == nil {
656
740
return nil
657
741
}
0 commit comments