From 31a9eba503254c5600c26e84e93196d013b1a168 Mon Sep 17 00:00:00 2001 From: wenbo Date: Tue, 30 Oct 2018 20:21:17 +0800 Subject: [PATCH 1/3] add a binlog syncer: read binlog and write to db --- go/src/app/binlogsync/binlog_sync.go | 428 +++++++++++++++++++++++++++ go/src/app/binlogsync/config.yaml | 36 +++ go/src/app/binlogsync/main.go | 87 ++++++ go/src/app/binlogsync/sqlUtil.go | 131 ++++++++ go/src/app/binlogsync/util.go | 90 ++++++ 5 files changed, 772 insertions(+) create mode 100644 go/src/app/binlogsync/binlog_sync.go create mode 100644 go/src/app/binlogsync/config.yaml create mode 100644 go/src/app/binlogsync/main.go create mode 100644 go/src/app/binlogsync/sqlUtil.go create mode 100644 go/src/app/binlogsync/util.go diff --git a/go/src/app/binlogsync/binlog_sync.go b/go/src/app/binlogsync/binlog_sync.go new file mode 100644 index 0000000..32de05a --- /dev/null +++ b/go/src/app/binlogsync/binlog_sync.go @@ -0,0 +1,428 @@ +package main + +import ( + "context" + "fmt" + "log" + "reflect" + "sort" + "sync" + "time" + + uuid "github.com/satori/go.uuid" + "github.com/siddontang/go-mysql/client" + "github.com/siddontang/go-mysql/replication" +) + +type Connection struct { + Addr string `yaml:"Addr"` // can be ip:port or a unix socket domain + Host string `yaml:"Host"` + Port string `yaml:"Port"` + User string `yaml:"User"` + Password string `yaml:"Password"` + DBName string `yaml:"DBName"` +} + +type DBInfo struct { + Conn *client.Conn + Shard *DBShard +} + +type DBShard struct { + From []interface{} `yaml:"From"` + Table string `yaml:"Table"` + DBPort string `yaml:"DBPort"` +} + +type WriteEvent struct { + event *replication.BinlogEvent + before map[string]interface{} + after map[string]interface{} + dbInfo *DBInfo + nextGTID string +} + +type BinlogSyncer struct { + Config + + DBPool map[string]*client.Conn + WriteChs []chan *WriteEvent + CountCh chan *Status + + mutex *sync.Mutex + wg *sync.WaitGroup + shellLog *log.Logger + fileLog *log.Logger +} + +type BinlogPosition struct { + BinlogFile string `yaml:"BinlogFile"` + BinlogPos int64 `yaml:"BinlogPos"` + NextGTID string `yaml:"GTID"` +} + +type OutMessage struct { + Synced int64 `yaml:"Synced"` + Faild int64 `yaml:"Faild"` + Rate float64 `yaml:"Rate(rows/s)"` + Position BinlogPosition `yaml:"Position"` + Error map[string]int `yaml:"Error"` +} + +type Status struct { + err error + goroutineIndex int + + position BinlogPosition +} + +func NewBinlogSyncer(conf Config) *BinlogSyncer { + fileLog := log.New(logFile, "", log.Ldate|log.Ltime|log.Lshortfile) + fileLog.SetPrefix("[" + conf.SourceConn.Addr + "] ") + + bs := &BinlogSyncer{ + Config: conf, + + DBPool: make(map[string]*client.Conn), + WriteChs: make([]chan *WriteEvent, conf.WorkerCnt), + CountCh: make(chan *Status, channelCapacity*conf.WorkerCnt), + + mutex: &sync.Mutex{}, + wg: &sync.WaitGroup{}, + shellLog: shellLog, + fileLog: fileLog, + } + + return bs +} + +func (bs *BinlogSyncer) Sync() { + defer mainWG.Done() + + var binlogReader *replication.BinlogStreamer + var err error + + if bs.GTIDSet != "" { + binlogReader, err = newBinlogReaderByGTID(&bs.SourceConn, bs.GTIDSet, 9999) + } else { + binlogReader, err = newBinlogReaderByPosition(&bs.SourceConn, bs.BinlogFile, bs.BinlogPos, 9999) + } + + if err != nil { + bs.shellLog.Printf("[%s] make binlog reader failed: %v\n", bs.SourceConn.Addr, err) + return + } + + for i := 0; i < bs.WorkerCnt; i++ { + writeCh := make(chan *WriteEvent, channelCapacity) + bs.WriteChs[i] = writeCh + bs.wg.Add(1) + go bs.writeToDB(i, writeCh) + } + + go bs.readBinlog(binlogReader) + + go func() { + bs.wg.Wait() + close(bs.CountCh) + }() + + bs.collector() +} + +func (bs *BinlogSyncer) formatRow(srcRow []interface{}) map[string]interface{} { + // the first column `id` should not put in new rowValue + rowValue := make([]interface{}, len(srcRow)-1) + for i, v := range srcRow[1:] { + if v == nil { + continue + } + + var tmp interface{} + if reflect.TypeOf(v).String() == "[]uint8" { + // convert []byte to string + tmp = fmt.Sprintf("%s", v) + } else { + tmp = v + } + + rowValue[i] = tmp + } + + row := make(map[string]interface{}) + + for i, k := range bs.TableField { + row[k] = rowValue[i] + } + + return row +} + +func (bs *BinlogSyncer) writeToDB(chIdx int, inCh chan *WriteEvent) { + defer bs.wg.Done() + + for ev := range inCh { + if ev.event.Header.EventType == replication.ROTATE_EVENT { + rotateEv, ok := ev.event.Event.(*replication.RotateEvent) + if !ok { + bs.fileLog.Printf("event is RotateEvent, but cannot convert to a RotateEvent") + } + stat := &Status{ + goroutineIndex: chIdx, + position: BinlogPosition{ + BinlogPos: int64(rotateEv.Position), + BinlogFile: string(rotateEv.NextLogName), + }, + } + bs.CountCh <- stat + continue + } + + var err error + ev.dbInfo, err = bs.getEventConnection(ev.after) + if err != nil { + bs.shellLog.Printf("[%s] get connection failed: %v\n", bs.SourceConn.Addr, err) + return + } + + index := bs.makeTableIndex(ev) + + value := make([]string, len(bs.TableField)) + for i, k := range bs.TableField { + value[i] = fmt.Sprintf("%v", ev.after[k]) + } + + var sql string + evType := ev.event.Header.EventType + + bs.fileLog.Printf("routin index: %d, before: %v, after: %v, event type: %v\n", chIdx, ev.before, ev.after, evType) + + if evType == replication.UPDATE_ROWS_EVENTv2 || evType == replication.UPDATE_ROWS_EVENTv1 || evType == replication.UPDATE_ROWS_EVENTv0 { + sql = makeUpdateSql(ev.dbInfo.Shard.Table, bs.TableIndex, bs.TableField, index, value) + + } else if evType == replication.DELETE_ROWS_EVENTv2 || evType == replication.DELETE_ROWS_EVENTv1 || evType == replication.DELETE_ROWS_EVENTv0 { + + sql = makeDeleteSql(ev.dbInfo.Shard.Table, bs.TableIndex, index) + } else if evType == replication.WRITE_ROWS_EVENTv2 || evType == replication.WRITE_ROWS_EVENTv1 || evType == replication.WRITE_ROWS_EVENTv0 { + + sql = makeInsertSql(ev.dbInfo.Shard.Table, bs.TableField, value) + } else { + continue + } + + bs.fileLog.Printf("routin index: %d, get sql statement: %v", chIdx, sql) + + bs.mutex.Lock() + _, err = ev.dbInfo.Conn.Execute(sql) + bs.mutex.Unlock() + if err != nil { + bs.fileLog.Printf("Execute error: %v\n", err) + } + + stat := &Status{ + err: err, + goroutineIndex: chIdx, + position: BinlogPosition{ + BinlogPos: int64(ev.event.Header.LogPos), + BinlogFile: bs.BinlogFile, + NextGTID: ev.nextGTID, + }, + } + + bs.fileLog.Printf("routin index: %d, event position: %d\n", chIdx, ev.event.Header.LogPos) + bs.CountCh <- stat + } +} + +func (bs *BinlogSyncer) readBinlog(binlogReader *replication.BinlogStreamer) { + var gtidNext string + for { + ctx, _ := context.WithTimeout(context.Background(), 30*time.Second) + ev, err := binlogReader.GetEvent(ctx) + if err != nil { + bs.fileLog.Printf("get event failed: %v\n", err) + break + } + + if ev.Header.EventType == replication.ROTATE_EVENT { + writeEV := &WriteEvent{ + event: ev, + } + // RotateEvent has no row to hash so put it to channal 0 + bs.WriteChs[0] <- writeEV + continue + } + + if ev.Header.EventType == replication.GTID_EVENT { + gtidEv, ok := ev.Event.(*replication.GTIDEvent) + if !ok { + bs.fileLog.Printf("event is GTIDEvent, but cannot convert to a GTIDEvent") + continue + } + + u, _ := uuid.FromBytes(gtidEv.SID) + gtidNext = fmt.Sprintf("%s:%d", u.String(), gtidEv.GNO) + + bs.fileLog.Printf("next gtid: %s\n", gtidNext) + continue + } + + writeEV := bs.makeWriteEvent(ev) + if writeEV == nil { + // not a RowsEvent + continue + } + writeEV.nextGTID = gtidNext + + indexValues := bs.makeTableIndex(writeEV) + + rowHash, err := hashStringSliceToInt32(indexValues) + if err != nil { + bs.shellLog.Printf("[%s] calculate hash failed: %v", bs.SourceConn.Addr, err) + } + + chIdx := rowHash % int64(bs.WorkerCnt) + bs.WriteChs[chIdx] <- writeEV + } + + for _, ch := range bs.WriteChs { + close(ch) + } +} + +func (bs *BinlogSyncer) collector() { + var rowCnt int64 + var errCnt int64 + var errTypes = make(map[string]int) + + var start = time.Now() + for outStat := range bs.CountCh { + if outStat.err != nil { + errCnt += 1 + errTypes[outStat.err.Error()] += 1 + } + rowCnt += 1 + + if rowCnt%bs.TickCnt == 0 { + + rate := float64(bs.TickCnt) / time.Since(start).Seconds() + + om := &OutMessage{ + Synced: rowCnt, + Faild: errCnt, + Rate: rate, + Position: outStat.position, + Error: errTypes, + } + + outMessage, err := dumpYAML(om) + if err != nil { + bs.fileLog.Printf("[%s] dump out message to YAML failed: %v\n", bs.SourceConn.Addr, outMessage) + return + } + + outMessage = " ====== [" + bs.SourceConn.Addr + "] status ======\n" + outMessage + bs.shellLog.Printf(outMessage) + + start = time.Now() + } + } +} + +func (bs *BinlogSyncer) getEventConnection(row map[string]interface{}) (*DBInfo, error) { + shardValues := make([]interface{}, len(bs.TableShard)) + for i, k := range bs.TableShard { + shardValues[i] = row[k] + } + + shard := bs.findShards(shardValues) + + bs.mutex.Lock() + conn := bs.DBPool[shard.DBPort] + if conn == nil { + addr := bs.DBConfig[shard.DBPort] + + var err error + conn, err = client.Connect(addr.Addr, addr.User, addr.Password, addr.DBName) + if err != nil { + bs.shellLog.Printf("[%s] get connection failed: %v\n", bs.SourceConn, err) + return nil, err + } + bs.DBPool[addr.Port] = conn + } + bs.mutex.Unlock() + + dbInfo := &DBInfo{ + Conn: conn, + Shard: shard, + } + return dbInfo, nil +} + +func (bs *BinlogSyncer) findShards(tbShards []interface{}) *DBShard { + + //conf.Shards should be descending + i := sort.Search(len(bs.Shards), func(i int) bool { + shard := bs.Shards[i].From + rst, err := compareSlice(shard, tbShards) + if err != nil { + bs.shellLog.Printf("[%s] compare table shards failed: %v\n", bs.SourceConn.Addr, err) + } + + if rst <= 0 { + return true + } + return false + }) + + if i >= 0 && i < len(bs.Shards) { + return &bs.Shards[i] + } + + bs.shellLog.Printf("[%s] can not find shard: index out of bound", bs.SourceConn.Addr) + return nil +} + +func (bs *BinlogSyncer) makeWriteEvent(ev *replication.BinlogEvent) *WriteEvent { + var before map[string]interface{} + var after map[string]interface{} + + rowEv, ok := ev.Event.(*replication.RowsEvent) + if !ok { + bs.fileLog.Printf("event is not a rows event, got: %v\n", ev.Header.EventType) + return nil + } + + table := string(rowEv.Table.Table) + if table != bs.TableName { + bs.fileLog.Printf("rows event is not the required table, get %v\n", table) + return nil + } + + if len(rowEv.Rows) == 2 { + before = bs.formatRow(rowEv.Rows[0]) + after = bs.formatRow(rowEv.Rows[1]) + } else { + before = nil + after = bs.formatRow(rowEv.Rows[0]) + } + + return &WriteEvent{ + event: ev, + before: before, + after: after, + } +} + +func (bs *BinlogSyncer) makeTableIndex(ev *WriteEvent) []string { + index := make([]string, len(bs.TableIndex)) + for i, k := range bs.TableIndex { + if ev.before != nil { + index[i] = fmt.Sprintf("%v", ev.before[k]) + } else { + index[i] = fmt.Sprintf("%v", ev.after[k]) + } + } + + return index +} diff --git a/go/src/app/binlogsync/config.yaml b/go/src/app/binlogsync/config.yaml new file mode 100644 index 0000000..86df517 --- /dev/null +++ b/go/src/app/binlogsync/config.yaml @@ -0,0 +1,36 @@ +Confs: + - + WriteWorkerCnt: 2 + + SourceConn: + Addr: 106.14.46.83:3308 + Host: 106.14.46.83 + Port: "3308" + User: root + Password: Password + DBName: db + + DBConfig: + 4491: + Addr: 139.224.229.252:4491 + Host: 139.224.229.252 + Port: "4491" + User: root + Password: Password + DBName: db + + Shards: + - + From: [1090000000000000096, u, 1.jpg, 0] + Table: key_1090000000000000096_u_1.jpg + DBPort: "4491" + + TableName: key_0000000000000000000__ + TableField: ["bucket_id", "scope", "key", "ts", "is_del", "owner", "acl", "sha1", "ver", "md5", "crc32", "size", "file_meta", "group_id", "origo", "expires", "multipart"] + TableShard: ["bucket_id" , "scope", "key", "ts"] + TableIndex: ["bucket_id" , "scope", "key", "ts"] + + GTIDSet: bad0552d-cb43-11e6-a6ef-00163e0e31a0:1-20482 + BinlogFile: mysql-bin.000004 + BinlogPos: 4 + TickCnt: 40 diff --git a/go/src/app/binlogsync/main.go b/go/src/app/binlogsync/main.go new file mode 100644 index 0000000..892e783 --- /dev/null +++ b/go/src/app/binlogsync/main.go @@ -0,0 +1,87 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os" + "sync" +) + +var ( + logFileName = "binlog_sync.out" + confName = "./config.json" + channelCapacity = 10240 + + logFile *os.File + shellLog *log.Logger + + mainWG *sync.WaitGroup +) + +type Config struct { + + // WorkerCnt specifies how many goroutine used to execute sql statement + WorkerCnt int `yaml:"WriteWorkerCnt"` + + SourceConn Connection `yaml:"SourceConn"` + + DBConfig map[string]Connection `yaml:"DBConfig"` + Shards []DBShard `yaml:"Shards"` + + TableName string `yaml:"TableName"` + TableField []string `yaml:"TableField"` + TableShard []string `yaml:"TableShard"` + TableIndex []string `yaml:"TableIndex"` + + // if specifies GTIDSet, binlog file and pos will be ignored + GTIDSet string `yaml:"GTIDSet"` + + BinlogFile string `yaml:"BinlogFile"` + BinlogPos int32 `yaml:"BinlogPos"` + + // TickCnt specifies every `TickCnt` rows synced got 1 status report + TickCnt int64 `yaml:"TickCnt"` +} + +type ConfigList struct { + Confs []Config `yaml:"Confs"` +} + +func main() { + + // get arguments + getInput() + + // set log + var err error + shellLog = log.New(os.Stdout, "", 0) + logFile, err = os.Create(logFileName) + if err != nil { + shellLog.Panicf("create log file failed: %v\n", err) + } + defer logFile.Close() + + // read config + confList := &ConfigList{} + err = parseYAML(confName, &confList) + if err != nil { + shellLog.Panicf("read config file failed: %v\n", err) + } + + mainWG = &sync.WaitGroup{} + for _, conf := range confList.Confs { + syncer := NewBinlogSyncer(conf) + mainWG.Add(1) + syncer.Sync() + } + mainWG.Wait() +} + +func getInput() { + flag.StringVar(&logFileName, "log", logFileName, "file name to output error log") + flag.StringVar(&confName, "config", confName, "configration file path") + + flag.Parse() + fmt.Printf("log: %v, conf: %v\n", logFileName, confName) +} diff --git a/go/src/app/binlogsync/sqlUtil.go b/go/src/app/binlogsync/sqlUtil.go new file mode 100644 index 0000000..1c6612a --- /dev/null +++ b/go/src/app/binlogsync/sqlUtil.go @@ -0,0 +1,131 @@ +package main + +import ( + "fmt" + "strconv" + "strings" + + "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" +) + +func makeUpdateSql(table string, idxField, tbField, idxValue, tbValue []string) string { + var setClause string + var whereClause string + var limitClause string + var tableClause string + + tableClause = quote(table) + setClause = makeSqlCondition(tbField, tbValue, "=", ", ") + whereClause = makeWhereClause(idxField, idxValue) + limitClause = "LIMIT 1" + + return fmt.Sprintf("UPDATE %s SET %s%s%s;", tableClause, setClause, whereClause, limitClause) +} + +func makeInsertSql(table string, tbField, tbValue []string) string { + var fldClause string + var valClause string + var tableClause string + + tableClause = quote(table) + + fld := make([]string, len(tbField)) + val := make([]string, len(tbValue)) + + for i := 0; i < len(tbField); i++ { + fld[i] = quote(tbField[i]) + val[i] = "\"" + mysql.Escape(tbValue[i]) + "\"" + } + + fldClause = "(" + strings.Join(fld, ", ") + ")" + valClause = "(" + strings.Join(val, ", ") + ")" + + return fmt.Sprintf("INSERT INTO %s %s VALUES %s;", tableClause, fldClause, valClause) +} + +func makeDeleteSql(table string, idxField, idxValue []string) string { + var whereClause string + var limitClause string + var tableClause string + + tableClause = quote(table) + + whereClause = makeWhereClause(idxField, idxValue) + limitClause = "LIMIT 1" + + return fmt.Sprintf("DELETE FROM %s %s %s;", tableClause, whereClause, limitClause) +} + +func makeWhereClause(fields, values []string) string { + return fmt.Sprintf("WHERE %s", makeSqlCondition(fields, values, "=", " AND ")) +} + +func makeSqlCondition(fields, values []string, operator, formatter string) string { + conds := make([]string, len(fields)) + + for i, k := range fields { + conds[i] = fmt.Sprintf("%s%s%s", quote(k), operator, "\""+mysql.Escape(values[i])+"\"") + } + + return strings.Join(conds, formatter) +} + +func quote(src string) string { + + rst := strings.Replace(src, "`", "\\`", -1) + + return "`" + rst + "`" +} + +func newBinlogReader(conn *Connection, serverID int32) (*replication.BinlogSyncer, error) { + port, err := strconv.ParseInt(conn.Port, 10, 16) + if err != nil { + return nil, err + } + + binlogCfg := replication.BinlogSyncerConfig{ + ServerID: uint32(serverID), + Flavor: "mysql", + Host: conn.Host, + Port: uint16(port), + User: conn.User, + Password: conn.Password, + } + + return replication.NewBinlogSyncer(binlogCfg), nil +} + +func newBinlogReaderByPosition(conn *Connection, binlogFile string, binlogPos int32, serverID int32) (*replication.BinlogStreamer, error) { + + reader, err := newBinlogReader(conn, serverID) + if err != nil { + return nil, err + } + + streamer, err := reader.StartSync(mysql.Position{binlogFile, uint32(binlogPos)}) + if err != nil { + return nil, err + } + + return streamer, nil +} + +func newBinlogReaderByGTID(conn *Connection, GTID string, serverID int32) (*replication.BinlogStreamer, error) { + reader, err := newBinlogReader(conn, serverID) + if err != nil { + return nil, err + } + + gtidSet, err := mysql.ParseMysqlGTIDSet(GTID) + if err != nil { + return nil, err + } + + streamer, err := reader.StartSyncGTID(gtidSet) + if err != nil { + return nil, err + } + + return streamer, nil +} diff --git a/go/src/app/binlogsync/util.go b/go/src/app/binlogsync/util.go new file mode 100644 index 0000000..cf99d35 --- /dev/null +++ b/go/src/app/binlogsync/util.go @@ -0,0 +1,90 @@ +package main + +import ( + "errors" + "fmt" + "hash/fnv" + "io/ioutil" + "reflect" + "strings" + + yaml "gopkg.in/yaml.v2" +) + +func hashStringSliceToInt32(src []string) (int64, error) { + h := fnv.New32() + for _, s := range src { + _, err := h.Write([]byte(s)) + if err != nil { + return 0, err + } + } + + return int64(h.Sum32()), nil +} + +func compareSlice(a, b []interface{}) (int, error) { + lenA := len(a) + lenB := len(b) + if lenA != lenB { + return 0, errors.New(fmt.Sprintf("length of slices not equals: %d != %d", lenA, lenB)) + } + + for i, v := range a { + var rst int + switch v.(type) { + case int: + ai := reflect.ValueOf(v).Int() + bi := reflect.ValueOf(b[i]).Int() + rst = compareInt(ai, bi) + case string: + rst = strings.Compare(v.(string), b[i].(string)) + default: + return 0, errors.New(fmt.Sprintf("unknow type of element: %v", reflect.TypeOf(v))) + } + + if rst == 0 { + continue + } + + return rst, nil + } + + return 0, nil +} + +func compareInt(a, b int64) int { + if a == b { + return 0 + } + + if a > b { + return 1 + } else { + return -1 + } +} + +func parseYAML(filename string, v interface{}) error { + data, err := ioutil.ReadFile(filename) + if err != nil { + return err + } + + err = yaml.Unmarshal(data, v) + if err != nil { + return err + } + + return nil +} + +func dumpYAML(v interface{}) (string, error) { + + data, err := yaml.Marshal(v) + if err != nil { + return "", err + } + + return string(data), nil +} From e9b65a0300c20c2b8c92f696ffa2153c21fe03be Mon Sep 17 00:00:00 2001 From: wenbo Date: Fri, 9 Nov 2018 15:58:59 +0800 Subject: [PATCH 2/3] fix database connection pool --- go/src/app/binlogsync/binlog_sync.go | 102 ++++++++++++++------------- go/src/app/binlogsync/main.go | 2 +- go/src/app/binlogsync/util.go | 40 ++++++++--- 3 files changed, 87 insertions(+), 57 deletions(-) diff --git a/go/src/app/binlogsync/binlog_sync.go b/go/src/app/binlogsync/binlog_sync.go index 32de05a..f9ed172 100644 --- a/go/src/app/binlogsync/binlog_sync.go +++ b/go/src/app/binlogsync/binlog_sync.go @@ -2,6 +2,7 @@ package main import ( "context" + "database/sql" "fmt" "log" "reflect" @@ -9,8 +10,8 @@ import ( "sync" "time" + "github.com/go-sql-driver/mysql" uuid "github.com/satori/go.uuid" - "github.com/siddontang/go-mysql/client" "github.com/siddontang/go-mysql/replication" ) @@ -24,7 +25,7 @@ type Connection struct { } type DBInfo struct { - Conn *client.Conn + Conn *sql.DB Shard *DBShard } @@ -38,14 +39,13 @@ type WriteEvent struct { event *replication.BinlogEvent before map[string]interface{} after map[string]interface{} - dbInfo *DBInfo nextGTID string } type BinlogSyncer struct { Config - DBPool map[string]*client.Conn + DBPool map[string]*sql.DB WriteChs []chan *WriteEvent CountCh chan *Status @@ -53,6 +53,8 @@ type BinlogSyncer struct { wg *sync.WaitGroup shellLog *log.Logger fileLog *log.Logger + + stop bool } type BinlogPosition struct { @@ -62,11 +64,11 @@ type BinlogPosition struct { } type OutMessage struct { - Synced int64 `yaml:"Synced"` - Faild int64 `yaml:"Faild"` - Rate float64 `yaml:"Rate(rows/s)"` - Position BinlogPosition `yaml:"Position"` - Error map[string]int `yaml:"Error"` + Synced int64 `yaml:"Synced"` + Faild int64 `yaml:"Faild"` + RowsPerSec float64 `yaml:"RowsPerSec"` + Position BinlogPosition `yaml:"Position"` + Error map[string]int `yaml:"Error"` } type Status struct { @@ -83,7 +85,7 @@ func NewBinlogSyncer(conf Config) *BinlogSyncer { bs := &BinlogSyncer{ Config: conf, - DBPool: make(map[string]*client.Conn), + DBPool: make(map[string]*sql.DB), WriteChs: make([]chan *WriteEvent, conf.WorkerCnt), CountCh: make(chan *Status, channelCapacity*conf.WorkerCnt), @@ -91,6 +93,7 @@ func NewBinlogSyncer(conf Config) *BinlogSyncer { wg: &sync.WaitGroup{}, shellLog: shellLog, fileLog: fileLog, + stop: false, } return bs @@ -131,9 +134,8 @@ func (bs *BinlogSyncer) Sync() { } func (bs *BinlogSyncer) formatRow(srcRow []interface{}) map[string]interface{} { - // the first column `id` should not put in new rowValue - rowValue := make([]interface{}, len(srcRow)-1) - for i, v := range srcRow[1:] { + rowValue := make([]interface{}, len(srcRow)) + for i, v := range srcRow { if v == nil { continue } @@ -163,10 +165,8 @@ func (bs *BinlogSyncer) writeToDB(chIdx int, inCh chan *WriteEvent) { for ev := range inCh { if ev.event.Header.EventType == replication.ROTATE_EVENT { - rotateEv, ok := ev.event.Event.(*replication.RotateEvent) - if !ok { - bs.fileLog.Printf("event is RotateEvent, but cannot convert to a RotateEvent") - } + rotateEv, _ := ev.event.Event.(*replication.RotateEvent) + stat := &Status{ goroutineIndex: chIdx, position: BinlogPosition{ @@ -178,10 +178,12 @@ func (bs *BinlogSyncer) writeToDB(chIdx int, inCh chan *WriteEvent) { continue } + var dbInfo *DBInfo var err error - ev.dbInfo, err = bs.getEventConnection(ev.after) + dbInfo, err = bs.getWriteConnection(ev.after) if err != nil { bs.shellLog.Printf("[%s] get connection failed: %v\n", bs.SourceConn.Addr, err) + bs.stop = true return } @@ -198,25 +200,28 @@ func (bs *BinlogSyncer) writeToDB(chIdx int, inCh chan *WriteEvent) { bs.fileLog.Printf("routin index: %d, before: %v, after: %v, event type: %v\n", chIdx, ev.before, ev.after, evType) if evType == replication.UPDATE_ROWS_EVENTv2 || evType == replication.UPDATE_ROWS_EVENTv1 || evType == replication.UPDATE_ROWS_EVENTv0 { - sql = makeUpdateSql(ev.dbInfo.Shard.Table, bs.TableIndex, bs.TableField, index, value) + sql = makeUpdateSql(dbInfo.Shard.Table, bs.TableField, bs.TableField, value, value) } else if evType == replication.DELETE_ROWS_EVENTv2 || evType == replication.DELETE_ROWS_EVENTv1 || evType == replication.DELETE_ROWS_EVENTv0 { - sql = makeDeleteSql(ev.dbInfo.Shard.Table, bs.TableIndex, index) + sql = makeDeleteSql(dbInfo.Shard.Table, bs.TableIndex, index) } else if evType == replication.WRITE_ROWS_EVENTv2 || evType == replication.WRITE_ROWS_EVENTv1 || evType == replication.WRITE_ROWS_EVENTv0 { - sql = makeInsertSql(ev.dbInfo.Shard.Table, bs.TableField, value) + sql = makeInsertSql(dbInfo.Shard.Table, bs.TableField, value) } else { continue } bs.fileLog.Printf("routin index: %d, get sql statement: %v", chIdx, sql) - bs.mutex.Lock() - _, err = ev.dbInfo.Conn.Execute(sql) - bs.mutex.Unlock() + _, err = dbInfo.Conn.Exec(sql) if err != nil { bs.fileLog.Printf("Execute error: %v\n", err) + + sqlErr := err.(*mysql.MySQLError) + if sqlErr.Number != 1062 { + bs.stop = true + } } stat := &Status{ @@ -254,11 +259,7 @@ func (bs *BinlogSyncer) readBinlog(binlogReader *replication.BinlogStreamer) { } if ev.Header.EventType == replication.GTID_EVENT { - gtidEv, ok := ev.Event.(*replication.GTIDEvent) - if !ok { - bs.fileLog.Printf("event is GTIDEvent, but cannot convert to a GTIDEvent") - continue - } + gtidEv, _ := ev.Event.(*replication.GTIDEvent) u, _ := uuid.FromBytes(gtidEv.SID) gtidNext = fmt.Sprintf("%s:%d", u.String(), gtidEv.GNO) @@ -278,10 +279,13 @@ func (bs *BinlogSyncer) readBinlog(binlogReader *replication.BinlogStreamer) { rowHash, err := hashStringSliceToInt32(indexValues) if err != nil { - bs.shellLog.Printf("[%s] calculate hash failed: %v", bs.SourceConn.Addr, err) + bs.shellLog.Panicf("[%s] calculate hash failed: %v", bs.SourceConn.Addr, err) } chIdx := rowHash % int64(bs.WorkerCnt) + if bs.stop { + break + } bs.WriteChs[chIdx] <- writeEV } @@ -296,29 +300,30 @@ func (bs *BinlogSyncer) collector() { var errTypes = make(map[string]int) var start = time.Now() + for outStat := range bs.CountCh { if outStat.err != nil { errCnt += 1 errTypes[outStat.err.Error()] += 1 } + rowCnt += 1 if rowCnt%bs.TickCnt == 0 { - rate := float64(bs.TickCnt) / time.Since(start).Seconds() + rowsPerSec := float64(bs.TickCnt) / time.Since(start).Seconds() om := &OutMessage{ - Synced: rowCnt, - Faild: errCnt, - Rate: rate, - Position: outStat.position, - Error: errTypes, + Synced: rowCnt, + Faild: errCnt, + RowsPerSec: rowsPerSec, + Position: outStat.position, + Error: errTypes, } - outMessage, err := dumpYAML(om) + outMessage, err := marshalYAML(om) if err != nil { bs.fileLog.Printf("[%s] dump out message to YAML failed: %v\n", bs.SourceConn.Addr, outMessage) - return } outMessage = " ====== [" + bs.SourceConn.Addr + "] status ======\n" + outMessage @@ -329,7 +334,7 @@ func (bs *BinlogSyncer) collector() { } } -func (bs *BinlogSyncer) getEventConnection(row map[string]interface{}) (*DBInfo, error) { +func (bs *BinlogSyncer) getWriteConnection(row map[string]interface{}) (*DBInfo, error) { shardValues := make([]interface{}, len(bs.TableShard)) for i, k := range bs.TableShard { shardValues[i] = row[k] @@ -342,8 +347,11 @@ func (bs *BinlogSyncer) getEventConnection(row map[string]interface{}) (*DBInfo, if conn == nil { addr := bs.DBConfig[shard.DBPort] + // DSN(Data Source Name) in go-sql-driver + dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s", addr.User, addr.Password, addr.Addr, addr.DBName) + var err error - conn, err = client.Connect(addr.Addr, addr.User, addr.Password, addr.DBName) + conn, err = sql.Open("mysql", dsn) if err != nil { bs.shellLog.Printf("[%s] get connection failed: %v\n", bs.SourceConn, err) return nil, err @@ -361,25 +369,23 @@ func (bs *BinlogSyncer) getEventConnection(row map[string]interface{}) (*DBInfo, func (bs *BinlogSyncer) findShards(tbShards []interface{}) *DBShard { + lenShards := len(bs.Shards) //conf.Shards should be descending - i := sort.Search(len(bs.Shards), func(i int) bool { + i := sort.Search(lenShards, func(i int) bool { shard := bs.Shards[i].From rst, err := compareSlice(shard, tbShards) if err != nil { - bs.shellLog.Printf("[%s] compare table shards failed: %v\n", bs.SourceConn.Addr, err) + bs.shellLog.Panicf("[%s] compare table shards failed: %v\n", bs.SourceConn.Addr, err) } - if rst <= 0 { - return true - } - return false + return rst <= 0 }) - if i >= 0 && i < len(bs.Shards) { + if i >= 0 && i < lenShards { return &bs.Shards[i] } - bs.shellLog.Printf("[%s] can not find shard: index out of bound", bs.SourceConn.Addr) + bs.shellLog.Printf("[%s] can not find shard: %v, index out of bound, got index: %d, shards len: %d", bs.SourceConn.Addr, tbShards, i, lenShards) return nil } diff --git a/go/src/app/binlogsync/main.go b/go/src/app/binlogsync/main.go index 892e783..0c7f101 100644 --- a/go/src/app/binlogsync/main.go +++ b/go/src/app/binlogsync/main.go @@ -64,7 +64,7 @@ func main() { // read config confList := &ConfigList{} - err = parseYAML(confName, &confList) + err = unmarshalYAML(confName, &confList) if err != nil { shellLog.Panicf("read config file failed: %v\n", err) } diff --git a/go/src/app/binlogsync/util.go b/go/src/app/binlogsync/util.go index cf99d35..8404437 100644 --- a/go/src/app/binlogsync/util.go +++ b/go/src/app/binlogsync/util.go @@ -26,16 +26,19 @@ func hashStringSliceToInt32(src []string) (int64, error) { func compareSlice(a, b []interface{}) (int, error) { lenA := len(a) lenB := len(b) - if lenA != lenB { - return 0, errors.New(fmt.Sprintf("length of slices not equals: %d != %d", lenA, lenB)) - } for i, v := range a { + + if i >= lenB { + return 1, nil + } + var rst int + switch v.(type) { - case int: - ai := reflect.ValueOf(v).Int() - bi := reflect.ValueOf(b[i]).Int() + case int, int8, int16, int32, int64: + ai, _ := interfaceToInt64(v) + bi, _ := interfaceToInt64(b[i]) rst = compareInt(ai, bi) case string: rst = strings.Compare(v.(string), b[i].(string)) @@ -50,6 +53,10 @@ func compareSlice(a, b []interface{}) (int, error) { return rst, nil } + if lenA < lenB { + return -1, nil + } + return 0, nil } @@ -65,7 +72,24 @@ func compareInt(a, b int64) int { } } -func parseYAML(filename string, v interface{}) error { +func interfaceToInt64(v interface{}) (int64, error) { + switch v.(type) { + case int: + return int64(v.(int)), nil + case int8: + return int64(v.(int8)), nil + case int16: + return int64(v.(int16)), nil + case int32: + return int64(v.(int32)), nil + case int64: + return v.(int64), nil + default: + return 0, errors.New(fmt.Sprintf("unknow type of value:%v, type: %T", v, v)) + } +} + +func unmarshalYAML(filename string, v interface{}) error { data, err := ioutil.ReadFile(filename) if err != nil { return err @@ -79,7 +103,7 @@ func parseYAML(filename string, v interface{}) error { return nil } -func dumpYAML(v interface{}) (string, error) { +func marshalYAML(v interface{}) (string, error) { data, err := yaml.Marshal(v) if err != nil { From 1f060317223dd3056b6d72b0faebd145bbb351d8 Mon Sep 17 00:00:00 2001 From: wenbo Date: Tue, 20 Nov 2018 11:38:15 +0800 Subject: [PATCH 3/3] add controller --- go/src/app/binlogsync/binlog_sync.go | 86 +++++++++++++--------------- go/src/app/binlogsync/config.yaml | 83 +++++++++++++++++++-------- go/src/app/binlogsync/main.go | 41 ++++++------- 3 files changed, 118 insertions(+), 92 deletions(-) diff --git a/go/src/app/binlogsync/binlog_sync.go b/go/src/app/binlogsync/binlog_sync.go index f9ed172..0e5fe65 100644 --- a/go/src/app/binlogsync/binlog_sync.go +++ b/go/src/app/binlogsync/binlog_sync.go @@ -47,13 +47,15 @@ type BinlogSyncer struct { DBPool map[string]*sql.DB WriteChs []chan *WriteEvent - CountCh chan *Status + CountCh chan *Result mutex *sync.Mutex wg *sync.WaitGroup shellLog *log.Logger fileLog *log.Logger + stat *Status + stop bool } @@ -63,44 +65,47 @@ type BinlogPosition struct { NextGTID string `yaml:"GTID"` } -type OutMessage struct { - Synced int64 `yaml:"Synced"` - Faild int64 `yaml:"Faild"` +type Status struct { + Finished int64 `yaml:"Finished"` + Failed int64 `yaml:"Failed"` + Errors map[string]int `yaml:"Errors"` RowsPerSec float64 `yaml:"RowsPerSec"` - Position BinlogPosition `yaml:"Position"` - Error map[string]int `yaml:"Error"` } -type Status struct { +type Result struct { err error goroutineIndex int + gtid string + position BinlogPosition } -func NewBinlogSyncer(conf Config) *BinlogSyncer { +func NewBinlogSyncer(conf *Config) *BinlogSyncer { fileLog := log.New(logFile, "", log.Ldate|log.Ltime|log.Lshortfile) fileLog.SetPrefix("[" + conf.SourceConn.Addr + "] ") bs := &BinlogSyncer{ - Config: conf, + Config: *conf, DBPool: make(map[string]*sql.DB), WriteChs: make([]chan *WriteEvent, conf.WorkerCnt), - CountCh: make(chan *Status, channelCapacity*conf.WorkerCnt), + CountCh: make(chan *Result, channelCapacity*conf.WorkerCnt), mutex: &sync.Mutex{}, wg: &sync.WaitGroup{}, shellLog: shellLog, fileLog: fileLog, - stop: false, + stat: &Status{ + Errors: make(map[string]int), + }, + stop: false, } return bs } func (bs *BinlogSyncer) Sync() { - defer mainWG.Done() var binlogReader *replication.BinlogStreamer var err error @@ -167,14 +172,14 @@ func (bs *BinlogSyncer) writeToDB(chIdx int, inCh chan *WriteEvent) { if ev.event.Header.EventType == replication.ROTATE_EVENT { rotateEv, _ := ev.event.Event.(*replication.RotateEvent) - stat := &Status{ + rst := &Result{ goroutineIndex: chIdx, position: BinlogPosition{ BinlogPos: int64(rotateEv.Position), BinlogFile: string(rotateEv.NextLogName), }, } - bs.CountCh <- stat + bs.CountCh <- rst continue } @@ -224,7 +229,7 @@ func (bs *BinlogSyncer) writeToDB(chIdx int, inCh chan *WriteEvent) { } } - stat := &Status{ + rst := &Result{ err: err, goroutineIndex: chIdx, position: BinlogPosition{ @@ -235,7 +240,7 @@ func (bs *BinlogSyncer) writeToDB(chIdx int, inCh chan *WriteEvent) { } bs.fileLog.Printf("routin index: %d, event position: %d\n", chIdx, ev.event.Header.LogPos) - bs.CountCh <- stat + bs.CountCh <- rst } } @@ -295,45 +300,36 @@ func (bs *BinlogSyncer) readBinlog(binlogReader *replication.BinlogStreamer) { } func (bs *BinlogSyncer) collector() { - var rowCnt int64 - var errCnt int64 - var errTypes = make(map[string]int) var start = time.Now() + var tickCnt = int64(0) - for outStat := range bs.CountCh { - if outStat.err != nil { - errCnt += 1 - errTypes[outStat.err.Error()] += 1 - } - - rowCnt += 1 - - if rowCnt%bs.TickCnt == 0 { - - rowsPerSec := float64(bs.TickCnt) / time.Since(start).Seconds() - - om := &OutMessage{ - Synced: rowCnt, - Faild: errCnt, - RowsPerSec: rowsPerSec, - Position: outStat.position, - Error: errTypes, - } + ticker := time.NewTicker(time.Duration(time.Minute * 1)) + defer ticker.Stop() - outMessage, err := marshalYAML(om) - if err != nil { - bs.fileLog.Printf("[%s] dump out message to YAML failed: %v\n", bs.SourceConn.Addr, outMessage) - } + go func() { + for _ = range ticker.C { + bs.stat.RowsPerSec = float64(tickCnt) / time.Since(start).Seconds() + tickCnt = 0 + start = time.Now() + } + }() - outMessage = " ====== [" + bs.SourceConn.Addr + "] status ======\n" + outMessage - bs.shellLog.Printf(outMessage) + for rst := range bs.CountCh { + bs.stat.Finished += 1 + tickCnt += 1 - start = time.Now() + if rst.err != nil { + bs.stat.Errors[rst.err.Error()] += 1 + bs.stat.Failed += 1 } } } +func (bs *BinlogSyncer) GetStat() *Status { + return bs.stat +} + func (bs *BinlogSyncer) getWriteConnection(row map[string]interface{}) (*DBInfo, error) { shardValues := make([]interface{}, len(bs.TableShard)) for i, k := range bs.TableShard { diff --git a/go/src/app/binlogsync/config.yaml b/go/src/app/binlogsync/config.yaml index 86df517..09f3234 100644 --- a/go/src/app/binlogsync/config.yaml +++ b/go/src/app/binlogsync/config.yaml @@ -1,36 +1,69 @@ -Confs: - - +worker1: WriteWorkerCnt: 2 SourceConn: - Addr: 106.14.46.83:3308 - Host: 106.14.46.83 - Port: "3308" - User: root - Password: Password - DBName: db + Addr: 127.0.0.1:3333 + Host: 127.0.0.1 + Port: "3333" + User: admin + Password: password + DBName: mysql DBConfig: - 4491: - Addr: 139.224.229.252:4491 - Host: 139.224.229.252 - Port: "4491" - User: root - Password: Password - DBName: db + 4444: + Addr: 127.0.0.2:4444 + Host: 127.0.0.2 + Port: "4444" + User: admin + Password: password + DBName: mysql Shards: - - From: [1090000000000000096, u, 1.jpg, 0] - Table: key_1090000000000000096_u_1.jpg - DBPort: "4491" + From: [field0, field1, field2, ...] + Table: destination table + DBPort: "4444" - TableName: key_0000000000000000000__ - TableField: ["bucket_id", "scope", "key", "ts", "is_del", "owner", "acl", "sha1", "ver", "md5", "crc32", "size", "file_meta", "group_id", "origo", "expires", "multipart"] - TableShard: ["bucket_id" , "scope", "key", "ts"] - TableIndex: ["bucket_id" , "scope", "key", "ts"] + TableName: sorce table + TableField: [] + TableShard: [] + TableIndex: [] - GTIDSet: bad0552d-cb43-11e6-a6ef-00163e0e31a0:1-20482 - BinlogFile: mysql-bin.000004 + GTIDSet: string + BinlogFile: mysql-bin.000001 + BinlogPos: 4 + +worker2: + WriteWorkerCnt: 2 + + SourceConn: + Addr: 127.0.0.1:5555 + Host: 127.0.0.1 + Port: "5555" + User: admin + Password: password + DBName: mysql + + DBConfig: + 6666: + Addr: 127.0.0.2:6666 + Host: 127.0.0.2 + Port: "6666" + User: admin + Password: password + DBName: mysql + + Shards: + - + From: [field0, field1, field2, ...] + Table: destination table + DBPort: "6666" + + TableName: source table + TableField: [] + TableShard: [] + TableIndex: [] + + GTIDSet: string + BinlogFile: mysql-bin.000001 BinlogPos: 4 - TickCnt: 40 diff --git a/go/src/app/binlogsync/main.go b/go/src/app/binlogsync/main.go index 0c7f101..a228017 100644 --- a/go/src/app/binlogsync/main.go +++ b/go/src/app/binlogsync/main.go @@ -39,49 +39,46 @@ type Config struct { BinlogFile string `yaml:"BinlogFile"` BinlogPos int32 `yaml:"BinlogPos"` - - // TickCnt specifies every `TickCnt` rows synced got 1 status report - TickCnt int64 `yaml:"TickCnt"` -} - -type ConfigList struct { - Confs []Config `yaml:"Confs"` } func main() { - // get arguments - getInput() - // set log var err error - shellLog = log.New(os.Stdout, "", 0) logFile, err = os.Create(logFileName) if err != nil { - shellLog.Panicf("create log file failed: %v\n", err) + panic(fmt.Sprintf("create log file failed: %v\n", err)) } defer logFile.Close() + shellLog = log.New(os.Stdout, "", 0) + // read config - confList := &ConfigList{} - err = unmarshalYAML(confName, &confList) + var confList = make(map[string]*Config) + err = unmarshalYAML(confName, confList) if err != nil { - shellLog.Panicf("read config file failed: %v\n", err) + fmt.Printf("read config file failed: %v\n", err) + return } - mainWG = &sync.WaitGroup{} - for _, conf := range confList.Confs { + controller := NewController() + + for w, conf := range confList { syncer := NewBinlogSyncer(conf) - mainWG.Add(1) - syncer.Sync() + controller.AddWorker(&Worker{ + name: w, + syncer: syncer, + config: conf, + }) + go syncer.Sync() } - mainWG.Wait() + + controller.Listen("8888") } -func getInput() { +func init() { flag.StringVar(&logFileName, "log", logFileName, "file name to output error log") flag.StringVar(&confName, "config", confName, "configration file path") flag.Parse() - fmt.Printf("log: %v, conf: %v\n", logFileName, confName) }