您的位置:首页 > 其它

Canal 与 Kafka 集成安装与配置

2020-06-08 04:34 148 查看

Canal 与 Kafka 集成安装与配置

vim 编辑中 >>> 后为列出原内容其后紧接的 <<< 行为对其的更改,没有前置符号的表示新添

主机环境:
CentOS 7.6 内存至少 1.5G,否则服务会启动不起来

软件版本
MySQL 5.7.28
OpenJDK 8
Zookeeper 3.5.6-bin
Kafka 2.12(Scala)-2.3.0
Canal deployer-1.1.4**

一、MySQL 安装
采用从官方源直接安装的方式

1.添加 MySQL 5.7 官方源

rpm -ivh https://dev.mysql.com/get/mysql57-community-release-el7-11.noarch.rpm

2.更新源

yum -y update

3.安装 MySQL 5.7

yum -y install mysql-community-server

4.添加 Canal 所需 MySQL 配置
vim /etc/my.cnf

[mysqld]      #在此处下方新添配置
log-bin=mysql-bin 	# 开启 binlog
binlog-format=ROW 	# 选择 ROW 模式
server_id=1 	# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

5.启动

MySQL systemctl start mysqld

查看状态:

systemctl status mysqld

6.查看生成的临时密码

cat /var/log/mysqld.log | grep 'temporary password'

初次配置

mysql_secure_installation
按照提示完成配置

7.测试中允许 root 远程连接并添加 Canal 所需的用户, mysql -uroot -p 后执行 SQL

use mysql;
UPDATE user SET Host = '%' WHERE user = 'root';
CREATE user canal IDENTIFIED BY 'Canal123!';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
EXIT

二、Java 安装
采用从官方源直接安装的方式

1.安装源中的 OpenJDK 8

yum install -y java-1.8.0-openjdk.x86_64

验证安装:

java -version

三、Zookeeper 安装

https://zookeeper.apache.org/releases.html
中查找 zookeeper 最新 bin 版本压缩包的 HTTP 方式的下载链接
下载压缩包 :

wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.5.6/apache-zookeeper-3.5.6-bin.tar.gz

1.解压缩 :

mkdir -p tmp/zookeeper/ && tar zxvf apache-zookeeper-3.5.6-bin.tar.gz -C tmp/zookeeper/

将解压缩后的软件移动至 /usr/local 下

