From 33e77ca69ae2880b69d08004ce1e1b69258933a9 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Sun, 11 Feb 2024 18:15:53 +0800 Subject: [PATCH 01/17] collect pprof heap Signed-off-by: Ping Yu --- cdc/tests/integration_tests/flow_control/run.sh | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/cdc/tests/integration_tests/flow_control/run.sh b/cdc/tests/integration_tests/flow_control/run.sh index a434e021..838c04b7 100644 --- a/cdc/tests/integration_tests/flow_control/run.sh +++ b/cdc/tests/integration_tests/flow_control/run.sh @@ -49,7 +49,7 @@ EOF # Wait until cdc pulls the data from tikv and store it in soter sleep 90 - rss1=$(ps -aux | grep 'tikv-cdc' | head -n1 | awk '{print $6}') + rss1=$(ps -aux | grep 'tikv-cdc' | grep -v grep | head -n1 | awk '{print $6}') if [[ $rss1 == "" ]]; then echo "Failed to get rrs1 by ps" exit 1 @@ -61,12 +61,9 @@ EOF echo "cdc server used memory: $used" if [ $used -gt $expected ]; then echo "Maybe flow-contorl is not working" - - if [ "$SINK_TYPE" != "kafka" ]; then - # Kafka sink may have memory leak. - # TODO: investigate why. - exit 1 - fi + # CI only collect *.log files, so name it as heap-dump.log + curl http://127.0.0.1:8600/debug/pprof/heap >$WORK_DIR/heap-dump.log + exit 1 fi # As "per-changefeed-memory-quota" is low the syncing will cost more time. From 7ceeefb0cfb2a86a0206a4b6f2482003a47c5c77 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 13 Feb 2024 10:49:14 +0800 Subject: [PATCH 02/17] unlimit retry for pd connection Signed-off-by: Ping Yu --- cdc/cmd/kafka-consumer/tikv.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cdc/cmd/kafka-consumer/tikv.go b/cdc/cmd/kafka-consumer/tikv.go index e808ee3e..8cba5f46 100644 --- a/cdc/cmd/kafka-consumer/tikv.go +++ b/cdc/cmd/kafka-consumer/tikv.go @@ -15,6 +15,7 @@ package main import ( "context" + "math" "net/url" "time" @@ -29,7 +30,7 @@ import ( ) const ( - defaultPDErrorRetry int = 10 + defaultPDErrorRetry int = math.MaxInt ) var _ sink.Sink = (*tikvSimpleSink)(nil) From 88bebcc596ec071aed62e5444a70929c0eb5b7e2 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 13 Feb 2024 11:39:09 +0800 Subject: [PATCH 03/17] reduce record size Signed-off-by: Ping Yu --- cdc/tests/integration_tests/flow_control/config/workload | 2 ++ cdc/tests/integration_tests/flow_control/run.sh | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/cdc/tests/integration_tests/flow_control/config/workload b/cdc/tests/integration_tests/flow_control/config/workload index faa23c17..9a74c3c5 100644 --- a/cdc/tests/integration_tests/flow_control/config/workload +++ b/cdc/tests/integration_tests/flow_control/config/workload @@ -34,3 +34,5 @@ insertproportion=0 requestdistribution=uniform +fieldcount=1 +fieldlength=100 diff --git a/cdc/tests/integration_tests/flow_control/run.sh b/cdc/tests/integration_tests/flow_control/run.sh index 838c04b7..5ba45031 100644 --- a/cdc/tests/integration_tests/flow_control/run.sh +++ b/cdc/tests/integration_tests/flow_control/run.sh @@ -55,7 +55,9 @@ EOF exit 1 fi # We set `per-changefeed-memory-quota=10M` and forbid sorter to use memory cache data, - # so maybe there is 10M of memory for data. But still needs some memory to hold related data structures. + # so maybe there is 10M of memory for data. + # Note that there is memory usage between puller & sorter, and it's limited by size of channels. + # Use small record size to reduce memory usage of this part (see flow_control/config/workload). expected=307200 #300M used=$(expr $rss1 - $rss0) echo "cdc server used memory: $used" From 9430c956fc0f458bdd6a069b78047d610920a5e3 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 13 Feb 2024 16:17:58 +0800 Subject: [PATCH 04/17] log level: info Signed-off-by: Ping Yu --- cdc/tests/integration_tests/_utils/run_cdc_server | 2 +- .../integration_tests/_utils/start_tidb_cluster_impl | 11 ++++++++--- .../integration_tests/flow_control/config/workload | 2 +- cdc/tests/integration_tests/flow_control/run.sh | 2 +- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/cdc/tests/integration_tests/_utils/run_cdc_server b/cdc/tests/integration_tests/_utils/run_cdc_server index 1938eab8..7520fdf4 100755 --- a/cdc/tests/integration_tests/_utils/run_cdc_server +++ b/cdc/tests/integration_tests/_utils/run_cdc_server @@ -21,7 +21,7 @@ addr= addr_url="127.0.0.1:8600" pd_addr= pwd=$pwd -log_level=debug +log_level=info restart= failpoint=$GO_FAILPOINTS config_path= diff --git a/cdc/tests/integration_tests/_utils/start_tidb_cluster_impl b/cdc/tests/integration_tests/_utils/start_tidb_cluster_impl index e2f5f201..d8024e1d 100755 --- a/cdc/tests/integration_tests/_utils/start_tidb_cluster_impl +++ b/cdc/tests/integration_tests/_utils/start_tidb_cluster_impl @@ -13,6 +13,7 @@ multiple_upstream_pd= random_file_name= verify_tidb=false tikv_count=3 +log_level=info # Random generate the sockets config. # Make sure we dont use the same sock. @@ -46,12 +47,16 @@ while [[ ${1-} ]]; do shift ;; --verify-tidb) - vierfy_tidb=true + verify_tidb=true ;; --tikv-count) tikv_count=${2} shift ;; + --log-level) + log_level=${2} + shift + ;; *) echo "Unknown parameter: ${1}" >&2 exit 1 @@ -204,7 +209,7 @@ for idx in $(seq 1 "$tikv_count"); do -A ${!host}:${!port} \ --status-addr ${!host}:${!status_port} \ --log-file "$OUT_DIR/tikv$idx.log" \ - --log-level debug \ + --log-level "$log_level" \ -C "$OUT_DIR/tikv-config.toml" \ -s "$OUT_DIR/tikv$idx" & done @@ -215,7 +220,7 @@ tikv-server \ -A ${DOWN_TIKV_HOST}:${DOWN_TIKV_PORT} \ --status-addr ${DOWN_TIKV_HOST}:${DOWN_TIKV_STATUS_PORT} \ --log-file "$OUT_DIR/tikv_down.log" \ - --log-level debug \ + --log-level "$log_level" \ -C "$OUT_DIR/tikv-config.toml" \ -s "$OUT_DIR/tikv_down" & diff --git a/cdc/tests/integration_tests/flow_control/config/workload b/cdc/tests/integration_tests/flow_control/config/workload index 9a74c3c5..0c74ff45 100644 --- a/cdc/tests/integration_tests/flow_control/config/workload +++ b/cdc/tests/integration_tests/flow_control/config/workload @@ -22,7 +22,7 @@ # Default data size: 1 KB records (10 fields, 100 bytes each, plus key) # Request distribution: zipfian -recordcount=1000000 +recordcount=10000000 workload=core readallfields=true diff --git a/cdc/tests/integration_tests/flow_control/run.sh b/cdc/tests/integration_tests/flow_control/run.sh index 5ba45031..369df23d 100644 --- a/cdc/tests/integration_tests/flow_control/run.sh +++ b/cdc/tests/integration_tests/flow_control/run.sh @@ -19,7 +19,7 @@ function run() { cd $WORK_DIR start_ts=$(get_start_ts $UP_PD) - go-ycsb load tikv -P $CUR/config/workload -p tikv.pd="$UP_PD" -p tikv.type="raw" -p tikv.apiversion=V2 --threads 100 # About 1G + go-ycsb load tikv -P $CUR/config/workload -p tikv.pd="$UP_PD" -p tikv.type="raw" -p tikv.apiversion=V2 --threads 400 # About 1G cat - >"$WORK_DIR/tikv-cdc-config.toml" < Date: Tue, 13 Feb 2024 18:16:05 +0800 Subject: [PATCH 05/17] reduce data size; add grafana panel Signed-off-by: Ping Yu --- cdc/metrics/grafana/tikv-cdc.json | 64 ++++++++++++++----- .../flow_control/config/workload | 2 +- .../integration_tests/flow_control/run.sh | 2 +- 3 files changed, 50 insertions(+), 18 deletions(-) diff --git a/cdc/metrics/grafana/tikv-cdc.json b/cdc/metrics/grafana/tikv-cdc.json index 6ff82cc0..ec885bd6 100644 --- a/cdc/metrics/grafana/tikv-cdc.json +++ b/cdc/metrics/grafana/tikv-cdc.json @@ -10371,7 +10371,7 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "Percentiles of TiKV sink write duration by each server.\n", + "description": "Percentiles of $sink_type sink write duration by each server.\n", "fieldConfig": { "defaults": { "links": [] @@ -10419,7 +10419,7 @@ "targets": [ { "exemplar": true, - "expr": "histogram_quantile(0.95, sum(rate(tikv_cdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"TiKV\"}[1m])) by (le,instance, changefeed))", + "expr": "histogram_quantile(0.95, sum(rate(tikv_cdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"$sink_type\"}[1m])) by (le,instance, changefeed))", "format": "time_series", "hide": false, "interval": "", @@ -10429,7 +10429,7 @@ }, { "exemplar": true, - "expr": "histogram_quantile(0.99, sum(rate(tikv_cdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"TiKV\"}[1m])) by (le,instance, changefeed))", + "expr": "histogram_quantile(0.99, sum(rate(tikv_cdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"$sink_type\"}[1m])) by (le,instance, changefeed))", "format": "time_series", "hide": false, "interval": "", @@ -10439,7 +10439,7 @@ }, { "exemplar": true, - "expr": "histogram_quantile(0.999, sum(rate(tikv_cdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"TiKV\"}[1m])) by (le,instance, changefeed))", + "expr": "histogram_quantile(0.999, sum(rate(tikv_cdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"$sink_type\"}[1m])) by (le,instance, changefeed))", "format": "time_series", "hide": false, "interval": "", @@ -10452,7 +10452,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "TiKV sink write duration percentile", + "title": "$sink_type sink write duration percentile", "tooltip": { "shared": true, "sort": 0, @@ -10543,7 +10543,7 @@ "targets": [ { "exemplar": true, - "expr": "histogram_quantile(0.90, sum(rate(tikv_cdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"TiKV\"}[1m])) by (le, instance, changefeed))", + "expr": "histogram_quantile(0.90, sum(rate(tikv_cdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"$sink_type\"}[1m])) by (le, instance, changefeed))", "format": "time_series", "interval": "", "intervalFactor": 1, @@ -10552,7 +10552,7 @@ }, { "exemplar": true, - "expr": "histogram_quantile(0.99, sum(rate(tikv_cdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"TiKV\"}[1m])) by (le, instance, changefeed))", + "expr": "histogram_quantile(0.99, sum(rate(tikv_cdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"$sink_type\"}[1m])) by (le, instance, changefeed))", "format": "time_series", "interval": "", "intervalFactor": 1, @@ -10561,7 +10561,7 @@ }, { "exemplar": true, - "expr": "histogram_quantile(0.999, sum(rate(tikv_cdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"TiKV\"}[1m])) by (le, instance, changefeed))", + "expr": "histogram_quantile(0.999, sum(rate(tikv_cdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"$sink_type\"}[1m])) by (le, instance, changefeed))", "format": "time_series", "hide": true, "interval": "", @@ -10574,7 +10574,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "TiKV sink write batch size percentile", + "title": "$sink_type sink write batch size percentile", "tooltip": { "shared": true, "sort": 0, @@ -10617,7 +10617,7 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The number of changed entires that are written to downstream TiKV", + "description": "The number of changed entires that are written to downstream sink", "fieldConfig": { "defaults": { "links": [] @@ -10664,7 +10664,7 @@ "steppedLine": false, "targets": [ { - "expr": "sum (rate(tikv_cdc_sink_txn_batch_size_sum{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"TiKV\"}[1m])) by (instance)", + "expr": "sum (rate(tikv_cdc_sink_txn_batch_size_sum{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"$sink_type\"}[1m])) by (instance)", "format": "time_series", "hide": false, "intervalFactor": 1, @@ -10673,7 +10673,7 @@ }, { "exemplar": true, - "expr": "sum (rate(tikv_cdc_sink_txn_batch_size_sum{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"TiKV\"}[1m])) by (changefeed)", + "expr": "sum (rate(tikv_cdc_sink_txn_batch_size_sum{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"$sink_type\"}[1m])) by (changefeed)", "format": "time_series", "hide": false, "interval": "", @@ -10686,7 +10686,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "TiKV sink write rows count/s", + "title": "$sink_type sink write rows count/s", "tooltip": { "shared": true, "sort": 0, @@ -10773,7 +10773,7 @@ "steppedLine": false, "targets": [ { - "expr": "sum(delta(tikv_cdc_sink_execution_error{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"TiKV\"}[1m])) by (capture)", + "expr": "sum(delta(tikv_cdc_sink_execution_error{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"$sink_type\"}[1m])) by (capture)", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{capture}}", @@ -10784,7 +10784,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "TiKV sink error count/m", + "title": "$sink_type sink error count/m", "tooltip": { "shared": true, "sort": 0, @@ -10822,7 +10822,8 @@ } } ], - "title": "TiKV Sink", + "repeat": "sink_type", + "title": "$sink_type Sink", "type": "row" } ], @@ -10957,6 +10958,37 @@ "type": "query", "useTags": false }, + { + "allValue": ".*", + "current": { + "selected": false, + "text": "All", + "value": "$__all" + }, + "datasource": "${DS_TEST-CLUSTER}", + "definition": "label_values(tikv_cdc_sink_txn_batch_size_sum{tidb_cluster=\"$tidb_cluster\"}, type)", + "description": null, + "error": null, + "hide": 0, + "includeAll": true, + "label": "Sink", + "multi": false, + "name": "sink_type", + "options": [], + "query": { + "query": "label_values(tikv_cdc_sink_txn_batch_size_sum{tidb_cluster=\"$tidb_cluster\"}, type)", + "refId": "${DS_TEST-CLUSTER}-sink_type-Variable-Query" + }, + "refresh": 2, + "regex": "", + "skipUrlSync": false, + "sort": 1, + "tagValuesQuery": "", + "tags": [], + "tagsQuery": "", + "type": "query", + "useTags": false + }, { "allValue": "9999999999", "current": { diff --git a/cdc/tests/integration_tests/flow_control/config/workload b/cdc/tests/integration_tests/flow_control/config/workload index 0c74ff45..71274999 100644 --- a/cdc/tests/integration_tests/flow_control/config/workload +++ b/cdc/tests/integration_tests/flow_control/config/workload @@ -22,7 +22,7 @@ # Default data size: 1 KB records (10 fields, 100 bytes each, plus key) # Request distribution: zipfian -recordcount=10000000 +recordcount=5000000 workload=core readallfields=true diff --git a/cdc/tests/integration_tests/flow_control/run.sh b/cdc/tests/integration_tests/flow_control/run.sh index 369df23d..6108fc37 100644 --- a/cdc/tests/integration_tests/flow_control/run.sh +++ b/cdc/tests/integration_tests/flow_control/run.sh @@ -19,7 +19,7 @@ function run() { cd $WORK_DIR start_ts=$(get_start_ts $UP_PD) - go-ycsb load tikv -P $CUR/config/workload -p tikv.pd="$UP_PD" -p tikv.type="raw" -p tikv.apiversion=V2 --threads 400 # About 1G + go-ycsb load tikv -P $CUR/config/workload -p tikv.pd="$UP_PD" -p tikv.type="raw" -p tikv.apiversion=V2 --threads 400 # About 500MiB cat - >"$WORK_DIR/tikv-cdc-config.toml" < Date: Sun, 25 Feb 2024 23:23:31 +0800 Subject: [PATCH 06/17] fix encoder size Signed-off-by: Ping Yu --- cdc/cdc/sink/codec/json.go | 22 ++++++++++++++++--- cdc/cdc/sink/codec/json_test.go | 18 +++++++++++++-- .../docker-compose-kafka-integration.yml | 21 ++++++++++++++++++ .../flow_control/config/workload | 4 +--- .../integration_tests/flow_control/run.sh | 2 +- .../integration_tests/run_kafka_in_docker.sh | 2 +- 6 files changed, 59 insertions(+), 10 deletions(-) diff --git a/cdc/cdc/sink/codec/json.go b/cdc/cdc/sink/codec/json.go index 05bdb03a..28276dc5 100644 --- a/cdc/cdc/sink/codec/json.go +++ b/cdc/cdc/sink/codec/json.go @@ -125,8 +125,10 @@ type JSONEventBatchEncoder struct { valueBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now supportMixedBuild bool // TODO decouple this out - messageBuf []*MQMessage - curBatchSize int + messageBuf []*MQMessage + curBatchSize int + totalBatchBytes int + // configs maxMessageBytes int maxBatchSize int @@ -226,6 +228,9 @@ func (d *JSONEventBatchEncoder) AppendChangedEvent(e *model.RawKVEntry) (Encoder versionHead := make([]byte, 8) binary.BigEndian.PutUint64(versionHead, BatchVersion1) + if len(d.messageBuf) > 0 { + d.totalBatchBytes += d.messageBuf[len(d.messageBuf)-1].Length() + } d.messageBuf = append(d.messageBuf, NewMQMessage(config.ProtocolOpen, versionHead, nil, 0, model.MqMessageTypeKv)) d.curBatchSize = 0 } @@ -307,13 +312,24 @@ func (d *JSONEventBatchEncoder) MixedBuild(withVersion bool) []byte { // Size implements the EventBatchEncoder interface func (d *JSONEventBatchEncoder) Size() int { - return d.keyBuf.Len() + d.valueBuf.Len() + if d.supportMixedBuild { + return d.keyBuf.Len() + d.valueBuf.Len() + } + lastMessageLength := 0 + if len(d.messageBuf) > 0 { + lastMessageLength = d.messageBuf[len(d.messageBuf)-1].Length() + } + return d.totalBatchBytes + lastMessageLength } // Reset implements the EventBatchEncoder interface func (d *JSONEventBatchEncoder) Reset() { d.keyBuf.Reset() d.valueBuf.Reset() + + d.messageBuf = d.messageBuf[:0] + d.curBatchSize = 0 + d.totalBatchBytes = 0 } // SetParams reads relevant parameters for Open Protocol diff --git a/cdc/cdc/sink/codec/json_test.go b/cdc/cdc/sink/codec/json_test.go index 7b06d03d..a12533c8 100644 --- a/cdc/cdc/sink/codec/json_test.go +++ b/cdc/cdc/sink/codec/json_test.go @@ -197,6 +197,7 @@ func (s *batchSuite) TestSetParams(c *check.C) { func (s *batchSuite) TestMaxMessageBytes(c *check.C) { defer testleak.AfterTest(c)() encoder := NewJSONEventBatchEncoder() + c.Check(encoder.Size(), check.Equals, 0) // the size of `testEvent` is 75 testEvent := &model.RawKVEntry{ @@ -207,14 +208,23 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) { ExpiredTs: 200, } eventSize := 75 - // for a single message, the overhead is 36(maximumRecordOverhead) + 8(versionHea) = 44, just can hold it. - a := strconv.Itoa(eventSize + 44) + overhead := 36 + 8 + + a := strconv.Itoa(eventSize + overhead) err := encoder.SetParams(map[string]string{"max-message-bytes": a}) c.Check(err, check.IsNil) r, err := encoder.AppendChangedEvent(testEvent) c.Check(err, check.IsNil) c.Check(r, check.Equals, EncoderNoOperation) + totalSize := eventSize + overhead + c.Check(encoder.Size(), check.Equals, totalSize) + + r, err = encoder.AppendChangedEvent(testEvent) + c.Check(err, check.IsNil) + c.Check(r, check.Equals, EncoderNoOperation) + totalSize += eventSize + overhead + c.Check(encoder.Size(), check.Equals, totalSize) a = strconv.Itoa(eventSize + 43) err = encoder.SetParams(map[string]string{"max-message-bytes": a}) @@ -222,8 +232,10 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) { r, err = encoder.AppendChangedEvent(testEvent) c.Check(err, check.NotNil) c.Check(r, check.Equals, EncoderNoOperation) + c.Check(encoder.Size(), check.Equals, totalSize) // make sure each batch's `Length` not greater than `max-message-bytes` + // 256: each message can hold 2 events (75 * 2 + 36 + 8 = 194) err = encoder.SetParams(map[string]string{"max-message-bytes": "256"}) c.Check(err, check.IsNil) @@ -232,6 +244,8 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) { c.Check(r, check.Equals, EncoderNoOperation) c.Check(err, check.IsNil) } + totalSize += (eventSize*2 + overhead) * 5000 + c.Check(encoder.Size(), check.Equals, totalSize) messages := encoder.Build() for _, msg := range messages { diff --git a/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml b/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml index 8d47a2a8..50ab8592 100644 --- a/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml +++ b/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml @@ -5,6 +5,13 @@ services: image: wurstmeister/zookeeper ports: - "2181:2181" + deploy: + resources: + limits: + cpus: '2' + memory: 4G + reservations: + memory: 4G kafka: image: wurstmeister/kafka:2.12-2.4.1 @@ -19,6 +26,13 @@ services: KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://127.0.0.1:9092" ZK: "zk" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + deploy: + resources: + limits: + cpus: '2' + memory: 4G + reservations: + memory: 4G depends_on: - "zookeeper" @@ -38,3 +52,10 @@ services: network_mode: "service:kafka" volumes: - /tmp/tikv_cdc_test/:/tmp/tikv_cdc_test + deploy: + resources: + limits: + cpus: '4' + memory: 16G + reservations: + memory: 12G diff --git a/cdc/tests/integration_tests/flow_control/config/workload b/cdc/tests/integration_tests/flow_control/config/workload index 71274999..faa23c17 100644 --- a/cdc/tests/integration_tests/flow_control/config/workload +++ b/cdc/tests/integration_tests/flow_control/config/workload @@ -22,7 +22,7 @@ # Default data size: 1 KB records (10 fields, 100 bytes each, plus key) # Request distribution: zipfian -recordcount=5000000 +recordcount=1000000 workload=core readallfields=true @@ -34,5 +34,3 @@ insertproportion=0 requestdistribution=uniform -fieldcount=1 -fieldlength=100 diff --git a/cdc/tests/integration_tests/flow_control/run.sh b/cdc/tests/integration_tests/flow_control/run.sh index 6108fc37..5ba45031 100644 --- a/cdc/tests/integration_tests/flow_control/run.sh +++ b/cdc/tests/integration_tests/flow_control/run.sh @@ -19,7 +19,7 @@ function run() { cd $WORK_DIR start_ts=$(get_start_ts $UP_PD) - go-ycsb load tikv -P $CUR/config/workload -p tikv.pd="$UP_PD" -p tikv.type="raw" -p tikv.apiversion=V2 --threads 400 # About 500MiB + go-ycsb load tikv -P $CUR/config/workload -p tikv.pd="$UP_PD" -p tikv.type="raw" -p tikv.apiversion=V2 --threads 100 # About 1G cat - >"$WORK_DIR/tikv-cdc-config.toml" < Date: Sun, 25 Feb 2024 23:30:08 +0800 Subject: [PATCH 07/17] fix Signed-off-by: Ping Yu --- cdc/cdc/sink/codec/json.go | 3 ++- cdc/tests/integration_tests/flow_control/config/workload | 4 +++- cdc/tests/integration_tests/flow_control/run.sh | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/cdc/cdc/sink/codec/json.go b/cdc/cdc/sink/codec/json.go index 28276dc5..a8b76d60 100644 --- a/cdc/cdc/sink/codec/json.go +++ b/cdc/cdc/sink/codec/json.go @@ -127,7 +127,7 @@ type JSONEventBatchEncoder struct { messageBuf []*MQMessage curBatchSize int - totalBatchBytes int + totalBatchBytes int // Note: The size of last message is not included // configs maxMessageBytes int @@ -315,6 +315,7 @@ func (d *JSONEventBatchEncoder) Size() int { if d.supportMixedBuild { return d.keyBuf.Len() + d.valueBuf.Len() } + lastMessageLength := 0 if len(d.messageBuf) > 0 { lastMessageLength = d.messageBuf[len(d.messageBuf)-1].Length() diff --git a/cdc/tests/integration_tests/flow_control/config/workload b/cdc/tests/integration_tests/flow_control/config/workload index faa23c17..71274999 100644 --- a/cdc/tests/integration_tests/flow_control/config/workload +++ b/cdc/tests/integration_tests/flow_control/config/workload @@ -22,7 +22,7 @@ # Default data size: 1 KB records (10 fields, 100 bytes each, plus key) # Request distribution: zipfian -recordcount=1000000 +recordcount=5000000 workload=core readallfields=true @@ -34,3 +34,5 @@ insertproportion=0 requestdistribution=uniform +fieldcount=1 +fieldlength=100 diff --git a/cdc/tests/integration_tests/flow_control/run.sh b/cdc/tests/integration_tests/flow_control/run.sh index 5ba45031..dd394655 100644 --- a/cdc/tests/integration_tests/flow_control/run.sh +++ b/cdc/tests/integration_tests/flow_control/run.sh @@ -19,7 +19,7 @@ function run() { cd $WORK_DIR start_ts=$(get_start_ts $UP_PD) - go-ycsb load tikv -P $CUR/config/workload -p tikv.pd="$UP_PD" -p tikv.type="raw" -p tikv.apiversion=V2 --threads 100 # About 1G + go-ycsb load tikv -P $CUR/config/workload -p tikv.pd="$UP_PD" -p tikv.type="raw" -p tikv.apiversion=V2 --threads 100 # About 500MiB cat - >"$WORK_DIR/tikv-cdc-config.toml" < Date: Mon, 26 Feb 2024 10:11:35 +0800 Subject: [PATCH 08/17] MQMessage pool Signed-off-by: Ping Yu --- cdc/cdc/sink/codec/interface.go | 40 +++++++++++++++++++++++---------- cdc/cdc/sink/codec/json.go | 18 +++++++++------ cdc/cdc/sink/mq.go | 5 +++++ 3 files changed, 44 insertions(+), 19 deletions(-) diff --git a/cdc/cdc/sink/codec/interface.go b/cdc/cdc/sink/codec/interface.go index 5b2faaa8..b3efdd3f 100644 --- a/cdc/cdc/sink/codec/interface.go +++ b/cdc/cdc/sink/codec/interface.go @@ -16,6 +16,7 @@ package codec import ( "context" "encoding/binary" + "sync" "time" "github.com/pingcap/log" @@ -63,6 +64,15 @@ type MQMessage struct { entriesCount int // entries in one MQ Message } +func (m *MQMessage) Reset() { + m.Key = m.Key[:0] + m.Value = m.Value[:0] + m.Ts = 0 + m.Type = model.MqMessageTypeUnknown + m.Protocol = config.ProtocolDefault + m.entriesCount = 0 +} + // maximumRecordOverhead is used to calculate ProducerMessage's byteSize by sarama kafka client. // reference: https://github.com/Shopify/sarama/blob/66521126c71c522c15a36663ae9cddc2b024c799/async_producer.go#L233 // for TiKV-CDC, minimum supported kafka version is `0.11.0.2`, which will be treated as `version = 2` by sarama producer. @@ -99,31 +109,37 @@ func newResolvedMQMessage(proto config.Protocol, key, value []byte, ts uint64) * return NewMQMessage(proto, key, value, ts, model.MqMessageTypeResolved) } +var mqMsgPool = sync.Pool{ + New: func() any { + return new(MQMessage) + }, +} + // NewMQMessage should be used when creating a MQMessage struct. // It copies the input byte slices to avoid any surprises in asynchronous MQ writes. func NewMQMessage(proto config.Protocol, key []byte, value []byte, ts uint64, ty model.MqMessageType) *MQMessage { - ret := &MQMessage{ - Key: nil, - Value: nil, - Ts: ts, - Type: ty, - Protocol: proto, - entriesCount: 0, - } + ret := mqMsgPool.Get().(*MQMessage) + ret.Ts = ts + ret.Type = ty + ret.Protocol = proto + ret.entriesCount = 0 if key != nil { - ret.Key = make([]byte, len(key)) - copy(ret.Key, key) + ret.Key = append(ret.Key, key...) } if value != nil { - ret.Value = make([]byte, len(value)) - copy(ret.Value, value) + ret.Value = append(ret.Value, value...) } return ret } +func ReleaseMQMessage(m *MQMessage) { + m.Reset() + mqMsgPool.Put(m) +} + // EventBatchDecoder is an abstraction for events decoder // this interface is only for testing now type EventBatchDecoder interface { diff --git a/cdc/cdc/sink/codec/json.go b/cdc/cdc/sink/codec/json.go index a8b76d60..071d606b 100644 --- a/cdc/cdc/sink/codec/json.go +++ b/cdc/cdc/sink/codec/json.go @@ -254,6 +254,8 @@ func (d *JSONEventBatchEncoder) AppendChangedEvent(e *model.RawKVEntry) (Encoder } // Build implements the EventBatchEncoder interface +// NOTE: when supportMixedBuild is enabled, must call Reset() after the returned `mqMessages` is used. +// It's not a good design. As supportMixedBuild is used in unit tests only, we don't fix it now. func (d *JSONEventBatchEncoder) Build() (mqMessages []*MQMessage) { if d.supportMixedBuild { if d.valueBuf.Len() == 0 { @@ -265,7 +267,7 @@ func (d *JSONEventBatchEncoder) Build() (mqMessages []*MQMessage) { } ret := d.messageBuf - d.messageBuf = make([]*MQMessage, 0) + d.Reset() return ret } @@ -325,12 +327,14 @@ func (d *JSONEventBatchEncoder) Size() int { // Reset implements the EventBatchEncoder interface func (d *JSONEventBatchEncoder) Reset() { - d.keyBuf.Reset() - d.valueBuf.Reset() - - d.messageBuf = d.messageBuf[:0] - d.curBatchSize = 0 - d.totalBatchBytes = 0 + if d.supportMixedBuild { + d.keyBuf.Reset() + d.valueBuf.Reset() + } else { + d.messageBuf = make([]*MQMessage, 0) + d.curBatchSize = 0 + d.totalBatchBytes = 0 + } } // SetParams reads relevant parameters for Open Protocol diff --git a/cdc/cdc/sink/mq.go b/cdc/cdc/sink/mq.go index a305a830..a94f24f5 100644 --- a/cdc/cdc/sink/mq.go +++ b/cdc/cdc/sink/mq.go @@ -239,6 +239,11 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error { flushToProducer := func(op codec.EncoderResult) error { return k.statistics.RecordBatchExecution(func() (int, error) { messages := encoder.Build() + defer func() { + for _, msg := range messages { + codec.ReleaseMQMessage(msg) + } + }() thisBatchSize := 0 if len(messages) == 0 { return 0, nil From b62252f2f80d15658356f77ac5ed41dbfffe77b3 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Sat, 2 Mar 2024 15:20:45 +0800 Subject: [PATCH 09/17] fix release Signed-off-by: Ping Yu --- cdc/cdc/sink/codec/interface.go | 18 ++++++++++++++++-- cdc/cdc/sink/mq.go | 5 ----- cdc/cdc/sink/producer/kafka/kafka.go | 15 +++++++++++++-- 3 files changed, 29 insertions(+), 9 deletions(-) diff --git a/cdc/cdc/sink/codec/interface.go b/cdc/cdc/sink/codec/interface.go index b3efdd3f..56f4b7e9 100644 --- a/cdc/cdc/sink/codec/interface.go +++ b/cdc/cdc/sink/codec/interface.go @@ -64,9 +64,23 @@ type MQMessage struct { entriesCount int // entries in one MQ Message } +const ( + MemoryReleaseThreshold = 1024 + MemoryReleaseFactor = 10 +) + +func resetBuffer(buf []byte) []byte { + length := len(buf) + capSize := cap(buf) + if capSize > MemoryReleaseThreshold && length > 0 && length*MemoryReleaseFactor < capSize { + return nil + } + return buf[:0] +} + func (m *MQMessage) Reset() { - m.Key = m.Key[:0] - m.Value = m.Value[:0] + m.Key = resetBuffer(m.Key) + m.Value = resetBuffer(m.Value) m.Ts = 0 m.Type = model.MqMessageTypeUnknown m.Protocol = config.ProtocolDefault diff --git a/cdc/cdc/sink/mq.go b/cdc/cdc/sink/mq.go index a94f24f5..a305a830 100644 --- a/cdc/cdc/sink/mq.go +++ b/cdc/cdc/sink/mq.go @@ -239,11 +239,6 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error { flushToProducer := func(op codec.EncoderResult) error { return k.statistics.RecordBatchExecution(func() (int, error) { messages := encoder.Build() - defer func() { - for _, msg := range messages { - codec.ReleaseMQMessage(msg) - } - }() thisBatchSize := 0 if len(messages) == 0 { return 0, nil diff --git a/cdc/cdc/sink/producer/kafka/kafka.go b/cdc/cdc/sink/producer/kafka/kafka.go index d1a937b8..7176a484 100644 --- a/cdc/cdc/sink/producer/kafka/kafka.go +++ b/cdc/cdc/sink/producer/kafka/kafka.go @@ -72,6 +72,11 @@ type kafkaSaramaProducer struct { type kafkaProducerClosingFlag = int32 +type kafkaMetadata struct { + message *codec.MQMessage + offset uint64 +} + func (k *kafkaSaramaProducer) AsyncSendMessage(ctx context.Context, message *codec.MQMessage, partition int32) error { k.clientLock.RLock() defer k.clientLock.RUnlock() @@ -88,7 +93,11 @@ func (k *kafkaSaramaProducer) AsyncSendMessage(ctx context.Context, message *cod Value: sarama.ByteEncoder(message.Value), Partition: partition, } - msg.Metadata = atomic.AddUint64(&k.partitionOffset[partition].sent, 1) + metadata := &kafkaMetadata{ + message: message, + offset: atomic.AddUint64(&k.partitionOffset[partition].sent, 1), + } + msg.Metadata = metadata failpoint.Inject("KafkaSinkAsyncSendError", func() { // simulate sending message to input channel successfully but flushing @@ -242,7 +251,9 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error { if msg == nil || msg.Metadata == nil { continue } - flushedOffset := msg.Metadata.(uint64) + metadata := msg.Metadata.(*kafkaMetadata) + codec.ReleaseMQMessage(metadata.message) + flushedOffset := metadata.offset atomic.StoreUint64(&k.partitionOffset[msg.Partition].flushed, flushedOffset) k.flushedNotifier.Notify() case err := <-k.asyncClient.Errors(): From 671adf559a65e5bcd58cf45b7d04e8781b19dbb9 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Sat, 2 Mar 2024 16:28:54 +0800 Subject: [PATCH 10/17] wip Signed-off-by: Ping Yu --- cdc/cdc/sink/mq.go | 2 +- cdc/cdc/sink/producer/kafka/config.go | 2 +- cdc/cdc/sink/producer/kafka/kafka_test.go | 8 ++++++++ 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/cdc/cdc/sink/mq.go b/cdc/cdc/sink/mq.go index a305a830..761df1c2 100644 --- a/cdc/cdc/sink/mq.go +++ b/cdc/cdc/sink/mq.go @@ -250,11 +250,11 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error { }) for _, msg := range messages { + thisBatchSize += msg.GetEntriesCount() err := k.writeToProducer(ctx, msg, codec.EncoderNeedAsyncWrite, partition) if err != nil { return 0, err } - thisBatchSize += msg.GetEntriesCount() } if op == codec.EncoderNeedSyncWrite { diff --git a/cdc/cdc/sink/producer/kafka/config.go b/cdc/cdc/sink/producer/kafka/config.go index e33395df..3293f476 100644 --- a/cdc/cdc/sink/producer/kafka/config.go +++ b/cdc/cdc/sink/producer/kafka/config.go @@ -31,7 +31,7 @@ import ( ) func init() { - sarama.MaxRequestSize = 1024 * 1024 * 1024 // 1GB + sarama.MaxRequestSize = 100 * 1024 * 1024 // 100MB } // Config stores user specified Kafka producer configuration diff --git a/cdc/cdc/sink/producer/kafka/kafka_test.go b/cdc/cdc/sink/producer/kafka/kafka_test.go index 98bc6c39..1542384e 100644 --- a/cdc/cdc/sink/producer/kafka/kafka_test.go +++ b/cdc/cdc/sink/producer/kafka/kafka_test.go @@ -26,11 +26,19 @@ import ( "github.com/pingcap/errors" "github.com/tikv/migration/cdc/cdc/sink/codec" "github.com/tikv/migration/cdc/pkg/kafka" + "github.com/tikv/migration/cdc/pkg/logutil" "github.com/tikv/migration/cdc/pkg/util/testleak" ) type kafkaSuite struct{} +func (s *kafkaSuite) SetUpTest(c *check.C) { + err := logutil.InitLogger(&logutil.Config{ + Level: "debug", + }) + c.Assert(err, check.IsNil) +} + var _ = check.Suite(&kafkaSuite{}) func Test(t *testing.T) { check.TestingT(t) } From d5af5a3896b9556997e44b60e40caed72d08df46 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Sat, 2 Mar 2024 17:38:04 +0800 Subject: [PATCH 11/17] fix flaky ut Signed-off-by: Ping Yu --- cdc/cdc/sink/mq_test.go | 2 +- cdc/cdc/sink/producer/kafka/kafka_test.go | 13 ++++--------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/cdc/cdc/sink/mq_test.go b/cdc/cdc/sink/mq_test.go index e337e5c6..740d8fa1 100644 --- a/cdc/cdc/sink/mq_test.go +++ b/cdc/cdc/sink/mq_test.go @@ -32,7 +32,7 @@ import ( type mqSinkSuite struct{} -var _ = check.Suite(&mqSinkSuite{}) +var _ = check.SerialSuites(&mqSinkSuite{}) func (s mqSinkSuite) TestKafkaSink(c *check.C) { defer testleak.AfterTest(c)() diff --git a/cdc/cdc/sink/producer/kafka/kafka_test.go b/cdc/cdc/sink/producer/kafka/kafka_test.go index 1542384e..f9491bdb 100644 --- a/cdc/cdc/sink/producer/kafka/kafka_test.go +++ b/cdc/cdc/sink/producer/kafka/kafka_test.go @@ -26,20 +26,12 @@ import ( "github.com/pingcap/errors" "github.com/tikv/migration/cdc/cdc/sink/codec" "github.com/tikv/migration/cdc/pkg/kafka" - "github.com/tikv/migration/cdc/pkg/logutil" "github.com/tikv/migration/cdc/pkg/util/testleak" ) type kafkaSuite struct{} -func (s *kafkaSuite) SetUpTest(c *check.C) { - err := logutil.InitLogger(&logutil.Config{ - Level: "debug", - }) - c.Assert(err, check.IsNil) -} - -var _ = check.Suite(&kafkaSuite{}) +var _ = check.SerialSuites(&kafkaSuite{}) func Test(t *testing.T) { check.TestingT(t) } @@ -110,6 +102,9 @@ func (s *kafkaSuite) TestNewSaramaProducer(c *check.C) { cfg.Producer.Flush.MaxMessages = 1 return cfg, err } + defer func() { + newSaramaConfigImpl = newSaramaConfigImplBak + }() NewAdminClientImpl = kafka.NewMockAdminClient defer func() { NewAdminClientImpl = kafka.NewSaramaAdminClient From 87e7534bcc6dd155c24cd1036d15736a1b8a1481 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Sat, 2 Mar 2024 22:48:42 +0800 Subject: [PATCH 12/17] logging Signed-off-by: Ping Yu --- cdc/cdc/sink/mq_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cdc/cdc/sink/mq_test.go b/cdc/cdc/sink/mq_test.go index 740d8fa1..1942ee02 100644 --- a/cdc/cdc/sink/mq_test.go +++ b/cdc/cdc/sink/mq_test.go @@ -25,6 +25,7 @@ import ( "github.com/tikv/migration/cdc/cdc/sink/codec" kafkap "github.com/tikv/migration/cdc/cdc/sink/producer/kafka" "github.com/tikv/migration/cdc/pkg/config" + "github.com/tikv/migration/cdc/pkg/logutil" "github.com/tikv/migration/cdc/pkg/kafka" "github.com/tikv/migration/cdc/pkg/util/testleak" @@ -121,6 +122,11 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + err := logutil.InitLogger(&logutil.Config{ + Level: "debug", + }) + c.Assert(err, check.IsNil) + topic := kafka.DefaultMockTopicName leader := sarama.NewMockBroker(c, 1) defer leader.Close() From a6f8b1db2c7f62b806d9bf43c05bb528f3119991 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Sat, 2 Mar 2024 23:08:42 +0800 Subject: [PATCH 13/17] fix ut Signed-off-by: Ping Yu --- cdc/cdc/sink/mq_test.go | 26 +++++++++++--------- cdc/cdc/sink/producer/kafka/config_test.go | 10 ++++---- cdc/cdc/sink/producer/kafka/kafka.go | 4 ++-- cdc/cdc/sink/producer/kafka/kafka_test.go | 28 +++++++++++----------- 4 files changed, 36 insertions(+), 32 deletions(-) diff --git a/cdc/cdc/sink/mq_test.go b/cdc/cdc/sink/mq_test.go index 1942ee02..0a6ba084 100644 --- a/cdc/cdc/sink/mq_test.go +++ b/cdc/cdc/sink/mq_test.go @@ -25,7 +25,6 @@ import ( "github.com/tikv/migration/cdc/cdc/sink/codec" kafkap "github.com/tikv/migration/cdc/cdc/sink/producer/kafka" "github.com/tikv/migration/cdc/pkg/config" - "github.com/tikv/migration/cdc/pkg/logutil" "github.com/tikv/migration/cdc/pkg/kafka" "github.com/tikv/migration/cdc/pkg/util/testleak" @@ -122,11 +121,6 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := logutil.InitLogger(&logutil.Config{ - Level: "debug", - }) - c.Assert(err, check.IsNil) - topic := kafka.DefaultMockTopicName leader := sarama.NewMockBroker(c, 1) defer leader.Close() @@ -150,6 +144,16 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) { opts := map[string]string{} errCh := make(chan error, 1) + newSaramaConfigImplBak := kafkap.NewSaramaConfigImpl + kafkap.NewSaramaConfigImpl = func(ctx context.Context, config *kafkap.Config) (*sarama.Config, error) { + cfg, err := newSaramaConfigImplBak(ctx, config) + c.Assert(err, check.IsNil) + cfg.Producer.Flush.MaxMessages = 1 + return cfg, err + } + defer func() { + kafkap.NewSaramaConfigImpl = newSaramaConfigImplBak + }() kafkap.NewAdminClientImpl = kafka.NewMockAdminClient defer func() { kafkap.NewAdminClientImpl = kafka.NewSaramaAdminClient @@ -158,8 +162,11 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) { sink, err := newKafkaSaramaSink(ctx, sinkURI, replicaConfig, opts, errCh) c.Assert(err, check.IsNil) - // mock kafka broker processes 1 row changed event - leader.Returns(prodSuccess) + // mock kafka broker processes 3 row changed event + for i := 0; i < 3; i++ { + leader.Returns(prodSuccess) + } + keyspanID1 := model.KeySpanID(1) kv1 := &model.RawKVEntry{ OpType: model.OpTypePut, @@ -188,12 +195,9 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) { StartTs: 110, CRTs: 130, } - err = sink.EmitChangedEvents(ctx, kv3) c.Assert(err, check.IsNil) - // mock kafka broker processes 1 row resolvedTs event - leader.Returns(prodSuccess) checkpointTs1, err := sink.FlushChangedEvents(ctx, keyspanID1, kv1.CRTs) c.Assert(err, check.IsNil) c.Assert(checkpointTs1, check.Equals, kv1.CRTs) diff --git a/cdc/cdc/sink/producer/kafka/config_test.go b/cdc/cdc/sink/producer/kafka/config_test.go index c83c2ea9..967cce5a 100644 --- a/cdc/cdc/sink/producer/kafka/config_test.go +++ b/cdc/cdc/sink/producer/kafka/config_test.go @@ -33,13 +33,13 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) { ctx := context.Background() config := NewConfig() config.Version = "invalid" - _, err := newSaramaConfigImpl(ctx, config) + _, err := NewSaramaConfigImpl(ctx, config) c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*") ctx = util.SetOwnerInCtx(ctx) config.Version = "2.6.0" config.ClientID = "^invalid$" - _, err = newSaramaConfigImpl(ctx, config) + _, err = NewSaramaConfigImpl(ctx, config) c.Assert(cerror.ErrKafkaInvalidClientID.Equal(err), check.IsTrue) config.ClientID = "test-kafka-client" @@ -56,7 +56,7 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) { } for _, cc := range compressionCases { config.Compression = cc.algorithm - cfg, err := newSaramaConfigImpl(ctx, config) + cfg, err := NewSaramaConfigImpl(ctx, config) c.Assert(err, check.IsNil) c.Assert(cfg.Producer.Compression, check.Equals, cc.expected) } @@ -64,7 +64,7 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) { config.Credential = &security.Credential{ CAPath: "/invalid/ca/path", } - _, err = newSaramaConfigImpl(ctx, config) + _, err = NewSaramaConfigImpl(ctx, config) c.Assert(errors.Cause(err), check.ErrorMatches, ".*no such file or directory") saslConfig := NewConfig() @@ -76,7 +76,7 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) { SaslMechanism: sarama.SASLTypeSCRAMSHA256, } - cfg, err := newSaramaConfigImpl(ctx, saslConfig) + cfg, err := NewSaramaConfigImpl(ctx, saslConfig) c.Assert(err, check.IsNil) c.Assert(cfg, check.NotNil) c.Assert(cfg.Net.SASL.User, check.Equals, "user") diff --git a/cdc/cdc/sink/producer/kafka/kafka.go b/cdc/cdc/sink/producer/kafka/kafka.go index 7176a484..08d028f9 100644 --- a/cdc/cdc/sink/producer/kafka/kafka.go +++ b/cdc/cdc/sink/producer/kafka/kafka.go @@ -269,14 +269,14 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error { } var ( - newSaramaConfigImpl = newSaramaConfig + NewSaramaConfigImpl = newSaramaConfig NewAdminClientImpl kafka.ClusterAdminClientCreator = kafka.NewSaramaAdminClient ) // NewKafkaSaramaProducer creates a kafka sarama producer func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, opts map[string]string, errCh chan error) (*kafkaSaramaProducer, error) { log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config)) - cfg, err := newSaramaConfigImpl(ctx, config) + cfg, err := NewSaramaConfigImpl(ctx, config) if err != nil { return nil, err } diff --git a/cdc/cdc/sink/producer/kafka/kafka_test.go b/cdc/cdc/sink/producer/kafka/kafka_test.go index f9491bdb..ec5c7d69 100644 --- a/cdc/cdc/sink/producer/kafka/kafka_test.go +++ b/cdc/cdc/sink/producer/kafka/kafka_test.go @@ -95,15 +95,15 @@ func (s *kafkaSuite) TestNewSaramaProducer(c *check.C) { config.AutoCreate = false config.BrokerEndpoints = strings.Split(leader.Addr(), ",") - newSaramaConfigImplBak := newSaramaConfigImpl - newSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) { + newSaramaConfigImplBak := NewSaramaConfigImpl + NewSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) { cfg, err := newSaramaConfigImplBak(ctx, config) c.Assert(err, check.IsNil) cfg.Producer.Flush.MaxMessages = 1 return cfg, err } defer func() { - newSaramaConfigImpl = newSaramaConfigImplBak + NewSaramaConfigImpl = newSaramaConfigImplBak }() NewAdminClientImpl = kafka.NewMockAdminClient defer func() { @@ -204,7 +204,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) { // When topic exists and max message bytes is set correctly. config.MaxMessageBytes = adminClient.GetDefaultMaxMessageBytes() - cfg, err := newSaramaConfigImpl(context.Background(), config) + cfg, err := NewSaramaConfigImpl(context.Background(), config) c.Assert(err, check.IsNil) opts := make(map[string]string) err = validateMaxMessageBytesAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg, opts) @@ -215,7 +215,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) { // use the smaller one. defaultMaxMessageBytes := adminClient.GetDefaultMaxMessageBytes() config.MaxMessageBytes = defaultMaxMessageBytes + 1 - cfg, err = newSaramaConfigImpl(context.Background(), config) + cfg, err = NewSaramaConfigImpl(context.Background(), config) c.Assert(err, check.IsNil) opts = make(map[string]string) err = validateMaxMessageBytesAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg, opts) @@ -224,7 +224,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) { c.Assert(opts["max-message-bytes"], check.Equals, strconv.Itoa(cfg.Producer.MaxMessageBytes)) config.MaxMessageBytes = defaultMaxMessageBytes - 1 - cfg, err = newSaramaConfigImpl(context.Background(), config) + cfg, err = NewSaramaConfigImpl(context.Background(), config) c.Assert(err, check.IsNil) opts = make(map[string]string) err = validateMaxMessageBytesAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg, opts) @@ -234,7 +234,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) { // When topic does not exist and auto-create is not enabled. config.AutoCreate = false - cfg, err = newSaramaConfigImpl(context.Background(), config) + cfg, err = NewSaramaConfigImpl(context.Background(), config) c.Assert(err, check.IsNil) opts = make(map[string]string) err = validateMaxMessageBytesAndCreateTopic(adminClient, "non-exist", config, cfg, opts) @@ -248,7 +248,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) { // It is less than the value of broker. config.AutoCreate = true config.MaxMessageBytes = defaultMaxMessageBytes - 1 - cfg, err = newSaramaConfigImpl(context.Background(), config) + cfg, err = NewSaramaConfigImpl(context.Background(), config) c.Assert(err, check.IsNil) opts = make(map[string]string) err = validateMaxMessageBytesAndCreateTopic(adminClient, "create-new-success", config, cfg, opts) @@ -260,7 +260,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) { // It is larger than the value of broker. config.MaxMessageBytes = defaultMaxMessageBytes + 1 config.AutoCreate = true - cfg, err = newSaramaConfigImpl(context.Background(), config) + cfg, err = NewSaramaConfigImpl(context.Background(), config) c.Assert(err, check.IsNil) opts = make(map[string]string) err = validateMaxMessageBytesAndCreateTopic(adminClient, "create-new-fail", config, cfg, opts) @@ -272,7 +272,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) { // the check of parameter succeeds. // It is less than the value of broker. config.MaxMessageBytes = defaultMaxMessageBytes - 1 - cfg, err = newSaramaConfigImpl(context.Background(), config) + cfg, err = NewSaramaConfigImpl(context.Background(), config) c.Assert(err, check.IsNil) detail := &sarama.TopicDetail{ NumPartitions: 3, @@ -291,7 +291,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) { // the check of parameter fails. // It is larger than the value of broker. config.MaxMessageBytes = defaultMaxMessageBytes + 1 - cfg, err = newSaramaConfigImpl(context.Background(), config) + cfg, err = NewSaramaConfigImpl(context.Background(), config) c.Assert(err, check.IsNil) opts = make(map[string]string) err = validateMaxMessageBytesAndCreateTopic(adminClient, "test-topic", config, cfg, opts) @@ -346,8 +346,8 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { NewAdminClientImpl = kafka.NewSaramaAdminClient }() - newSaramaConfigImplBak := newSaramaConfigImpl - newSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) { + newSaramaConfigImplBak := NewSaramaConfigImpl + NewSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) { cfg, err := newSaramaConfigImplBak(ctx, config) c.Assert(err, check.IsNil) cfg.Producer.Flush.MaxMessages = 1 @@ -356,7 +356,7 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { return cfg, err } defer func() { - newSaramaConfigImpl = newSaramaConfigImplBak + NewSaramaConfigImpl = newSaramaConfigImplBak }() errCh := make(chan error, 1) From d0c9e756f2d648af306f42734b2a9a9733c5a9a4 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Sun, 3 Mar 2024 16:54:43 +0800 Subject: [PATCH 14/17] adjust memory release parameter Signed-off-by: Ping Yu --- cdc/cdc/sink/codec/interface.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/cdc/sink/codec/interface.go b/cdc/cdc/sink/codec/interface.go index 56f4b7e9..d24557f2 100644 --- a/cdc/cdc/sink/codec/interface.go +++ b/cdc/cdc/sink/codec/interface.go @@ -65,8 +65,8 @@ type MQMessage struct { } const ( - MemoryReleaseThreshold = 1024 - MemoryReleaseFactor = 10 + MemoryReleaseThreshold = 100 * 1024 // 100KiB + MemoryReleaseFactor = 100 ) func resetBuffer(buf []byte) []byte { From b54817cf941cbaf23c67c643d83a176d424f94f9 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Sun, 3 Mar 2024 17:24:56 +0800 Subject: [PATCH 15/17] polish Signed-off-by: Ping Yu --- cdc/cdc/sink/codec/interface.go | 6 ++++++ cdc/cdc/sink/codec/json_test.go | 3 ++- cdc/cdc/sink/mq_test.go | 2 +- cdc/cdc/sink/producer/kafka/kafka.go | 9 ++++++++- 4 files changed, 17 insertions(+), 3 deletions(-) diff --git a/cdc/cdc/sink/codec/interface.go b/cdc/cdc/sink/codec/interface.go index d24557f2..18908ebe 100644 --- a/cdc/cdc/sink/codec/interface.go +++ b/cdc/cdc/sink/codec/interface.go @@ -133,6 +133,12 @@ var mqMsgPool = sync.Pool{ // It copies the input byte slices to avoid any surprises in asynchronous MQ writes. func NewMQMessage(proto config.Protocol, key []byte, value []byte, ts uint64, ty model.MqMessageType) *MQMessage { ret := mqMsgPool.Get().(*MQMessage) + + // TODO: remove this check. + if len(ret.Key) > 0 || len(ret.Value) > 0 { + log.Panic("MQMessage is not reset", zap.String("key", string(ret.Key)), zap.String("value", string(ret.Value))) + } + ret.Ts = ts ret.Type = ty ret.Protocol = proto diff --git a/cdc/cdc/sink/codec/json_test.go b/cdc/cdc/sink/codec/json_test.go index a12533c8..aaca75a0 100644 --- a/cdc/cdc/sink/codec/json_test.go +++ b/cdc/cdc/sink/codec/json_test.go @@ -208,9 +208,10 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) { ExpiredTs: 200, } eventSize := 75 - // for a single message, the overhead is 36(maximumRecordOverhead) + 8(versionHea) = 44, just can hold it. + // for a single message, the overhead is 36(maximumRecordOverhead) + 8(versionHea) = 44. overhead := 36 + 8 + // just can hold a single message. a := strconv.Itoa(eventSize + overhead) err := encoder.SetParams(map[string]string{"max-message-bytes": a}) c.Check(err, check.IsNil) diff --git a/cdc/cdc/sink/mq_test.go b/cdc/cdc/sink/mq_test.go index 0a6ba084..f2bda0a4 100644 --- a/cdc/cdc/sink/mq_test.go +++ b/cdc/cdc/sink/mq_test.go @@ -162,7 +162,7 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) { sink, err := newKafkaSaramaSink(ctx, sinkURI, replicaConfig, opts, errCh) c.Assert(err, check.IsNil) - // mock kafka broker processes 3 row changed event + // mock kafka broker processes 3 row changed events for i := 0; i < 3; i++ { leader.Returns(prodSuccess) } diff --git a/cdc/cdc/sink/producer/kafka/kafka.go b/cdc/cdc/sink/producer/kafka/kafka.go index 08d028f9..c26232ed 100644 --- a/cdc/cdc/sink/producer/kafka/kafka.go +++ b/cdc/cdc/sink/producer/kafka/kafka.go @@ -97,6 +97,7 @@ func (k *kafkaSaramaProducer) AsyncSendMessage(ctx context.Context, message *cod message: message, offset: atomic.AddUint64(&k.partitionOffset[partition].sent, 1), } + log.Debug("kafka producer sending message", zap.Int32("partition", partition), zap.Uint64("offset", metadata.offset)) msg.Metadata = metadata failpoint.Inject("KafkaSinkAsyncSendError", func() { @@ -254,7 +255,13 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error { metadata := msg.Metadata.(*kafkaMetadata) codec.ReleaseMQMessage(metadata.message) flushedOffset := metadata.offset - atomic.StoreUint64(&k.partitionOffset[msg.Partition].flushed, flushedOffset) + + prevOffset := atomic.SwapUint64(&k.partitionOffset[msg.Partition].flushed, flushedOffset) + if flushedOffset <= prevOffset { + log.Panic("kafka producer flushed offset goes backward", zap.Int32("partition", msg.Partition), zap.Uint64("flushed", flushedOffset), zap.Uint64("prev", prevOffset)) + } + log.Debug("kafka producer flushed message", zap.Int32("partition", msg.Partition), zap.Uint64("offset", flushedOffset)) + k.flushedNotifier.Notify() case err := <-k.asyncClient.Errors(): // We should not wrap a nil pointer if the pointer is of a subtype of `error` From 20b67dd858b64114d0e25726e08c008e8457198c Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Sun, 3 Mar 2024 17:50:26 +0800 Subject: [PATCH 16/17] polish Signed-off-by: Ping Yu --- cdc/tests/integration_tests/flow_control/config/workload | 4 ++-- cdc/tests/integration_tests/flow_control/run.sh | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cdc/tests/integration_tests/flow_control/config/workload b/cdc/tests/integration_tests/flow_control/config/workload index 71274999..009c71a2 100644 --- a/cdc/tests/integration_tests/flow_control/config/workload +++ b/cdc/tests/integration_tests/flow_control/config/workload @@ -22,7 +22,7 @@ # Default data size: 1 KB records (10 fields, 100 bytes each, plus key) # Request distribution: zipfian -recordcount=5000000 +recordcount=2000000 workload=core readallfields=true @@ -35,4 +35,4 @@ insertproportion=0 requestdistribution=uniform fieldcount=1 -fieldlength=100 +fieldlength=250 diff --git a/cdc/tests/integration_tests/flow_control/run.sh b/cdc/tests/integration_tests/flow_control/run.sh index dd394655..9391c680 100644 --- a/cdc/tests/integration_tests/flow_control/run.sh +++ b/cdc/tests/integration_tests/flow_control/run.sh @@ -19,7 +19,7 @@ function run() { cd $WORK_DIR start_ts=$(get_start_ts $UP_PD) - go-ycsb load tikv -P $CUR/config/workload -p tikv.pd="$UP_PD" -p tikv.type="raw" -p tikv.apiversion=V2 --threads 100 # About 500MiB + go-ycsb load tikv -P $CUR/config/workload -p tikv.pd="$UP_PD" -p tikv.type="raw" -p tikv.apiversion=V2 --threads 200 # About 500MiB cat - >"$WORK_DIR/tikv-cdc-config.toml" < Date: Mon, 4 Mar 2024 08:52:06 +0800 Subject: [PATCH 17/17] polish Signed-off-by: Ping Yu --- cdc/cdc/sink/mq_test.go | 4 ++++ cdc/cdc/sink/producer/kafka/kafka.go | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/cdc/cdc/sink/mq_test.go b/cdc/cdc/sink/mq_test.go index f2bda0a4..39699790 100644 --- a/cdc/cdc/sink/mq_test.go +++ b/cdc/cdc/sink/mq_test.go @@ -198,6 +198,10 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) { err = sink.EmitChangedEvents(ctx, kv3) c.Assert(err, check.IsNil) + // TODO: fix EmitCheckpointTs + // mock kafka broker processes 1 row resolvedTs event + // leader.Returns(prodSuccess) + checkpointTs1, err := sink.FlushChangedEvents(ctx, keyspanID1, kv1.CRTs) c.Assert(err, check.IsNil) c.Assert(checkpointTs1, check.Equals, kv1.CRTs) diff --git a/cdc/cdc/sink/producer/kafka/kafka.go b/cdc/cdc/sink/producer/kafka/kafka.go index c26232ed..447879ee 100644 --- a/cdc/cdc/sink/producer/kafka/kafka.go +++ b/cdc/cdc/sink/producer/kafka/kafka.go @@ -260,7 +260,7 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error { if flushedOffset <= prevOffset { log.Panic("kafka producer flushed offset goes backward", zap.Int32("partition", msg.Partition), zap.Uint64("flushed", flushedOffset), zap.Uint64("prev", prevOffset)) } - log.Debug("kafka producer flushed message", zap.Int32("partition", msg.Partition), zap.Uint64("offset", flushedOffset)) + log.Debug("kafka producer flushed message", zap.Int32("partition", msg.Partition), zap.Uint64("offset", flushedOffset), zap.Uint64("prev", prevOffset)) k.flushedNotifier.Notify() case err := <-k.asyncClient.Errors():