Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
393 changes: 393 additions & 0 deletions go/src/binlogsync/binlog_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,393 @@
package main

import (
"context"
"fmt"
"log"
"os"
"reflect"
"strconv"
"strings"
"sync"
"time"

"github.com/siddontang/go-mysql/client"
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
)

type Connection struct {
Addr string // can be ip:port or a unix socket domain
Host string
Port string
User string
Password string
DBName string
}

type DBInfo struct {
Conn *client.Conn
Shard *DBShard
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1个connection只有1个shard?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是一条 sql 记录对应一个 DBInfo, 包括它所属的 shard 和 这个 shard 对应的connection

}

type DBShard struct {
From []string
Table string
DBPort string
}

type WriteEvent struct {
event *replication.BinlogEvent
before map[string]string
after map[string]string
dbInfo *DBInfo
}

type OutStatus struct {
err error
routineIndex int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

routine中文是什么意思?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

goroutine 类似线程

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

叫goroutine吧...routine的解释有点多

logPos uint32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里如果可以的话用有符号整数. 所有代码中避免使用无符号数, 一遇到减法就容易溢出

}

type Config struct {
ChCap int
ChCnt int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这2个变量不造是啥意思...配置项当做全局变量,尽量用完整单词.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好 是指的 channel 的数量和容量。这里改一下

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

所有的channel不放在一个slice里吗?如果放的话用slice的len维护数量就行了...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里想的在配置文件里 指定一下 需要创建多少个 channel


SourceConn Connection

DBAddrs map[string]Connection
Shards []DBShard

TableName string
TableField []string
TableShard []string
TableIndex []string

BinlogFile string
BinlogPos uint32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

咱们应该都是用的gtid来追踪binlog的. 文件+pos的方式在多个源上不一样.不太好处理.
不造你这里为啥是要加这2个东西

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

之前参考的python版 的同步用的是 文件+pos 的方式,这里也用这样来读binlog 的。我再看看 gtid 的方式

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

恩...好像读的话还必须对源还必须制定binlog的file+pos, 不造有没有通过gtid来指定的方式...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有的 增加了一个根据 gtid 读


TickCnt int64
}

var logFile *os.File

var mutex sync.Mutex

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

锁不用引用类型感觉怪怪得

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

噢 用引用吧


var (
FileLog *log.Logger
ShellLog *log.Logger
)

var dbPool = make(map[string]*client.Conn)
var conf = Config{}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var conf Config 应该就可以了

var logFileName = "binlog_sync.out"
var confName = "./config.json"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logFileName confName 还是作为命令行参数传进来吧,方便调试

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗷嗷 可以 改一下


func validRow(srcRow []interface{}) map[string]string {
// the first column `id` should not put in new rowValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不太明白, 都处理成string的好处是啥?

函数名不太合适, normalize 好点...valid是形容词

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

都变成 string 方便一个row放在一个map。保留原格式像 int []byte 好像也没特别用处..

rowValue := make([]string, len(srcRow)-1)
for i, v := range srcRow[1:] {
if v == nil {
continue
}

var tmp string
if reflect.TypeOf(v).String() == "[]uint8" {
// convert []byte to string
tmp = fmt.Sprintf("%s", v)
} else {
tmp = fmt.Sprintf("%v", v)
}

rowValue[i] = tmp
}

row := make(map[string]string)

for i, k := range conf.TableField {
row[k] = rowValue[i]
}

return row
}

func newBinlogReader(conn *Connection, binlogFile string, binlogPos uint32, serverID uint32) (*replication.BinlogStreamer, error) {

port, err := strconv.ParseInt(conn.Port, 10, 16)
if err != nil {
return nil, err
}

binlogCfg := replication.BinlogSyncerConfig{
ServerID: serverID,
Flavor: "mysql",
Host: conn.Host,
Port: uint16(port),
User: conn.User,
Password: conn.Password,
}

syncer := replication.NewBinlogSyncer(binlogCfg)

streamer, err := syncer.StartSync(mysql.Position{binlogFile, binlogPos})
if err != nil {
return nil, err
}

return streamer, nil
}

