Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): add logs to detect send checkpoint ts correct #12093

Open
wants to merge 13 commits into
base: release-7.5
Choose a base branch
from
Prev Previous commit
Next Next commit
new the producer in another way
3AceShowHand committed Mar 20, 2025
commit 9716494efb56d036aada62b2cc939d67249ac101
25 changes: 0 additions & 25 deletions pkg/sink/kafka/factory.go
Original file line number Diff line number Diff line change
@@ -88,7 +88,6 @@ type AsyncProducer interface {

type saramaSyncProducer struct {
id model.ChangeFeedID
client sarama.Client
producer sarama.SyncProducer
}

@@ -149,31 +148,7 @@ func (p *saramaSyncProducer) SendMessages(ctx context.Context,
}

func (p *saramaSyncProducer) Close() {
// We need to close it asynchronously. Otherwise, we might get stuck
// with an unhealthy(i.e. Network jitter, isolation) state of Kafka.
// Factory has a background thread to fetch and update the metadata.
// If we close the client synchronously, we might get stuck.
// Safety:
// * If the kafka cluster is running well, it will be closed as soon as possible.
// * If there is a problem with the kafka cluster,
// no data will be lost because this is a synchronous client.
// * There is a risk of goroutine leakage, but it is acceptable and our main
// goal is not to get stuck with the owner tick.
start := time.Now()
if err := p.client.Close(); err != nil {
log.Warn("Close Kafka DDL client with error",
zap.String("namespace", p.id.Namespace),
zap.String("changefeed", p.id.ID),
zap.Duration("duration", time.Since(start)),
zap.Error(err))
} else {
log.Info("Kafka DDL client closed",
zap.String("namespace", p.id.Namespace),
zap.String("changefeed", p.id.ID),
zap.Duration("duration", time.Since(start)))
}

start = time.Now()
err := p.producer.Close()
if err != nil {
log.Error("Close Kafka DDL producer with error",
7 changes: 1 addition & 6 deletions pkg/sink/kafka/sarama_factory.go
Original file line number Diff line number Diff line change
@@ -91,18 +91,13 @@ func (f *saramaFactory) SyncProducer(ctx context.Context) (SyncProducer, error)
}
config.MetricRegistry = f.registry

client, err := sarama.NewClient(f.option.BrokerEndpoints, config)
p, err := sarama.NewSyncProducer(f.option.BrokerEndpoints, config)
if err != nil {
return nil, errors.Trace(err)
}

p, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
return nil, errors.Trace(err)
}
return &saramaSyncProducer{
id: f.changefeedID,
client: client,
producer: p,
}, nil
}