当前位置:首页 > 话题广场 > 攻略专题 > 网游攻略

678改动日志专题之Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB

一.背景介绍

本文介绍了将MySQL中的数据通过Binlog Canal导入Kafka,然后由Flink消耗的情况。

为了能够快速的验证整套流程的功能性,所有的组件都以单机的形式部署。如果手上的物理资源不足,可以将本文中的所有组件一台 4G 1U 的虚拟机环境中。

如果需要在生产环境中部署,建议将每一个组件替换成高可用的集群部署方案。

其中,我们单独创建了一套 Zookeeper 单节点环境,Flink、Kafka、Canal 等组件共用这个 Zookeeper 环境。

针对于所有需要 Jre 的组件,如 Flink,Kafka,Canal,Zookeeper,考虑到升级 JRE 可能会影响到其他的应用,我们选择每个组件独立使用自己的 JRE 环境。

本文分为两个部分,其中,前七小节主要介绍基础环境的搭建,最后一个小节介绍了数据是如何在各个组件中流通的。

数据的流动经过以下组件:

  • MySQL 数据源生成 Binlog。
  • Canal 读取 Binlog,生成 Canal json,推送到 Kafka 指定的 Topic 中。
  • Flink 使用 flink-sql-connector-kafka API,消费 Kafka Topic 中的数据。
  • Flink 在通过 flink-connector-jdbc,将数据写入到 TiDB 中。

TiDB + Flink 的结构,支持开发与运行多种不同种类的应用程序。

目前主要的特性主要包括:

  • 批流一体化。
  • 精密的状态管理。
  • 事件时间支持。
  • 精确的一次状态一致性保障。

Flink 可以运行在包括 YARN、Mesos、Kubernetes 在内的多种资源管理框架上,还支持裸机集群上独立部署。TiDB 可以部署 AWS、Kubernetes、GCP GKE 上,同时也支持使用 TiUP 在裸机集群上独立部署。

TiDB + Flink 结构常见的几类应用如下:

  • 事件驱动型应用:反欺诈。异常检测。基于规则的报警。业务流程监控。
  • 数据分析应用:网络质量监控。产品更新及试验评估分析。事实数据即席分析。大规模图分析。
  • 数据管道应用:电商实时查询索引构建。电商持续 ETL。

二. 环境介绍

2.1 操作系统环境

[root@r20 topology]# cat /etc/redhat-release CentOS Stream release 8

2.2 软件环境

Item

Version

Download link

TiDB

v4.0.9

1

Kafka

v2.7.0

Flink

v1.12.1

Jre

v1.8.0_281

Zookeeper

v3.6.2

flink-sql-connector-kafka

v1.12.1

flink-connector-jdbc

v1.12.0

MySQL

v8.0.23

Canal

v1.1.4

2.3 机器分配

Hostname

IP

Component

r21

192.168.12.21

TiDB Cluster

r22

192.168.12.22

Kafka

r23

192.168.12.23

Flink

r24

192.168.12.24

Zookeeper

r25

192.168.12.25

MySQL

r26

192.168.12.26

Canal

三. 部署 TiDB Cluster

与传统的单机数据库相比,TiDB 具有以下优势:

  • 纯分布式架构,拥有良好的扩展性,支持弹性的扩缩容。
  • 支持 SQL,对外暴露 MySQL 的网络协议,并兼容大多数 MySQL 的语法,在大多数场景下可以直接替换 MySQL。
  • 默认支持高可用,在少数副本失效的情况下,数据库本身能够自动进行数据修复和故障转移,对业务透明。
  • 支持 ACID 事务,对于一些有强一致需求的场景友好,例如:银行转账。
  • 具有丰富的工具链生态,覆盖数据迁移、同步、备份等多种场景。

在内核设计上,TiDB 分布式数据库将整体架构拆分成了多个模块,各模块之间互相通信,组成完整的 TiDB 系统。对应的架构图如下:

在本文中,我们只做最简单的功能测试,所以部署了一套单节点但副本的 TiDB,涉及到了以下的三个模块:

  • TiDB Server:SQL 层,对外暴露 MySQL 协议的连接 endpoint,负责接受客户端的连接,执行 SQL 解析和优化,最终生成分布式执行计划。
  • PD (Placement Driver) Server:整个 TiDB 集群的元信息管理模块,负责存储每个 TiKV 节点实时的数据分布情况和集群的整体拓扑结构,提供 TiDB Dashboard 管控界面,并为分布式事务分配事务 ID。
  • TiKV Server:负责存储数据,从外部看 TiKV 是一个分布式的提供事务的 Key-Value 存储引擎。

