Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 51 additions & 16 deletions go/src/binlogsync/binlog_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ type WriteEvent struct {
type OutStatus struct {
err error
routineIndex int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

routine中文是什么意思?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

goroutine 类似线程

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

叫goroutine吧...routine的解释有点多

logPos uint32
logPos int32
}

type Config struct {
ChCap int
ChCnt int
ChannelCapacity int

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为啥 channelCapacity 要做为参数?

WriteThreadCount int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

咱们代码里好像count都用的cnt...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的 我加点注释

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thread 似乎不准确 这里用的是goroutine 吧?这里用来控制并发吧,而且控制并发数就这一个变量,干脆叫 worker 怎么样?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的 Config 是用来解 binary -> json 的吧。最好定义下一下 json 字段名


SourceConn Connection

Expand All @@ -63,8 +63,11 @@ type Config struct {
TableShard []string
TableIndex []string

// if set GTID, binlog file and pos will be ignored
GTID string

BinlogFile string
BinlogPos uint32
BinlogPos int32

TickCnt int64
}
Expand Down Expand Up @@ -111,25 +114,51 @@ func validRow(srcRow []interface{}) map[string]string {
return row
}

func newBinlogReader(conn *Connection, binlogFile string, binlogPos uint32, serverID uint32) (*replication.BinlogStreamer, error) {

func newBinlogSyncer(conn *Connection, serverID int32) (*replication.BinlogSyncer, error) {
port, err := strconv.ParseInt(conn.Port, 10, 16)
if err != nil {
return nil, err
}

binlogCfg := replication.BinlogSyncerConfig{
ServerID: serverID,
ServerID: uint32(serverID),
Flavor: "mysql",
Host: conn.Host,
Port: uint16(port),
User: conn.User,
Password: conn.Password,
}

syncer := replication.NewBinlogSyncer(binlogCfg)
return replication.NewBinlogSyncer(binlogCfg), nil
}

func newBinlogReaderByPosition(conn *Connection, binlogFile string, binlogPos int32, serverID int32) (*replication.BinlogStreamer, error) {

syncer, err := newBinlogSyncer(conn, serverID)
if err != nil {
return nil, err
}

streamer, err := syncer.StartSync(mysql.Position{binlogFile, uint32(binlogPos)})
if err != nil {
return nil, err
}

return streamer, nil
}

streamer, err := syncer.StartSync(mysql.Position{binlogFile, binlogPos})
func newBinlogReaderByGTID(conn *Connection, GTID string, serverID int32) (*replication.BinlogStreamer, error) {
syncer, err := newBinlogSyncer(conn, serverID)
if err != nil {
return nil, err
}

gtidSet, err := mysql.ParseMysqlGTIDSet(GTID)
if err != nil {
return nil, err
}

streamer, err := syncer.StartSyncGTID(gtidSet)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -180,7 +209,7 @@ func writeToDB(chIdx int, inCh chan *WriteEvent, outCh chan *OutStatus) {
stat := &OutStatus{
err: err,
routineIndex: chIdx,
logPos: ev.event.Header.LogPos,
logPos: int32(ev.event.Header.LogPos),
}

FileLog.Printf("routin index: %d, event position: %d\n", chIdx, ev.event.Header.LogPos)
Expand Down Expand Up @@ -294,18 +323,24 @@ func main() {
ShellLog.Panicf("read config file failed: %v\n", err)
}

var writeChs = make([]chan *WriteEvent, conf.ChCnt)
var countCh = make(chan *OutStatus, conf.ChCnt*conf.ChCap)
var writeChs = make([]chan *WriteEvent, conf.WriteThreadCount)
var countCh = make(chan *OutStatus, conf.WriteThreadCount*conf.ChannelCapacity)

for i := 0; i < conf.ChCnt; i++ {
writeCh := make(chan *WriteEvent, conf.ChCap)
for i := 0; i < conf.WriteThreadCount; i++ {
writeCh := make(chan *WriteEvent, conf.ChannelCapacity)
writeChs[i] = writeCh
go writeToDB(i, writeCh, countCh)
}

go collector(countCh)

binlogReader, err := newBinlogReader(&conf.SourceConn, conf.BinlogFile, conf.BinlogPos, 9999)
var binlogReader *replication.BinlogStreamer
if conf.GTID != "" {
binlogReader, err = newBinlogReaderByGTID(&conf.SourceConn, conf.GTID, 9999)
} else {
binlogReader, err = newBinlogReaderByPosition(&conf.SourceConn, conf.BinlogFile, conf.BinlogPos, 9999)
}

if err != nil {
ShellLog.Panicf("make binlog reader failed: %v\n", err)
}
Expand Down Expand Up @@ -343,7 +378,7 @@ func main() {
ShellLog.Panicf("calculate hash failed: %v", err)
}

chIdx := rowSha1 % int64(conf.ChCnt)
chIdx := rowSha1 % int64(conf.WriteThreadCount)
writeChs[chIdx] <- writeEV
}
}
Expand Down
5 changes: 3 additions & 2 deletions go/src/binlogsync/config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"ChCap": 20,
"ChCnt": 2,
"ChannelCapacity": 20,
"WriteThreadCount": 2,

"SourceConn": {
"Addr": "106.14.46.83:3308",
Expand Down Expand Up @@ -36,6 +36,7 @@
"TableShard": [ "bucket_id" , "scope", "key"],
"TableIndex": [ "bucket_id" , "scope", "key"],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

index 应该是 bucket_id, scope, key, ts


"GTID": "",
"BinlogFile": "mysql-bin.000004",
"BinlogPos": 4,
"TickCnt": 40
Expand Down