Skip to content

Commit 70613d2

Browse files
committed
Merge remote-tracking branch 'origin/master' into release-4.0
2 parents a5fad14 + d1149b7 commit 70613d2

File tree

2 files changed

+25
-7
lines changed

2 files changed

+25
-7
lines changed

cdc/sink/codec/canal.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ func (b *canalEntryBuilder) buildColumn(c *model.Column, colName string, updated
216216
func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent) (*canal.RowData, error) {
217217
var columns []*canal.Column
218218
for _, column := range e.Columns {
219-
if e == nil {
219+
if column == nil {
220220
continue
221221
}
222222
c, err := b.buildColumn(column, column.Name, !e.IsDelete())
@@ -227,7 +227,7 @@ func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent) (*canal.RowDa
227227
}
228228
var preColumns []*canal.Column
229229
for _, column := range e.PreColumns {
230-
if e == nil {
230+
if column == nil {
231231
continue
232232
}
233233
c, err := b.buildColumn(column, column.Name, !e.IsDelete())
@@ -374,16 +374,22 @@ func (d *CanalEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage,
374374

375375
// Build implements the EventBatchEncoder interface
376376
func (d *CanalEventBatchEncoder) Build() []*MQMessage {
377+
if len(d.messages.Messages) == 0 {
378+
return nil
379+
}
380+
377381
err := d.refreshPacketBody()
378382
if err != nil {
379383
log.Fatal("Error when generating Canal packet", zap.Error(err))
380384
}
385+
381386
value, err := proto.Marshal(d.packet)
382387
if err != nil {
383388
log.Fatal("Error when serializing Canal packet", zap.Error(err))
384389
}
385390
ret := NewMQMessage(nil, value, 0)
386391
d.messages.Reset()
392+
d.resetPacket()
387393
return []*MQMessage{ret}
388394
}
389395

@@ -414,24 +420,30 @@ func (d *CanalEventBatchEncoder) refreshPacketBody() error {
414420
if newSize > oldSize {
415421
// resize packet body slice
416422
d.packet.Body = append(d.packet.Body, make([]byte, newSize-oldSize)...)
423+
} else {
424+
d.packet.Body = d.packet.Body[:newSize]
417425
}
418-
_, err := d.messages.MarshalToSizedBuffer(d.packet.Body[:newSize])
426+
427+
_, err := d.messages.MarshalToSizedBuffer(d.packet.Body)
419428
return err
420429
}
421430

422-
// NewCanalEventBatchEncoder creates a new CanalEventBatchEncoder.
423-
func NewCanalEventBatchEncoder() EventBatchEncoder {
424-
p := &canal.Packet{
431+
func (d *CanalEventBatchEncoder) resetPacket() {
432+
d.packet = &canal.Packet{
425433
VersionPresent: &canal.Packet_Version{
426434
Version: CanalPacketVersion,
427435
},
428436
Type: canal.PacketType_MESSAGES,
429437
}
438+
}
430439

440+
// NewCanalEventBatchEncoder creates a new CanalEventBatchEncoder.
441+
func NewCanalEventBatchEncoder() EventBatchEncoder {
431442
encoder := &CanalEventBatchEncoder{
432443
messages: &canal.Messages{},
433-
packet: p,
434444
entryBuilder: NewCanalEntryBuilder(),
435445
}
446+
447+
encoder.resetPacket()
436448
return encoder
437449
}

cdc/sink/codec/canal_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,12 @@ func (s *canalBatchSuite) TestCanalEventBatchEncoder(c *check.C) {
9191
}
9292
size := encoder.Size()
9393
res := encoder.Build()
94+
95+
if len(cs) == 0 {
96+
c.Assert(res, check.IsNil)
97+
continue
98+
}
99+
94100
c.Assert(res, check.HasLen, 1)
95101
c.Assert(res[0].Key, check.IsNil)
96102
c.Assert(len(res[0].Value), check.Equals, size)

0 commit comments

Comments
 (0)