3.1 TiUP 部署模板文件

# # Global variables are applied to all deployments and used as the default value of # # the deployments if a specific deployment value is missing. global: user: "tidb" SSH_port: 22 deploy_dir: "/opt/tidb-c1/" data_dir: "/opt/tidb-c1/data/" # # Monitored variables are applied to all the machines. #monitored: # node_exporter_port: 19100 # blackbox_exporter_port: 39115 # deploy_dir: "/opt/tidb-c3/monitored" # data_dir: "/opt/tidb-c3/data/monitored" # log_dir: "/opt/tidb-c3/log/monitored" # # Server configs are used to specify the runtime configuration of TiDB components. # # All configuration items can be found in TiDB docs: # # - TiDB: # # - TiKV: # # - PD: # # All configuration items use points to represent the hierarchy, e.g: # # read # # # # You can overwrite this configuration via the instance-level `config` field. server_configs: tidb: log.slow-threshold: 300 binlog.enable: false binlog.ignore-error: false : true tikv: : 4 ra: 2 ra: 2 rock: 1 : "16GB" read: 12 read: false read: true ra: 0 pd: : 4 : 2048 : 64 pd_servers: - host: 192.168.12.21 ssh_port: 22 name: "pd-2" client_port: 12379 peer_port: 12380 deploy_dir: "/opt/tidb-c1/pd-12379" data_dir: "/opt/tidb-c1/data/pd-12379" log_dir: "/opt/tidb-c1/log/pd-12379" numa_node: "0" # # The following configs are used to overwrite the `` values. config: : 20 : 200000 tidb_servers: - host: 192.168.12.21 ssh_port: 22 port: 14000 status_port: 12080 deploy_dir: "/opt/tidb-c1/tidb-14000" log_dir: "/opt/tidb-c1/log/tidb-14000" numa_node: "0" # # The following configs are used to overwrite the `` values. config: log.slow-query-file: : true tikv_servers: - host: 192.168.12.21 ssh_port: 22 port: 12160 status_port: 12180 deploy_dir: "/opt/tidb-c1/tikv-12160" data_dir: "/opt/tidb-c1/data/tikv-12160" log_dir: "/opt/tidb-c1/log/tikv-12160" numa_node: "0" # # The following configs are used to overwrite the `` values. config: : 4 #: { zone: "zone1", dc: "dc1", host: "host1" } #monitoring_servers: # - host: 192.168.12.21 # ssh_port: 22 # port: 19090 # deploy_dir: "/opt/tidb-c1/prometheus-19090" # data_dir: "/opt/tidb-c1/data/prometheus-19090" # log_dir: "/opt/tidb-c1/log/prometheus-19090" #grafana_servers: # - host: 192.168.12.21 # port: 13000 # deploy_dir: "/opt/tidb-c1/grafana-13000" #alertmanager_servers: # - host: 192.168.12.21 # ssh_port: 22 # web_port: 19093 # cluster_port: 19094 # deploy_dir: "/opt/tidb-c1/alertmanager-19093" # data_dir: "/opt/tidb-c1/data/alertmanager-19093" # log_dir: "/opt/tidb-c1/log/alertmanager-19093"

3.2 TiDB Cluster 环境

本文重点非部署 TiDB Cluster,作为快速实验环境,只在一台机器上部署单副本的 TiDB Cluster 集群。不需要部署监控环境。

[root@r20 topology]# tiup cluster display tidb-c1-v409 Starting component `cluster`: /root/.tiup/components/cluster display tidb-c1-v409 Cluster type: tidb Cluster name: tidb-c1-v409 Cluster version: v4.0.9 SSH type: builtin Dashboard URL: ID Role Host Ports OS/Arch Status Data Dir Deploy Dir -- ---- ---- ----- ------- ------ -------- ---------- 192.168.12.21:12379 pd 192.168.12.21 12379/12380 linux/x86_64 Up|L|UI /opt/tidb-c1/data/pd-12379 /opt/tidb-c1/pd-12379 192.168.12.21:14000 tidb 192.168.12.21 14000/12080 linux/x86_64 Up - /opt/tidb-c1/tidb-14000 192.168.12.21:12160 tikv 192.168.12.21 12160/12180 linux/x86_64 Up /opt/tidb-c1/data/tikv-12160 /opt/tidb-c1/tikv-12160 Total nodes: 4