mkdir /usr/local/zookeeper && mv tmp/zookeeper/apache-zookeeper-3.5.6-bin/* /usr/local/zookeeper/

2.添加 zookeeper 环境变量
vim /etc/profile 末尾添加

#zookeeper env
export ZOOKEEPER_HOME=/usr/local/zookeeper/
export PATH=$PATH:$ZOOKEEPER_HOME/bin

使环境变量生效 :

source /etc/profile

3.创建配置文件,从默认的配置文件创建,因为 zookeeper 启动时会去找 conf/zoo.cfg 作为配置文件

cd /usr/local/zookeeper/
cp conf/zoo_sample.cfg conf/zoo.cfg
mkdir data

4.编辑配置 vim conf/zoo.cfg,此处配置的是单机,如果需要配置集群也是在这里
#dataDir=/tmp/zookeeper #这是默认的配置
dataDir=/usr/local/zookeeper/data # 这是 zookeeper 的数据目录
admin.serverPort=2191
启动 :

zookeeper zkServer.sh start

查看状态 :
zkServer.sh status

四、Kafka 安装
1.在

https://kafka.apache.org/downloads
中查找 kafka 最新版本的 HTTP 方式下载链接
下载压缩包 :

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz

2.解压缩:

mkdir tmp/kafka && tar zxvf kafka_2.12-2.3.0.tgz -C tmp/kafka

将解压缩后的软件移动至 /usr/local 下:

mkdir /usr/local/kafka && mv tmp/kafka/kafka_2.12-2.3.0/* /usr/local/kafka && cd /usr/local/kafka

3.添加 kafka 环境变量 vim /etc/profile 末尾添加

#Set Kafka env
export KAFKA_HOME=/usr/local/kafka/
export PATH=$PATH:$KAFKA_HOME/bin

使环境变量生效:

source /etc/profile

4.修改配置文件 vim config/server.properties,配置集群的话还需要更改 broker.id

zookeeper.connect=localhost:2181 # 这里需要配置成 zookeeper 的地址,这里默认就是正确的
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://:9092 	# 删除前面的注释符号

5.启动 Kafka 守护进程:

kafka-server-start.sh -daemon config/server.properties &

6.测试创建一个 topic:

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

7.列出 topic :

kafka-topics.sh --list --zookeeper localhost:2181

五、Canal 安装

https://github.com/alibaba/canal/releases
中查找 canal 最新deploy 版本的下载链接
1.下载压缩包:

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

2.解压

mkdir tmp/canal && tar zxvf canal.deployer-1.1.4.tar.gz -C tmp/canal/

将解压后的文件复制进 /usr/local:

mkdir /usr/local/canal && mv tmp/canal/* /usr/local/canal/

3.创建测试配置直接使用自带的样例进行更改,并复制出默认配置以备后期增加库时使用,这里的每一个文件夹就代表一个数据源,多个数据源就复制出多个文件夹后在 canal.properties 的 canal.destinations 里逗号分隔进行配置:

mv conf/example/ conf/test && cp conf/test/instance.properties instance.properties.bak

4.修改单个数据源的配置
vim conf/test/instance.properties

canal.instance.master.address=127.0.0.1:3306 	# 数据库服务器地址
canal.instance.dbUsername=canal 	# 数据库用户名
canal.instance.dbPassword=Canal123! 	# 数据库密码
canal.instance.filter.regex=.*\\..* 	# 数据表过滤正则,dbName.tbName,默认的是所有库的所有表
canal.mq.topic=test 		# 这个数据库存储进 Kafka 时使用的 topic

修改 Canal 全局设置 vim conf/canal.properties

canal.destinations = test 	# 这里配置开启的 instance,具体方法上面步骤有说明
canal.serverMode = kafka 	# 更改模式,直接把数据扔进 Kafka
#canal.mq.servers = 127.0.0.1:6667
canal.mq.servers = 127.0.0.1:9092 	# Kafka 的地址
canal.mq.batchSize = 16384 # 这里没有更改,值应该小于 Kafka 的 config/producer.properties 中 batch.size,但是 Kafka 里没设置,这里也就不更改了
#canal.mq.flatMessage = false
canal.mq.flatMessage = true 	# 使用文本格式(JSON)进行传输,否则 Kafka 里扔进去的是二进制数据,虽然不影响,但是看起来不方便

配置文件详解:
参数名字 参数说明 默认值
canal.destinations 当前server上部署的instance列表
canal.id 每个canal server实例的唯一标识,暂无实际意义 1
canal.ip canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务 无
canal.port canal server提供socket服务的端口 11111
canal.zkServers canal server链接zookeeper集群的链接信息
例子:127.0.0.1:2181,127.0.0.1:2182 无

canal.zookeeper.flush.period canal持久化数据到zookeeper上的更新频率,单位毫秒 1000
canal.file.data.dir canal持久化数据到file上的目录 …/conf (默认和instance.properties为同一目录,方便运维和备份)
canal.file.flush.period canal持久化数据到file上的更新频率,单位毫秒 1000
canal.instance.mysql.slaveId mysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一 1234
canal.instance.master.address mysql主库链接地址 127.0.0.1:3306
canal.instance.master.journal.name mysql主库链接时起始的binlog文件 无
canal.instance.master.position mysql主库链接时起始的binlog偏移量 无
canal.instance.master.timestamp mysql主库链接时起始的binlog的时间戳 无
canal.instance.dbUsername mysql数据库帐号 canal
canal.instance.dbPassword mysql数据库密码 canal
canal.instance.defaultDatabaseName mysql链接时默认schema
canal.instance.connectionCharset mysql 数据解析编码 UTF-8
canal.instance.filter.regex mysql 数据解析关注的表,Perl正则表达式.

多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打头的表:canal\\.canal.*
4.  canal schema下的一张表:canal.test1
5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)	.*\\..*

开启 Canal :
bin/startup.sh #因为 Canal 的脚本名称都太普通,所以没有添加到 PATH 里
查看日志是否有异常:

vim logs/canal/canal.log vim logs/test/test.log

测试,连上数据库尝试执行更改 SQL

CREATE DATABASE IF NOT EXISTS test;
USE test;
CREATE TABLE test_tb(
c1 INT COMMENT "中文测试",
c2 VARCHAR(36)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

查看 Kafka 中的数据,列出所有 topic:
kafka-topics.sh --list --zookeeper localhost:2181
消费其中的数据:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
此时应该已经可以列出创建表时的语句
后期 Bug 修复:
消费端 Kafka 不进数据,Canal 日志报错 org.apache.kafka.common.errors.RecordTooLargeException,认为是 Kafka 消息体大小限制造成的,需要同时修改 Kafka 与 Canal 消息体的最大限制
修改 Kafka 配置,server.properties 中修改或添加配置项 message.max.bytes=100000000,producer.properties 中修改或添加配置项 max.request.size=100000000,consumer.properties 中修改或添加配置项 max.partition.fetch.bytes=100000000,重启 Kafka
修改 Canal 配置,canal.properties 修改 canal.mq.maxRequestSize 参数值为 90000000,重启 Canal
查看 Canal 日志是否报错 Could not find first log file name in binary log index file at… 如果报错则停止 Canal ,再删除实例配置下的 meta.dat 文件,再启动 Canal 即可

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: