Skip to content
This repository has been archived by the owner on Jun 24, 2021. It is now read-only.

Commit

Permalink
Binlog compatible with kafka and cluster (#655)
Browse files Browse the repository at this point in the history
* Binlog compatible with kafka and cluster version

* Binlog compatible with kafka and cluster version

* Update the default value of conf/drainer-cluster.toml
  • Loading branch information
liubo0127 authored Jan 18, 2019
1 parent d560c6d commit fd8534d
Show file tree
Hide file tree
Showing 67 changed files with 991 additions and 31 deletions.
65 changes: 65 additions & 0 deletions conf/drainer-cluster.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# drainer Configuration.

# the interval time (in seconds) of detect pumps' status
detect-interval = 10

# syncer Configuration.
[syncer]

# disable sync these schema
ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql"

# number of binlog events in a transaction batch
txn-batch = 20

# work count to execute binlogs
# if the latency between drainer and downstream(mysql or tidb) are too high, you might want to increase this
# to get higher throughput by higher concurrent write to the downstream
worker-count = 16

disable-dispatch = false

# safe mode will split update to delete and insert
safe-mode = false

# downstream storage, equal to --dest-db-type
# valid values are "mysql", "pb", "tidb", "flash", "kafka"
db-type = "mysql"

##replicate-do-db priority over replicate-do-table if have same db name
##and we support regex expression , start with '~' declare use regex expression.
#
#replicate-do-db = ["~^b.*","s1"]
#[[syncer.replicate-do-table]]
#db-name ="test"
#tbl-name = "log"

#[[syncer.replicate-do-table]]
#db-name ="test"
#tbl-name = "~^a.*"

# the downstream mysql protocol database
[syncer.to]
host = "127.0.0.1"
user = "root"
password = ""
port = 3306

# Uncomment this if you want to use pb or sql as db-type.
# Compress compresses output file, like pb and sql file. Now it supports "gzip" algorithm only.
# Values can be "gzip". Leave it empty to disable compression.
#[syncer.to]
#dir = "data.drainer"
#compression = "gzip"

# when db-type is kafka, you can uncomment this to config the down stream kafka, it will be the globle config kafka default
#[syncer.to]
# only need config one of zookeeper-addrs and kafka-addrs, will get kafka address if zookeeper-addrs is configed.
# zookeeper-addrs = "127.0.0.1:2181"
# kafka-addrs = "127.0.0.1:9092"
# kafka-version = "0.8.2.0"
# kafka-max-messages = 1024

# the topic name drainer will push msg, the default name is <cluster-id>_obinlog
# be careful don't use the same name if run multi drainer instances
# topic-name = ""
File renamed without changes.
File renamed without changes.
12 changes: 12 additions & 0 deletions conf/pump-kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
# default configuration file for pump in yaml format

security:
# Path of file that contains list of trusted SSL CAs for connection with cluster components.
# ssl-ca: "/path/to/ca.pem"

# Path of file that contains X509 certificate in PEM format for connection with cluster components.
# ssl-cert: "/path/to/drainer.pem"

# Path of file that contains X509 key in PEM format for connection with cluster components.
# ssl-key: "/path/to/drainer-key.pem"
9 changes: 8 additions & 1 deletion deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,14 @@
tags:
- pump
roles:
- { role: pump, when: enable_binlog|default(false) }
- { role: pump_cluster, when: enable_binlog|default(false) and binlog_version == "cluster" }

- name: deploying pump kafka
hosts: tidb_servers
tags:
- pump
roles:
- { role: pump_kafka, when: enable_binlog|default(false) and binlog_version == "kafka" }

- name: deploying TiDB cluster
hosts: tidb_servers
Expand Down
11 changes: 9 additions & 2 deletions deploy_drainer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,19 @@
roles:
- check_config_dynamic

- name: deploying drainer
- name: deploying drainer(binlog cluster)
hosts: drainer_servers
tags:
- drainer
roles:
- { role: drainer, when: enable_binlog|default(false) }
- { role: drainer_cluster, when: enable_binlog|default(false) and binlog_version == "cluster" }

- name: deploying drainer(binlog kafka)
hosts: drainer_servers
tags:
- drainer
roles:
- { role: drainer_kafka, when: enable_binlog|default(false) and binlog_version == "kafka" }

- name: finalizing deployment target
hosts: all
Expand Down
12 changes: 12 additions & 0 deletions group_vars/all.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,15 @@ disable_send_sigkill: False

# The way to collect tikv monitor data: "pull" or "push".
tikv_metric_method: "pull"

# pump
pump_port: 8250
pump_data_dir: "{{ deploy_dir }}/data.pump"
pump_log_dir: "{{ deploy_dir }}/log"
pump_cert_dir: "{{ deploy_dir }}/conf/ssl"

# pump binlog to pump sock
pump_socket: "{{ status_dir }}/pump.sock"

# drainer
drainer_port: 8249
1 change: 0 additions & 1 deletion group_vars/drainer_servers.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
---

drainer_port: 8249
4 changes: 0 additions & 4 deletions group_vars/pump_servers.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,2 @@
---

pump_port: 8250
pump_data_dir: "{{ deploy_dir }}/data.pump"
pump_log_dir: "{{ deploy_dir }}/log"
pump_cert_dir: "{{ deploy_dir }}/conf/ssl"
3 changes: 3 additions & 0 deletions inventory.ini
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ set_hostname = False
## binlog trigger
enable_binlog = False

# binlog version, "kafka" or "cluster":
binlog_version = "cluster"

# kafka cluster address for monitoring, example:
# kafka_addrs = "192.168.0.11:9092,192.168.0.12:9092,192.168.0.13:9092"
kafka_addrs = ""
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@

- name: create run script
template:
src: "{{ item }}_{{ role_name }}_binary.sh.j2"
dest: "{{ deploy_dir }}/scripts/{{ item }}_{{ role_name }}.sh"
src: "{{ item }}_drainer_binary.sh.j2"
dest: "{{ deploy_dir }}/scripts/{{ item }}_drainer.sh"
mode: "0755"
backup: yes
with_items:
- run
vars:
role_status_dir: status/{{ role_name }}
role_status_dir: status/drainer

- include_tasks: "{{ process_supervision }}_deployment.yml"
48 changes: 48 additions & 0 deletions roles/drainer_cluster/tasks/main.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
---
# tasks file for drainer

- name: Preflight check - ensure initial_commit_ts variable is set
fail: msg="Please set initial_commit_ts variable for drainer server in inventory.ini."
when: (initial_commit_ts is undefined) or (initial_commit_ts is defined and initial_commit_ts == "")

- name: "check if the customized config file `{{ playbook_dir }}/conf/{{ inventory_hostname }}_drainer-cluster.toml` existed"
delegate_to: localhost
stat:
path: "{{ playbook_dir }}/conf/{{ inventory_hostname }}_drainer-cluster.toml"
register: drainer_customized_conf_st

- name: "Preflight check - ensure that the customized config file `{{ playbook_dir }}/conf/{{ inventory_hostname }}_drainer-cluster.toml` exists"
delegate_to: localhost
fail:
msg: 'You need to create customized config file `{{ playbook_dir }}/conf/{{ inventory_hostname }}_drainer-cluster.toml`.'
when: drainer_customized_conf_st.stat.exists == False

- name: create deploy directories
file: path="{{ item }}" state=directory mode=0755
with_items:
- "{{ drainer_data_dir }}"
- "{{ drainer_log_dir }}"
- "{{ status_dir }}"

# - include_tasks: check_certs.yml
# when: enable_tls|default(false)
#
# - include_tasks: gen_certs.yml
# when: enable_tls|default(false)
#
# - include_tasks: install_certs.yml
# when: enable_tls|default(false)

- name: create configuration file
copy: src="{{ playbook_dir }}/conf/{{ inventory_hostname }}_drainer-cluster.toml" dest="{{ deploy_dir }}/conf/drainer.toml" mode=0644 backup=yes
register: drainer_conf_st

- name: backup conf file
command: mv "{{ drainer_conf_st.backup_file }}" "{{ backup_dir }}"
when: drainer_conf_st.changed and drainer_conf_st.backup_file is defined

- include_tasks: "binary_deployment.yml"

- name: prepare firewalld white list
set_fact:
firewalld_ports: "{{ [drainer_port ~ '/tcp'] + firewalld_ports }}"
12 changes: 12 additions & 0 deletions roles/drainer_cluster/vars/default.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
# default configuration file for drainer in yaml format

security:
# Path of file that contains list of trusted SSL CAs for connection with cluster components.
ssl-ca: ""

# Path of file that contains X509 certificate in PEM format for connection with cluster components.
ssl-cert: ""

# Path of file that contains X509 key in PEM format for connection with cluster components.
ssl-key: ""
11 changes: 11 additions & 0 deletions roles/drainer_kafka/defaults/main.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---

drainer_log_dir: "{{ deploy_dir }}/log"
drainer_log_filename: "drainer.log"

drainer_data_dir: "{{ deploy_dir }}/data.drainer"

pd_scheme: http

# systemd: Specifies when to restart the service.
restart: on-failure
File renamed without changes.
File renamed without changes.
17 changes: 17 additions & 0 deletions roles/drainer_kafka/tasks/binary_deployment.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---

- name: deploy drainer binary
copy: src="{{ resources_dir }}/bin/drainer" dest="{{ deploy_dir }}/bin/" mode=0755

- name: create run script
template:
src: "{{ item }}_drainer_binary.sh.j2"
dest: "{{ deploy_dir }}/scripts/{{ item }}_drainer.sh"
mode: "0755"
backup: yes
with_items:
- run
vars:
role_status_dir: status/drainer

- include_tasks: "{{ process_supervision }}_deployment.yml"
91 changes: 91 additions & 0 deletions roles/drainer_kafka/tasks/check_certs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
---

- name: "Check_certs | check if the certs have already been generated on control machine"
find:
paths: "{{ cert_dir }}"
patterns: "*.pem"
get_checksum: true
delegate_to: localhost
register: cert_control_node
run_once: true

- debug:
var: cert_control_node

- name: "Check_certs | Set default value for 'sync_certs', 'gen_certs' to false"
set_fact:
sync_certs: false
gen_certs: false

- set_fact:
drainer_host: "{{ hostvars[inventory_hostname].ansible_host | default(inventory_hostname) }}"

- name: "Check certs | check if a cert already exists on node"
stat:
path: "{{ drainer_cert_dir }}/{{ item }}"
register: cert_drainer_node
with_items:
- ca.pem
- drainer-server-{{ drainer_host }}-key.pem
- drainer-server-{{ drainer_host }}.pem

- debug:
var: cert_drainer_node

- name: "Check_certs | Set 'gen_certs' to true"
set_fact:
gen_certs: true
when: not item in cert_control_node.files|map(attribute='path') | list
delegate_to: localhost
run_once: true
with_items: >-
['{{cert_dir}}/ca.pem',
{% set all_drainer_hosts = groups['drainer_servers']|unique|sort %}
{% for host in all_drainer_hosts %}
{% set drainer_ip = hostvars[host].ansible_host | default(hostvars[host].inventory_hostname) -%}
'{{cert_dir}}/drainer-server-{{ drainer_ip }}-key.pem'
{% if not loop.last %}{{','}}{% endif %}
{% endfor %}]
- debug:
var: gen_certs

- name: "Check_certs | Set 'gen_node_certs' to true"
set_fact:
gen_node_certs: |-
{
{% set all_drainer_hosts = groups['drainer_servers']|unique|sort -%}
{% set existing_certs = cert_control_node.files|map(attribute='path')|list|sort %}
{% for host in all_drainer_hosts -%}
{% set drainer_ip = hostvars[host].ansible_host | default(hostvars[host].inventory_hostname) -%}
{% set host_cert = "%s/drainer-server-%s-key.pem"|format(cert_dir, drainer_ip) %}
{% if host_cert in existing_certs -%}
"{{ host }}": False,
{% else -%}
"{{ host }}": True,
{% endif -%}
{% endfor %}
}
run_once: true

- debug:
var: gen_node_certs

- name: "Check_certs | Set drainer_cert_key"
set_fact:
drainer_cert_key_path:
"{{ cert_dir }}/drainer-server-{{ hostvars[inventory_hostname].drainer_host }}-key.pem"

- debug:
var: drainer_cert_key_path

- name: "Check_certs | Set 'sync_certs' to true"
set_fact:
sync_certs: true
when: gen_node_certs[inventory_hostname] or
(not cert_drainer_node.results[0].stat.exists|default(False)) or
(not cert_drainer_node.results[1].stat.exists|default(False)) or
(cert_drainer_node.results[1].stat.checksum|default('') != cert_control_node.files|selectattr("path","equalto",drainer_cert_key_path)|map(attribute="checksum")|first|default(''))

- debug:
var: sync_certs
24 changes: 24 additions & 0 deletions roles/drainer_kafka/tasks/gen_certs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---

- name: Gen_certs | copy certs generation script
copy:
src: "make-ssl.sh"
dest: "{{ script_dir }}/make-ssl.sh"
mode: 0700
run_once: yes
delegate_to: localhost
when: gen_certs|default(false)

- name: Gen_certs | run cert generation script
command: "{{ script_dir }}/make-ssl.sh -d {{ cert_dir }}"
environment:
- HOSTS: "{% for h in groups['drainer_servers'] %}
{% if gen_node_certs[h]|default(true) %}
{{ hostvars[h].ansible_host | default(hostvars[h].inventory_hostname) }}
{% endif %}
{% endfor %}"
- PATH: "{{ ansible_env.PATH }}:{{ binary_dir }}"
- CN: "drainer-server"
run_once: yes
delegate_to: localhost
when: gen_certs|default(false)
19 changes: 19 additions & 0 deletions roles/drainer_kafka/tasks/install_certs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---

- name: "Deploy_certs | Make sure the certificate directory exits"
file:
path: "{{ drainer_cert_dir }}"
state: directory
mode: 0700

- name: "Deploy_certs | Deploy certificates"
copy:
src: "{{ cert_dir }}/{{ item }}"
dest: "{{ drainer_cert_dir }}/{{ item }}"
mode: 0600
backup: yes
with_items:
- ca.pem
- drainer-server-{{ drainer_host }}-key.pem
- drainer-server-{{ drainer_host }}.pem
when: sync_certs|default(false)
Loading

0 comments on commit fd8534d

Please sign in to comment.