-
例如如下SQL,将MySQL的变更用canal-json格式,输出到Kafka中。 CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders'
);
CREATE TABLE kafka_canal_orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'kafka_canal_orders',
'properties.bootstrap.servers' = 'localhost:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'canal-json'
);
INSERT INTO kafka_canal_orders SELECT * FROM orders; 写入Kafka的数据,如以下格式: {"data":[{"order_id":10001,"order_date":"2020-07-30 10:08:22","customer_name":"Jark","price":50.5,"product_id":102,"order_status":false}],"type":"INSERT"} 这里面丢失了 变成如下格式: {"database":"mydb", "table":"orders","ts":1636967147000,"data":[{"order_id":10001,"order_date":"2020-07-30 10:08:22","customer_name":"Jark","price":50.5,"product_id":102,"order_status":false}],"type":"INSERT"} |
Beta Was this translation helpful? Give feedback.
Answered by
ashulin
Nov 15, 2021
Replies: 1 comment 13 replies
-
2.1支持元数据列(即表名库名等信息);但是序列化出来不是这个格式; |
Beta Was this translation helpful? Give feedback.
13 replies
Answer selected by
nienie
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
2.1支持元数据列(即表名库名等信息);但是序列化出来不是这个格式;