欢迎访问我的博客,你的支持,是我最大的动力!

MySQL binlog解析canal + kafka实践|ZooKeeper集群部署

Mysql-笔记 马从东 5759℃ 评论
目录:
[显示]

这是针对canal kafka quickstart的实践:https://github.com/alibaba/canal/wiki/Canal-Kafka-QuickStart

系统环境
系统:CentOS Linux release 7.3.1611 (Core)  3.10.0-514.el7.x86_64
JDK:jdk-8u161-linux-x64.tar.gz
zookeeper:zookeeper-3.4.13.tar.gz
kafka:kafka_2.11-2.0.0.tgz
canal.kafka:canal.kafka-1.1.0.tar.gz
MySQL:5.7.22-log
部署java环境

jdk安装目录:/opt/java_maven_env/jdk1.8.0_161

在/etc/profile中末尾添加以下内容

JAVA_HOME=/opt/java_maven_env/jdk1.8.0_161
JRE_HOME=/opt/java_maven_env/jdk1.8.0_161/jre
CLASS_PATH=.:$JAVA_HOME/lib:$JRE_HOME/lib
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export JAVA_HOME JRE_HOME CLASS_PATH PATH

然后执行source命令,使配置生效

source /etc/profile

验证

java -version
java version "1.8.0_161"
Java(TM) SE Runtime Environment (build 1.8.0_161-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.161-b12, mixed mode)
部署zookeeper

官网:http://zookeeper.apache.org/

下载:https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz

yum install wget
cd /opt
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz
tar zxf zookeeper-3.4.13.tar.gz
chown -R root:root zookeeper-3.4.13
cd zookeeper-3.4.13

#添加环境变量 编辑/etc/profile文件 在末尾添加

# ZooKeeper Env
export ZOOKEEPER_HOME=/opt/zookeeper-3.4.13
export PATH=$PATH:$ZOOKEEPER_HOME/bin

然后执行source命令,使配置生效

source /etc/profile

编辑配置文件

cp $ZOOKEEPER_HOME/conf/zoo_sample.cfg $ZOOKEEPER_HOME/conf/zoo.cfg
#创建数据目录和日志目录
mkdir -p /opt/zookeeper-3.4.13/data
mkdir -p /opt/zookeeper-3.4.13/logs
##单机模式 修改zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper-3.4.13/data
dataLogDir=/opt/zookeeper-3.4.13/logs
clientPort=2181

配置说明:
tickTime,ZK服务器之间维持心跳的时间间隔,单位ms
dataDir,ZK保存数据的目录,默认日志文件也保存到这个目录。第一次启动时,此目录应预先清空,以避免脏数据影响服务运行
clientPort,与客户端通信的端口
initLimit,在集群中,若此节点是Leader节点,在等待10个心跳时间(tickTime)后,还没有收到新加入Follower节点心跳包,则认为这个Follower节点连接失败
syncLimit,Leader与Follower之间发送消息,请求和应答时间长度最长不能超过5倍心跳时间

配置zookeeper多节点(高可用集群)

增加server.x配置项
x是一个数字,其值需要与各节点myid文件内容相同,表示第几号服务器
server.x=xxx.xxx.xxx.xxx:2888:3888
第一部分是服务器节点IP地址
第二部分是该节点与集群中Leader节点交换信息的端口,通常是2888
第三部分是若集群中Leader挂了,执行选举时节点相互通信的端口,通常是3888
myid在各节点都不相同,其内容只有一个数字

配置文件zoo.cfg末尾添加
server.1=192.168.10.110:2888:3888
server.2=192.168.10.120:2888:3888
server.3=192.168.10.130:2888:3888
在每个zookeeper实例分别配置myid
#master
echo "1">/opt/zookeeper-3.4.13/data/myid
#slave1
echo "2">/opt/zookeeper-3.4.13/data/myid
#slave2
echo "3">/opt/zookeeper-3.4.13/data/myid

zookeeper集群的测试

在任意节占,执行
./zkCli.sh
# ./zkCli.sh -server xxx.xxx.xxx.xxx:2181
ls /  #查看结构
create /test abc
get /test
set /test "newData"
get /test
delete /test

启动zookeeper服务

cd /opt/zookeeper-3.4.13/bin/
./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
Mode: standalone

验证zookeeper服务

yum install telnet
telnet 127.0.0.1 2181
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
stat
Zookeeper version: 3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 04:05 GMT
Clients:
/127.0.0.1:53726[0](queued=0,recved=1,sent=0)

Latency min/avg/max: 0/0/0
Received: 2
Sent: 1
Connections: 1
Outstanding: 0
Zxid: 0x0
Mode: standalone
Node count: 4
Connection closed by foreign host.

停止zookeeper服务

cd /opt/zookeeper-3.4.13/bin/
./zkServer.sh stop
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
 ./zkServer.sh [start|start-foreground|stop|restart|status|upgrade|print-cmd] #命令列表
部署zkui

zkui是zookeeper的一个web界面的管理控制面板,可以查看和修改zookeeper中数据

项目地址:https://github.com/DeemOpen/zkui