创建用于测试的表

mysql> show create table t1; +-------+-------------------------------------------------------------------------------------------------------------------------------+ | Table | Create Table | +-------+-------------------------------------------------------------------------------------------------------------------------------+ | t1 | CREATE TABLE `t1` ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin | +-------+-------------------------------------------------------------------------------------------------------------------------------+ 1 row in set sec)

四. 部署 Zookeeper 环境

在本实验中单独配置 Zookeeper 环境,为 Kafka 和 Flink 环境提供服务。

作为实验演示方案,只部署单机环境。

4.1 解压 Zookeeper 包

[root@r24 soft]# tar vxzf a [root@r24 soft]# mv a /opt/zookeeper

4.2 部署用于 Zookeeper 的 jre

[root@r24 soft]# tar vxzf jre1.8.0_281. [root@r24 soft]# mv jre1.8.0_281 /opt/zookeeper/jre

修改 /opt/zookeeper/bin 文件,增加 JAVA_HOME 环境变量

## add bellowing env var in the head of zkEnv.sh JAVA_HOME=/opt/zookeeper/jre

4.3 创建 Zookeeper 的配置文件

[root@r24 conf]# cat zoo.cfg | grep -v "#" tickTime=2000 initLimit=10 syncLimit=5 dataDir=/opt/zookeeper/data clientPort=2181

4.4 启动 Zookeeper

[root@r24 bin]# /opt/zookeeper/bin start

4.5 检查 Zookeeper 的状态

## check zk status [root@r24 bin]# . status ZooKeeper JMX enabled by default Using config: /opt/zookeeper/bin/../con Client port found: 2181. Client address: localhost. Client SSL: false. Mode: standalone ## check OS port status [root@r24 bin]# netstat -ntlp Active Internet connections (only servers) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 942/sshd tcp6 0 0 :::2181 :::* LISTEN 15062/java tcp6 0 0 :::8080 :::* LISTEN 15062/java tcp6 0 0 :::22 :::* LISTEN 942/sshd tcp6 0 0 :::44505 :::* LISTEN 15062/java ## use zkCli tool to check zk connection [root@r24 bin]# . -server 192.168.12.24:2181

4.6 关于 Zookeeper 的建议

我个人有一个关于 Zookeeper 的不成熟的小建议:

Zookeeper 集群版本一定要开启网络监控。特别是要关注 system metrics 里面的 network bandwidth。

五. 部署 Kafka

Kafka 是一个分布式流处理平台,主要应用于两大类的应用中:

  • 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。 (相当于message queue)
  • 构建实时流式应用程序,对这些流数据进行转换或者影响。 (就是流处理,通过kafka stream topic和topic之间内部进行变化)

Kafka 有四个核心的 API:

  • The Producer API 允许一个应用程序发布一串流式的数据到一个或者多个Kafka topic。
  • The Consumer API 允许一个应用程序订阅一个或多个 topic ,并且对发布给他们的流式数据进行处理。
  • The Streams API 允许一个应用程序作为一个流处理器,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换。
  • The Connector API 允许构建并运行可重用的生产者或者消费者,将Kafka topics连接到已存在的应用程序或者数据系统。比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容。

在本实验中只做功能性验证,只搭建一个单机版的 Kafka 环境。

5.1 下载并解压 Kafka

[root@r22 soft]# tar vxzf ka [root@r22 soft]# mv ka /opt/kafka

5.2 部署用于 Kafka 的 jre

[root@r22 soft]# tar vxzf jre1.8.0_281. [root@r22 soft]# mv jre1.8.0_281 /opt/kafka/jre

修改 Kafka 的 jre 环境变量

[root@r22 bin]# vim /opt/kafka/bin ## add bellowing line in the head of ka JAVA_HOME=/opt/kafka/jre

5.3 修改 Kafka 配置文件

修改 Kafka 配置文件 /opt/kafka/config

## change bellowing variable in /opt/kafka/config broker.id=0 listeners=PLAINTEXT://192.168.12.22:9092 log.dirs=/opt/kafka/logs zookeeper.connect=i192.168.12.24:2181

5.4 启动 Kafka

[root@r22 bin]# /opt/kafka/bin /opt/kafka/config

5.5 查看 Kafka 的版本信息

