Skip to content

Commit

Permalink
[to #381] Add more integration tests for Kafka sink (#387)
Browse files Browse the repository at this point in the history
* kafka consumer tolerate TiKV errors

Signed-off-by: Ping Yu <[email protected]>

* polish

Signed-off-by: Ping Yu <[email protected]>

* improve download

Signed-off-by: Ping Yu <[email protected]>

* fix no wget

Signed-off-by: Ping Yu <[email protected]>

* trap kafka consumer log on exit

Signed-off-by: Ping Yu <[email protected]>

* more logs

Signed-off-by: Ping Yu <[email protected]>

* more memory usage for kafka sink

Signed-off-by: Ping Yu <[email protected]>

* trigger CI

Signed-off-by: Ping Yu <[email protected]>

* skip error on memory oversize for Kafka sink

Signed-off-by: Ping Yu <[email protected]>

* longer check_sync_diff timeout

Signed-off-by: Ping Yu <[email protected]>

* check event nil

Signed-off-by: Ping Yu <[email protected]>

---------

Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu authored Jan 25, 2024
1 parent aeae68f commit 503fce8
Show file tree
Hide file tree
Showing 39 changed files with 261 additions and 77 deletions.
8 changes: 5 additions & 3 deletions cdc/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ debug:
$(GOBUILD_DEBUG) -ldflags '$(LDFLAGS)' -o bin/tikv-cdc ./cmd/cdc/main.go

kafka_consumer:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_kafka_consumer ./cmd/kafka-consumer/main.go
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_kafka_consumer ./cmd/kafka-consumer/

install:
go install ./...
Expand Down Expand Up @@ -248,8 +248,10 @@ integration_test_by_group: prepare_test_binaries check_third_party_binary integr
tests/integration_tests/run_group.sh others

prepare_test_binaries:
cd scripts && ./download-integration-test-binaries.sh "$(TEST_ON_BRANCH)" && cd ..
touch prepare_test_binaries
cd scripts && \
./download-integration-test-binaries.sh "$(TEST_ON_BRANCH)" && \
cd .. && \
touch prepare_test_binaries

check_third_party_binary:
@which scripts/bin/tidb-server
Expand Down
1 change: 1 addition & 0 deletions cdc/cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ func (s *clientSuite) TestRecvLargeMessageSize(c *check.C) {
case <-time.After(30 * time.Second): // Send 128MB object may costs lots of time.
c.Fatalf("receiving message takes too long")
}
c.Assert(event, check.NotNil)
c.Assert(len(event.Val.Value), check.Equals, largeValSize)
}

Expand Down
15 changes: 11 additions & 4 deletions cdc/cdc/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ type Sink interface {
}

var (
sinkIniterMap = make(map[string]sinkInitFunc)
sinkURICheckerMap = make(map[string]sinkInitFunc)
sinkIniterMap = make(map[string]InitFunc)
sinkURICheckerMap = make(map[string]InitFunc)
)

type sinkInitFunc func(context.Context, model.ChangeFeedID, *url.URL, *config.ReplicaConfig, map[string]string, chan error) (Sink, error)
type InitFunc func(context.Context, model.ChangeFeedID, *url.URL, *config.ReplicaConfig, map[string]string, chan error) (Sink, error)