func writeToDB(chIdx int, inCh chan *WriteEvent, outCh chan *OutStatus) {
for ev := range inCh {
mutex.Lock()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里需要lock吗?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getEventConnection 里面查找 shard 时有一段 二分查找的逻辑,会有值的修改和比较 需要锁上

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

二分查找为啥会有值的修改?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是根据下标 找 slice 里的 shard ,这个 shard 是全局的,过程中下标的修改可能导致找到错误的 shard 吧

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是不用加锁

ev.dbInfo = getEventConnection(ev.after)
mutex.Unlock()

index := makeTableIndex(ev)

value := make([]string, len(conf.TableField))
for i, k := range conf.TableField {
value[i] = ev.after[k]
}

var sql string
evType := ev.event.Header.EventType

FileLog.Printf("routin index: %d, before: %v, after: %v, event type: %v\n", chIdx, ev.before, ev.after, evType)

if evType == replication.UPDATE_ROWS_EVENTv2 || evType == replication.UPDATE_ROWS_EVENTv1 || evType == replication.UPDATE_ROWS_EVENTv0 {
sql = makeUpdateSql(ev.dbInfo.Shard.Table, conf.TableIndex, index, conf.TableField, value)

} else if evType == replication.DELETE_ROWS_EVENTv2 || evType == replication.DELETE_ROWS_EVENTv1 || evType == replication.DELETE_ROWS_EVENTv0 {

sql = makeDeleteSql(ev.dbInfo.Shard.Table, conf.TableIndex, index)
} else if evType == replication.WRITE_ROWS_EVENTv2 || evType == replication.WRITE_ROWS_EVENTv1 || evType == replication.WRITE_ROWS_EVENTv0 {

sql = makeInsertSql(ev.dbInfo.Shard.Table, conf.TableField, value)
} else {
continue
}

FileLog.Printf("routin index: %d, get sql statement: %v", chIdx, sql)

mutex.Lock()
_, err := ev.dbInfo.Conn.Execute(sql)
mutex.Unlock()
if err != nil {
FileLog.Printf("Execute error: %v\n", err)
}

stat := &OutStatus{
err: err,
routineIndex: chIdx,
logPos: ev.event.Header.LogPos,
}

FileLog.Printf("routin index: %d, event position: %d\n", chIdx, ev.event.Header.LogPos)
outCh <- stat
}
}

func collector(inCh chan *OutStatus) {
var rowCount int64
var errCount int64
var errTypes = make(map[string]int)

var start = time.Now()
for outStat := range inCh {
if outStat.err != nil {
errCount += 1
errTypes[outStat.err.Error()] += 1
}
rowCount += 1

if rowCount%conf.TickCnt == 0 {

ShellLog.Printf("========= sync stat =========\n")

ShellLog.Printf("has synced: %d rows\n", rowCount)
ShellLog.Printf("has error: %d rows\n", errCount)
for k, v := range errTypes {
ShellLog.Printf("%s: %d rows\n", k, v)
}

ShellLog.Printf("sync rate: %.3f rows per second\n", float64(conf.TickCnt)/time.Since(start).Seconds())
ShellLog.Printf("has synced log position: %d\n", outStat.logPos)

start = time.Now()
}
}
}

func getEventConnection(row map[string]string) *DBInfo {
shardValues := make([]string, len(conf.TableShard))
for i, k := range conf.TableShard {
shardValues[i] = row[k]
}

shard := findShards(shardValues)
conn := dbPool[shard.DBPort]
if conn == nil {
addr := conf.DBAddrs[shard.DBPort]
conn, err := client.Connect(addr.Addr, addr.User, addr.Password, addr.DBName)
if err != nil {
FileLog.Panicf("get connection failed: %v\n", err)
}
dbPool[addr.Port] = conn
}

return &DBInfo{
Conn: conn,
Shard: shard,
}
}