Kafka 并没有提供 --version 的 optional 来查看 Kafka 的版本信息。

[root@r22 ~]# ll /opt/kafka/libs/ | grep kafka -rw-r--r-- 1 root root 4929521 Dec 16 09:02 ka.jar -rw-r--r-- 1 root root 821 Dec 16 09:03 ka.jar.asc -rw-r--r-- 1 root root 41793 Dec 16 09:02 ka-javadoc.jar -rw-r--r-- 1 root root 821 Dec 16 09:03 ka-javadoc.jar.asc -rw-r--r-- 1 root root 892036 Dec 16 09:02 ka-sources.jar -rw-r--r-- 1 root root 821 Dec 16 09:03 ka-sources.jar.asc ... ...

其中 2.13 是 scale 的版本信息,2.7.0 是 Kafka 的版本信息。

六. 部署 Flink

Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

支持高吞吐、低延迟、高性能的分布式处理框架 Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。

本实验只做功能性测试,仅部署单机 Flink 环境。

6.1 下载并分发 Flink

[root@r23 soft]# tar vxzf [root@r23 soft]# mv /opt/flink

6.2 部署 Flink 的 jre

[root@r23 soft]# tar vxzf jre1.8.0_281. [root@r23 soft]# mv jre1.8.0_281 /opt/flink/jre

6.3 添加 Flink 需要的 lib

Flink 消费 Kafka 数据,需要 flink-sql-connector-kafka 包。

Flink 链接 MySQL/TiDB,需要 flink-connector-jdbc 包。

[root@r23 soft]# mv /opt/flink/lib/ [root@r23 soft]# mv /opt/flink/lib/

6.4 修改 Flink 配置文件

## add or modify bellowing lines in /opt/flink/con jobmanager.r: 192.168.12.23 env.java.home: /opt/flink/jre

6.5 启动 Flink

[root@r23 ~]# /opt/flink/bin Starting cluster. Starting standalonesession daemon on host r23. Starting taskexecutor daemon on host r23.

6.6 查看 Flink GUI

七. 部署 MySQL

7.1 解压 MySQL package

[root@r25 soft]# tar vxf my [root@r25 soft]# mv my /opt/mysql/

7.2 创建 MySQL Service 文件

[root@r25 ~]# touch /opt/mysql/support-file [root@r25 support-files]# cat my [Unit] Description=MySQL 8.0 database server After= After=ne [Service] Type=simple User=mysql Group=mysql #ExecStartPre=/usr/libexec/mysql-check-socket #ExecStartPre=/usr/libexec/mysql-prepare-db-dir %n # Note: we set --basedir to prevent probes that might trigger SELinux alarms, # per bug #547485 ExecStart=/opt/mysql/bin/mysqld_safe #ExecStartPost=/opt/mysql/bin/mysql-check-upgrade #ExecStopPost=/opt/mysql/bin/mysql-wait-stop # Give a reasonable amount of time for the server to start up/shut down TimeoutSec=300 # Place temp files in a secure directory, not /tmp PrivateTmp=true Restart=on-failure RestartPreventExitStatus=1 # Sets open_files_limit LimitNOFILE = 10000 # Set enviroment variable MYSQLD_PARENT_PID. This is required for SQL restart command. Environment=MYSQLD_PARENT_PID=1 [Install] WantedBy=mul ## copy my to /usr/lib/systemd/system/ [root@r25 support-files]# cp my /usr/lib/systemd/system/

7.3 创建 my.cnf 文件

[root@r34 opt]# cat /etc [mysqld] port=3306 basedir=/opt/mysql datadir=/opt/mysql/data socket=/opt/mysql/data max_connections = 100 default-storage-engine = InnoDB character-set-server=utf8 log-error = /opt/mysql/log slow_query_log = 1 long-query-time = 30 slow_query_log_file = /opt/mysql/log min_examined_row_limit = 1000 log-slow-slave-statements log-queries-not-using-indexes #skip-grant-tables

7.4 初始化并启动 MySQL

[root@r25 ~]# /opt/mysql/bin/mysqld --initialize --user=mysql --console [root@r25 ~]# chown -R mysql:mysql /opt/mysql [root@r25 ~]# systemctl start mysqld ## check mysql temp passord from /opt/mysql/log 2021-02-24T02:45:47.316406Z 6 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: I?nDjijxa3>-

7.5 创建一个新的 MySQL 用户用以连接 Canal

