Skip to content

Commit 1f06031

Browse files
committed
add controller
1 parent e9b65a0 commit 1f06031

File tree

3 files changed

+118
-92
lines changed

3 files changed

+118
-92
lines changed

go/src/app/binlogsync/binlog_sync.go

Lines changed: 41 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,15 @@ type BinlogSyncer struct {
4747

4848
DBPool map[string]*sql.DB
4949
WriteChs []chan *WriteEvent
50-
CountCh chan *Status
50+
CountCh chan *Result
5151

5252
mutex *sync.Mutex
5353
wg *sync.WaitGroup
5454
shellLog *log.Logger
5555
fileLog *log.Logger
5656

57+
stat *Status
58+
5759
stop bool
5860
}
5961

@@ -63,44 +65,47 @@ type BinlogPosition struct {
6365
NextGTID string `yaml:"GTID"`
6466
}
6567

66-
type OutMessage struct {
67-
Synced int64 `yaml:"Synced"`
68-
Faild int64 `yaml:"Faild"`
68+
type Status struct {
69+
Finished int64 `yaml:"Finished"`
70+
Failed int64 `yaml:"Failed"`
71+
Errors map[string]int `yaml:"Errors"`
6972
RowsPerSec float64 `yaml:"RowsPerSec"`
70-
Position BinlogPosition `yaml:"Position"`
71-
Error map[string]int `yaml:"Error"`
7273
}
7374

74-
type Status struct {
75+
type Result struct {
7576
err error
7677
goroutineIndex int
7778

79+
gtid string
80+
7881
position BinlogPosition
7982
}
8083

81-
func NewBinlogSyncer(conf Config) *BinlogSyncer {
84+
func NewBinlogSyncer(conf *Config) *BinlogSyncer {
8285
fileLog := log.New(logFile, "", log.Ldate|log.Ltime|log.Lshortfile)
8386
fileLog.SetPrefix("[" + conf.SourceConn.Addr + "] ")
8487

8588
bs := &BinlogSyncer{
86-
Config: conf,
89+
Config: *conf,
8790

8891
DBPool: make(map[string]*sql.DB),
8992
WriteChs: make([]chan *WriteEvent, conf.WorkerCnt),
90-
CountCh: make(chan *Status, channelCapacity*conf.WorkerCnt),
93+
CountCh: make(chan *Result, channelCapacity*conf.WorkerCnt),
9194

9295
mutex: &sync.Mutex{},
9396
wg: &sync.WaitGroup{},
9497
shellLog: shellLog,
9598
fileLog: fileLog,
96-
stop: false,
99+
stat: &Status{
100+
Errors: make(map[string]int),
101+
},
102+
stop: false,
97103
}
98104

99105
return bs
100106
}
101107

102108
func (bs *BinlogSyncer) Sync() {
103-
defer mainWG.Done()
104109

105110
var binlogReader *replication.BinlogStreamer
106111
var err error
@@ -167,14 +172,14 @@ func (bs *BinlogSyncer) writeToDB(chIdx int, inCh chan *WriteEvent) {
167172
if ev.event.Header.EventType == replication.ROTATE_EVENT {
168173
rotateEv, _ := ev.event.Event.(*replication.RotateEvent)
169174

170-
stat := &Status{
175+
rst := &Result{
171176
goroutineIndex: chIdx,
172177
position: BinlogPosition{
173178
BinlogPos: int64(rotateEv.Position),
174179
BinlogFile: string(rotateEv.NextLogName),
175180
},
176181
}
177-
bs.CountCh <- stat
182+
bs.CountCh <- rst
178183
continue
179184
}
180185

@@ -224,7 +229,7 @@ func (bs *BinlogSyncer) writeToDB(chIdx int, inCh chan *WriteEvent) {
224229
}
225230
}
226231

227-
stat := &Status{
232+
rst := &Result{
228233
err: err,
229234
goroutineIndex: chIdx,
230235
position: BinlogPosition{
@@ -235,7 +240,7 @@ func (bs *BinlogSyncer) writeToDB(chIdx int, inCh chan *WriteEvent) {
235240
}
236241

237242
bs.fileLog.Printf("routin index: %d, event position: %d\n", chIdx, ev.event.Header.LogPos)
238-
bs.CountCh <- stat
243+
bs.CountCh <- rst
239244
}
240245
}
241246

@@ -295,45 +300,36 @@ func (bs *BinlogSyncer) readBinlog(binlogReader *replication.BinlogStreamer) {
295300
}
296301

297302
func (bs *BinlogSyncer) collector() {
298-
var rowCnt int64
299-
var errCnt int64
300-
var errTypes = make(map[string]int)
301303

302304
var start = time.Now()
305+
var tickCnt = int64(0)
303306

304-
for outStat := range bs.CountCh {
305-
if outStat.err != nil {
306-
errCnt += 1
307-
errTypes[outStat.err.Error()] += 1
308-
}
309-
310-
rowCnt += 1
311-
312-
if rowCnt%bs.TickCnt == 0 {
313-
314-
rowsPerSec := float64(bs.TickCnt) / time.Since(start).Seconds()
315-
316-
om := &OutMessage{
317-
Synced: rowCnt,
318-
Faild: errCnt,
319-
RowsPerSec: rowsPerSec,
320-
Position: outStat.position,
321-
Error: errTypes,
322-
}
307+
ticker := time.NewTicker(time.Duration(time.Minute * 1))
308+
defer ticker.Stop()
323309

324-
outMessage, err := marshalYAML(om)
325-
if err != nil {
326-
bs.fileLog.Printf("[%s] dump out message to YAML failed: %v\n", bs.SourceConn.Addr, outMessage)
327-
}
310+
go func() {
311+
for _ = range ticker.C {
312+
bs.stat.RowsPerSec = float64(tickCnt) / time.Since(start).Seconds()
313+
tickCnt = 0
314+
start = time.Now()
315+
}
316+
}()
328317

329-
outMessage = " ====== [" + bs.SourceConn.Addr + "] status ======\n" + outMessage
330-
bs.shellLog.Printf(outMessage)
318+
for rst := range bs.CountCh {
319+
bs.stat.Finished += 1
320+
tickCnt += 1
331321

332-
start = time.Now()
322+
if rst.err != nil {
323+
bs.stat.Errors[rst.err.Error()] += 1
324+
bs.stat.Failed += 1
333325
}
334326
}
335327
}
336328

329+
func (bs *BinlogSyncer) GetStat() *Status {
330+
return bs.stat
331+
}
332+
337333
func (bs *BinlogSyncer) getWriteConnection(row map[string]interface{}) (*DBInfo, error) {
338334
shardValues := make([]interface{}, len(bs.TableShard))
339335
for i, k := range bs.TableShard {

go/src/app/binlogsync/config.yaml

Lines changed: 58 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,69 @@
1-
Confs:
2-
-
1+
worker1:
32
WriteWorkerCnt: 2
43

54
SourceConn:
6-
Addr: 106.14.46.83:3308
7-
Host: 106.14.46.83
8-
Port: "3308"
9-
User: root
10-
Password: Password
11-
DBName: db
5+
Addr: 127.0.0.1:3333
6+
Host: 127.0.0.1
7+
Port: "3333"
8+
User: admin
9+
Password: password
10+
DBName: mysql
1211

1312
DBConfig:
14-
4491:
15-
Addr: 139.224.229.252:4491
16-
Host: 139.224.229.252
17-
Port: "4491"
18-
User: root
19-
Password: Password
20-
DBName: db
13+
4444:
14+
Addr: 127.0.0.2:4444
15+
Host: 127.0.0.2
16+
Port: "4444"
17+
User: admin
18+
Password: password
19+
DBName: mysql
2120

2221
Shards:
2322
-
24-
From: [1090000000000000096, u, 1.jpg, 0]
25-
Table: key_1090000000000000096_u_1.jpg
26-
DBPort: "4491"
23+
From: [field0, field1, field2, ...]
24+
Table: destination table
25+
DBPort: "4444"
2726

28-
TableName: key_0000000000000000000__
29-
TableField: ["bucket_id", "scope", "key", "ts", "is_del", "owner", "acl", "sha1", "ver", "md5", "crc32", "size", "file_meta", "group_id", "origo", "expires", "multipart"]
30-
TableShard: ["bucket_id" , "scope", "key", "ts"]
31-
TableIndex: ["bucket_id" , "scope", "key", "ts"]
27+
TableName: sorce table
28+
TableField: []
29+
TableShard: []
30+
TableIndex: []
3231

33-
GTIDSet: bad0552d-cb43-11e6-a6ef-00163e0e31a0:1-20482
34-
BinlogFile: mysql-bin.000004
32+
GTIDSet: string
33+
BinlogFile: mysql-bin.000001
34+
BinlogPos: 4
35+
36+
worker2:
37+
WriteWorkerCnt: 2
38+
39+
SourceConn:
40+
Addr: 127.0.0.1:5555
41+
Host: 127.0.0.1
42+
Port: "5555"
43+
User: admin
44+
Password: password
45+
DBName: mysql
46+
47+
DBConfig:
48+
6666:
49+
Addr: 127.0.0.2:6666
50+
Host: 127.0.0.2
51+
Port: "6666"
52+
User: admin
53+
Password: password
54+
DBName: mysql
55+
56+
Shards:
57+
-
58+
From: [field0, field1, field2, ...]
59+
Table: destination table
60+
DBPort: "6666"
61+
62+
TableName: source table
63+
TableField: []
64+
TableShard: []
65+
TableIndex: []
66+
67+
GTIDSet: string
68+
BinlogFile: mysql-bin.000001
3569
BinlogPos: 4
36-
TickCnt: 40

go/src/app/binlogsync/main.go

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -39,49 +39,46 @@ type Config struct {
3939

4040
BinlogFile string `yaml:"BinlogFile"`
4141
BinlogPos int32 `yaml:"BinlogPos"`
42-
43-
// TickCnt specifies every `TickCnt` rows synced got 1 status report
44-
TickCnt int64 `yaml:"TickCnt"`
45-
}
46-
47-
type ConfigList struct {
48-
Confs []Config `yaml:"Confs"`
4942
}
5043

5144
func main() {
5245

53-
// get arguments
54-
getInput()
55-
5646
// set log
5747
var err error
58-
shellLog = log.New(os.Stdout, "", 0)
5948
logFile, err = os.Create(logFileName)
6049
if err != nil {
61-
shellLog.Panicf("create log file failed: %v\n", err)
50+
panic(fmt.Sprintf("create log file failed: %v\n", err))
6251
}
6352
defer logFile.Close()
6453

54+
shellLog = log.New(os.Stdout, "", 0)
55+
6556
// read config
66-
confList := &ConfigList{}
67-
err = unmarshalYAML(confName, &confList)
57+
var confList = make(map[string]*Config)
58+
err = unmarshalYAML(confName, confList)
6859
if err != nil {
69-
shellLog.Panicf("read config file failed: %v\n", err)
60+
fmt.Printf("read config file failed: %v\n", err)
61+
return
7062
}
7163

72-
mainWG = &sync.WaitGroup{}
73-
for _, conf := range confList.Confs {
64+
controller := NewController()
65+
66+
for w, conf := range confList {
7467
syncer := NewBinlogSyncer(conf)
75-
mainWG.Add(1)
76-
syncer.Sync()
68+
controller.AddWorker(&Worker{
69+
name: w,
70+
syncer: syncer,
71+
config: conf,
72+
})
73+
go syncer.Sync()
7774
}
78-
mainWG.Wait()
75+
76+
controller.Listen("8888")
7977
}
8078

81-
func getInput() {
79+
func init() {
8280
flag.StringVar(&logFileName, "log", logFileName, "file name to output error log")
8381
flag.StringVar(&confName, "config", confName, "configration file path")
8482

8583
flag.Parse()
86-
fmt.Printf("log: %v, conf: %v\n", logFileName, confName)
8784
}

0 commit comments

Comments
 (0)