源码编译需要maven

yum install unzip
cd /opt
wget https://github.com/DeemOpen/zkui/archive/master.zip
unzip master.zip
cd zkui-master/
mvn clean install
#会生成/opt/zkui-master/target/zkui-2.0-SNAPSHOT-jar-with-dependencies.jar 这就是我们需要的目标文件
mkdir -p /opt/zkui
cp /opt/zkui-master/target/zkui-2.0-SNAPSHOT-jar-with-dependencies.jar /opt/zkui
cp /opt/zkui-master/config.cfg /opt/zkui
#修改配置文件 不用修改即可使用
vi /opt/zkui/config.cfg
serverPort=9090 #http端口
zkServer=192.168.10.110:2181  #zookeeper的地址

运行zkui

cd /opt/zkui
nohup java -jar zkui-2.0-SNAPSHOT-jar-with-dependencies.jar &

访问:http://192.168.10.110:9090/login
默认帐号:admin/manager appconfig/appconfig

部署kafka

官网:http://kafka.apache.org/

下载:https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz

部署
cd /opt
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
tar zxf kafka_2.11-2.0.0.tgz
cd kafka_2.11-2.0.0
##修改配置文件
mkdir -p /opt/kafka_2.11-2.0.0/logs
vi config/server.properties
#31行 listeners=PLAINTEXT://:9092
#36行 advertised.listeners=PLAINTEXT://192.168.10.110:9092
#60行 log.dirs=/opt/kafka_2.11-2.0.0/logs
#123行 zookeeper.connect=localhost:2181

启动kafka

bin/kafka-server-start.sh -daemon config/server.properties &

查看所有topic

bin/kafka-topics.sh --list --zookeeper 192.168.10.110:2181

查看指定topic下数据

bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.110:9092 --from-beginning --topic example  #从开头开始消费

bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.110:9092 --topic example  #实时消费

查看消息日志内容

bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /opt/kafka_2.11-2.0.0/logs/example-0/00000000000000000000.log --print-data-log

kafkainfo工具

项目地址:https://github.com/wupeaking/kafkainfo

下载:https://github.com/wupeaking/kafkainfo/releases/download/v0.0.1/kafkainfo-linux64

kafkainfo是一个简单的调试kafka的命令行工具

kafkainfo-linux64 [global options] command [command options] [arguments...]
命令列表:
topics ,kafka的topic信息
partitions ,kafka的指定topic的分区
brokers ,kafka的指定所有brokers
produce ,向kafka生成若干消息
consum ,消费kafka消息
uishow ,在shell上图形化显示kafka概述信息
help, h ,Shows a list of commands or help for one command
常用命令:
#列出所有主题
./kafkainfo-linux64 topics --addr 192.168.10.110:9092 list
#列出某个分区的leader信息
./kafkainfo-linux64 topics --addr 192.168.10.110:9092 -t example -p 0 leader
#列出某个topic所有分区
./kafkainfo-linux64 partitions --addr 192.168.10.110:9092 -t example
#列出kafka所有集群
./kafkainfo-linux64 brokers --addr 192.168.10.110:9092
#产生消息
./kafkainfo-linux64 produce --addr 192.168.10.110:9092 -t example_t -m "这是消息内容"
#消费消息
./kafkainfo-linux64 consum --addr 192.168.10.110:9092 -t example_t -c 1
##生产/消费可带的参数
--addr value, --ip value kafka集群的任意一个地址
-t value, --topic value 指定topic
-m value, --message value 消息内容
-c value, --count value 生产几份消息 (default: 0)
-f, --forerver 一直消费 直到按下Ctrl-C
部署canal.kafka

项目地址:https://github.com/alibaba/canal

下载:https://github.com/alibaba/canal/releases/download/canal-1.1.0/canal.kafka-1.1.0.tar.gz

部署

cd /opt
wget https://github.com/alibaba/canal/releases/download/canal-1.1.0/canal.kafka-1.1.0.tar.gz
mkdir -p /opt/canal-kafka
tar zxf canal.kafka-1.1.0.tar.gz -C /opt/canal-kafka
cd canal-kafka
##修改配置参数
vi conf/example/instance.properties
#9行 canal.instance.master.address=192.168.31.77:3306
#33行 canal.instance.dbUsername=canal
#34行 canal.instance.dbPassword=canal
vi conf/canal.properties
#8行 canal.zkServers=192.168.10.110:2181
#11行 canal.withoutNetty = true
#77行 canal.destinations= example
vi conf/kafka.yml
#1行 servers: 192.168.10.110:9092

启动

bin/startup.sh

查看日志

tail logs/canal/canal.log
tail logs/example/example.log

关闭

bin/stop.sh

遗留问题:kafka中输出结果乱码

已知道原因:https://github.com/alibaba/canal/issues/898
kafka数据投递传输的是数据包,收到数据后还要解包成对应的message,可参考client中的kafka实现

转载请注明:轻风博客 » MySQL binlog解析canal + kafka实践|ZooKeeper集群部署

喜欢 (2)or分享 (0)