func init() {
// register blackhole sink
Expand All @@ -93,7 +93,7 @@ func init() {
sinkURICheckerMap["tikv"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
config *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (Sink, error) {
_, _, err := parseTiKVUri(sinkURI, opts)
_, _, err := ParseTiKVUri(sinkURI, opts)
return nil, err
}

Expand All @@ -113,6 +113,13 @@ func init() {
sinkURICheckerMap["kafka+ssl"] = sinkURICheckerMap["kafka"]
}

func RegisterSink(scheme string, initFunc InitFunc, checkerFunc InitFunc) {
sinkIniterMap[scheme] = initFunc
if checkerFunc != nil {
sinkURICheckerMap[scheme] = checkerFunc
}
}

// New creates a new sink with the sink-uri
func New(ctx context.Context, changefeedID model.ChangeFeedID, sinkURIStr string, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) {
// parse sinkURI as a URI
Expand Down
8 changes: 4 additions & 4 deletions cdc/cdc/sink/tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (b *tikvBatcher) getNow() uint64 {
return uint64(time.Now().Unix()) // TODO: use TSO ?
}

func extractEntry(entry *model.RawKVEntry, now uint64) (opType model.OpType,
func ExtractRawKVEntry(entry *model.RawKVEntry, now uint64) (opType model.OpType,
key []byte, value []byte, ttl uint64, err error,
) {
opType = entry.OpType
Expand Down Expand Up @@ -321,7 +321,7 @@ func (b *tikvBatcher) Append(entry *model.RawKVEntry) {
b.now = b.getNow()
}

opType, key, value, ttl, err := extractEntry(entry, b.now)
opType, key, value, ttl, err := ExtractRawKVEntry(entry, b.now)
if err != nil {
log.Error("failed to extract entry", zap.Any("event", entry), zap.Error(err))
b.statistics.AddInvalidKeyCount()
Expand Down Expand Up @@ -436,7 +436,7 @@ func (k *tikvSink) runWorker(ctx context.Context, workerIdx uint32) error {
}
}

func parseTiKVUri(sinkURI *url.URL, opts map[string]string) (*tikvconfig.Config, []string, error) {
func ParseTiKVUri(sinkURI *url.URL, opts map[string]string) (*tikvconfig.Config, []string, error) {
config := tikvconfig.DefaultConfig()
pdAddrPrefix := "http://"

Expand Down Expand Up @@ -477,7 +477,7 @@ func parseTiKVUri(sinkURI *url.URL, opts map[string]string) (*tikvconfig.Config,
}

func newTiKVSink(ctx context.Context, sinkURI *url.URL, _ *config.ReplicaConfig, opts map[string]string, errCh chan error) (*tikvSink, error) {
config, pdAddr, err := parseTiKVUri(sinkURI, opts)
config, pdAddr, err := ParseTiKVUri(sinkURI, opts)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
6 changes: 3 additions & 3 deletions cdc/cdc/sink/tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestExtractRawKVEntry(t *testing.T) {
}

for i, c := range cases {
opType, key, value, ttl, err := extractEntry(c, now)
opType, key, value, ttl, err := ExtractRawKVEntry(c, now)
require.Equal(expects[i].opType, opType)
require.Equal(expects[i].key, key)
require.Equal(expects[i].value, value)
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestTiKVSinkConfig(t *testing.T) {
require.NoError(err)

opts := make(map[string]string)
config, pdAddr, err := parseTiKVUri(sinkURI, opts)
config, pdAddr, err := ParseTiKVUri(sinkURI, opts)
require.NoError(err)
require.Equal(expected[i].pdAddr, pdAddr)
require.Equal(expected[i].concurrency, opts["concurrency"])
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestTiKVSink(t *testing.T) {
require.NoError(err)

opts := make(map[string]string)
config, pdAddr, err := parseTiKVUri(sinkURI, opts)
config, pdAddr, err := ParseTiKVUri(sinkURI, opts)
require.NoError(err)

errCh := make(chan error)
Expand Down
41 changes: 30 additions & 11 deletions cdc/cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ import (
"go.uber.org/zap"
)

const (
downstreamRetryInterval = 500 * time.Millisecond
)

// Sarama configuration options
var (
kafkaAddrs []string
Expand Down Expand Up @@ -105,14 +109,14 @@ func init() {
})
kafkaAddrs = strings.Split(upstreamURI.Host, ",")

config, err := newSaramaConfig()
cnf, err := newSaramaConfig()
if err != nil {
log.Fatal("Error creating sarama config", zap.Error(err))
}

s = upstreamURI.Query().Get("partition-num")
if s == "" {
partition, err := getPartitionNum(kafkaAddrs, kafkaTopic, config)
partition, err := getPartitionNum(kafkaAddrs, kafkaTopic, cnf)
if err != nil {
log.Fatal("can not get partition number", zap.String("topic", kafkaTopic), zap.Error(err))
}
Expand Down Expand Up @@ -144,6 +148,10 @@ func init() {
log.Info("Setting max-batch-size", zap.Int("max-batch-size", c))
kafkaMaxBatchSize = c
}

// Use `tikvSimpleSink` for "tikv".
// As `sink.tikvSink` has internal batch, it is not easy to tolerate errors of TiKV in Kafka consuming scene.
registerSimpleTiKVSink("tikv")
}

func getPartitionNum(address []string, topic string, cfg *sarama.Config) (int32, error) {
Expand Down Expand Up @@ -362,7 +370,8 @@ func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
ctx := context.TODO()
ctx, cancel := context.WithCancel(session.Context())
defer cancel()
partition := claim.Partition()
c.sinksMu.Lock()
sink := c.sinks[partition]
Expand Down Expand Up @@ -409,14 +418,24 @@ ClaimMessages:
zap.Int32("partition", partition))
break ClaimMessages
}
err = sink.EmitChangedEvents(ctx, kv)
if err != nil {
log.Fatal("emit row changed event failed", zap.Error(err))
}
log.Debug("Emit ChangedEvent", zap.Any("kv", kv))
lastCRTs := sink.lastCRTs.Load()
if lastCRTs < kv.CRTs {
sink.lastCRTs.Store(kv.CRTs)

for {
err = sink.EmitChangedEvents(ctx, kv)
if err == nil {
log.Debug("emit changed events", zap.Any("kv", kv))
lastCRTs := sink.lastCRTs.Load()
if lastCRTs < kv.CRTs {
sink.lastCRTs.Store(kv.CRTs)
}
break
}

log.Warn("emit row changed event failed", zap.Error(err))
if session.Context().Err() != nil {
log.Warn("session closed", zap.Error(session.Context().Err()))
return nil
}
time.Sleep(downstreamRetryInterval)
}
case model.MqMessageTypeResolved:
ts, err := batchDecoder.NextResolvedEvent()
Expand Down
117 changes: 117 additions & 0 deletions cdc/cmd/kafka-consumer/tikv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"net/url"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/tikv/migration/cdc/cdc/model"
"github.com/tikv/migration/cdc/cdc/sink"
"github.com/tikv/migration/cdc/pkg/config"

"github.com/tikv/client-go/v2/rawkv"
pd "github.com/tikv/pd/client"
)

const (
defaultPDErrorRetry int = 10
)

var _ sink.Sink = (*tikvSimpleSink)(nil)

// tikvSimpleSink is a sink that sends events to downstream TiKV cluster.
// The reason why we need this sink other than `cdc/sink/tikv.tikvSink` is that we need Kafka message offset to handle TiKV errors, which is not provided by `tikvSink`.
type tikvSimpleSink struct {
client *rawkv.Client
}

func newSimpleTiKVSink(ctx context.Context, sinkURI *url.URL, _ *config.ReplicaConfig, opts map[string]string, _ chan error) (*tikvSimpleSink, error) {
config, pdAddrs, err := sink.ParseTiKVUri(sinkURI, opts)
if err != nil {
return nil, errors.Trace(err)
}

client, err := rawkv.NewClientWithOpts(ctx, pdAddrs,
rawkv.WithSecurity(config.Security),
rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2),
rawkv.WithPDOptions(pd.WithMaxErrorRetry(defaultPDErrorRetry)),
)
if err != nil {
return nil, errors.Trace(err)
}
return &tikvSimpleSink{
client: client,
}, nil
}

func (s *tikvSimpleSink) EmitChangedEvents(ctx context.Context, rawKVEntries ...*model.RawKVEntry) error {
now := uint64(time.Now().Unix())

for _, entry := range rawKVEntries {
opType, key, value, ttl, err := sink.ExtractRawKVEntry(entry, now)
if err != nil {
return errors.Trace(err)
}

if opType == model.OpTypePut {
err := s.client.PutWithTTL(ctx, key, value, ttl)
if err != nil {
return errors.Trace(err)
}
} else if opType == model.OpTypeDelete {
err := s.client.Delete(ctx, key)
if err != nil {
return errors.Trace(err)
}
} else {
return errors.Errorf("unexpected opType %v", opType)
}
}
return nil
}

func (s *tikvSimpleSink) FlushChangedEvents(ctx context.Context, _ model.KeySpanID, resolvedTs uint64) (uint64, error) {
return resolvedTs, nil
}

func (s *tikvSimpleSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
return nil
}

func (s *tikvSimpleSink) Close(ctx context.Context) error {
return errors.Trace(s.client.Close())
}

func (s *tikvSimpleSink) Barrier(ctx context.Context, keyspanID model.KeySpanID) error {
return nil
}

func registerSimpleTiKVSink(schema string) {
initFunc := func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
config *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (sink.Sink, error) {
return newSimpleTiKVSink(ctx, sinkURI, config, opts, errCh)
}
checkerFunc := func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
config *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (sink.Sink, error) {
_, _, err := sink.ParseTiKVUri(sinkURI, opts)
return nil, err
}
sink.RegisterSink(schema, initFunc, checkerFunc)
}
2 changes: 2 additions & 0 deletions cdc/deployments/tikv-cdc/docker/integration-test.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ ARG TEST_ON_BRANCH=master
USER root
WORKDIR /root/download

RUN yum install -y wget

COPY ./scripts/download-integration-test-binaries.sh .
# Download all binaries into bin dir.
RUN ./download-integration-test-binaries.sh ${TEST_ON_BRANCH}
Expand Down
33 changes: 25 additions & 8 deletions cdc/scripts/download-integration-test-binaries.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@ color-green() { # Green
echo -e "\x1B[1;32m${*}\x1B[0m"
}

function download() {
local url=$1
local file_name=$2
local file_path=$3
if [[ -f "${file_path}" ]]; then
echo "file ${file_name} already exists, skip download"
return
fi
echo ">>>"
echo "download ${file_name} from ${url}"
wget --no-verbose --retry-connrefused --waitretry=1 -t 3 -O "${file_path}" "${url}"
}

# Specify the download branch.
branch=$1

Expand Down Expand Up @@ -56,18 +69,22 @@ mkdir -p tmp
mkdir -p bin

color-green "Download binaries..."
curl "${tidb_download_url}" | tar xz -C tmp bin/tidb-server
curl "${tikv_download_url}" | tar xz -C tmp bin/tikv-server
curl "${pd_download_url}" | tar xz --wildcards -C tmp bin/*
mv tmp/bin/* third_bin

curl "${go_ycsb_download_url}" -o third_bin/go-ycsb
curl -L "${etcd_download_url}" | tar xz -C tmp
mv tmp/etcd-v3.4.7-linux-amd64/etcdctl third_bin

download "$tidb_download_url" "tidb-server.tar.gz" "tmp/tidb-server.tar.gz"
tar -xz -C third_bin bin/tidb-server -f tmp/tidb-server.tar.gz && mv third_bin/bin/tidb-server third_bin/
download "$pd_download_url" "pd-server.tar.gz" "tmp/pd-server.tar.gz"
tar -xz --wildcards -C third_bin 'bin/*' -f tmp/pd-server.tar.gz && mv third_bin/bin/* third_bin/
download "$tikv_download_url" "tikv-server.tar.gz" "tmp/tikv-server.tar.gz"
tar -xz -C third_bin bin/tikv-server -f tmp/tikv-server.tar.gz && mv third_bin/bin/tikv-server third_bin/
download "$go_ycsb_download_url" "go-ycsb" "third_bin/go-ycsb"
download "$etcd_download_url" "etcd.tar.gz" "tmp/etcd.tar.gz"
tar -xz -C third_bin etcd-v3.4.7-linux-amd64/etcdctl -f tmp/etcd.tar.gz && mv third_bin/etcd-v3.4.7-linux-amd64/etcdctl third_bin/

chmod a+x third_bin/*

# Copy it to the bin directory in the root directory.
rm -rf tmp
rm -rf bin/bin
mv third_bin/* ./bin
rm -rf third_bin

Expand Down
Loading

0 comments on commit 503fce8

Please sign in to comment.