## change mysql temp password firstly mysql> alter user 'root'@'localhost' identified by 'mysql'; Query OK, 0 rows affected sec) ## create a management user 'root'@'%' mysql> create user 'root'@'%' identified by 'mysql'; Query OK, 0 rows affected sec) mysql> grant all privileges on *.* to 'root'@'%'; Query OK, 0 rows affected sec) ## create a canal replication user 'canal'@'%' mysql> create user 'canal'@'%' identified by 'canal'; Query OK, 0 rows affected sec) mysql> grant select, replication slave, replication client on *.* to 'canal'@'%'; Query OK, 0 rows affected sec) mysql> flush privileges; Query OK, 0 rows affected sec)

7.6 在 MySQL 中创建用于测试的表

mysql> show create table ; +-------+----------------------------------------------------------------------------------+ | Table | Create Table | +-------+----------------------------------------------------------------------------------+ | t2 | CREATE TABLE `t2` ( `id` int DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8 | +-------+----------------------------------------------------------------------------------+ 1 row in set sec)

八. 部署 Canal

Canal 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。

从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括:

  • 数据库镜像。
  • 数据库实时备份。
  • 索引构建和实时维护(拆分异构索引、倒排索引等)。
  • 业务 cache 刷新。
  • 带业务逻辑的增量数据处理。

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

8.1 解压 Canal 包

[root@r26 soft]# mkdir /opt/canal && tar vxzf canal.deployer-1.1.4. -C /opt/canal

8.2 部署 Canal 的 jre

[root@r26 soft]# tar vxzf jre1.8.0_281. [root@r26 soft]# mv jre1.8.0_281 /opt/canal/jre ## configue jre, add bellowing line in the head of /opt/canal/bin JAVA=/opt/canal/jre/bin/java

8.3 修改 Canal 的配置文件

修改 /opt/canal/con 配置文件

## modify bellowing configuration canal.zkServers =192.168.12.24:2181 canal.serverMode = kafka canal.destinations = example ## 需要在 /opt/canal/conf 目录下创建一个 example 文件夹,用于存放 destination 的配置 canal.mq.servers = 192.168.12.22:9092

修改 /opt/canal/conf/example 配置文件

## modify bellowing configuration canal.instance.master.address=192.168.12.25:3306 canal.in canal.in canal.in.*\\..* ## 过滤数据库的表 canal.mq.topic=canal-kafka

九. 配置数据流向

9.1 MySQL Binlog -> Canal -> Kafka 通路

9.1.1 查看 MySQL Binlog 信息

查看 MySQL Binlog 信息,确保 Binlog 是正常的。

mysql> show master status; +---------------+----------+--------------+------------------+-------------------+ | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set | +---------------+----------+--------------+------------------+-------------------+ | binlog.000001 | 2888 | | | | +---------------+----------+--------------+------------------+-------------------+ 1 row in set sec)

9.1.2 在 Kafka 中创建一个 Topic

在 Kafka 中创建一个 Topic canal-kafka,这个Topic 的名字要与 Canal 配置文件 /opt/canal/conf/example 中的 canal.mq.topic=canal-kafka 对应:

[root@r22 kafka]# /opt/kafka/bin --create \ > --zookeeper 192.168.12.24:2181 \ > --config max.me \ > --config \ > --replication-factor 1 \ > --partitions 1 \ > --topic canal-kafka Created topic canal-kafka. [2021-02-24 01:51:55,050] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(canal-kafka-0) ) [2021-02-24 01:51:55,052] INFO [Log partition=canal-kafka-0, dir=/opt/kafka/logs] Loading producer state till offset 0 with message format version 2 ) [2021-02-24 01:51:55,053] INFO Created log for partition canal-kafka-0 in /opt/kafka/logs/canal-kafka-0 with properties {com -> producer, me -> true, min.in -> 1, -> 0, cleanup.policy -> [delete], -> 9223372036854775807, -> 1073741824, re -> 604800000, -> 1, me -> 2.7-IV2, -> 60000, max.com -> 9223372036854775807, max.me -> 12800000, min.com -> 0, me -> CreateTime, preallocate -> false, min.cleanable.dir -> 0.5, index.in -> 4096, unclean.leader.elec -> false, re -> -1, delete.re -> 86400000, -> 604800000, me -> 9223372036854775807, -> 10485760}. Manager) [2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] No checkpointed highwatermark is found for partition canal-kafka-0 ) [2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] Log loaded for partition canal-kafka-0 with initial high watermark 0 )

