Open
Description
We need to see which topics do not exist in Kafka clusters if we get an error from WriteMessages. We can't enable the "Auto Topic Creation" config because of our business needs. We can't extract the error from "WriteErrors" because error happens before batch operations.
Describe the solution you would like
Like MessageTooLargeError
func messageTooLarge(msgs []Message, i int) MessageTooLargeError {
remain := make([]Message, 0, len(msgs)-1)
remain = append(remain, msgs[:i]...)
remain = append(remain, msgs[i+1:]...)
return MessageTooLargeError{
Message: msgs[i],
Remaining: remain,
}
}
We need to implement a custom type error.
func (w *Writer) partitions(ctx context.Context, topic string) (int, error) {
client := w.client(w.readTimeout())
// Here we use the transport directly as an optimization to avoid the
// construction of temporary request and response objects made by the
// (*Client).Metadata API.
//
// It is expected that the transport will optimize this request by
// caching recent results (the kafka.Transport types does).
r, err := client.transport().RoundTrip(ctx, client.Addr, &metadataAPI.Request{
TopicNames: []string{topic},
AllowAutoTopicCreation: w.AllowAutoTopicCreation,
})
if err != nil {
return 0, err
}
for _, t := range r.(*metadataAPI.Response).Topics {
if t.Name == topic {
// This should always hit, unless kafka has a bug.
if t.ErrorCode != 0 {
return 0, Error(t.ErrorCode)
}
return len(t.Partitions), nil
}
}
return 0, UnknownTopicOrPartition
}