Skip to content

Commit 989f859

Browse files
authored
redo: improve redo writer (#4353)
ref #1061
1 parent 3dae8aa commit 989f859

File tree

10 files changed

+528
-73
lines changed

10 files changed

+528
-73
lines changed

downstreamadapter/sink/kafka/sink.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,9 @@ func (s *sink) calculateKeyPartitions(ctx context.Context) error {
223223

224224
partitionGenerator := s.comp.eventRouter.GetPartitionGenerator(schema, table)
225225
selector := s.comp.columnSelector.Get(schema, table)
226-
rowsCount := uint64(event.Len())
227-
rowCallback := helper.NewTxnPostFlushRowCallback(event, rowsCount)
226+
rowsCount := event.Len()
227+
events := make([]*commonEvent.MQRowEvent, 0, rowsCount)
228+
rowCallback := helper.NewTxnPostFlushRowCallback(event, uint64(rowsCount))
228229

229230
for {
230231
row, ok := event.GetNextRow()
@@ -238,7 +239,7 @@ func (s *sink) calculateKeyPartitions(ctx context.Context) error {
238239
return errors.Trace(err)
239240
}
240241

241-
mqEvent := &commonEvent.MQRowEvent{
242+
events = append(events, &commonEvent.MQRowEvent{
242243
Key: commonEvent.TopicPartitionKey{
243244
Topic: topic,
244245
Partition: index,
@@ -255,9 +256,9 @@ func (s *sink) calculateKeyPartitions(ctx context.Context) error {
255256
ColumnSelector: selector,
256257
Checksum: row.Checksum,
257258
},
258-
}
259-
s.rowChan.Push(mqEvent)
259+
})
260260
}
261+
s.rowChan.Push(events...)
261262
}
262263
}
263264
}
@@ -333,6 +334,7 @@ func (s *sink) batch(ctx context.Context, buffer []*commonEvent.MQRowEvent) ([]*
333334
zap.String("changefeed", s.changefeedID.Name()))
334335
return nil, nil
335336
}
337+
buffer = buffer[:0]
336338
return msgs, nil
337339
}
338340
}

downstreamadapter/sink/pulsar/sink.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -333,8 +333,9 @@ func (s *sink) calculateKeyPartitions(ctx context.Context) error {
333333

334334
partitionGenerator := s.comp.eventRouter.GetPartitionGenerator(schema, table)
335335
selector := s.comp.columnSelector.Get(schema, table)
336-
rowsCount := uint64(event.Len())
337-
rowCallback := helper.NewTxnPostFlushRowCallback(event, rowsCount)
336+
rowsCount := event.Len()
337+
events := make([]*commonEvent.MQRowEvent, 0, rowsCount)
338+
rowCallback := helper.NewTxnPostFlushRowCallback(event, uint64(rowsCount))
338339

339340
for {
340341
row, ok := event.GetNextRow()
@@ -348,7 +349,7 @@ func (s *sink) calculateKeyPartitions(ctx context.Context) error {
348349
return errors.Trace(err)
349350
}
350351

351-
mqEvent := &commonEvent.MQRowEvent{
352+
events = append(events, &commonEvent.MQRowEvent{
352353
Key: commonEvent.TopicPartitionKey{
353354
Topic: topic,
354355
Partition: index,
@@ -365,9 +366,9 @@ func (s *sink) calculateKeyPartitions(ctx context.Context) error {
365366
ColumnSelector: selector,
366367
Checksum: row.Checksum,
367368
},
368-
}
369-
s.rowChan.Push(mqEvent)
369+
})
370370
}
371+
s.rowChan.Push(events...)
371372
}
372373
}
373374
}
@@ -436,6 +437,7 @@ func (s *sink) batch(ctx context.Context, buffer []*commonEvent.MQRowEvent, tick
436437
zap.String("changefeed", s.changefeedID.Name()))
437438
return nil, nil
438439
}
440+
buffer = buffer[:0]
439441
return msgs, nil
440442
}
441443
}

downstreamadapter/sink/redo/sink.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,16 +133,17 @@ func (s *Sink) WriteBlockEvent(event commonEvent.BlockEvent) error {
133133
}
134134

