Skip to content

Commit

Permalink
chore: capture processing times for different stages of tracesexporter
Browse files Browse the repository at this point in the history
  • Loading branch information
dhawal1248 committed Dec 14, 2023
1 parent 1576977 commit c29ef6c
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 4 deletions.
16 changes: 16 additions & 0 deletions exporter/clickhousetracesexporter/clickhouse_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/google/uuid"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand Down Expand Up @@ -419,11 +420,26 @@ func (s *storage) pushTraceData(ctx context.Context, td ptrace.Traces) error {
}
}
}
stats.RecordWithTags(ctx,
[]tag.Mutator{
tag.Upsert(exporterKey, string(component.DataTypeTraces)),
tag.Upsert(stageKey, "resource_span_loop"),
},
tracesExporterStageProcessingTime.M(float64(time.Since(processingTimeStart).Milliseconds())),
)
st := time.Now()
err := s.Writer.WriteBatchOfSpans(batchOfSpans)
if err != nil {
zap.S().Error("Error in writing spans to clickhouse: ", err)
return err
}
stats.RecordWithTags(ctx,
[]tag.Mutator{
tag.Upsert(exporterKey, string(component.DataTypeTraces)),
tag.Upsert(stageKey, "write_batch_of_spans"),
},
tracesExporterStageProcessingTime.M(float64(time.Since(st).Milliseconds())),
)
stats.Record(ctx, tracesExporterProcessingTime.M(float64(time.Since(processingTimeStart).Milliseconds())))
return nil
}
Expand Down
19 changes: 15 additions & 4 deletions exporter/clickhousetracesexporter/clickhouse_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ type Writer interface {
type writerMaker func(WriterOptions) (Writer, error)

var (
writeLatencyMillis = stats.Int64("exporter_db_write_latency", "Time taken (in millis) for exporter to write batch", "ms")
tracesExporterProcessingTime = stats.Float64("signoz_traces_exporter_processing_time_millis", "Time spent processing a span by the traces exporter", stats.UnitMilliseconds)
exporterKey = tag.MustNewKey("exporter")
tableKey = tag.MustNewKey("table")
writeLatencyMillis = stats.Int64("exporter_db_write_latency", "Time taken (in millis) for exporter to write batch", "ms")
tracesExporterStageProcessingTime = stats.Float64("signoz_traces_exporter_stage_processing_time_millis", "Time spent processing a span by a particular stage of the traces exporter", stats.UnitMilliseconds)
tracesExporterProcessingTime = stats.Float64("signoz_traces_exporter_processing_time_millis", "Time spent processing a span by the traces exporter", stats.UnitMilliseconds)
exporterKey = tag.MustNewKey("exporter")
tableKey = tag.MustNewKey("table")
stageKey = tag.MustNewKey("stage")
)

// NewFactory creates a new Factory.
Expand All @@ -70,8 +72,17 @@ func ClickHouseNewFactory(migrations string, datasource string, dockerMultiNodeC
Aggregation: processingTimeDistribution,
}

stageProcessingTimeView := &view.View{
Name: tracesExporterStageProcessingTime.Name(),
Measure: tracesExporterStageProcessingTime,
Description: tracesExporterStageProcessingTime.Description(),
TagKeys: []tag.Key{exporterKey, tableKey, stageKey},
Aggregation: processingTimeDistribution,
}

view.Register(writeLatencyView)
view.Register(processingTimeView)
view.Register(stageProcessingTimeView)
return &Factory{
Options: NewOptions(migrations, datasource, dockerMultiNodeCluster, numConsumers, primaryNamespace, archiveNamespace),
// makeReader: func(db *clickhouse.Conn, operationsTable, indexTable, spansTable string) (spanstore.Reader, error) {
Expand Down
33 changes: 33 additions & 0 deletions exporter/clickhousetracesexporter/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,29 +410,62 @@ func (w *SpanWriter) writeModelBatch(batchSpans []*Span) error {

// WriteBatchOfSpans writes the encoded batch of spans
func (w *SpanWriter) WriteBatchOfSpans(batch []*Span) error {
ctx := context.Background()
if w.spansTable != "" {
st := time.Now()
if err := w.writeModelBatch(batch); err != nil {
w.logger.Error("Could not write a batch of spans to model table: ", zap.Error(err))
return err
}
stats.RecordWithTags(ctx,
[]tag.Mutator{
tag.Upsert(exporterKey, string(component.DataTypeTraces)),
tag.Upsert(stageKey, "write_model_batch"),
},
tracesExporterStageProcessingTime.M(float64(time.Since(st).Milliseconds())),
)
}
if w.indexTable != "" {
st := time.Now()
if err := w.writeIndexBatch(batch); err != nil {
w.logger.Error("Could not write a batch of spans to index table: ", zap.Error(err))
return err
}
stats.RecordWithTags(ctx,
[]tag.Mutator{
tag.Upsert(exporterKey, string(component.DataTypeTraces)),
tag.Upsert(stageKey, "write_index_batch"),
},
tracesExporterStageProcessingTime.M(float64(time.Since(st).Milliseconds())),
)
}
if w.errorTable != "" {
st := time.Now()
if err := w.writeErrorBatch(batch); err != nil {
w.logger.Error("Could not write a batch of spans to error table: ", zap.Error(err))
return err
}
stats.RecordWithTags(ctx,
[]tag.Mutator{
tag.Upsert(exporterKey, string(component.DataTypeTraces)),
tag.Upsert(stageKey, "write_error_batch"),
},
tracesExporterStageProcessingTime.M(float64(time.Since(st).Milliseconds())),
)
}
if w.attributeTable != "" && w.attributeKeyTable != "" {
st := time.Now()
if err := w.writeTagBatch(batch); err != nil {
w.logger.Error("Could not write a batch of spans to tag/tagKey tables: ", zap.Error(err))
return err
}
stats.RecordWithTags(ctx,
[]tag.Mutator{
tag.Upsert(exporterKey, string(component.DataTypeTraces)),
tag.Upsert(stageKey, "write_tag_batch"),
},
tracesExporterStageProcessingTime.M(float64(time.Since(st).Milliseconds())),
)
}
return nil
}
Expand Down

0 comments on commit c29ef6c

Please sign in to comment.