Description
Describe the solution you would like
It would be good if Writer supported Transactional API. Currently the only mention of Transactional in this library is Dialer
's TransactionalID
field.
I have tried to wire a custom kafka.Dialer with that field set into a custom kafka.Transport which is then wired into kafka.Writer, but it results in the following error on every write:
[3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker
Or just plain EOF
sometimes.
Here's the code snippet (not full program):
dialer := &kafka.Dialer{
TransactionalID: "my-id",
SASLMechanism: sasl,
TLS: &tls.Config{
InsecureSkipVerify: true,
},
}
tr := &kafka.Transport{
Context: ctx,
SASL: sasl,
TLS: &tls.Config{
InsecureSkipVerify: true,
},
DialTimeout: 4 * time.Hour, // guard against EOF during debugging
Dial: func(ctx context.Context, s string, s2 string) (net.Conn, error) {
c, e := dialer.DialContext(ctx, s, s2)
return c, e
},
}
w := kafka.Writer{
Addr: kafka.TCP(address),
Topic: Topic,
Balancer: &kafka.Hash{},
Transport: tr,
Async: true,
BatchSize: 100_000,
BatchBytes: 100_000 * 1024,
RequiredAcks: kafka.RequireAll,
Compression: kafka.Lz4,
}
defer func() {
err = errors.Join(err, w.Close())
}()
Here's how to alter this snippet to make the writer function correctly (comment out the custom dialer and TransactionalID with it):
// dialer := &kafka.Dialer{
// TransactionalID: "successful_trans_producer",
// SASLMechanism: sasl,
// TLS: &tls.Config{
// InsecureSkipVerify: true,
// },
// }
tr := &kafka.Transport{
Context: ctx,
SASL: sasl,
TLS: &tls.Config{
InsecureSkipVerify: true,
},
// DialTimeout: 4 * time.Hour,
// Dial: func(ctx context.Context, s string, s2 string) (net.Conn, error) {
// c, e := dialer.DialContext(ctx, s, s2)
// return c, e
// },
}
w := kafka.Writer{
Addr: kafka.TCP(address),
Topic: Topic,
Balancer: &kafka.Hash{},
Transport: tr,
Async: true,
BatchSize: 100_000,
BatchBytes: 100_000 * 1024,
RequiredAcks: kafka.RequireAll,
Compression: kafka.Lz4,
}
defer func() {
err = errors.Join(err, w.Close())
}()
sasl and tls have absolutely nothing to do with the problem.
So, how to use Transactional API with Writer?
Also, is this library maintained?