diff --git a/cdc/cdc/sink/mq_test.go b/cdc/cdc/sink/mq_test.go index 29152333..c7c4ac08 100644 --- a/cdc/cdc/sink/mq_test.go +++ b/cdc/cdc/sink/mq_test.go @@ -211,6 +211,10 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) { err = sink.EmitChangedEvents(ctx, kv3) c.Assert(err, check.IsNil) + // TODO: fix EmitCheckpointTs + // mock kafka broker processes 1 row resolvedTs event + // leader.Returns(prodSuccess) + checkpointTs1, err := sink.FlushChangedEvents(ctx, keyspanID1, kv1.CRTs) c.Assert(err, check.IsNil) c.Assert(checkpointTs1, check.Equals, kv1.CRTs) diff --git a/cdc/cdc/sink/producer/kafka/kafka.go b/cdc/cdc/sink/producer/kafka/kafka.go index c26232ed..447879ee 100644 --- a/cdc/cdc/sink/producer/kafka/kafka.go +++ b/cdc/cdc/sink/producer/kafka/kafka.go @@ -260,7 +260,7 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error { if flushedOffset <= prevOffset { log.Panic("kafka producer flushed offset goes backward", zap.Int32("partition", msg.Partition), zap.Uint64("flushed", flushedOffset), zap.Uint64("prev", prevOffset)) } - log.Debug("kafka producer flushed message", zap.Int32("partition", msg.Partition), zap.Uint64("offset", flushedOffset)) + log.Debug("kafka producer flushed message", zap.Int32("partition", msg.Partition), zap.Uint64("offset", flushedOffset), zap.Uint64("prev", prevOffset)) k.flushedNotifier.Notify() case err := <-k.asyncClient.Errors():