查看 Kafka 中所有的 Topic:

[root@r22 kafka]# /opt/kafka/bin --list --zookeeper 192.168.12.24:2181 __consumer_offsets canal-kafka ticdc-test

查看 Kafka 中 Topic ticdc-test 的信息:

[root@r22 ~]# /opt/kafka/bin --describe --zookeeper 192.168.12.24:2181 --topic canal-kafka Topic: ticdc-test PartitionCount: 1 ReplicationFactor: 1 Configs: max.me, Topic: ticdc-test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

9.1.3 启动 Canal

在启动 Canal 之前,需要在 Canal 节点上查看一下端口的情况:

## check MySQL 3306 port ## canal.instance.master.address=192.168.12.25:3306 [root@r26 bin]# telnet 192.168.12.25 3306 ## check Kafka 9092 port ## canal.mq.servers = 192.168.12.22:9092 [root@r26 bin]# telnet 192.168.12.22 9092 ## check zookeeper 2181 port ## canal.zkServers = 192.168.12.24:2181 [root@r26 bin]# telnet 192.168.12.24 2181

启动 Canal:

[root@r26 bin]# /opt/canal/bin cd to /opt/canal/bin for workaround relative path LOG CONFIGURATION : /opt/canal/bin/../con canal conf : /opt/canal/bin/../con CLASSPATH :/opt/canal/bin/../conf:/opt/canal/bin/../lib: cd to /opt/canal/bin for continue

9.1.4 查看 Canal 日志

查看 /opt/canal/logs/example

2021-02-24 01:41:40.293 [destination = example , address = , EventParser] WARN c.a.o.c.p.inbound.my - ---> begin to find start position, it will be long time for reset or first position 2021-02-24 01:41:40.293 [destination = example , address = , EventParser] WARN c.a.o.c.p.inbound.my - prepare to find start position just show master status 2021-02-24 01:41:40.542 [destination = example , address = , EventParser] WARN c.a.o.c.p.inbound.my - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000001,position=4,serverId=1,gtid=<null>,timestamp=1614134832000] cost : 244ms , the next step is binlog dump

9.1.5 查看 Kafka 中 consumer 信息

在 MySQL 中插入一条测试信息:

mysql> insert into t2 values(1); Query OK, 1 row affected sec)

查看 consumer 的信息,已经有了刚才插入的测试数据:

/opt/kafka/bin --bootstrap-server 192.168.12.22:9092 --topic canal-kafka --from-beginning {"data":null,"database":"test","es":1614151725000,"id":2,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"create database test","sqlType":null,"table":"","ts":1614151725890,"type":"QUERY"} {"data":null,"database":"test","es":1614151746000,"id":3,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create table t2(id int)","sqlType":null,"table":"t2","ts":1614151746141,"type":"CREATE"} {"data":[{"id":"1"}],"database":"test","es":1614151941000,"id":4,"isDdl":false,"mysqlType":{"id":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4},"table":"t2","ts":1614151941235,"type":"INSERT"}

9.2 Kafka -> Flink 通路

在 Flink 中创建 t2 表,connector 类型为 kafka。

## create a test table t2 in Flink Flink SQL> create table t2(id int) > WITH ( > 'connector' = 'kafka', > 'topic' = 'canal-kafka', > '; = '192.168.12.22:9092', > '; = 'canal-kafka-consumer-group', > 'format' = 'canal-json', > '; = 'latest-offset' > ); Flink SQL> select * from t1;

在 MySQL 中在插入一条测试数据:

mysql> insert into values(2); Query OK, 1 row affected sec)

从 Flink 中可以实时同步数据:

Flink SQL> select * from t1; Refresh: 1 s Page: Last of 1 Updated: 02:49:27.366 id 2

9.3 Flink -> TiDB 通路

9.3.1 在 下游的 TiDB 中创建用于测试的表

[root@r20 soft]# mysql -uroot -P14000 -hr21 mysql> create table t3 (id int); Query OK, 0 rows affected sec)

9.3.2 在 Flink 中创建测试表

Flink SQL> CREATE TABLE t3 ( > id int > ) with ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://192.168.12.21:14000/test', > 'table-name' = 't3', > 'username' = 'root', > 'password' = 'mysql' > ); Flink SQL> insert into t3 values(3); [INFO] Submitting SQL update statement to the cluster... [INFO] Table update statement has been successfully submitted to the cluster: Job ID: a0827487030db177ee7e5c8575ef714e

