Skip to content

Commit

Permalink
signozkafkareceiver: export sarama consumer config (#256)
Browse files Browse the repository at this point in the history
  • Loading branch information
dhawal1248 authored Dec 27, 2023
1 parent d5d6299 commit 57c8024
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 8 deletions.
16 changes: 12 additions & 4 deletions receiver/signozkafkareceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ type MessageMarking struct {
OnError bool `mapstructure:"on_error"`
}

type SaramaConsumerConfig struct {
// Controls sarama client's Consumer.Fetch config if set.
ConsumerFetchMinBytes int32 `mapstructure:"fetch_min_bytes"`
ConsumerFetchDefaultBytes int32 `mapstructure:"fetch_default_bytes"`
ConsumerFetchMaxBytes int32 `mapstructure:"fetch_max_bytes"`

MaxProcessingTime time.Duration `mapstructure:"max_processing_time"`
GroupSessionTimeout time.Duration `mapstructure:"consumer_group_session_timeout"`
MessagesChannelSize int `mapstructure:"messages_channel_size"`
}

// Config defines configuration for Kafka receiver.
type Config struct {
// The list of kafka brokers (default localhost:9092)
Expand Down Expand Up @@ -62,10 +73,7 @@ type Config struct {
// Controls the way the messages are marked as consumed
MessageMarking MessageMarking `mapstructure:"message_marking"`

// Controls sarama client's Consumer.Fetch config if set.
ConsumerFetchMinBytes int32 `mapstructure:"consumer_fetch_min_bytes"`
ConsumerFetchDefaultBytes int32 `mapstructure:"consumer_fetch_default_bytes"`
ConsumerFetchMaxBytes int32 `mapstructure:"consumer_fetch_max_bytes"`
SaramaConsumerConfig SaramaConsumerConfig `mapstructure:"sarama_consumer_config"`
}

const (
Expand Down
8 changes: 8 additions & 0 deletions receiver/signozkafkareceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ func TestLoadConfig(t *testing.T) {
Enable: true,
Interval: 1 * time.Second,
},
SaramaConsumerConfig: SaramaConsumerConfig{
ConsumerFetchMinBytes: 1,
ConsumerFetchDefaultBytes: 64,
ConsumerFetchMaxBytes: 128,
MaxProcessingTime: 500 * time.Millisecond,
MessagesChannelSize: 1024,
GroupSessionTimeout: 30 * time.Second,
},
},
},
{
Expand Down
17 changes: 13 additions & 4 deletions receiver/signozkafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshalers
sarama.Logger = zap.NewStdLog(set.Logger)

c := sarama.NewConfig()
c = setSaramaConsumerFetchConfig(c, &config)
c = setSaramaConsumerConfig(c, &config.SaramaConsumerConfig)
c.ClientID = config.ClientID
c.Metadata.Full = config.Metadata.Full
c.Metadata.Retry.Max = config.Metadata.Retry.Max
Expand Down Expand Up @@ -182,7 +182,7 @@ func newMetricsReceiver(config Config, set receiver.CreateSettings, unmarshalers
sarama.Logger = zap.NewStdLog(set.Logger)

c := sarama.NewConfig()
c = setSaramaConsumerFetchConfig(c, &config)
c = setSaramaConsumerConfig(c, &config.SaramaConsumerConfig)
c.ClientID = config.ClientID
c.Metadata.Full = config.Metadata.Full
c.Metadata.Retry.Max = config.Metadata.Retry.Max
Expand Down Expand Up @@ -274,7 +274,7 @@ func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshalers ma
sarama.Logger = zap.NewStdLog(set.Logger)

c := sarama.NewConfig()
c = setSaramaConsumerFetchConfig(c, &config)
c = setSaramaConsumerConfig(c, &config.SaramaConsumerConfig)
c.ClientID = config.ClientID
c.Metadata.Full = config.Metadata.Full
c.Metadata.Retry.Max = config.Metadata.Retry.Max
Expand Down Expand Up @@ -686,7 +686,7 @@ func toSaramaInitialOffset(initialOffset string) (int64, error) {
}
}

func setSaramaConsumerFetchConfig(sc *sarama.Config, c *Config) *sarama.Config {
func setSaramaConsumerConfig(sc *sarama.Config, c *SaramaConsumerConfig) *sarama.Config {
if c.ConsumerFetchMinBytes != 0 {
sc.Consumer.Fetch.Min = c.ConsumerFetchMinBytes
}
Expand All @@ -696,5 +696,14 @@ func setSaramaConsumerFetchConfig(sc *sarama.Config, c *Config) *sarama.Config {
if c.ConsumerFetchMaxBytes != 0 {
sc.Consumer.Fetch.Max = c.ConsumerFetchMaxBytes
}
if c.MaxProcessingTime != 0 {
sc.Consumer.MaxProcessingTime = c.MaxProcessingTime
}
if c.GroupSessionTimeout != 0 {
sc.Consumer.Group.Session.Timeout = c.GroupSessionTimeout
}
if c.MessagesChannelSize != 0 {
sc.ChannelBufferSize = c.MessagesChannelSize
}
return sc
}
7 changes: 7 additions & 0 deletions receiver/signozkafkareceiver/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ signozkafkareceiver:
retry:
max: 10
backoff: 5s
sarama_consumer_config:
fetch_min_bytes: 1
fetch_default_bytes: 64
fetch_max_bytes: 128
max_processing_time: 500ms
messages_channel_size: 1024
consumer_group_session_timeout: 30s
signozkafkareceiver/logs:
topic: logs
encoding: direct
Expand Down

0 comments on commit 57c8024

Please sign in to comment.