@@ -113,6 +113,7 @@ type kafkaOpts struct {
113113 tlsCertFile string
114114 tlsKeyFile string
115115 tlsInsecureSkipTLSVerify bool
116+ kafkaVersion string
116117}
117118
118119// CanReadCertAndKey returns true if the certificate and key files already exists,
@@ -153,7 +154,11 @@ func canReadFile(path string) bool {
153154func NewExporter (opts kafkaOpts , topicFilter string ) (* Exporter , error ) {
154155 config := sarama .NewConfig ()
155156 config .ClientID = clientID
156- config .Version = sarama .V0_10_1_0
157+ version , err := sarama .ParseKafkaVersion (opts .kafkaVersion )
158+ if err != nil {
159+ return nil , err
160+ }
161+ config .Version = version
157162
158163 if opts .useSASL {
159164 config .Net .SASL .Enable = true
@@ -259,7 +264,7 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
259264 for _ , partition := range partitions {
260265 broker , err := e .client .Leader (topic , partition )
261266 if err != nil {
262- plog .Errorf ("Can't get leader of topic %s partition %s : %v" , topic , partition , err )
267+ plog .Errorf ("Can't get leader of topic %s partition %d : %v" , topic , partition , err )
263268 } else {
264269 ch <- prometheus .MustNewConstMetric (
265270 topicPartitionLeader , prometheus .GaugeValue , float64 (broker .ID ()), topic , strconv .FormatInt (int64 (partition ), 10 ),
@@ -268,7 +273,7 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
268273
269274 currentOffset , err := e .client .GetOffset (topic , partition , sarama .OffsetNewest )
270275 if err != nil {
271- plog .Errorf ("Can't get current offset of topic %s partition %s : %v" , topic , partition , err )
276+ plog .Errorf ("Can't get current offset of topic %s partition %d : %v" , topic , partition , err )
272277 } else {
273278 e .mu .Lock ()
274279 e.offset [topic ][partition ] = currentOffset
@@ -280,7 +285,7 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
280285
281286 oldestOffset , err := e .client .GetOffset (topic , partition , sarama .OffsetOldest )
282287 if err != nil {
283- plog .Errorf ("Can't get oldest offset of topic %s partition %s : %v" , topic , partition , err )
288+ plog .Errorf ("Can't get oldest offset of topic %s partition %d : %v" , topic , partition , err )
284289 } else {
285290 ch <- prometheus .MustNewConstMetric (
286291 topicOldestOffset , prometheus .GaugeValue , float64 (oldestOffset ), topic , strconv .FormatInt (int64 (partition ), 10 ),
@@ -289,7 +294,7 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
289294
290295 replicas , err := e .client .Replicas (topic , partition )
291296 if err != nil {
292- plog .Errorf ("Can't get replicas of topic %s partition %s : %v" , topic , partition , err )
297+ plog .Errorf ("Can't get replicas of topic %s partition %d : %v" , topic , partition , err )
293298 } else {
294299 ch <- prometheus .MustNewConstMetric (
295300 topicPartitionReplicas , prometheus .GaugeValue , float64 (len (replicas )), topic , strconv .FormatInt (int64 (partition ), 10 ),
@@ -298,7 +303,7 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
298303
299304 inSyncReplicas , err := e .client .InSyncReplicas (topic , partition )
300305 if err != nil {
301- plog .Errorf ("Can't get in-sync replicas of topic %s partition %s : %v" , topic , partition , err )
306+ plog .Errorf ("Can't get in-sync replicas of topic %s partition %d : %v" , topic , partition , err )
302307 } else {
303308 ch <- prometheus .MustNewConstMetric (
304309 topicPartitionInSyncReplicas , prometheus .GaugeValue , float64 (len (inSyncReplicas )), topic , strconv .FormatInt (int64 (partition ), 10 ),
@@ -335,11 +340,11 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
335340 if err == sarama .ErrAlreadyConnected {
336341 broker .Close ()
337342 if err := broker .Open (e .client .Config ()); err != nil {
338- plog .Errorf ("Can't connect to broker %v : %v" , broker .ID (), err )
343+ plog .Errorf ("Can't connect to broker %d : %v" , broker .ID (), err )
339344 break
340345 }
341346 } else {
342- plog .Errorf ("Can't connect to broker %v : %v" , broker .ID (), err )
347+ plog .Errorf ("Can't connect to broker %d : %v" , broker .ID (), err )
343348 break
344349 }
345350 }
@@ -431,6 +436,7 @@ func main() {
431436 kingpin .Flag ("tls.cert-file" , "The optional certificate file for client authentication." ).Default ("" ).StringVar (& opts .tlsCertFile )
432437 kingpin .Flag ("tls.key-file" , "The optional key file for client authentication." ).Default ("" ).StringVar (& opts .tlsKeyFile )
433438 kingpin .Flag ("tls.insecure-skip-tls-verify" , "If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure." ).Default ("false" ).BoolVar (& opts .tlsInsecureSkipTLSVerify )
439+ kingpin .Flag ("kafka.version" , "Kafka broker version" ).Default (sarama .V1_0_0_0 .String ()).StringVar (& opts .kafkaVersion )
434440
435441 plog .AddFlags (kingpin .CommandLine )
436442 kingpin .Version (version .Print ("kafka_exporter" ))
0 commit comments