135135
func (s *Sink) AddDMLEvent(event *commonEvent.DMLEvent) {
136-
rowsCount := uint64(event.Len())
137-
rowCallback := helper.NewTxnPostFlushRowCallback(event, rowsCount)
136+
rowsCount := event.Len()
137+
events := make([]writer.RedoEvent, 0, rowsCount)
138+
rowCallback := helper.NewTxnPostFlushRowCallback(event, uint64(rowsCount))
138139

139140
for {
140141
row, ok := event.GetNextRow()
141142
if !ok {
142143
event.Rewind()
143144
break
144145
}
145-
s.logBuffer.Push(&commonEvent.RedoRowEvent{
146+
events = append(events, &commonEvent.RedoRowEvent{
146147
StartTs: event.StartTs,
147148
CommitTs: event.CommitTs,
148149
Event: row,
@@ -151,6 +152,7 @@ func (s *Sink) AddDMLEvent(event *commonEvent.DMLEvent) {
151152
Callback: rowCallback,
152153
})
153154
}
155+
s.logBuffer.Push(events...)
154156
}
155157

156158
func (s *Sink) IsNormal() bool {
@@ -195,20 +197,30 @@ func (s *Sink) Close(_ bool) {
195197
}
196198

197199
func (s *Sink) sendMessages(ctx context.Context) error {
200+
buffer := make([]writer.RedoEvent, 0, redo.DefaultFlushBatchSize)
198201
for {
199-
e, ok := s.logBuffer.Get()
202+
select {
203+
case <-ctx.Done():
204+
return errors.Trace(ctx.Err())
205+
default:
206+
}
207+
events, ok := s.logBuffer.GetMultipleNoGroup(buffer)
200208
if !ok {
201209
return nil
202210
}
211+
if len(events) == 0 {
212+
continue
213+
}
214+
buffer = events[:0]
203215

204216
start := time.Now()
205-
err := s.dmlWriter.WriteEvents(ctx, e)
217+
err := s.dmlWriter.WriteEvents(ctx, events...)
206218
if err != nil {
207219
return err
208220
}
209221

210222
if s.metric != nil {
211-
s.metric.observeRowWrite(1, time.Since(start))
223+
s.metric.observeRowWrite(len(events), time.Since(start))
212224
}
213225
}
214226
}

downstreamadapter/sink/redo/sink_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@ import (
2121
"testing"
2222
"time"
2323

24+
"github.com/golang/mock/gomock"
2425
"github.com/pingcap/ticdc/heartbeatpb"
2526
"github.com/pingcap/ticdc/pkg/common"
2627
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
2728
"github.com/pingcap/ticdc/pkg/config"
2829
"github.com/pingcap/ticdc/pkg/redo"
30+
"github.com/pingcap/ticdc/pkg/redo/writer"
2931
"github.com/pingcap/ticdc/pkg/util"
32+
"github.com/pingcap/ticdc/utils/chann"
3033
"github.com/stretchr/testify/require"
3134
"golang.org/x/sync/errgroup"
3235
)
@@ -329,3 +332,55 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) {
329332

330333
require.ErrorIs(b, eg.Wait(), context.Canceled)
331334
}
335+
336+
func TestRedoSinkSendMessagesInBatch(t *testing.T) {
337+
t.Parallel()
338+
339+
ctx, cancel := context.WithCancel(context.Background())
340+
defer cancel()
341+
342+
ctrl := gomock.NewController(t)
343+
defer ctrl.Finish()
344+
345+
mockWriter := writer.NewMockRedoLogWriter(ctrl)
346+
expectWriteBatch := func(batchSize int) *gomock.Call {
347+
args := make([]interface{}, 0, batchSize+1)
348+
args = append(args, gomock.Any()) // context
349+
for i := 0; i < batchSize; i++ {
350+
args = append(args, gomock.Any())
351+
}
352+
return mockWriter.EXPECT().
353+
WriteEvents(args[0], args[1:]...).
354+
DoAndReturn(func(_ context.Context, events ...writer.RedoEvent) error {
355+
require.Len(t, events, batchSize)
356+
return nil
357+
})
358+
}
359+
360+
gomock.InOrder(
361+
expectWriteBatch(redo.DefaultFlushBatchSize),
362+
expectWriteBatch(redo.DefaultFlushBatchSize),
363+
expectWriteBatch(17),
364+
)
365+
366+
s := &Sink{
367+
dmlWriter: mockWriter,
368+
logBuffer: chann.NewUnlimitedChannelDefault[writer.RedoEvent](),
369+
}
370+
371+
doneCh := make(chan error, 1)
372+
go func() {
373+
doneCh <- s.sendMessages(ctx)
374+
}()
375+
376+
totalEvents := redo.DefaultFlushBatchSize*2 + 17
377+
events := make([]writer.RedoEvent, 0, totalEvents)
378+
for i := 0; i < totalEvents; i++ {
379+
events = append(events, &commonEvent.RedoRowEvent{})
380+
}
381+
s.logBuffer.Push(events...)
382+
s.logBuffer.Close()
383+
384+
err := <-doneCh
385+
require.NoError(t, err)
386+
}

0 commit comments

Comments
 (0)