Skip to content

Conversation

@wenbobuaa
Copy link

从一机器读取 mysql binlog 数据向另一数据库执行 sql 语句进行 binlog 同步。

使用 go 实现,依赖 的 go-mysql 包比较大,没有在这个目录。

详细描述: #2

@wenbobuaa wenbobuaa self-assigned this Oct 30, 2018

type DBInfo struct {
Conn *client.Conn
Shard *DBShard
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1个connection只有1个shard?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是一条 sql 记录对应一个 DBInfo, 包括它所属的 shard 和 这个 shard 对应的connection


type OutStatus struct {
err error
routineIndex int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

routine中文是什么意思?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

goroutine 类似线程

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

叫goroutine吧...routine的解释有点多

type OutStatus struct {
err error
routineIndex int
logPos uint32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里如果可以的话用有符号整数. 所有代码中避免使用无符号数, 一遇到减法就容易溢出


type Config struct {
ChCap int
ChCnt int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这2个变量不造是啥意思...配置项当做全局变量,尽量用完整单词.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好 是指的 channel 的数量和容量。这里改一下

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

所有的channel不放在一个slice里吗?如果放的话用slice的len维护数量就行了...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里想的在配置文件里 指定一下 需要创建多少个 channel

TableIndex []string

BinlogFile string
BinlogPos uint32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

咱们应该都是用的gtid来追踪binlog的. 文件+pos的方式在多个源上不一样.不太好处理.
不造你这里为啥是要加这2个东西

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

之前参考的python版 的同步用的是 文件+pos 的方式,这里也用这样来读binlog 的。我再看看 gtid 的方式

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

恩...好像读的话还必须对源还必须制定binlog的file+pos, 不造有没有通过gtid来指定的方式...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有的 增加了一个根据 gtid 读

Copy link
Contributor

@drmingdrmer drmingdrmer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writeThreadCount 跟 ChannelCapacity 是啥关系...在这里跟整个代码的逻辑的关系是啥...加点注释解释下吧...

ChCap int
ChCnt int
ChannelCapacity int
WriteThreadCount int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

咱们代码里好像count都用的cnt...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的 我加点注释

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thread 似乎不准确 这里用的是goroutine 吧?这里用来控制并发吧,而且控制并发数就这一个变量,干脆叫 worker 怎么样?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的 Config 是用来解 binary -> json 的吧。最好定义下一下 json 字段名

FileLog = log.New(logFile, "", log.Ldate|log.Ltime|log.Lshortfile)

// read config
jsonParser := NewJsonStruct()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

配置用yaml, rpc消息用json. 分别偏重可读性和解析的效率.

配置允许用json写, 但用yaml解析. 这样支持可读性更好的yaml配置.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的 这里修改一下


func calcHashToInt64(src []byte) (int64, error) {
h := sha256.New()
h.Write(src)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

计算hash时, 因为不是为了校验, 只是为了sharding, 可以用弱1点的, 只要差不多均匀就可以了. @templexxx 那应该调研过很多快速的hash. 求1个够快够均匀的, 输出是整数就行.

这个环节可以考虑降低点cpu开销, 字符串不要拼接, 一个一个update到hash函数里, 类似sha1:init(); sha1:update(buf); sha1:final() 酱.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的 这里重新写一下

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

看上去 murmurhash fnv adler32 包括 crc32 应该都 okay

}

func quote(src, quote string) string {
rst := strings.Replace(src, quote, "\\"+quote, -1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

参数跟函数名一样不太好...

}

func quote(src, quote string) string {
rst := strings.Replace(src, quote, "\\"+quote, -1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quote必须也replace \, 否则mysql解析出错. 这类基础操作涉及到很多细节, 需要测试. 另外没有现成的 blabla-escape的函数用咩?

` --> \`
\` --> \\\` ## 不应该是 \\`

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有 escape 函数。这里 quote 的作用只是用 "src 括起来。好像也不太需要这个函数,escape 之后直接 + " 就好...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what? 不是用来替换table名字中的 反引号的吗?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯 对 这个函数还是有用 我修改一下

"TableField": ["bucket_id", "scope", "key", "ts", "is_del", "owner", "acl", "sha1",
"ver", "md5", "crc32", "size", "file_meta", "group_id", "origo", "expires", "multipart"],
"TableShard": [ "bucket_id" , "scope", "key"],
"TableIndex": [ "bucket_id" , "scope", "key"],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

index 应该是 bucket_id, scope, key, ts


func writeToDB(chIdx int, inCh chan *WriteEvent, outCh chan *OutStatus) {
for ev := range inCh {
mutex.Lock()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里需要lock吗?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getEventConnection 里面查找 shard 时有一段 二分查找的逻辑,会有值的修改和比较 需要锁上

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

二分查找为啥会有值的修改?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是根据下标 找 slice 里的 shard ,这个 shard 是全局的,过程中下标的修改可能导致找到错误的 shard 吧

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是不用加锁

writeEV := &WriteEvent{
event: ev,
}
writeChs[0] <- writeEV
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writeToDB 这个函数应该处理不了 replication.ROTATE_EVENT 这个event

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

对的。处理 rotate 和记录已经同步到的位置和恢复执行要再考虑下,这部分后面更新一个提交点。

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

哦哦

@templexxx
Copy link

参考一下 ec 项目 go 的文件布局调整一下吧。 在这里 binlogsync 就是一个单纯的 app, 不需要外部引用他吧。那么建立一个 app 目录,下面再建立一个 binlogsync 目录用来存放你的文件。 下面包含: binlogsync.go binlogsync.conf(作为配置示例) sql.go util.go

func main() {

// set log
ShellLog = log.New(os.Stdout, "", 0)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ShellLog 为啥大写?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好像没啥特殊的,就用小写吧

// set log
ShellLog = log.New(os.Stdout, "", 0)

logFile, err := os.Create(logFileName)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里覆盖了你定义的全局 logFile

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是.. 改一下..

var dbPool = make(map[string]*client.Conn)
var conf = Config{}
var logFileName = "binlog_sync.out"
var confName = "./config.json"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logFileName confName 还是作为命令行参数传进来吧,方便调试

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗷嗷 可以 改一下

)

var dbPool = make(map[string]*client.Conn)
var conf = Config{}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var conf Config 应该就可以了

type JsonStruct struct {
}

func NewJsonStruct() *JsonStruct {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为啥要定义这么一个数据结构?没意义啊?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

看起来是多余 这里要重写下

ChCap int
ChCnt int
ChannelCapacity int
WriteThreadCount int

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thread 似乎不准确 这里用的是goroutine 吧?这里用来控制并发吧,而且控制并发数就这一个变量,干脆叫 worker 怎么样?

ChCap int
ChCnt int
ChannelCapacity int
WriteThreadCount int

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的 Config 是用来解 binary -> json 的吧。最好定义下一下 json 字段名

}

type Config struct {
ChannelCapacity int

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为啥 channelCapacity 要做为参数?

return &conf.Shards[0]
}

func main() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

用 waitGroup 保证一下执行完成

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

记得 close chanel

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

主线程收集处理结果,其他交由 goroutine 处理。


var logFile *os.File

var mutex sync.Mutex

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

锁不用引用类型感觉怪怪得

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

噢 用引用吧

rst := strings.Replace(src, quote, "\\"+quote, -1)
func quoteString(src, quote string) string {
safeQuote := strings.Replace(quote, "\\", "\\\\", -1)
rst := strings.Replace(src, quote, "\\"+safeQuote, -1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感觉逻辑不太对...替换quote里的\是啥意思

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯.. 感觉这里应该限制 len(quote) == 1 ,这样只用替换 src 中的quote就好了。len(quote) > 1 的话不太好处理replace src 里的 quote

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

替换quote里的\是啥意思

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为了解决这种情况出现, 但是没想对..

` -> \`
\` -> \\\`  # 不是  \\`

i := sort.Search(len(conf.Shards), func(i int) bool {
shard := conf.Shards[i].From

rst, err := compareStringSlice(shard, tbShards)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

额...row里的所有field都是string? 没有整数吗?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

从binlog 读出来后,都统一变成了 string 处理

func main() {

// set log
shellLog = log.New(os.Stdout, "", 0)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

似乎这里的 shellLog 的用处可以直接用 print 代替

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

还是加个shelllog吧.以后替换容易点...最终肯定不会往shell里打印吧...

var (
mutex *sync.Mutex

wg sync.WaitGroup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

以面向对象的方式来实现吧, 一个机器上肯定不止1个复制关系在跑, 可能一个进程里要处理多个复制.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没有明白 过去问问你

)

func validRow(srcRow []interface{}) map[string]string {
// the first column `id` should not put in new rowValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不太明白, 都处理成string的好处是啥?

函数名不太合适, normalize 好点...valid是形容词

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

都变成 string 方便一个row放在一个map。保留原格式像 int []byte 好像也没特别用处..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants