-
Notifications
You must be signed in to change notification settings - Fork 2
Description
做什么
本地读取mysql binlog 然后写远程数据库,以此方式同步 binlog 。
理想目标的同步的效率达到 5k row 每秒。
实现思路
-
使用 go 语言实现,读取 binlog 用 go-mysql 工具:
测试读取 binlog 有 1w+ row 每秒;写入效果也能达到预期目标。程序读取配置文件开始工作,每一个源数据到目标数据库的同步的配置信息,程序会创建一个 worker 去执行这次同步。
1一个worker 是1个goroutine,其会启动1个读binlog 的 goroutine 和 n 个写数据库的 goroutine ,和1个收集本 worker 状态的goroutine 协同工作。
程序通过调用worker 的接口获取1个worker 当前的状态;可以通知worker 停止工作;需要记录worker 已经完成的binlog 的位置(GTIDset)。
worker 需要重新启动时,程序通过读取该worker的配置信息和工作进度(通过源数据库的host:port 确定1个 worker),然后重新创建1个goroutine。
1 个worker 退出不会影响其他 worker 的工作,整个程序需要退出时,会首先停止各个 worker 的工作。如果程序本身意外退出,worker 会受影响意外退出,但是会保证记录各worker的工作进度,重启程序时,需要在配置文件中准确指定开始同步的位置。
-
需要开发的部分:
- 1. 建立源 mysql 连接,用来读取 binlog 中需要同步的事件。
- 2. 读线程读取 binlog 处理成 k-v 格式(能直接写成 sql 语句),分发给写线程。
- 3. 写线程根据 k-v 格式的row,和事件类型(insert, delete, update),拼接成 string sql 语句,连接目标数据库执行写操作,并将结果(成功,失败原因) 汇报给收集线程。
- 4. 收集线程监控程序的运行状态(读取速度,写入速度,发生错误的情况)。
- 5. 使用 goroutine 实现并发写,使用 channe 作为通信。
- 6. 启动1个TCP监听接口,控制程序的运行状态(包括读取worker信息,停止worker,重启worker)。
- 7. 记录每个worker 的配置信息和已经完成的工作进度,在重启worker 时需要读取。
- 使用
监控状态: 实现一个TCP监听接口,用来控制当前程序。
stat [host:port]: yaml 格式输出程序的当前状态,woker的存活情况,同步速率,错误情况等.. 省略指定worker 则返回所有worker 的状态。
wake [host:port]: 重新启动一个 worker,读取上次同步的GTID位置然后重新开始,如果worker仍存活则命令无效。
stop [host:port]: 停止一个worker,停止read线程的binlog读取,然后写线程消费完已经读取到的事件,然后worker退出。
部署运行:编译成可执行文件,通过配置文件指定需要同步的数据库和表信息,格式见后面的例子。
源数据库如果出现连接中断的情况,woker 会发生读取 binlog 超时的情况,在连接中断前已经读取到的事件会按预期继续写入目标数据库,然后 worker 退出。需要 wake woker 以继续。
目标数据库如果出现连接中断的情况,worker的写线程会发出暂停信号,然后读线程停止读取bianlog,在此前已经读取到的事件不能保证已经全部写入数据库,然后worker退出,需要重启worker以继续。继续可能会发生同一个事件重复同步的情况,程序判断忽略掉这种情况。
恢复执行:监控状态记录每一个worker当前已经完成的 binlog 的 GTIDset,保证已经记录的GTID 一定已经同步完成。worker重新启动时,读取GTID然后开始同步。理论上不会有同步数据丢失的情况。
运行和监控
监控
运行状态的输出格式;
worker1:
finished: xxx
failed: xxx
rowPerSec: xxx
stat: running|waiting
errors: {
"xxx": xxx
"xxx": xxx
}
worker2:
... 配置文件中指定同步的表结构,分片结构,数据库连接信息,例子:
- worker1:
”writerCnt": 2,
"SourceConn": {"host": "127.0.0.1", "port":3306, ....},
"DestConn": { "3307": {"host": "", "Port": 3307, ...}, "3308": {}...},
"Shards": [{"From": ["100000", "u", "hello"], "Table": "key_100000_u_hello", "DBPort": 3307}, ... ]
"TableName":
"TableField":
"TableIndex":
"TableShard":
"BinlogFile":
"BinlogPos":
- worker2: