Skip to content

Commit

Permalink
signozkafkareceiver: record processing times (#259)
Browse files Browse the repository at this point in the history
  • Loading branch information
dhawal1248 authored Dec 27, 2023
1 parent c558765 commit 458b1c3
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 2 deletions.
24 changes: 22 additions & 2 deletions receiver/signozkafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"strings"
"sync"
"time"

"github.com/IBM/sarama"
"go.opencensus.io/stats"
Expand Down Expand Up @@ -82,7 +83,7 @@ func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshalers

// set sarama library's logger to get detailed logs from the library
sarama.Logger = zap.NewStdLog(set.Logger)

c := sarama.NewConfig()
c = setSaramaConsumerConfig(c, &config.SaramaConsumerConfig)
c.ClientID = config.ClientID
Expand Down Expand Up @@ -134,6 +135,7 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro
return err
}
consumerGroup := &tracesConsumerGroupHandler{
id: c.settings.ID,
logger: c.settings.Logger,
unmarshaler: c.unmarshaler,
nextConsumer: c.nextConsumer,
Expand Down Expand Up @@ -231,6 +233,7 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err
return err
}
metricsConsumerGroup := &metricsConsumerGroupHandler{
id: c.settings.ID,
logger: c.settings.Logger,
unmarshaler: c.unmarshaler,
nextConsumer: c.nextConsumer,
Expand Down Expand Up @@ -356,6 +359,7 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error
}

logsConsumerGroup := &logsConsumerGroupHandler{
id: c.settings.ID,
logger: c.settings.Logger,
unmarshaler: c.unmarshaler,
nextConsumer: c.nextConsumer,
Expand Down Expand Up @@ -469,6 +473,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
if !ok {
return nil
}
start := time.Now()
c.logger.Debug("Kafka message claimed",
zap.String("value", string(message.Value)),
zap.Time("timestamp", message.Timestamp),
Expand Down Expand Up @@ -509,6 +514,10 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
if !c.autocommitEnabled {
session.Commit()
}
err = stats.RecordWithTags(ctx, statsTags, processingTime.M(time.Since(start).Milliseconds()))
if err != nil {
c.logger.Error("failed to record processing time", zap.Error(err))
}

// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
Expand Down Expand Up @@ -545,6 +554,7 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
if !ok {
return nil
}
start := time.Now()
c.logger.Debug("Kafka message claimed",
zap.String("value", string(message.Value)),
zap.Time("timestamp", message.Timestamp),
Expand Down Expand Up @@ -585,6 +595,10 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
if !c.autocommitEnabled {
session.Commit()
}
err = stats.RecordWithTags(ctx, statsTags, processingTime.M(time.Since(start).Milliseconds()))
if err != nil {
c.logger.Error("failed to record processing time", zap.Error(err))
}

// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
Expand Down Expand Up @@ -625,6 +639,7 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
if !ok {
return nil
}
start := time.Now()
c.logger.Debug("Kafka message claimed",
zap.String("value", string(message.Value)),
zap.Time("timestamp", message.Timestamp),
Expand All @@ -634,9 +649,10 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
}

ctx := c.obsrecv.StartLogsOp(session.Context())
statsTags := []tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())}
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())},
statsTags,
statMessageCount.M(1),
statMessageOffset.M(message.Offset),
statMessageOffsetLag.M(claim.HighWaterMarkOffset()-message.Offset-1))
Expand Down Expand Up @@ -666,6 +682,10 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
if !c.autocommitEnabled {
session.Commit()
}
err = stats.RecordWithTags(ctx, statsTags, processingTime.M(time.Since(start).Milliseconds()))
if err != nil {
c.logger.Error("failed to record processing time", zap.Error(err))
}

// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
Expand Down
11 changes: 11 additions & 0 deletions receiver/signozkafkareceiver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ var (

statPartitionStart = stats.Int64("kafka_receiver_partition_start", "Number of started partitions", stats.UnitDimensionless)
statPartitionClose = stats.Int64("kafka_receiver_partition_close", "Number of finished partitions", stats.UnitDimensionless)

processingTime = stats.Int64("kafka_receiver_processing_time_milliseconds", "Time taken to process a kafka message in ms", stats.UnitMilliseconds)
)

// MetricViews return metric views for Kafka receiver.
Expand Down Expand Up @@ -64,11 +66,20 @@ func MetricViews() []*view.View {
Aggregation: view.Sum(),
}

processingTimeView := &view.View{
Name: processingTime.Name(),
Measure: processingTime,
Description: processingTime.Description(),
TagKeys: tagKeys,
Aggregation: view.Distribution(100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1100, 1200, 1300, 1400, 1500, 1750, 2000, 4000, 8000, 16000, 32000, 64000, 128000, 256000, 512000),
}

return []*view.View{
countMessages,
lastValueOffset,
lastValueOffsetLag,
countPartitionStart,
countPartitionClose,
processingTimeView,
}
}

0 comments on commit 458b1c3

Please sign in to comment.