9.3.3 在下游 TiDB 中查看插入的数据

mysql> select * from ; +------+ | id | +------+ | 3 | +------+ 1 row in set sec)

本文为阿里云原创内容,未经允许不得转载。

1.《678改动日志专题之Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB》援引自互联网,旨在传递更多网络信息知识,仅代表作者本人观点,与本网站无关,侵删请联系页脚下方联系方式。

2.《678改动日志专题之Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB》仅供读者参考,本网站未对该内容进行证实,对其原创性、真实性、完整性、及时性不作任何保证。

3.文章转载时请保留本站内容来源地址,https://www.lu-xu.com/gl/2075401.html

上一篇

dnf散打加点专题之90版本DNF武极攻略分享 解读90版本男散打加点配装

下一篇

dnf红眼刷图加点看这里!DNF:红眼100级的2种技能加点推荐,狂战的信仰小蹦你会点满吗?

678改动日志看这里!Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB

678改动日志看这里!Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB

678改动日志相关介绍,一. 背景介绍 本文将介绍如何将 MySQL 中的数据,通过 Binlog + Canal 的形式导入到 Kafka 中,继而被 Flink 消费的案例。 达到当天最大量API KEY 超过次数限制 ...

关于678改动日志我想说第84章 春风行动

关于678改动日志我想说第84章 春风行动

678改动日志相关介绍,“谢谢大家热烈的掌声,今天面对各位前辈和老师,雷倩倩的身份首先是一名学生,请允许我向各位尊敬的师长致敬!” 雷倩倩起立,她走到讲台中央,向在座的广大教育工作者,深深鞠躬! “正因为我首先是一名学生,所...

678改动日志看这里!记一次 .NET 某纺织工厂 MES系统 API 挂死分析

678改动日志看这里!记一次 .NET 某纺织工厂 MES系统 API 挂死分析

678改动日志相关介绍,一、背景 1.讲故事 这个月中旬,一个朋友向我求助wx的程序线程份额很高,探讨了如何解决。屏幕截图如下: 说实话,和其他行业的程序员聊天还是很有趣的,可以交朋友,扩大自己的圈子。朋友说,因为这个bug...

【678改动日志】Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB

【678改动日志】Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB

678改动日志相关介绍,一.背景介绍 本文介绍了将MySQL中的数据通过Binlog Canal导入Kafka,然后由Flink消耗的情况。 为了能够快速的验证整套流程的功能性,所有的组件都以单机的形式部署。如果手上的物理资...

【678改动日志】专题形象一落千丈的徐睿知 韩国演艺圈历代劲爆 争议连环爆

【678改动日志】专题形象一落千丈的徐睿知 韩国演艺圈历代劲爆 争议连环爆

678改动日志相关介绍,安静的外表,声音低,有磁性的她有望成为一线女演员。 她就是最近争议连环爆的徐睿知。1990年4月6日出生于首尔过去曾在街上被星探看中。但因为觉得自己声音低沉比较适合当主播,因此拒绝了试镜邀请,之后她到...

【678改动日志】2月8日·贵州要闻及抗击肺炎快报

【678改动日志】2月8日·贵州要闻及抗击肺炎快报

678改动日志相关介绍,每天广播 贵州:2020年2月7日12-24时,全省新型新冠病毒感染肺炎新增确诊病例8例,出院患者0例。 其中:新增确诊病例中,遵义市3例,毕节市4例,黔西南州1例。 截至2月7日24时,全省累计报告...

【678改动日志】专题网络流量与Agent_Drable恶意程序深度分析

【678改动日志】专题网络流量与Agent_Drable恶意程序深度分析

678改动日志相关介绍,在回顾最近的一些网络异常时,发现了使用DNS隧道与C2通信的攻击组织,并将其命名为“Cold River”。破译受害者和C2的流量,发现攻击者使用的复杂诱饵文件,与其他未知样本连接,并发现攻击者使用的...

678改动日志专题之必收藏 | 宽带错误码含义及处理方法大全

678改动日志专题之必收藏 | 宽带错误码含义及处理方法大全

678改动日志相关介绍,文章有点长。但是很耐用! 更多宽带信息,微信搜索“宽带军”,宽带军洗干净了等着你! 最近发现很多小朋友给我留言 不是为了找对象 在找对象的路上。 已经有错误了。 . . . 宽带错误 今天宽带军给大家...