Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 71 additions & 63 deletions go/src/binlogsync/binlog_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"log"
"os"
"reflect"
"sort"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -44,14 +44,17 @@ type WriteEvent struct {
}

type OutStatus struct {
err error
routineIndex int
logPos int32
err error
goroutineIndex int
logPos int32
}

type Config struct {
ChannelCapacity int
WriteThreadCount int
// this config used to parse `config.json`, format detail is in the example config file
// `config.json`

// WriteThreadCnt specifies how many goroutine used to execute sql statement
WriteWorkerCnt int

SourceConn Connection

Expand All @@ -69,22 +72,26 @@ type Config struct {
BinlogFile string
BinlogPos int32

// TickCnt specifies every `TickCnt` rows synced got 1 status report
TickCnt int64
}

var logFile *os.File
var (
mutex *sync.Mutex

var mutex sync.Mutex
wg sync.WaitGroup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

以面向对象的方式来实现吧, 一个机器上肯定不止1个复制关系在跑, 可能一个进程里要处理多个复制.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没有明白 过去问问你


var (
FileLog *log.Logger
ShellLog *log.Logger
fileLog *log.Logger
shellLog *log.Logger
)

var dbPool = make(map[string]*client.Conn)
var conf = Config{}
var logFileName = "binlog_sync.out"
var confName = "./config.json"
var (
dbPool = make(map[string]*client.Conn)
conf = Config{}
logFileName = "binlog_sync.out"
confName = "./config.json"
channelCapacity = 10240
)

func validRow(srcRow []interface{}) map[string]string {
// the first column `id` should not put in new rowValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不太明白, 都处理成string的好处是啥?

函数名不太合适, normalize 好点...valid是形容词

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

都变成 string 方便一个row放在一个map。保留原格式像 int []byte 好像也没特别用处..

Expand Down Expand Up @@ -168,6 +175,11 @@ func newBinlogReaderByGTID(conn *Connection, GTID string, serverID int32) (*repl

func writeToDB(chIdx int, inCh chan *WriteEvent, outCh chan *OutStatus) {
for ev := range inCh {

if ev.event.Header.EventType == replication.ROTATE_EVENT {
continue
}

mutex.Lock()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里需要lock吗?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getEventConnection 里面查找 shard 时有一段 二分查找的逻辑,会有值的修改和比较 需要锁上

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

二分查找为啥会有值的修改?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是根据下标 找 slice 里的 shard ,这个 shard 是全局的,过程中下标的修改可能导致找到错误的 shard 吧

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是不用加锁

ev.dbInfo = getEventConnection(ev.after)
mutex.Unlock()
Expand All @@ -182,7 +194,7 @@ func writeToDB(chIdx int, inCh chan *WriteEvent, outCh chan *OutStatus) {
var sql string
evType := ev.event.Header.EventType

FileLog.Printf("routin index: %d, before: %v, after: %v, event type: %v\n", chIdx, ev.before, ev.after, evType)
fileLog.Printf("routin index: %d, before: %v, after: %v, event type: %v\n", chIdx, ev.before, ev.after, evType)

if evType == replication.UPDATE_ROWS_EVENTv2 || evType == replication.UPDATE_ROWS_EVENTv1 || evType == replication.UPDATE_ROWS_EVENTv0 {
sql = makeUpdateSql(ev.dbInfo.Shard.Table, conf.TableIndex, index, conf.TableField, value)
Expand All @@ -197,22 +209,22 @@ func writeToDB(chIdx int, inCh chan *WriteEvent, outCh chan *OutStatus) {
continue
}

FileLog.Printf("routin index: %d, get sql statement: %v", chIdx, sql)
fileLog.Printf("routin index: %d, get sql statement: %v", chIdx, sql)

mutex.Lock()
_, err := ev.dbInfo.Conn.Execute(sql)
mutex.Unlock()
if err != nil {
FileLog.Printf("Execute error: %v\n", err)
fileLog.Printf("Execute error: %v\n", err)
}

stat := &OutStatus{
err: err,
routineIndex: chIdx,
logPos: int32(ev.event.Header.LogPos),
err: err,
goroutineIndex: chIdx,
logPos: int32(ev.event.Header.LogPos),
}

FileLog.Printf("routin index: %d, event position: %d\n", chIdx, ev.event.Header.LogPos)
fileLog.Printf("routin index: %d, event position: %d\n", chIdx, ev.event.Header.LogPos)
outCh <- stat
}
}
Expand All @@ -232,16 +244,16 @@ func collector(inCh chan *OutStatus) {

if rowCount%conf.TickCnt == 0 {

ShellLog.Printf("========= sync stat =========\n")
shellLog.Printf("========= sync stat =========\n")

ShellLog.Printf("has synced: %d rows\n", rowCount)
ShellLog.Printf("has error: %d rows\n", errCount)
shellLog.Printf("has synced: %d rows\n", rowCount)
shellLog.Printf("has error: %d rows\n", errCount)
for k, v := range errTypes {
ShellLog.Printf("%s: %d rows\n", k, v)
shellLog.Printf("%s: %d rows\n", k, v)
}

ShellLog.Printf("sync rate: %.3f rows per second\n", float64(conf.TickCnt)/time.Since(start).Seconds())
ShellLog.Printf("has synced log position: %d\n", outStat.logPos)
shellLog.Printf("sync rate: %.3f rows per second\n", float64(conf.TickCnt)/time.Since(start).Seconds())
shellLog.Printf("has synced log position: %d\n", outStat.logPos)

start = time.Now()
}
Expand All @@ -260,7 +272,7 @@ func getEventConnection(row map[string]string) *DBInfo {
addr := conf.DBAddrs[shard.DBPort]
conn, err := client.Connect(addr.Addr, addr.User, addr.Password, addr.DBName)
if err != nil {
FileLog.Panicf("get connection failed: %v\n", err)
fileLog.Panicf("get connection failed: %v\n", err)
}
dbPool[addr.Port] = conn
}
Expand All @@ -272,63 +284,57 @@ func getEventConnection(row map[string]string) *DBInfo {
}

func findShards(tbShards []string) *DBShard {
le := 0
ri := len(conf.Shards)

for le < ri {
i := (le + ri) / 2
//conf.Shards should be descending
i := sort.Search(len(conf.Shards), func(i int) bool {
shard := conf.Shards[i].From

rst, err := compareStringSlice(shard, tbShards)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

额...row里的所有field都是string? 没有整数吗?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

从binlog 读出来后,都统一变成了 string 处理

if err != nil {
FileLog.Printf("conf is not valid, shard length not equal: %v\n", err)
return nil
}

if rst == 0 {
return &conf.Shards[i]
fileLog.Panicf("conf is not valid, shard length not equal: %v\n", err)
}

if rst < 0 {
le = i + 1
} else {
ri = i
if rst <= 0 {
return true
}
}
return false
})

if ri >= 1 {
return &conf.Shards[ri-1]
if i >= 0 && i < len(conf.Shards) {
return &conf.Shards[i]
}

return &conf.Shards[0]
fileLog.Panicf("can not find shard: index out of bound")
return nil
}

func main() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

用 waitGroup 保证一下执行完成

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

记得 close chanel

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

主线程收集处理结果,其他交由 goroutine 处理。


// set log
ShellLog = log.New(os.Stdout, "", 0)
shellLog = log.New(os.Stdout, "", 0)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

似乎这里的 shellLog 的用处可以直接用 print 代替

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

还是加个shelllog吧.以后替换容易点...最终肯定不会往shell里打印吧...


logFile, err := os.Create(logFileName)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里覆盖了你定义的全局 logFile

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是.. 改一下..

if err != nil {
ShellLog.Panicf("create log file failed: %v\n", err)
shellLog.Panicf("create log file failed: %v\n", err)
}
defer logFile.Close()
FileLog = log.New(logFile, "", log.Ldate|log.Ltime|log.Lshortfile)
fileLog = log.New(logFile, "", log.Ldate|log.Ltime|log.Lshortfile)

// read config
jsonParser := NewJsonStruct()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

配置用yaml, rpc消息用json. 分别偏重可读性和解析的效率.

配置允许用json写, 但用yaml解析. 这样支持可读性更好的yaml配置.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的 这里修改一下


err = jsonParser.Load(confName, &conf)
if err != nil {
ShellLog.Panicf("read config file failed: %v\n", err)
shellLog.Panicf("read config file failed: %v\n", err)
}

var writeChs = make([]chan *WriteEvent, conf.WriteThreadCount)
var countCh = make(chan *OutStatus, conf.WriteThreadCount*conf.ChannelCapacity)
var writeChs = make([]chan *WriteEvent, conf.WriteWorkerCnt)
var countCh = make(chan *OutStatus, conf.WriteWorkerCnt*channelCapacity)

for i := 0; i < conf.WriteThreadCount; i++ {
writeCh := make(chan *WriteEvent, conf.ChannelCapacity)
for i := 0; i < conf.WriteWorkerCnt; i++ {
writeCh := make(chan *WriteEvent, channelCapacity)
writeChs[i] = writeCh

wg.Add(1)
go writeToDB(i, writeCh, countCh)
}

Expand All @@ -342,13 +348,13 @@ func main() {
}

if err != nil {
ShellLog.Panicf("make binlog reader failed: %v\n", err)
shellLog.Panicf("make binlog reader failed: %v\n", err)
}

for {
ev, err := binlogReader.GetEvent(context.Background())
if err != nil {
ShellLog.Panicf("get event failed: %v\n", err)
shellLog.Panicf("get event failed: %v\n", err)
}

if ev.Header.EventType == replication.ROTATE_EVENT {
Expand All @@ -373,14 +379,16 @@ func main() {
}
}

rowSha1, err := calcHashToInt64([]byte(strings.Join(indexValues, "")))
rowHash, err := hashStringSliceToInt32(indexValues)
if err != nil {
ShellLog.Panicf("calculate hash failed: %v", err)
shellLog.Panicf("calculate hash failed: %v", err)
}

chIdx := rowSha1 % int64(conf.WriteThreadCount)
chIdx := rowHash % int64(conf.WriteWorkerCnt)
writeChs[chIdx] <- writeEV
}

wg.Wait()
}

func makeWriteEvent(ev *replication.BinlogEvent) *WriteEvent {
Expand All @@ -389,13 +397,13 @@ func makeWriteEvent(ev *replication.BinlogEvent) *WriteEvent {

rowEv, ok := ev.Event.(*replication.RowsEvent)
if !ok {
FileLog.Printf("event is not a rows event\n")
fileLog.Printf("event is not a rows event, got: %v\n", ev.Header.EventType)
return nil
}

table := string(rowEv.Table.Table)
if table != conf.TableName {
FileLog.Printf("rows event is not the required table, get %v\n", table)
fileLog.Printf("rows event is not the required table, get %v\n", table)
return nil
}

Expand Down
17 changes: 9 additions & 8 deletions go/src/binlogsync/sqlUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func makeUpdateSql(table string, idxField, idxValue, tbField, tbValue []string)
var limitClause string
var tableClause string

tableClause = quote(table, "`")
tableClause = quoteString(table, "`")
setClause = makeSqlCondition(tbField, tbValue, "=", ", ")
whereClause = makeWhereClause(idxField, idxValue)
limitClause = "LIMIT 1"
Expand All @@ -26,17 +26,17 @@ func makeInsertSql(table string, tbField, tbValue []string) string {
var valClause string
var tableClause string

tableClause = quote(table, "`")
tableClause = quoteString(table, "`")

fld := make([]string, len(tbField))
val := make([]string, len(tbValue))

for i := 0; i < len(tbField); i++ {
fld[i] = quote(tbField[i], "`")
fld[i] = quoteString(tbField[i], "`")
val[i] = "\"" + mysql.Escape(tbValue[i]) + "\""
}

FileLog.Printf("table field: %v, value: %v\n", fld, val)
fileLog.Printf("table field: %v, value: %v\n", fld, val)

fldClause = "(" + strings.Join(fld, ", ") + ")"
valClause = "(" + strings.Join(val, ", ") + ")"
Expand All @@ -49,7 +49,7 @@ func makeDeleteSql(table string, idxField, idxValue []string) string {
var limitClause string
var tableClause string

tableClause = quote(table, "`")
tableClause = quoteString(table, "`")

whereClause = makeWhereClause(idxField, idxValue)
limitClause = "LIMIT 1"
Expand All @@ -65,13 +65,14 @@ func makeSqlCondition(fields, values []string, operator, formatter string) strin
conds := make([]string, len(fields))

for i, k := range fields {
conds[i] = fmt.Sprintf("%s%s%s", quote(k, "`"), operator, "\""+mysql.Escape(values[i])+"\"")
conds[i] = fmt.Sprintf("%s%s%s", quoteString(k, "`"), operator, "\""+mysql.Escape(values[i])+"\"")
}

return strings.Join(conds, formatter)
}

func quote(src, quote string) string {
rst := strings.Replace(src, quote, "\\"+quote, -1)
func quoteString(src, quote string) string {
safeQuote := strings.Replace(quote, "\\", "\\\\", -1)
rst := strings.Replace(src, quote, "\\"+safeQuote, -1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感觉逻辑不太对...替换quote里的\是啥意思

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯.. 感觉这里应该限制 len(quote) == 1 ,这样只用替换 src 中的quote就好了。len(quote) > 1 的话不太好处理replace src 里的 quote

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

替换quote里的\是啥意思

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为了解决这种情况出现, 但是没想对..

` -> \`
\` -> \\\`  # 不是  \\`

return quote + rst + quote
}
23 changes: 10 additions & 13 deletions go/src/binlogsync/util.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
package main

import (
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"hash/fnv"
"io/ioutil"
"strconv"
"strings"
)

func calcHashToInt64(src []byte) (int64, error) {
h := sha256.New()
h.Write(src)

hex_str := fmt.Sprintf("%x", h.Sum(nil))

num, err := strconv.ParseInt(hex_str[:8], 16, 64)
if err != nil {
return 0, err
func hashStringSliceToInt32(src []string) (int64, error) {
h := fnv.New32()
for _, s := range src {
_, err := h.Write([]byte(s))
if err != nil {
return 0, err
}
}
return num, nil

return int64(h.Sum32()), nil
}

func compareStringSlice(a, b []string) (int, error) {
Expand Down