Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[close #383] fix integration test case flow_control & stop_downstream #392

Merged
merged 17 commits into from
Mar 4, 2024
58 changes: 47 additions & 11 deletions cdc/cdc/sink/codec/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package codec
import (
"context"
"encoding/binary"
"sync"
"time"

"github.com/pingcap/log"
Expand Down Expand Up @@ -63,6 +64,29 @@ type MQMessage struct {
entriesCount int // entries in one MQ Message
}

const (
MemoryReleaseThreshold = 100 * 1024 // 100KiB
MemoryReleaseFactor = 100
)

func resetBuffer(buf []byte) []byte {
length := len(buf)
capSize := cap(buf)
if capSize > MemoryReleaseThreshold && length > 0 && length*MemoryReleaseFactor < capSize {
return nil
}
return buf[:0]
}

func (m *MQMessage) Reset() {
m.Key = resetBuffer(m.Key)
m.Value = resetBuffer(m.Value)
m.Ts = 0
m.Type = model.MqMessageTypeUnknown
m.Protocol = config.ProtocolDefault
m.entriesCount = 0
}

// maximumRecordOverhead is used to calculate ProducerMessage's byteSize by sarama kafka client.
// reference: https://github.com/Shopify/sarama/blob/66521126c71c522c15a36663ae9cddc2b024c799/async_producer.go#L233
// for TiKV-CDC, minimum supported kafka version is `0.11.0.2`, which will be treated as `version = 2` by sarama producer.
Expand Down Expand Up @@ -99,31 +123,43 @@ func newResolvedMQMessage(proto config.Protocol, key, value []byte, ts uint64) *
return NewMQMessage(proto, key, value, ts, model.MqMessageTypeResolved)
}

var mqMsgPool = sync.Pool{
New: func() any {
return new(MQMessage)
},
}

// NewMQMessage should be used when creating a MQMessage struct.
// It copies the input byte slices to avoid any surprises in asynchronous MQ writes.
func NewMQMessage(proto config.Protocol, key []byte, value []byte, ts uint64, ty model.MqMessageType) *MQMessage {
ret := &MQMessage{
Key: nil,
Value: nil,
Ts: ts,
Type: ty,
Protocol: proto,
entriesCount: 0,
ret := mqMsgPool.Get().(*MQMessage)

// TODO: remove this check.
if len(ret.Key) > 0 || len(ret.Value) > 0 {
log.Panic("MQMessage is not reset", zap.String("key", string(ret.Key)), zap.String("value", string(ret.Value)))
}

ret.Ts = ts
ret.Type = ty
ret.Protocol = proto
ret.entriesCount = 0

if key != nil {
ret.Key = make([]byte, len(key))
copy(ret.Key, key)
ret.Key = append(ret.Key, key...)
}

if value != nil {
ret.Value = make([]byte, len(value))
copy(ret.Value, value)
ret.Value = append(ret.Value, value...)
}

return ret
}

func ReleaseMQMessage(m *MQMessage) {
m.Reset()
mqMsgPool.Put(m)
}

// EventBatchDecoder is an abstraction for events decoder
// this interface is only for testing now
type EventBatchDecoder interface {
Expand Down
33 changes: 27 additions & 6 deletions cdc/cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,10 @@ type JSONEventBatchEncoder struct {
valueBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now
supportMixedBuild bool // TODO decouple this out

messageBuf []*MQMessage
curBatchSize int
messageBuf []*MQMessage
curBatchSize int
totalBatchBytes int // Note: The size of last message is not included

// configs
maxMessageBytes int
maxBatchSize int
Expand Down Expand Up @@ -226,6 +228,9 @@ func (d *JSONEventBatchEncoder) AppendChangedEvent(e *model.RawKVEntry) (Encoder
versionHead := make([]byte, 8)
binary.BigEndian.PutUint64(versionHead, BatchVersion1)

if len(d.messageBuf) > 0 {
d.totalBatchBytes += d.messageBuf[len(d.messageBuf)-1].Length()
}
d.messageBuf = append(d.messageBuf, NewMQMessage(config.ProtocolOpen, versionHead, nil, 0, model.MqMessageTypeKv))
d.curBatchSize = 0
}
Expand All @@ -249,6 +254,8 @@ func (d *JSONEventBatchEncoder) AppendChangedEvent(e *model.RawKVEntry) (Encoder
}

// Build implements the EventBatchEncoder interface
// NOTE: when supportMixedBuild is enabled, must call Reset() after the returned `mqMessages` is used.
// It's not a good design. As supportMixedBuild is used in unit tests only, we don't fix it now.
func (d *JSONEventBatchEncoder) Build() (mqMessages []*MQMessage) {
if d.supportMixedBuild {
if d.valueBuf.Len() == 0 {
Expand All @@ -260,7 +267,7 @@ func (d *JSONEventBatchEncoder) Build() (mqMessages []*MQMessage) {
}

ret := d.messageBuf
d.messageBuf = make([]*MQMessage, 0)
d.Reset()
return ret
}

Expand Down Expand Up @@ -307,13 +314,27 @@ func (d *JSONEventBatchEncoder) MixedBuild(withVersion bool) []byte {

// Size implements the EventBatchEncoder interface
func (d *JSONEventBatchEncoder) Size() int {
return d.keyBuf.Len() + d.valueBuf.Len()
if d.supportMixedBuild {
return d.keyBuf.Len() + d.valueBuf.Len()
}

lastMessageLength := 0
if len(d.messageBuf) > 0 {
lastMessageLength = d.messageBuf[len(d.messageBuf)-1].Length()
}
return d.totalBatchBytes + lastMessageLength
}

// Reset implements the EventBatchEncoder interface
func (d *JSONEventBatchEncoder) Reset() {
d.keyBuf.Reset()
d.valueBuf.Reset()
if d.supportMixedBuild {
d.keyBuf.Reset()
d.valueBuf.Reset()
} else {
d.messageBuf = make([]*MQMessage, 0)
d.curBatchSize = 0
d.totalBatchBytes = 0
}
}

// SetParams reads relevant parameters for Open Protocol
Expand Down
19 changes: 17 additions & 2 deletions cdc/cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func (s *batchSuite) TestSetParams(c *check.C) {
func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewJSONEventBatchEncoder()
c.Check(encoder.Size(), check.Equals, 0)

// the size of `testEvent` is 75
testEvent := &model.RawKVEntry{
Expand All @@ -207,23 +208,35 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
ExpiredTs: 200,
}
eventSize := 75
// for a single message, the overhead is 36(maximumRecordOverhead) + 8(versionHea) = 44.
overhead := 36 + 8

// for a single message, the overhead is 36(maximumRecordOverhead) + 8(versionHea) = 44, just can hold it.
a := strconv.Itoa(eventSize + 44)
// just can hold a single message.
a := strconv.Itoa(eventSize + overhead)
err := encoder.SetParams(map[string]string{"max-message-bytes": a})
c.Check(err, check.IsNil)
r, err := encoder.AppendChangedEvent(testEvent)
c.Check(err, check.IsNil)
c.Check(r, check.Equals, EncoderNoOperation)
totalSize := eventSize + overhead
c.Check(encoder.Size(), check.Equals, totalSize)

r, err = encoder.AppendChangedEvent(testEvent)
c.Check(err, check.IsNil)
c.Check(r, check.Equals, EncoderNoOperation)
totalSize += eventSize + overhead
c.Check(encoder.Size(), check.Equals, totalSize)

a = strconv.Itoa(eventSize + 43)
err = encoder.SetParams(map[string]string{"max-message-bytes": a})
c.Assert(err, check.IsNil)
r, err = encoder.AppendChangedEvent(testEvent)
c.Check(err, check.NotNil)
c.Check(r, check.Equals, EncoderNoOperation)
c.Check(encoder.Size(), check.Equals, totalSize)

// make sure each batch's `Length` not greater than `max-message-bytes`
// 256: each message can hold 2 events (75 * 2 + 36 + 8 = 194)
err = encoder.SetParams(map[string]string{"max-message-bytes": "256"})
c.Check(err, check.IsNil)

Expand All @@ -232,6 +245,8 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
c.Check(r, check.Equals, EncoderNoOperation)
c.Check(err, check.IsNil)
}
totalSize += (eventSize*2 + overhead) * 5000
c.Check(encoder.Size(), check.Equals, totalSize)

messages := encoder.Build()
for _, msg := range messages {
Expand Down
2 changes: 1 addition & 1 deletion cdc/cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,11 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
})

for _, msg := range messages {
thisBatchSize += msg.GetEntriesCount()
err := k.writeToProducer(ctx, msg, codec.EncoderNeedAsyncWrite, partition)
if err != nil {
return 0, err
}
thisBatchSize += msg.GetEntriesCount()
}

if op == codec.EncoderNeedSyncWrite {
Expand Down
24 changes: 19 additions & 5 deletions cdc/cdc/sink/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (

type mqSinkSuite struct{}

var _ = check.Suite(&mqSinkSuite{})
var _ = check.SerialSuites(&mqSinkSuite{})

func (s mqSinkSuite) TestKafkaSink(c *check.C) {
defer testleak.AfterTest(c)()
Expand Down Expand Up @@ -144,6 +144,16 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) {
opts := map[string]string{}
errCh := make(chan error, 1)

newSaramaConfigImplBak := kafkap.NewSaramaConfigImpl
kafkap.NewSaramaConfigImpl = func(ctx context.Context, config *kafkap.Config) (*sarama.Config, error) {
cfg, err := newSaramaConfigImplBak(ctx, config)
c.Assert(err, check.IsNil)
cfg.Producer.Flush.MaxMessages = 1
return cfg, err
}
defer func() {
kafkap.NewSaramaConfigImpl = newSaramaConfigImplBak
}()
kafkap.NewAdminClientImpl = kafka.NewMockAdminClient
defer func() {
kafkap.NewAdminClientImpl = kafka.NewSaramaAdminClient
Expand All @@ -152,8 +162,11 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) {
sink, err := newKafkaSaramaSink(ctx, sinkURI, replicaConfig, opts, errCh)
c.Assert(err, check.IsNil)

// mock kafka broker processes 1 row changed event
leader.Returns(prodSuccess)
// mock kafka broker processes 3 row changed events
for i := 0; i < 3; i++ {
leader.Returns(prodSuccess)
}

keyspanID1 := model.KeySpanID(1)
kv1 := &model.RawKVEntry{
OpType: model.OpTypePut,
Expand Down Expand Up @@ -182,12 +195,13 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) {
StartTs: 110,
CRTs: 130,
}

err = sink.EmitChangedEvents(ctx, kv3)
c.Assert(err, check.IsNil)

// TODO: fix EmitCheckpointTs
// mock kafka broker processes 1 row resolvedTs event
leader.Returns(prodSuccess)
// leader.Returns(prodSuccess)

checkpointTs1, err := sink.FlushChangedEvents(ctx, keyspanID1, kv1.CRTs)
c.Assert(err, check.IsNil)
c.Assert(checkpointTs1, check.Equals, kv1.CRTs)
Expand Down
2 changes: 1 addition & 1 deletion cdc/cdc/sink/producer/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

func init() {
sarama.MaxRequestSize = 1024 * 1024 * 1024 // 1GB
sarama.MaxRequestSize = 100 * 1024 * 1024 // 100MB
}

// Config stores user specified Kafka producer configuration
Expand Down
10 changes: 5 additions & 5 deletions cdc/cdc/sink/producer/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) {
ctx := context.Background()
config := NewConfig()
config.Version = "invalid"
_, err := newSaramaConfigImpl(ctx, config)
_, err := NewSaramaConfigImpl(ctx, config)
c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*")

ctx = util.SetOwnerInCtx(ctx)
config.Version = "2.6.0"
config.ClientID = "^invalid$"
_, err = newSaramaConfigImpl(ctx, config)
_, err = NewSaramaConfigImpl(ctx, config)
c.Assert(cerror.ErrKafkaInvalidClientID.Equal(err), check.IsTrue)

config.ClientID = "test-kafka-client"
Expand All @@ -56,15 +56,15 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) {
}
for _, cc := range compressionCases {
config.Compression = cc.algorithm
cfg, err := newSaramaConfigImpl(ctx, config)
cfg, err := NewSaramaConfigImpl(ctx, config)
c.Assert(err, check.IsNil)
c.Assert(cfg.Producer.Compression, check.Equals, cc.expected)
}

config.Credential = &security.Credential{
CAPath: "/invalid/ca/path",
}
_, err = newSaramaConfigImpl(ctx, config)
_, err = NewSaramaConfigImpl(ctx, config)
c.Assert(errors.Cause(err), check.ErrorMatches, ".*no such file or directory")

saslConfig := NewConfig()
Expand All @@ -76,7 +76,7 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) {
SaslMechanism: sarama.SASLTypeSCRAMSHA256,
}

cfg, err := newSaramaConfigImpl(ctx, saslConfig)
cfg, err := NewSaramaConfigImpl(ctx, saslConfig)
c.Assert(err, check.IsNil)
c.Assert(cfg, check.NotNil)
c.Assert(cfg.Net.SASL.User, check.Equals, "user")
Expand Down
Loading
Loading