-
Notifications
You must be signed in to change notification settings - Fork 28
Closed
Labels
component/kafkaTiCDC kafka sinkTiCDC kafka sinktype/enhancementThe issue or PR belongs to an enhancement.The issue or PR belongs to an enhancement.
Description
- deploy 3 kafka brokers as a cluster.
- deploy a cdc cluster, and create one changefeed with the kafka sink, and set canal-json as the encoding protocol. no need to write any data.
- kill one of the kafka server manaully by
kill -9
You can see the following logs:
[2025/12/01 05:56:35.650 +00:00] [INFO] [async_producer.go:1192] ["producer/broker/2 state change to [closing] because EOF"] [component=sarama]
[2025/12/01 05:56:35.653 +00:00] [INFO] [sarama.go:129] ["Closed connection to broker 10.2.7.5:9094"] [component=sarama]
[2025/12/01 05:56:37.487 +00:00] [INFO] [persist_storage.go:621] ["gc is disabled"] [gcTs=462570834164449280]
[2025/12/01 05:56:45.651 +00:00] [INFO] [async_producer.go:1192] ["producer/broker/0 state change to [closing] because read tcp 10.2.7.5:33502->10.2.7.5:9092: i/o timeout"] [component=sarama]
[2025/12/01 05:56:45.651 +00:00] [INFO] [sarama.go:129] ["Closed connection to broker 10.2.7.5:9092"] [component=sarama]
[2025/12/01 05:56:45.654 +00:00] [INFO] [metrics_collector.go:81] ["kafka metrics collector stopped"] [keyspace=default] [changefeed=kafka-test]
[2025/12/01 05:56:45.654 +00:00] [INFO] [sink.go:208] ["kafka sink event channel closed"] [keyspace=default] [changefeed=kafka-test]
[2025/12/01 05:56:45.654 +00:00] [INFO] [factory.go:263] ["async producer exit since context is done"] [keyspace=default] [changefeed=kafka-test]
[2025/12/01 05:56:45.654 +00:00] [INFO] [sink.go:282] ["kafka sink event channel closed"] [keyspace=default] [changefeed=kafka-test]
[2025/12/01 05:56:45.654 +00:00] [INFO] [encoder_group.go:121] ["encoder group exited"] [keyspace=default] [changefeed=kafka-test]
[2025/12/01 05:56:45.654 +00:00] [ERROR] [dispatcher_manager.go:510] ["Event Dispatcher Manager Meets Error"] [changefeedID=default/kafka-test] [error="[CDC:ErrKafkaSendMessage]kafka send message failed: keyspace=default, changefeed=kafka-test, eventType=checkpoint, checkpointTs=462571069779738629; ErrorInfo:kafka: Failed to deliver 2 messages.: kafka: Failed to deliver 2 messages."] [errorVerbose="[CDC:ErrKafkaSendMessage]kafka send message failed: keyspace=default, changefeed=kafka-test, eventType=checkpoint, checkpointTs=462571069779738629; ErrorInfo:kafka: Failed to deliver 2 messages.: kafka: Failed to deliver 2 messages.\ngithub.com/pingcap/errors.(*Error).GenWithStackByArgs\n\tgithub.com/pingcap/[email protected]/normalize.go:177\ngithub.com/pingcap/ticdc/pkg/errors.WrapError\n\tgithub.com/pingcap/ticdc/pkg/errors/helper.go:35\ngithub.com/pingcap/ticdc/pkg/sink/kafka.(*saramaSyncProducer).SendMessages\n\tgithub.com/pingcap/ticdc/pkg/sink/kafka/factory.go:152\ngithub.com/pingcap/ticdc/downstreamadapter/sink/kafka.(*sink).sendCheckpoint\n\tgithub.com/pingcap/ticdc/downstreamadapter/sink/kafka/sink.go:514\ngithub.com/pingcap/ticdc/downstreamadapter/sink/kafka.(*sink).Run.func1\n\tgithub.com/pingcap/ticdc/downstreamadapter/sink/kafka/sink.go:123\ngolang.org/x/sync/errgroup.(*Group).Go.func1\n\tgolang.org/x/[email protected]/errgroup/errgroup.go:93\nruntime.goexit\n\truntime/asm_amd64.s:1700"]
[2025/12/01 05:56:45.654 +00:00] [ERROR] [maintainer.go:777] ["dispatcher report an error"] [changefeed=default/kafka-test] [sourceNode=f6727047-963f-43d6-ad22-466876131ae4] [error="[CDC:ErrKafkaSendMessage]kafka send message failed: keyspace=default, changefeed=kafka-test, eventType=checkpoint, checkpointTs=462571069779738629; ErrorInfo:kafka: Failed to deliver 2 messages.: kafka: Failed to deliver 2 messages."]
[2025/12/01 05:56:50.654 +00:00] [ERROR] [maintainer.go:777] ["dispatcher report an error"] [changefeed=default/kafka-test] [sourceNode=f6727047-963f-43d6-ad22-466876131ae4] [error="[CDC:ErrKafkaSendMessage]kafka send message failed: keyspace=default, changefeed=kafka-test, eventType=checkpoint, checkpointTs=462571069779738629; ErrorInfo:kafka: Failed to deliver 2 messages.: kafka: Failed to deliver 2 messages."]
[2025/12/01 05:56:55.654 +00:00] [ERROR] [maintainer.go:777] ["dispatcher report an error"] [changefeed=default/kafka-test] [sourceNode=f6727047-963f-43d6-ad22-466876131ae4] [error="[CDC:ErrKafkaSendMessage]kafka send message failed: keyspace=default, changefeed=kafka-test, eventType=checkpoint, checkpointTs=462571069779738629; ErrorInfo:kafka: Failed to deliver 2 messages.: kafka: Failed to deliver 2 messages."]
[2025/12/01 05:56:55.850 +00:00] [ERROR] [backoff.go:157] ["changefeed maintainer report an error"] [changefeed=default/kafka-test] [checkpointTs=462571075022618635] [node=10.2.7.5:8300] [code=CDC:ErrChangefeedRetryable] [state=warning] [error="time:\"2025-12-01 05:56:45.65447888 +0000 UTC m=+6308.383928842\" node:\"10.2.7.5:8300\" code:\"CDC:ErrChangefeedRetryable\" message:\"[CDC:ErrKafkaSendMessage]kafka send message failed: keyspace=default, changefeed=kafka-test, eventType=checkpoint, checkpointTs=462571069779738629; ErrorInfo:kafka: Failed to deliver 2 messages.: kafka: Failed to deliver 2 messages.\" "]
[2025/12/01 05:56:55.850 +00:00] [WARN] [backoff.go:220] ["changefeed meets an error, will be stopped"] [changefeed=default/kafka-test] [checkpointTs=462571075022618635] [nextRetryTime=2025/12/01 05:57:06.643 +00:00] [error="[{\"time\":\"2025-12-01 05:56:45.65447888 +0000 UTC m=+6308.383928842\",\"node\":\"10.2.7.5:8300\",\"code\":\"CDC:ErrChangefeedRetryable\",\"message\":\"[CDC:ErrKafkaSendMessage]kafka send message failed: keyspace=default, changefeed=kafka-test, eventType=checkpoint, checkpointTs=462571069779738629; ErrorInfo:kafka: Failed to deliver 2 messages.: kafka: Failed to deliver 2 messages.\"}]"]
[2025/12/01 05:56:55.850 +00:00] [INFO] [controller.go:513] ["changefeed status changed"] [changefeed=default/kafka-test] [state=warning] [error="time:\"2025-12-01 05:56:45.65447888 +0000 UTC m=+6308.383928842\" node:\"10.2.7.5:8300\" code:\"CDC:ErrChangefeedRetryable\" message:\"[CDC:ErrKafkaSendMessage]kafka send message failed: keyspace=default, changefeed=kafka-test, eventType=checkpoint, checkpointTs=462571069779738629; ErrorInfo:kafka: Failed to deliver 2 messages.: kafka: Failed to deliver 2 messages.\" "]
Metadata
Metadata
Assignees
Labels
component/kafkaTiCDC kafka sinkTiCDC kafka sinktype/enhancementThe issue or PR belongs to an enhancement.The issue or PR belongs to an enhancement.