func findShards(tbShards []string) *DBShard {
le := 0
ri := len(conf.Shards)

for le < ri {
i := (le + ri) / 2
shard := conf.Shards[i].From

rst, err := compareStringSlice(shard, tbShards)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

额...row里的所有field都是string? 没有整数吗?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

从binlog 读出来后,都统一变成了 string 处理

if err != nil {
FileLog.Printf("conf is not valid, shard length not equal: %v\n", err)
return nil
}

if rst == 0 {
return &conf.Shards[i]
}

if rst < 0 {
le = i + 1
} else {
ri = i
}
}

if ri >= 1 {
return &conf.Shards[ri-1]
}

return &conf.Shards[0]
}

func main() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

用 waitGroup 保证一下执行完成

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

记得 close chanel

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

主线程收集处理结果,其他交由 goroutine 处理。


// set log
ShellLog = log.New(os.Stdout, "", 0)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ShellLog 为啥大写?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好像没啥特殊的,就用小写吧


logFile, err := os.Create(logFileName)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里覆盖了你定义的全局 logFile

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是.. 改一下..

if err != nil {
ShellLog.Panicf("create log file failed: %v\n", err)
}
defer logFile.Close()
FileLog = log.New(logFile, "", log.Ldate|log.Ltime|log.Lshortfile)

// read config
jsonParser := NewJsonStruct()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

配置用yaml, rpc消息用json. 分别偏重可读性和解析的效率.

配置允许用json写, 但用yaml解析. 这样支持可读性更好的yaml配置.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的 这里修改一下


err = jsonParser.Load(confName, &conf)
if err != nil {
ShellLog.Panicf("read config file failed: %v\n", err)
}

var writeChs = make([]chan *WriteEvent, conf.ChCnt)
var countCh = make(chan *OutStatus, conf.ChCnt*conf.ChCap)

for i := 0; i < conf.ChCnt; i++ {
writeCh := make(chan *WriteEvent, conf.ChCap)
writeChs[i] = writeCh
go writeToDB(i, writeCh, countCh)
}

go collector(countCh)

binlogReader, err := newBinlogReader(&conf.SourceConn, conf.BinlogFile, conf.BinlogPos, 9999)
if err != nil {
ShellLog.Panicf("make binlog reader failed: %v\n", err)
}

for {
ev, err := binlogReader.GetEvent(context.Background())
if err != nil {
ShellLog.Panicf("get event failed: %v\n", err)
}

if ev.Header.EventType == replication.ROTATE_EVENT {
writeEV := &WriteEvent{
event: ev,
}
writeChs[0] <- writeEV
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writeToDB 这个函数应该处理不了 replication.ROTATE_EVENT 这个event

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

对的。处理 rotate 和记录已经同步到的位置和恢复执行要再考虑下,这部分后面更新一个提交点。

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

哦哦

continue
}

writeEV := makeWriteEvent(ev)
if writeEV == nil {
continue
}

indexValues := make([]string, len(conf.TableIndex))
for i, k := range conf.TableIndex {
if writeEV.before != nil {
indexValues[i] = writeEV.before[k]
} else {
indexValues[i] = writeEV.after[k]
}
}

rowSha1, err := calcHashToInt64([]byte(strings.Join(indexValues, "")))
if err != nil {
ShellLog.Panicf("calculate hash failed: %v", err)
}

chIdx := rowSha1 % int64(conf.ChCnt)
writeChs[chIdx] <- writeEV
}
}

func makeWriteEvent(ev *replication.BinlogEvent) *WriteEvent {
var before map[string]string
var after map[string]string

rowEv, ok := ev.Event.(*replication.RowsEvent)
if !ok {
FileLog.Printf("event is not a rows event\n")
return nil
}

table := string(rowEv.Table.Table)
if table != conf.TableName {
FileLog.Printf("rows event is not the required table, get %v\n", table)
return nil
}

if len(rowEv.Rows) == 2 {
before = validRow(rowEv.Rows[0])
after = validRow(rowEv.Rows[1])
} else {
before = nil
after = validRow(rowEv.Rows[0])
}

return &WriteEvent{
event: ev,
before: before,
after: after,
}
}

func makeTableIndex(ev *WriteEvent) []string {
index := make([]string, len(conf.TableIndex))
for i, k := range conf.TableIndex {
if ev.before != nil {
index[i] = ev.before[k]
} else {
index[i] = ev.after[k]
}
}

return index
}
Loading