Skip to content

Commit d01a02e

Browse files
author
ffffwh
committed
workaround of #717
Following 40fae59. Fix incorrect base 64 conversion in kafka.
1 parent f81d863 commit d01a02e

File tree

3 files changed

+126
-3
lines changed

3 files changed

+126
-3
lines changed

drivers/mysql/kafka/kafka3.go

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -870,10 +870,10 @@ func (kr *KafkaRunner) kafkaTransformDMLEventQueries(dmlEntries []*common.Binlog
870870
}
871871
case mysqlconfig.VarbinaryColumnType:
872872
if beforeValue != nil {
873-
beforeValue = base64.StdEncoding.EncodeToString([]byte(beforeValue.(string)))
873+
beforeValue = encodeStringInterfaceToBase64String(beforeValue)
874874
}
875875
if afterValue != nil {
876-
afterValue = base64.StdEncoding.EncodeToString([]byte(afterValue.(string)))
876+
afterValue = encodeStringInterfaceToBase64String(afterValue)
877877
}
878878
case mysqlconfig.BinaryColumnType:
879879
if beforeValue != nil {
@@ -914,7 +914,28 @@ func (kr *KafkaRunner) kafkaTransformDMLEventQueries(dmlEntries []*common.Binlog
914914
afterValue = getSetValue(afterValue.(int64), columnType)
915915
}
916916
case mysqlconfig.BlobColumnType:
917-
// do nothing. just keep the string value.
917+
if strings.Contains(colList[i].ColumnType, "text") {
918+
// already string value
919+
} else {
920+
if beforeValue != nil {
921+
beforeValue = encodeStringInterfaceToBase64String(beforeValue)
922+
}
923+
if afterValue != nil {
924+
afterValue = encodeStringInterfaceToBase64String(afterValue)
925+
}
926+
}
927+
case mysqlconfig.VarcharColumnType:
928+
// workaround of #717
929+
if strings.Contains(colList[i].ColumnType, "binary") {
930+
if beforeValue != nil {
931+
beforeValue = encodeStringInterfaceToBase64String(beforeValue)
932+
}
933+
if afterValue != nil {
934+
afterValue = encodeStringInterfaceToBase64String(afterValue)
935+
}
936+
} else {
937+
// keep as is
938+
}
918939
case mysqlconfig.TextColumnType:
919940
if beforeValue != nil {
920941
beforeValue = castBytesOrStringToString(beforeValue)
@@ -1080,6 +1101,7 @@ func getSetValue(num int64, set string) string {
10801101
}
10811102

10821103
func getBinaryValue(binary string, value string) string {
1104+
// binary = "binary(64)"
10831105
binaryLen := binary[7 : len(binary)-1]
10841106
lens, err := strconv.Atoi(binaryLen)
10851107
if err != nil {
@@ -1225,3 +1247,7 @@ func castBytesOrStringToString(v interface{}) string {
12251247
panic("only []byte or string is allowed")
12261248
}
12271249
}
1250+
1251+
func encodeStringInterfaceToBase64String(v interface{}) string {
1252+
return base64.StdEncoding.EncodeToString([]byte(v.(string)))
1253+
}

drivers/mysql/kafka/kafka3_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package kafka
2+
3+
import "testing"
4+
5+
func Test_getBinaryValue(t *testing.T) {
6+
type args struct {
7+
binary string
8+
value string
9+
}
10+
tests := []struct {
11+
name string
12+
args args
13+
want string
14+
}{
15+
{
16+
name: "binary1",
17+
args: args{
18+
binary: "binary(16)",
19+
value: "",
20+
},
21+
want: "AAAAAAAAAAAAAAAAAAAAAA==",
22+
},
23+
}
24+
for _, tt := range tests {
25+
t.Run(tt.name, func(t *testing.T) {
26+
if got := getBinaryValue(tt.args.binary, tt.args.value); got != tt.want {
27+
t.Errorf("getBinaryValue() = %v, want %v", got, tt.want)
28+
}
29+
})
30+
}
31+
}

drivers/mysql/mysql/base/utils_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ package base
88

99
import (
1010
gosql "database/sql"
11+
"fmt"
12+
sqle "github.com/actiontech/dtle/drivers/mysql/mysql/sqle/inspector"
13+
"github.com/pingcap/parser"
1114
"reflect"
1215
"testing"
1316
"time"
@@ -264,3 +267,66 @@ func Test_stringInterval(t *testing.T) {
264267
})
265268
}
266269
}
270+
271+
func TestGetTableColumnsSqle(t *testing.T) {
272+
// Not a real test. Just printing intermediate result.
273+
sqleCtx := sqle.NewContext(nil)
274+
sqleCtx.AddSchema("a"); sqleCtx.LoadSchemas(nil); sqleCtx.LoadTables("a", nil)
275+
p := parser.New()
276+
sqls := []string{
277+
"create table a.text_columns(id int(11) not null primary key,c_text longtext)",
278+
"create table a.binary_columns(id int(11) not null primary key, c_binary varbinary(255))",
279+
}
280+
for i := range sqls {
281+
stmt, err := p.ParseOneStmt(sqls[i], "", "")
282+
if err != nil {
283+
t.Error(err)
284+
}
285+
sqleCtx.UpdateContext(stmt, "mysql")
286+
}
287+
288+
type args struct {
289+
sqleContext *sqle.Context
290+
schema string
291+
table string
292+
}
293+
tests := []struct {
294+
name string
295+
args args
296+
want *common.ColumnList
297+
wantErr bool
298+
}{
299+
{
300+
name: "column-text",
301+
args: args{
302+
sqleContext: sqleCtx,
303+
schema: "a",
304+
table: "text_columns",
305+
},
306+
want: nil,
307+
wantErr: false,
308+
}, {
309+
name: "column-binary",
310+
args: args{
311+
sqleContext: sqleCtx,
312+
schema: "a",
313+
table: "binary_columns",
314+
},
315+
want: nil,
316+
wantErr: false,
317+
},
318+
}
319+
for _, tt := range tests {
320+
t.Run(tt.name, func(t *testing.T) {
321+
got, err := GetTableColumnsSqle(tt.args.sqleContext, tt.args.schema, tt.args.table)
322+
if (err != nil) != tt.wantErr {
323+
t.Errorf("GetTableColumnsSqle() error = %v, wantErr %v", err, tt.wantErr)
324+
return
325+
}
326+
fmt.Printf("table %v\n", tt.args.table)
327+
for i := range got.Columns {
328+
println(got.Columns[i].Type)
329+
}
330+
})
331+
}
332+
}

0 commit comments

Comments
 (0)