From 93b2c11b3762b31bbace71327273a136a89881c9 Mon Sep 17 00:00:00 2001 From: wenbo Date: Tue, 30 Oct 2018 20:21:17 +0800 Subject: [PATCH 1/6] make sql statement --- go/src/binlogsync/sqlUtil.go | 77 ++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 go/src/binlogsync/sqlUtil.go diff --git a/go/src/binlogsync/sqlUtil.go b/go/src/binlogsync/sqlUtil.go new file mode 100644 index 0000000..ed851d6 --- /dev/null +++ b/go/src/binlogsync/sqlUtil.go @@ -0,0 +1,77 @@ +package main + +import ( + "fmt" + "strings" + + "github.com/siddontang/go-mysql/mysql" +) + +func makeUpdateSql(table string, idxField, idxValue, tbField, tbValue []string) string { + var setClause string + var whereClause string + var limitClause string + var tableClause string + + tableClause = quote(table, "`") + setClause = makeSqlCondition(tbField, tbValue, "=", ", ") + whereClause = makeWhereClause(idxField, idxValue) + limitClause = "LIMIT 1" + + return fmt.Sprintf("UPDATE %s SET %s%s%s;", tableClause, setClause, whereClause, limitClause) +} + +func makeInsertSql(table string, tbField, tbValue []string) string { + var fldClause string + var valClause string + var tableClause string + + tableClause = quote(table, "`") + + fld := make([]string, len(tbField)) + val := make([]string, len(tbValue)) + + for i := 0; i < len(tbField); i++ { + fld[i] = quote(tbField[i], "`") + val[i] = "\"" + mysql.Escape(tbValue[i]) + "\"" + } + + FileLog.Printf("table field: %v, value: %v\n", fld, val) + + fldClause = "(" + strings.Join(fld, ", ") + ")" + valClause = "(" + strings.Join(val, ", ") + ")" + + return fmt.Sprintf("INSERT INTO %s %s VALUES %s;", tableClause, fldClause, valClause) +} + +func makeDeleteSql(table string, idxField, idxValue []string) string { + var whereClause string + var limitClause string + var tableClause string + + tableClause = quote(table, "`") + + whereClause = makeWhereClause(idxField, idxValue) + limitClause = "LIMIT 1" + + return fmt.Sprintf("DELETE FROM %s %s %s;", tableClause, whereClause, limitClause) +} + +func makeWhereClause(fields, values []string) string { + return fmt.Sprintf("WHERE %s", makeSqlCondition(fields, values, "=", " AND ")) +} + +func makeSqlCondition(fields, values []string, operator, formatter string) string { + conds := make([]string, len(fields)) + + for i, k := range fields { + conds[i] = fmt.Sprintf("%s%s%s", quote(k, "`"), operator, "\""+mysql.Escape(values[i])+"\"") + } + + return strings.Join(conds, formatter) +} + +func quote(src, quote string) string { + rst := strings.Replace(src, quote, "\\"+quote, -1) + return quote + rst + quote +} From 9d7f2f5a7329376d662a4366bda0d045734f9af7 Mon Sep 17 00:00:00 2001 From: wenbo Date: Tue, 30 Oct 2018 20:21:43 +0800 Subject: [PATCH 2/6] add some utils --- go/src/binlogsync/util.go | 61 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 go/src/binlogsync/util.go diff --git a/go/src/binlogsync/util.go b/go/src/binlogsync/util.go new file mode 100644 index 0000000..53d1730 --- /dev/null +++ b/go/src/binlogsync/util.go @@ -0,0 +1,61 @@ +package main + +import ( + "crypto/sha256" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "strconv" + "strings" +) + +func calcHashToInt64(src []byte) (int64, error) { + h := sha256.New() + h.Write(src) + + hex_str := fmt.Sprintf("%x", h.Sum(nil)) + + num, err := strconv.ParseInt(hex_str[:8], 16, 64) + if err != nil { + return 0, err + } + return num, nil +} + +func compareStringSlice(a, b []string) (int, error) { + if len(a) != len(b) { + return 0, errors.New("length of slices not equals") + } + + for i, v := range a { + rst := strings.Compare(v, b[i]) + if rst == 0 { + continue + } + return rst, nil + } + + return 0, nil +} + +type JsonStruct struct { +} + +func NewJsonStruct() *JsonStruct { + return &JsonStruct{} +} + +func (jst *JsonStruct) Load(filename string, v interface{}) error { + data, err := ioutil.ReadFile(filename) + if err != nil { + return err + } + + err = json.Unmarshal(data, v) + if err != nil { + return err + } + + return nil +} From dcc5d59a219410c8b6733ec4b3fb38d26c6ff682 Mon Sep 17 00:00:00 2001 From: wenbo Date: Tue, 30 Oct 2018 20:22:12 +0800 Subject: [PATCH 3/6] add binlog sync --- go/src/binlogsync/binlog_sync.go | 393 +++++++++++++++++++++++++++++++ go/src/binlogsync/config.json | 42 ++++ 2 files changed, 435 insertions(+) create mode 100644 go/src/binlogsync/binlog_sync.go create mode 100644 go/src/binlogsync/config.json diff --git a/go/src/binlogsync/binlog_sync.go b/go/src/binlogsync/binlog_sync.go new file mode 100644 index 0000000..4381962 --- /dev/null +++ b/go/src/binlogsync/binlog_sync.go @@ -0,0 +1,393 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "reflect" + "strconv" + "strings" + "sync" + "time" + + "github.com/siddontang/go-mysql/client" + "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" +) + +type Connection struct { + Addr string // can be ip:port or a unix socket domain + Host string + Port string + User string + Password string + DBName string +} + +type DBInfo struct { + Conn *client.Conn + Shard *DBShard +} + +type DBShard struct { + From []string + Table string + DBPort string +} + +type WriteEvent struct { + event *replication.BinlogEvent + before map[string]string + after map[string]string + dbInfo *DBInfo +} + +type OutStatus struct { + err error + routineIndex int + logPos uint32 +} + +type Config struct { + ChCap int + ChCnt int + + SourceConn Connection + + DBAddrs map[string]Connection + Shards []DBShard + + TableName string + TableField []string + TableShard []string + TableIndex []string + + BinlogFile string + BinlogPos uint32 + + TickCnt int64 +} + +var logFile *os.File + +var mutex sync.Mutex + +var ( + FileLog *log.Logger + ShellLog *log.Logger +) + +var dbPool = make(map[string]*client.Conn) +var conf = Config{} +var logFileName = "binlog_sync.out" +var confName = "./config.json" + +func validRow(srcRow []interface{}) map[string]string { + // the first column `id` should not put in new rowValue + rowValue := make([]string, len(srcRow)-1) + for i, v := range srcRow[1:] { + if v == nil { + continue + } + + var tmp string + if reflect.TypeOf(v).String() == "[]uint8" { + // convert []byte to string + tmp = fmt.Sprintf("%s", v) + } else { + tmp = fmt.Sprintf("%v", v) + } + + rowValue[i] = tmp + } + + row := make(map[string]string) + + for i, k := range conf.TableField { + row[k] = rowValue[i] + } + + return row +} + +func newBinlogReader(conn *Connection, binlogFile string, binlogPos uint32, serverID uint32) (*replication.BinlogStreamer, error) { + + port, err := strconv.ParseInt(conn.Port, 10, 16) + if err != nil { + return nil, err + } + + binlogCfg := replication.BinlogSyncerConfig{ + ServerID: serverID, + Flavor: "mysql", + Host: conn.Host, + Port: uint16(port), + User: conn.User, + Password: conn.Password, + } + + syncer := replication.NewBinlogSyncer(binlogCfg) + + streamer, err := syncer.StartSync(mysql.Position{binlogFile, binlogPos}) + if err != nil { + return nil, err + } + + return streamer, nil +} + +func writeToDB(chIdx int, inCh chan *WriteEvent, outCh chan *OutStatus) { + for ev := range inCh { + mutex.Lock() + ev.dbInfo = getEventConnection(ev.after) + mutex.Unlock() + + index := makeTableIndex(ev) + + value := make([]string, len(conf.TableField)) + for i, k := range conf.TableField { + value[i] = ev.after[k] + } + + var sql string + evType := ev.event.Header.EventType + + FileLog.Printf("routin index: %d, before: %v, after: %v, event type: %v\n", chIdx, ev.before, ev.after, evType) + + if evType == replication.UPDATE_ROWS_EVENTv2 || evType == replication.UPDATE_ROWS_EVENTv1 || evType == replication.UPDATE_ROWS_EVENTv0 { + sql = makeUpdateSql(ev.dbInfo.Shard.Table, conf.TableIndex, index, conf.TableField, value) + + } else if evType == replication.DELETE_ROWS_EVENTv2 || evType == replication.DELETE_ROWS_EVENTv1 || evType == replication.DELETE_ROWS_EVENTv0 { + + sql = makeDeleteSql(ev.dbInfo.Shard.Table, conf.TableIndex, index) + } else if evType == replication.WRITE_ROWS_EVENTv2 || evType == replication.WRITE_ROWS_EVENTv1 || evType == replication.WRITE_ROWS_EVENTv0 { + + sql = makeInsertSql(ev.dbInfo.Shard.Table, conf.TableField, value) + } else { + continue + } + + FileLog.Printf("routin index: %d, get sql statement: %v", chIdx, sql) + + mutex.Lock() + _, err := ev.dbInfo.Conn.Execute(sql) + mutex.Unlock() + if err != nil { + FileLog.Printf("Execute error: %v\n", err) + } + + stat := &OutStatus{ + err: err, + routineIndex: chIdx, + logPos: ev.event.Header.LogPos, + } + + FileLog.Printf("routin index: %d, event position: %d\n", chIdx, ev.event.Header.LogPos) + outCh <- stat + } +} + +func collector(inCh chan *OutStatus) { + var rowCount int64 + var errCount int64 + var errTypes = make(map[string]int) + + var start = time.Now() + for outStat := range inCh { + if outStat.err != nil { + errCount += 1 + errTypes[outStat.err.Error()] += 1 + } + rowCount += 1 + + if rowCount%conf.TickCnt == 0 { + + ShellLog.Printf("========= sync stat =========\n") + + ShellLog.Printf("has synced: %d rows\n", rowCount) + ShellLog.Printf("has error: %d rows\n", errCount) + for k, v := range errTypes { + ShellLog.Printf("%s: %d rows\n", k, v) + } + + ShellLog.Printf("sync rate: %.3f rows per second\n", float64(conf.TickCnt)/time.Since(start).Seconds()) + ShellLog.Printf("has synced log position: %d\n", outStat.logPos) + + start = time.Now() + } + } +} + +func getEventConnection(row map[string]string) *DBInfo { + shardValues := make([]string, len(conf.TableShard)) + for i, k := range conf.TableShard { + shardValues[i] = row[k] + } + + shard := findShards(shardValues) + conn := dbPool[shard.DBPort] + if conn == nil { + addr := conf.DBAddrs[shard.DBPort] + conn, err := client.Connect(addr.Addr, addr.User, addr.Password, addr.DBName) + if err != nil { + FileLog.Panicf("get connection failed: %v\n", err) + } + dbPool[addr.Port] = conn + } + + return &DBInfo{ + Conn: conn, + Shard: shard, + } +} + +func findShards(tbShards []string) *DBShard { + le := 0 + ri := len(conf.Shards) + + for le < ri { + i := (le + ri) / 2 + shard := conf.Shards[i].From + + rst, err := compareStringSlice(shard, tbShards) + if err != nil { + FileLog.Printf("conf is not valid, shard length not equal: %v\n", err) + return nil + } + + if rst == 0 { + return &conf.Shards[i] + } + + if rst < 0 { + le = i + 1 + } else { + ri = i + } + } + + if ri >= 1 { + return &conf.Shards[ri-1] + } + + return &conf.Shards[0] +} + +func main() { + + // set log + ShellLog = log.New(os.Stdout, "", 0) + + logFile, err := os.Create(logFileName) + if err != nil { + ShellLog.Panicf("create log file failed: %v\n", err) + } + defer logFile.Close() + FileLog = log.New(logFile, "", log.Ldate|log.Ltime|log.Lshortfile) + + // read config + jsonParser := NewJsonStruct() + + err = jsonParser.Load(confName, &conf) + if err != nil { + ShellLog.Panicf("read config file failed: %v\n", err) + } + + var writeChs = make([]chan *WriteEvent, conf.ChCnt) + var countCh = make(chan *OutStatus, conf.ChCnt*conf.ChCap) + + for i := 0; i < conf.ChCnt; i++ { + writeCh := make(chan *WriteEvent, conf.ChCap) + writeChs[i] = writeCh + go writeToDB(i, writeCh, countCh) + } + + go collector(countCh) + + binlogReader, err := newBinlogReader(&conf.SourceConn, conf.BinlogFile, conf.BinlogPos, 9999) + if err != nil { + ShellLog.Panicf("make binlog reader failed: %v\n", err) + } + + for { + ev, err := binlogReader.GetEvent(context.Background()) + if err != nil { + ShellLog.Panicf("get event failed: %v\n", err) + } + + if ev.Header.EventType == replication.ROTATE_EVENT { + writeEV := &WriteEvent{ + event: ev, + } + writeChs[0] <- writeEV + continue + } + + writeEV := makeWriteEvent(ev) + if writeEV == nil { + continue + } + + indexValues := make([]string, len(conf.TableIndex)) + for i, k := range conf.TableIndex { + if writeEV.before != nil { + indexValues[i] = writeEV.before[k] + } else { + indexValues[i] = writeEV.after[k] + } + } + + rowSha1, err := calcHashToInt64([]byte(strings.Join(indexValues, ""))) + if err != nil { + ShellLog.Panicf("calculate hash failed: %v", err) + } + + chIdx := rowSha1 % int64(conf.ChCnt) + writeChs[chIdx] <- writeEV + } +} + +func makeWriteEvent(ev *replication.BinlogEvent) *WriteEvent { + var before map[string]string + var after map[string]string + + rowEv, ok := ev.Event.(*replication.RowsEvent) + if !ok { + FileLog.Printf("event is not a rows event\n") + return nil + } + + table := string(rowEv.Table.Table) + if table != conf.TableName { + FileLog.Printf("rows event is not the required table, get %v\n", table) + return nil + } + + if len(rowEv.Rows) == 2 { + before = validRow(rowEv.Rows[0]) + after = validRow(rowEv.Rows[1]) + } else { + before = nil + after = validRow(rowEv.Rows[0]) + } + + return &WriteEvent{ + event: ev, + before: before, + after: after, + } +} + +func makeTableIndex(ev *WriteEvent) []string { + index := make([]string, len(conf.TableIndex)) + for i, k := range conf.TableIndex { + if ev.before != nil { + index[i] = ev.before[k] + } else { + index[i] = ev.after[k] + } + } + + return index +} diff --git a/go/src/binlogsync/config.json b/go/src/binlogsync/config.json new file mode 100644 index 0000000..760a67a --- /dev/null +++ b/go/src/binlogsync/config.json @@ -0,0 +1,42 @@ +{ + "ChCap": 20, + "ChCnt": 2, + + "SourceConn": { + "Addr": "106.14.46.83:3308", + "Host": "106.14.46.83", + "Port": "3308", + "User": "admin", + "Password": "Password", + "DBName": "db" + }, + + "DBAddrs": { + "4491": { + "Addr": "139.224.229.252:4491", + "Host": "139.224.229.252", + "Port": "4491", + "User": "admin", + "Password": "Password", + "DBName": "db" + } + }, + + "Shards": [ + { + "From": ["1090000000000000096", "u", "1.jpg"], + "Table": "key_1090000000000000096_u_1.jpg", + "DBPort": "4491" + } + ], + + "TableName": "key_0000000000000000000__", + "TableField": ["bucket_id", "scope", "key", "ts", "is_del", "owner", "acl", "sha1", + "ver", "md5", "crc32", "size", "file_meta", "group_id", "origo", "expires", "multipart"], + "TableShard": [ "bucket_id" , "scope", "key"], + "TableIndex": [ "bucket_id" , "scope", "key"], + + "BinlogFile": "mysql-bin.000004", + "BinlogPos": 4, + "TickCnt": 40 +} From 84db4941762aef8f105ad1d52890dae76ceb7924 Mon Sep 17 00:00:00 2001 From: wenbo Date: Wed, 31 Oct 2018 12:00:18 +0800 Subject: [PATCH 4/6] add read binlog by GTID --- go/src/binlogsync/binlog_sync.go | 67 ++++++++++++++++++++++++-------- go/src/binlogsync/config.json | 5 ++- 2 files changed, 54 insertions(+), 18 deletions(-) diff --git a/go/src/binlogsync/binlog_sync.go b/go/src/binlogsync/binlog_sync.go index 4381962..b29b1d6 100644 --- a/go/src/binlogsync/binlog_sync.go +++ b/go/src/binlogsync/binlog_sync.go @@ -46,12 +46,12 @@ type WriteEvent struct { type OutStatus struct { err error routineIndex int - logPos uint32 + logPos int32 } type Config struct { - ChCap int - ChCnt int + ChannelCapacity int + WriteThreadCount int SourceConn Connection @@ -63,8 +63,11 @@ type Config struct { TableShard []string TableIndex []string + // if set GTID, binlog file and pos will be ignored + GTID string + BinlogFile string - BinlogPos uint32 + BinlogPos int32 TickCnt int64 } @@ -111,15 +114,14 @@ func validRow(srcRow []interface{}) map[string]string { return row } -func newBinlogReader(conn *Connection, binlogFile string, binlogPos uint32, serverID uint32) (*replication.BinlogStreamer, error) { - +func newBinlogSyncer(conn *Connection, serverID int32) (*replication.BinlogSyncer, error) { port, err := strconv.ParseInt(conn.Port, 10, 16) if err != nil { return nil, err } binlogCfg := replication.BinlogSyncerConfig{ - ServerID: serverID, + ServerID: uint32(serverID), Flavor: "mysql", Host: conn.Host, Port: uint16(port), @@ -127,9 +129,36 @@ func newBinlogReader(conn *Connection, binlogFile string, binlogPos uint32, serv Password: conn.Password, } - syncer := replication.NewBinlogSyncer(binlogCfg) + return replication.NewBinlogSyncer(binlogCfg), nil +} + +func newBinlogReaderByPosition(conn *Connection, binlogFile string, binlogPos int32, serverID int32) (*replication.BinlogStreamer, error) { + + syncer, err := newBinlogSyncer(conn, serverID) + if err != nil { + return nil, err + } + + streamer, err := syncer.StartSync(mysql.Position{binlogFile, uint32(binlogPos)}) + if err != nil { + return nil, err + } + + return streamer, nil +} - streamer, err := syncer.StartSync(mysql.Position{binlogFile, binlogPos}) +func newBinlogReaderByGTID(conn *Connection, GTID string, serverID int32) (*replication.BinlogStreamer, error) { + syncer, err := newBinlogSyncer(conn, serverID) + if err != nil { + return nil, err + } + + gtidSet, err := mysql.ParseMysqlGTIDSet(GTID) + if err != nil { + return nil, err + } + + streamer, err := syncer.StartSyncGTID(gtidSet) if err != nil { return nil, err } @@ -180,7 +209,7 @@ func writeToDB(chIdx int, inCh chan *WriteEvent, outCh chan *OutStatus) { stat := &OutStatus{ err: err, routineIndex: chIdx, - logPos: ev.event.Header.LogPos, + logPos: int32(ev.event.Header.LogPos), } FileLog.Printf("routin index: %d, event position: %d\n", chIdx, ev.event.Header.LogPos) @@ -294,18 +323,24 @@ func main() { ShellLog.Panicf("read config file failed: %v\n", err) } - var writeChs = make([]chan *WriteEvent, conf.ChCnt) - var countCh = make(chan *OutStatus, conf.ChCnt*conf.ChCap) + var writeChs = make([]chan *WriteEvent, conf.WriteThreadCount) + var countCh = make(chan *OutStatus, conf.WriteThreadCount*conf.ChannelCapacity) - for i := 0; i < conf.ChCnt; i++ { - writeCh := make(chan *WriteEvent, conf.ChCap) + for i := 0; i < conf.WriteThreadCount; i++ { + writeCh := make(chan *WriteEvent, conf.ChannelCapacity) writeChs[i] = writeCh go writeToDB(i, writeCh, countCh) } go collector(countCh) - binlogReader, err := newBinlogReader(&conf.SourceConn, conf.BinlogFile, conf.BinlogPos, 9999) + var binlogReader *replication.BinlogStreamer + if conf.GTID != "" { + binlogReader, err = newBinlogReaderByGTID(&conf.SourceConn, conf.GTID, 9999) + } else { + binlogReader, err = newBinlogReaderByPosition(&conf.SourceConn, conf.BinlogFile, conf.BinlogPos, 9999) + } + if err != nil { ShellLog.Panicf("make binlog reader failed: %v\n", err) } @@ -343,7 +378,7 @@ func main() { ShellLog.Panicf("calculate hash failed: %v", err) } - chIdx := rowSha1 % int64(conf.ChCnt) + chIdx := rowSha1 % int64(conf.WriteThreadCount) writeChs[chIdx] <- writeEV } } diff --git a/go/src/binlogsync/config.json b/go/src/binlogsync/config.json index 760a67a..c634fc5 100644 --- a/go/src/binlogsync/config.json +++ b/go/src/binlogsync/config.json @@ -1,6 +1,6 @@ { - "ChCap": 20, - "ChCnt": 2, + "ChannelCapacity": 20, + "WriteThreadCount": 2, "SourceConn": { "Addr": "106.14.46.83:3308", @@ -36,6 +36,7 @@ "TableShard": [ "bucket_id" , "scope", "key"], "TableIndex": [ "bucket_id" , "scope", "key"], + "GTID": "", "BinlogFile": "mysql-bin.000004", "BinlogPos": 4, "TickCnt": 40 From 5552286b627a42bd211ea1f21169801b0250cc3f Mon Sep 17 00:00:00 2001 From: wenbo Date: Wed, 31 Oct 2018 15:38:43 +0800 Subject: [PATCH 5/6] add some commends --- go/src/binlogsync/binlog_sync.go | 41 +++++++++++++++++++------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/go/src/binlogsync/binlog_sync.go b/go/src/binlogsync/binlog_sync.go index b29b1d6..bf516e8 100644 --- a/go/src/binlogsync/binlog_sync.go +++ b/go/src/binlogsync/binlog_sync.go @@ -44,14 +44,20 @@ type WriteEvent struct { } type OutStatus struct { - err error - routineIndex int - logPos int32 + err error + goroutineIndex int + logPos int32 } type Config struct { - ChannelCapacity int - WriteThreadCount int + // this config used to parse `config.json`, format detail is in the example config file + // `config.json` + + // WriteThreadCnt specifies how many goroutine used to execute sql statement + WriteThreadCnt int + + // ChannelCapacity specifies the event buffer capacity of one write goroutine + ChannelCapacity int SourceConn Connection @@ -69,14 +75,15 @@ type Config struct { BinlogFile string BinlogPos int32 + // TickCnt specifies every `TickCnt` rows synced got 1 status report TickCnt int64 } -var logFile *os.File +var ( + logFile *os.File -var mutex sync.Mutex + mutex sync.Mutex -var ( FileLog *log.Logger ShellLog *log.Logger ) @@ -207,9 +214,9 @@ func writeToDB(chIdx int, inCh chan *WriteEvent, outCh chan *OutStatus) { } stat := &OutStatus{ - err: err, - routineIndex: chIdx, - logPos: int32(ev.event.Header.LogPos), + err: err, + goroutineIndex: chIdx, + logPos: int32(ev.event.Header.LogPos), } FileLog.Printf("routin index: %d, event position: %d\n", chIdx, ev.event.Header.LogPos) @@ -323,10 +330,10 @@ func main() { ShellLog.Panicf("read config file failed: %v\n", err) } - var writeChs = make([]chan *WriteEvent, conf.WriteThreadCount) - var countCh = make(chan *OutStatus, conf.WriteThreadCount*conf.ChannelCapacity) + var writeChs = make([]chan *WriteEvent, conf.WriteThreadCnt) + var countCh = make(chan *OutStatus, conf.WriteThreadCnt*conf.ChannelCapacity) - for i := 0; i < conf.WriteThreadCount; i++ { + for i := 0; i < conf.WriteThreadCnt; i++ { writeCh := make(chan *WriteEvent, conf.ChannelCapacity) writeChs[i] = writeCh go writeToDB(i, writeCh, countCh) @@ -373,12 +380,12 @@ func main() { } } - rowSha1, err := calcHashToInt64([]byte(strings.Join(indexValues, ""))) + rowHash, err := calcHashToInt64([]byte(strings.Join(indexValues, ""))) if err != nil { ShellLog.Panicf("calculate hash failed: %v", err) } - chIdx := rowSha1 % int64(conf.WriteThreadCount) + chIdx := rowHash % int64(conf.WriteThreadCnt) writeChs[chIdx] <- writeEV } } @@ -389,7 +396,7 @@ func makeWriteEvent(ev *replication.BinlogEvent) *WriteEvent { rowEv, ok := ev.Event.(*replication.RowsEvent) if !ok { - FileLog.Printf("event is not a rows event\n") + FileLog.Printf("event is not a rows event, got: %v\n", ev.Header.EventType) return nil } From 1fc8d2e52035ede2974348f806e62aeecc623ec0 Mon Sep 17 00:00:00 2001 From: wenbo Date: Wed, 31 Oct 2018 20:38:25 +0800 Subject: [PATCH 6/6] refine after review --- go/src/binlogsync/binlog_sync.go | 117 ++++++++++++++++--------------- go/src/binlogsync/sqlUtil.go | 17 ++--- go/src/binlogsync/util.go | 23 +++--- 3 files changed, 78 insertions(+), 79 deletions(-) diff --git a/go/src/binlogsync/binlog_sync.go b/go/src/binlogsync/binlog_sync.go index bf516e8..d63e16e 100644 --- a/go/src/binlogsync/binlog_sync.go +++ b/go/src/binlogsync/binlog_sync.go @@ -6,8 +6,8 @@ import ( "log" "os" "reflect" + "sort" "strconv" - "strings" "sync" "time" @@ -54,10 +54,7 @@ type Config struct { // `config.json` // WriteThreadCnt specifies how many goroutine used to execute sql statement - WriteThreadCnt int - - // ChannelCapacity specifies the event buffer capacity of one write goroutine - ChannelCapacity int + WriteWorkerCnt int SourceConn Connection @@ -80,18 +77,21 @@ type Config struct { } var ( - logFile *os.File + mutex *sync.Mutex - mutex sync.Mutex + wg sync.WaitGroup - FileLog *log.Logger - ShellLog *log.Logger + fileLog *log.Logger + shellLog *log.Logger ) -var dbPool = make(map[string]*client.Conn) -var conf = Config{} -var logFileName = "binlog_sync.out" -var confName = "./config.json" +var ( + dbPool = make(map[string]*client.Conn) + conf = Config{} + logFileName = "binlog_sync.out" + confName = "./config.json" + channelCapacity = 10240 +) func validRow(srcRow []interface{}) map[string]string { // the first column `id` should not put in new rowValue @@ -175,6 +175,11 @@ func newBinlogReaderByGTID(conn *Connection, GTID string, serverID int32) (*repl func writeToDB(chIdx int, inCh chan *WriteEvent, outCh chan *OutStatus) { for ev := range inCh { + + if ev.event.Header.EventType == replication.ROTATE_EVENT { + continue + } + mutex.Lock() ev.dbInfo = getEventConnection(ev.after) mutex.Unlock() @@ -189,7 +194,7 @@ func writeToDB(chIdx int, inCh chan *WriteEvent, outCh chan *OutStatus) { var sql string evType := ev.event.Header.EventType - FileLog.Printf("routin index: %d, before: %v, after: %v, event type: %v\n", chIdx, ev.before, ev.after, evType) + fileLog.Printf("routin index: %d, before: %v, after: %v, event type: %v\n", chIdx, ev.before, ev.after, evType) if evType == replication.UPDATE_ROWS_EVENTv2 || evType == replication.UPDATE_ROWS_EVENTv1 || evType == replication.UPDATE_ROWS_EVENTv0 { sql = makeUpdateSql(ev.dbInfo.Shard.Table, conf.TableIndex, index, conf.TableField, value) @@ -204,13 +209,13 @@ func writeToDB(chIdx int, inCh chan *WriteEvent, outCh chan *OutStatus) { continue } - FileLog.Printf("routin index: %d, get sql statement: %v", chIdx, sql) + fileLog.Printf("routin index: %d, get sql statement: %v", chIdx, sql) mutex.Lock() _, err := ev.dbInfo.Conn.Execute(sql) mutex.Unlock() if err != nil { - FileLog.Printf("Execute error: %v\n", err) + fileLog.Printf("Execute error: %v\n", err) } stat := &OutStatus{ @@ -219,7 +224,7 @@ func writeToDB(chIdx int, inCh chan *WriteEvent, outCh chan *OutStatus) { logPos: int32(ev.event.Header.LogPos), } - FileLog.Printf("routin index: %d, event position: %d\n", chIdx, ev.event.Header.LogPos) + fileLog.Printf("routin index: %d, event position: %d\n", chIdx, ev.event.Header.LogPos) outCh <- stat } } @@ -239,16 +244,16 @@ func collector(inCh chan *OutStatus) { if rowCount%conf.TickCnt == 0 { - ShellLog.Printf("========= sync stat =========\n") + shellLog.Printf("========= sync stat =========\n") - ShellLog.Printf("has synced: %d rows\n", rowCount) - ShellLog.Printf("has error: %d rows\n", errCount) + shellLog.Printf("has synced: %d rows\n", rowCount) + shellLog.Printf("has error: %d rows\n", errCount) for k, v := range errTypes { - ShellLog.Printf("%s: %d rows\n", k, v) + shellLog.Printf("%s: %d rows\n", k, v) } - ShellLog.Printf("sync rate: %.3f rows per second\n", float64(conf.TickCnt)/time.Since(start).Seconds()) - ShellLog.Printf("has synced log position: %d\n", outStat.logPos) + shellLog.Printf("sync rate: %.3f rows per second\n", float64(conf.TickCnt)/time.Since(start).Seconds()) + shellLog.Printf("has synced log position: %d\n", outStat.logPos) start = time.Now() } @@ -267,7 +272,7 @@ func getEventConnection(row map[string]string) *DBInfo { addr := conf.DBAddrs[shard.DBPort] conn, err := client.Connect(addr.Addr, addr.User, addr.Password, addr.DBName) if err != nil { - FileLog.Panicf("get connection failed: %v\n", err) + fileLog.Panicf("get connection failed: %v\n", err) } dbPool[addr.Port] = conn } @@ -279,63 +284,57 @@ func getEventConnection(row map[string]string) *DBInfo { } func findShards(tbShards []string) *DBShard { - le := 0 - ri := len(conf.Shards) - for le < ri { - i := (le + ri) / 2 + //conf.Shards should be descending + i := sort.Search(len(conf.Shards), func(i int) bool { shard := conf.Shards[i].From - rst, err := compareStringSlice(shard, tbShards) if err != nil { - FileLog.Printf("conf is not valid, shard length not equal: %v\n", err) - return nil + fileLog.Panicf("conf is not valid, shard length not equal: %v\n", err) } - if rst == 0 { - return &conf.Shards[i] + if rst <= 0 { + return true } + return false + }) - if rst < 0 { - le = i + 1 - } else { - ri = i - } + if i >= 0 && i < len(conf.Shards) { + return &conf.Shards[i] } - if ri >= 1 { - return &conf.Shards[ri-1] - } - - return &conf.Shards[0] + fileLog.Panicf("can not find shard: index out of bound") + return nil } func main() { // set log - ShellLog = log.New(os.Stdout, "", 0) + shellLog = log.New(os.Stdout, "", 0) logFile, err := os.Create(logFileName) if err != nil { - ShellLog.Panicf("create log file failed: %v\n", err) + shellLog.Panicf("create log file failed: %v\n", err) } defer logFile.Close() - FileLog = log.New(logFile, "", log.Ldate|log.Ltime|log.Lshortfile) + fileLog = log.New(logFile, "", log.Ldate|log.Ltime|log.Lshortfile) // read config jsonParser := NewJsonStruct() err = jsonParser.Load(confName, &conf) if err != nil { - ShellLog.Panicf("read config file failed: %v\n", err) + shellLog.Panicf("read config file failed: %v\n", err) } - var writeChs = make([]chan *WriteEvent, conf.WriteThreadCnt) - var countCh = make(chan *OutStatus, conf.WriteThreadCnt*conf.ChannelCapacity) + var writeChs = make([]chan *WriteEvent, conf.WriteWorkerCnt) + var countCh = make(chan *OutStatus, conf.WriteWorkerCnt*channelCapacity) - for i := 0; i < conf.WriteThreadCnt; i++ { - writeCh := make(chan *WriteEvent, conf.ChannelCapacity) + for i := 0; i < conf.WriteWorkerCnt; i++ { + writeCh := make(chan *WriteEvent, channelCapacity) writeChs[i] = writeCh + + wg.Add(1) go writeToDB(i, writeCh, countCh) } @@ -349,13 +348,13 @@ func main() { } if err != nil { - ShellLog.Panicf("make binlog reader failed: %v\n", err) + shellLog.Panicf("make binlog reader failed: %v\n", err) } for { ev, err := binlogReader.GetEvent(context.Background()) if err != nil { - ShellLog.Panicf("get event failed: %v\n", err) + shellLog.Panicf("get event failed: %v\n", err) } if ev.Header.EventType == replication.ROTATE_EVENT { @@ -380,14 +379,16 @@ func main() { } } - rowHash, err := calcHashToInt64([]byte(strings.Join(indexValues, ""))) + rowHash, err := hashStringSliceToInt32(indexValues) if err != nil { - ShellLog.Panicf("calculate hash failed: %v", err) + shellLog.Panicf("calculate hash failed: %v", err) } - chIdx := rowHash % int64(conf.WriteThreadCnt) + chIdx := rowHash % int64(conf.WriteWorkerCnt) writeChs[chIdx] <- writeEV } + + wg.Wait() } func makeWriteEvent(ev *replication.BinlogEvent) *WriteEvent { @@ -396,13 +397,13 @@ func makeWriteEvent(ev *replication.BinlogEvent) *WriteEvent { rowEv, ok := ev.Event.(*replication.RowsEvent) if !ok { - FileLog.Printf("event is not a rows event, got: %v\n", ev.Header.EventType) + fileLog.Printf("event is not a rows event, got: %v\n", ev.Header.EventType) return nil } table := string(rowEv.Table.Table) if table != conf.TableName { - FileLog.Printf("rows event is not the required table, get %v\n", table) + fileLog.Printf("rows event is not the required table, get %v\n", table) return nil } diff --git a/go/src/binlogsync/sqlUtil.go b/go/src/binlogsync/sqlUtil.go index ed851d6..651f45f 100644 --- a/go/src/binlogsync/sqlUtil.go +++ b/go/src/binlogsync/sqlUtil.go @@ -13,7 +13,7 @@ func makeUpdateSql(table string, idxField, idxValue, tbField, tbValue []string) var limitClause string var tableClause string - tableClause = quote(table, "`") + tableClause = quoteString(table, "`") setClause = makeSqlCondition(tbField, tbValue, "=", ", ") whereClause = makeWhereClause(idxField, idxValue) limitClause = "LIMIT 1" @@ -26,17 +26,17 @@ func makeInsertSql(table string, tbField, tbValue []string) string { var valClause string var tableClause string - tableClause = quote(table, "`") + tableClause = quoteString(table, "`") fld := make([]string, len(tbField)) val := make([]string, len(tbValue)) for i := 0; i < len(tbField); i++ { - fld[i] = quote(tbField[i], "`") + fld[i] = quoteString(tbField[i], "`") val[i] = "\"" + mysql.Escape(tbValue[i]) + "\"" } - FileLog.Printf("table field: %v, value: %v\n", fld, val) + fileLog.Printf("table field: %v, value: %v\n", fld, val) fldClause = "(" + strings.Join(fld, ", ") + ")" valClause = "(" + strings.Join(val, ", ") + ")" @@ -49,7 +49,7 @@ func makeDeleteSql(table string, idxField, idxValue []string) string { var limitClause string var tableClause string - tableClause = quote(table, "`") + tableClause = quoteString(table, "`") whereClause = makeWhereClause(idxField, idxValue) limitClause = "LIMIT 1" @@ -65,13 +65,14 @@ func makeSqlCondition(fields, values []string, operator, formatter string) strin conds := make([]string, len(fields)) for i, k := range fields { - conds[i] = fmt.Sprintf("%s%s%s", quote(k, "`"), operator, "\""+mysql.Escape(values[i])+"\"") + conds[i] = fmt.Sprintf("%s%s%s", quoteString(k, "`"), operator, "\""+mysql.Escape(values[i])+"\"") } return strings.Join(conds, formatter) } -func quote(src, quote string) string { - rst := strings.Replace(src, quote, "\\"+quote, -1) +func quoteString(src, quote string) string { + safeQuote := strings.Replace(quote, "\\", "\\\\", -1) + rst := strings.Replace(src, quote, "\\"+safeQuote, -1) return quote + rst + quote } diff --git a/go/src/binlogsync/util.go b/go/src/binlogsync/util.go index 53d1730..a4215b0 100644 --- a/go/src/binlogsync/util.go +++ b/go/src/binlogsync/util.go @@ -1,26 +1,23 @@ package main import ( - "crypto/sha256" "encoding/json" "errors" - "fmt" + "hash/fnv" "io/ioutil" - "strconv" "strings" ) -func calcHashToInt64(src []byte) (int64, error) { - h := sha256.New() - h.Write(src) - - hex_str := fmt.Sprintf("%x", h.Sum(nil)) - - num, err := strconv.ParseInt(hex_str[:8], 16, 64) - if err != nil { - return 0, err +func hashStringSliceToInt32(src []string) (int64, error) { + h := fnv.New32() + for _, s := range src { + _, err := h.Write([]byte(s)) + if err != nil { + return 0, err + } } - return num, nil + + return int64(h.Sum32()), nil } func compareStringSlice(a, b []string) (int, error) {