欢迎访问我的博客,你的支持,是我最大的动力!

MySQL Binlog解析工具Maxwell 1.17.1 的安装和使用

Mysql-笔记 马从东 5605℃ 评论
目录:
[显示]

目前MySQL Binlog解析工具主要有阿里的canal、maxwell和mysql_streamer,三个工具对照如下:

其中阿里开源的canal(https://github.com/alibaba/canal)当前稳定版本为v1.1.0,据官网介绍,这次版本整体性能提升了150%,是一个里程碑式的重大版本。canal基于java开发,分为服务端和客户端,拥有众多的衍生应用,性能稳定,功能强大。

maxwell(https://github.com/zendesk/maxwell)由zendesk开源,官网为http://maxwells-daemon.io/,也是由java开发,体量没有canal那么大,解析出来的结果为json,可以方便的发送到kafka、rabbitmq、redis等下游应用,进行处理。

mysql_streamer则是由python开发的binlog解析工具,使用相对较少。

Maxwell功能

1、支持SELECT * FROM table 的方式进行全量数据初始化

2、支持在主库发生failover后,自动恢复binlog位置(GTID)

3、可以对数据进行分区,解决数据倾斜问题,发送到kafka的数据支持database、table、column等级别的数据分区

4、工作方式是伪装为Slave,接收binlog events,然后根据schemas信息拼装,可以接受ddl、xid、row等各种event

maxwell相对于canal的优势是使用简单,因为使用canal需要自己编写客户端来消费canal解析到的数据,而maxwell则不同,它直接输出数据变更的json串,不需要再编写客户端。maxwell可以看作是canal服务端+canal客户端

Maxwell安装

部署java和maven环境

文件版本:
jdk-8u161-linux-x64.tar.gz
apache-maven-3.5.4-bin.tar.gz 官网 下载:http://mirrors.hust.edu.cn/apache/maven/maven-3/3.5.4/binaries/apache-maven-3.5.4-bin.tar.gz

mkdir -p /opt/java_maven_env
mv jdk-8u161-linux-x64.tar.gz apache-maven-3.5.4-bin.tar.gz /opt/java_maven_env
#解压
cd /opt/java_maven_env
tar zxf jdk-8u161-linux-x64.tar.gz
tar zxf apache-maven-3.5.4-bin.tar.gz
#添加环境变量
vi /etc/profile  在末尾添加以下几行
JAVA_HOME=/opt/java_maven_env/jdk1.8.0_161
JRE_HOME=/opt/java_maven_env/jdk1.8.0_161/jre
CLASS_PATH=.:$JAVA_HOME/lib:$JRE_HOME/lib
MAVEN_HOME=/opt/java_maven_env/apache-maven-3.5.4
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$MAVEN_HOME/bin
export JAVA_HOME JRE_HOME CLASS_PATH MAVEN_HOME PATH
#使环境变量生效 (注,去掉maven部分,即jdk环境)
source /etc/profile
#测试
java -version
java version "1.8.0_161"
Java(TM) SE Runtime Environment (build 1.8.0_161-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.161-b12, mixed mode)
mvn -v
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /opt/java_maven_env/apache-maven-3.5.4
Java version: 1.8.0_161, vendor: Oracle Corporation, runtime: /opt/java_maven_env/jdk1.8.0_161/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-514.el7.x86_64", arch: "amd64", family: "unix"
编译maxwell 1.17.1

项目地址:https://github.com/zendesk/maxwell
下载地址:https://github.com/zendesk/maxwell/releases/download/v1.17.1/maxwell-1.17.1.tar.gz  #已编译好的版本
源码:https://github.com/zendesk/maxwell/archive/v1.17.1.tar.gz

这里演示源码的编译,当然,也可以直接下载使用编译好的文件maxwell-1.17.1.tar.gz

#下载
wget https://github.com/zendesk/maxwell/archive/v1.17.1.tar.gz
#解压并进入目录
tar zxf v1.17.1.tar.gz
cd maxwell-1.17.1/
#编译并打包文件(实际调用的是maven)
make package  #这个过程在我的机器上执行大约用时30min
#如果运行测试,需要有gcc环境 make test
#最终,会在maxwell-1.17.1/target目录下生成目标文件maxwell-1.17.1.tar.gz (48M)

Maxwell的使用

前置条件

1、MySQL需要开启binlog功能,binlog格式需为row,要设置一个唯一的server_id,binlog_row_image应该为FULL(默认就是FULL)

vi /etc/my.cnf  #仅列相关配置项
[mysqld]
server_id=1234
log-bin=bin-log
binlog_format=row
#binlog_row_image=FULL

GTID模式

vi /etc/my.cnf  #仅列相关配置项
[mysqld]
server_id=1234
log-bin=bin-log
binlog_format=row
#binlog_row_image=FULL
gtid-mode=ON
log-slave-updates=ON
enforce-gtid-consistency=true

2、Maxwell需要建一个数据库,用于存放状态信息,默认库名为maxwell,该库在maxwell启动时会自动创建,但是需要为其设置相应权限;maxwell运行时需要获取复制状态信息,以及在information_schema中获取数据库及字段信息,所以还需要授予相应权限

CREATE USER 'maxwell'@'%' identified by 'XXXXXX';
GRANT ALL on maxwell.* to 'maxwell'@'%' ;
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
FLUSH PRIVILEGES;

3、java环境,maxwell运行需要java

简单测试

现在,就可以运行maxwell做测试了

tar zxf maxwell-1.17.1.tar.gz && cd maxwell-1.17.1
chown -R root:root *
#producer=stdout 直接输入到终端窗口
bin/maxwell --user='maxwell' --password='XXXXXX' --host=192.168.31.77 --producer=stdout
Using kafka version: 1.0.0
14:40:23,761 WARN MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.
14:40:24,120 INFO SchemaStoreSchema - Creating maxwell database
14:40:24,240 INFO Maxwell - Maxwell v1.17.1 is booting (StdoutProducer), starting at Position[BinlogPosition[mysql-binlog.000011:2539732], lastHeartbeat=0]
14:40:24,363 INFO AbstractSchemaStore - Maxwell is capturing initial schema
14:40:24,801 INFO BinlogConnectorReplicator - Setting initial binlog pos to: mysql-binlog.000011:2539732
14:40:24,965 INFO BinaryLogClient - Connected to 192.168.31.77:3306 at mysql-binlog.000011/2539732 (sid:6379, cid:4896)
14:40:24,965 INFO BinlogConnectorLifecycleListener - Binlog connected.
{"database":"test","table":"mytest","type":"insert","ts":1535179251,"xid":205516,"commit":true,"data":{"id":1,"name":"aa","age":23,"as":"bb"}}
#可以看到maxwell在启动时,会先创建maxwell数据库,然后获取当前binlog位点,初始化表结构,然后就开始解析binlog日志了,最后一条即解析的json串
##
如果使用的是docker镜像,则命令类似
docker pull zendesk/maxwell
docker run -it --rm zendesk/maxwell bin/maxwell --user=maxwell --password=XXXXXX --host=192.168.31.77 --producer=stdout
 输出json串说明
{"database":"test","table":"mytest","type":"insert","ts":1535179251,"xid":205516,"commit":true,"data":{"id":1,"name":"aa","age":23,"as":"bb"}}
{"database":"test","table":"mytest","type":"update","ts":1535179973,"xid":206703,"commit":true,"data":{"id":1,"name":"aa","age":23,"as":"cc"},"old":{"as":"bb"}}
字段说明
data,最新的数据,修改后的数据
old,旧数据,修改前的数据
type,操作类型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知类型)
xid,事务id
commit,同一个xid代表同一个事务,事务的最后一条语句会有commit,可以利用这个重现事务
server_id,thread_id,
运行程序时添加参数--output_ddl,可以捕捉到ddl语句
datetime列会输出为"YYYY-MM-DD hh:mm:ss",如果遇到"0000-00-00 00:00:00"会原样输出
maxwell支持多种编码,但仅输出utf8编码
maxwell的TIMESTAMP总是作为UTC处理,如果要调整为自己的时区,需要在后端逻辑上进行处理
参数说明

完整参数如下:

参数 值类型
描述 默认值
全局参数
config 字符串 指定config.properties配置文件路径 当前目录$PWD
log_level debug,info,warn,error 日志级别 info
daemon   maxwell作为守护进程运行  
env_config_prefix 字符串 作为配置项对待的环境变量前缀,如FOO_
 
mysql参数
host 字符串 mysql host localhost
user 字符串 mysql username  
password 字符串 mysql password (no password)
port INT mysql port 3306
jdbc_options 字符串 mysql jdbc 连接串 DEFAULT_JDBC*
ssl SSL_OPT SSL启动 DISABLED
schema_database 字符串 存储表结构和binlog位点的数据库名称 maxwell
client_id 字符串 maxwell实例唯一定义文本
maxwell
replica_server_id LONG maxwell实例唯一定义编号,对应于MySQL server_id
6379
master_recovery 布尔值 启用实验性master恢复代码 false
gtid_mode 布尔值 启动基于GTID的复制 false
recapture_schema 布尔值 重新获得最新表结构. 配置文件中不适用
false
replication参数,用于master-slave的架构,在slave上不方便创建maxwell库,从slave获取binlog
replication_host 字符串 主机 schema-store host
replication_password 字符串 密码 (none)
replication_port INT 端口 3306
replication_user 字符串 用户  
replication_ssl SSL_OPT* 启用SSL DISABLED
schema参数,仅用于复制代理的情形,获取表结构
schema_host 字符串 主机 schema-store host
schema_password 字符串 密码 (none)
schema_port INT 端口 3306
schema_user 字符串 用户  
schema_ssl SSL_OPT* 启用SSL DISABLED
producer参数
producer PRODUCER_TYPE* 消费者类型 stdout
custom_producer.factory CLASS_NAME 自定义消费者的工厂类  
producer_ack_timeout 异步消费认为消息丢失的超时时间,毫秒ms  
producer_partition_by PARTITION_BY* 输入到kafka/kinesis的分区函数 database
producer_partition_columns 字符串 若按列分区,以逗号分隔的列名称  
producer_partition_by_fallback PARTITION_BY_F* 按列分区时需要,当列不存在是使用  
ignore_producer_error 布尔值 为false时,在kafka/kinesis发生错误时退出程序;为true时,仅记录日志 true
producer=file
output_file 字符串 输出的文件路径  
javascript 字符串 指定javascript过滤器文件  
producer=kafka
kafka.bootstrap.servers 字符串 kafka brokers, 格式 HOST:PORT[,HOST:PORT]  
kafka_topic 字符串 kafka topic maxwell
kafka_version KAFKA_VERSION* kafka版本. 配置文件config.properties中不适用 0.11.0.1
kafka_partition_hash default,murmur3
kafka分区hash函数 default
kafka_key_format array,hash
maxwell输出kafka keys的格式 hash
ddl_kafka_topic 字符串 若output_ddl为true, kafka topic kafka_topic
producer=kinesis
kinesis_stream 字符串 kinesis stream name  
producer=sqs
sqs_queue_uri 字符串 SQS Queue URI  
producer=pubsub
pubsub_topic 字符串 Google Cloud pub-sub topic  
pubsub_platform_id 字符串 Google Cloud platform id associated with topic  
ddl_pubsub_topic 字符串 Google Cloud pub-sub topic to send DDL events to  
producer=rabbitmq
rabbitmq_user 字符串 用户名 guest
rabbitmq_pass 字符串 密码 guest
rabbitmq_host 字符串 主机  
rabbitmq_port INT 端口  
rabbitmq_virtual_host 字符串 虚拟主机  
rabbitmq_exchange 字符串 交换器名称  
rabbitmq_exchange_type 字符串 交换器类型  
rabbitmq_exchange_durable 布尔值 交换器是否持久化 false
rabbitmq_exchange_autodelete 布尔值 为true时,当所有队列都完成时,删除交换器 false
rabbitmq_routing_key_template 字符串 路由键模板,可用 %db%%table% 作为变量 %db%.%table%.
rabbitmq_message_persistent 布尔值 启用消息持久化 false
rabbitmq_declare_exchange 布尔值 需要声明交换器 true
producer=redis
redis_host 字符串 主机 localhost
redis_port INT 端口 6379
redis_auth 字符串 密码  
redis_database INT 数据库[0-15] 0
redis_pub_channel 字符串 Pub/Sub模式的chanal名 maxwell
redis_list_key 字符串 LPUSH模式的列表名 maxwell
redis_type pubsub,lpush
模式选择 pubsub
formatting格式化
output_binlog_position 布尔值 包含binlog位点 false
output_gtid_position 布尔值 包含gtid位点 false
output_commit_info 布尔值 包含commit和xid true
output_xoffset 布尔值 包含虚拟的行偏移 false
output_nulls 布尔值 包含NULL值 true
output_server_id 布尔值 包含server_id false
output_thread_id 布尔值 包含thread_id false
output_row_query 布尔值 包含INSERT/UPDATE/DELETE语句. Mysql需要开启binlog_rows_query_log_events false
output_ddl 布尔值 包含DDL (table-alter, table-create, etc)事件 false
filtering过滤器
filter 字符串 过滤规则,如 exclude: db.*, include: *.tbl, include: *./bar(bar)?/, exclude: foo.bar.col=val  
encryption加密
encrypt none,data,all
加密模式: none不加密,data仅加密data字段,all加密整个消息 none
secret_key 字符串 encryption key null
monitoring / metrics指标
metrics_prefix 字符串 指标前缀 MaxwellMetrics
metrics_type slf4j,jmx,http,datadog 指标类型  
metrics_jvm 布尔值 启用jvm指标: memory usage, GC stats等 false
metrics_slf4j_interval SECONDS slf4j的频率指标,秒 60
http_port INT http方式的端口 8080
http_path_prefix 字符串 http方式的路径前缀 /
http_bind_address 字符串 http绑定的地址 all addresses
http_diagnostic 布尔值 启用http诊断 false
http_diagnostic_timeout MILLISECONDS http诊断响应超时 10000
metrics_datadog_type udp,http datadog类型 udp
metrics_datadog_tags 字符串 datadog tags如 tag1:value1,tag2:value2  
metrics_datadog_interval INT datadog推的频率,秒 60
metrics_datadog_apikey 字符串 datadog api key仅当metrics_datadog_type = http  
metrics_datadog_host 字符串 目标主机,仅当metrics_datadog_type = udp localhost
metrics_datadog_port INT 端口,仅当metrics_datadog_type = udp 8125
其它
bootstrapper async,sync,none bootstrapper类型 async
init_position FILE:POS[:HEARTBEAT] 从指定位点启动,配置文件中不适用  
replay 布尔值 启用只读重放模式,不存储binlog位点或结构变更,配置文件中不适用  

SSL_OPT*:[ DISABLED | PREFERRED | REQUIRED | VERIFY_CA | VERIFY_IDENTITY ]
PRODUCER_TYPE*:[ stdout | file | kafka | kinesis | pubsub | sqs | rabbitmq | redis ]
DEFAULT_JDBC*:zeroDateTimeBehavior=convertToNull&connectTimeout=5000
PARTITION_BY*:[ database | table | primary_key | column ]
PARTITION_BY_F*:[ database | table | primary_key ]
KAFKA_VERSION*:[ 0.8.2.2 | 0.9.0.1 | 0.10.0.1 | 0.10.2.1 | 0.11.0.1 ]

配置优先级
命令行 -> 环境变量 -> 配置文件 -> 默认值

Maxwell的角色区分
--host,指定表结构来源的主机,即建maxwell库的主机
--replication_host,指定binlog来源的主机,用于master-slave结构中,不能在slave中建maxwell库的情况
--schema_host,指定表结构的主机,仅用于复制代理的情况
注,bootstrapping不适用于host和replication分离的情形

Maxwell多实例
若需要运行多个Maxwell,需要为每个实例配置不同的client_id,以存储不同的binlog位点;同时,还需要配置replica_server_id,指定一个不同的server_id,如
./maxwell --user='canal' --password='canal' --host='127.0.0.1' --producer=stdout --client_id=maxwell--replica_server_id=6379 #默认
./maxwell --user='canal' --password='canal' --host='127.0.0.1' --producer=stdout --client_id=my --replica_server_id=2345

Maxwell过滤
#仅匹配foodb数据库的tbl表和所有table_数字的表
--filter = 'exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/'
#排除所有库所有表,仅匹配db1数据库
--filter = 'exclude: *.*, include: db1.*'
#排除含db.tbl.col列值为reject的所有更新
--filter = 'exclude: db.tbl.col = reject'
#排除任何包含col_a列的更新
--filter = 'exclude: *.*.col_a = *'
#完全排除bad_db数据库,若要恢复,必须删除maxwell库
--filter = 'blacklist: bad_db.*'

Maxwell-bootstrap功能
使用maxwell-bootstrap时,可用的参数有
--log_level,日志级别
--user,用户名
--password,密码
--host,主机
--port,端口
--database,包含需要bootstrap的表的数据库名
--table,需要bootstrap的表名
--where,限制条件
--client_id,maxwell实例名称
或者,也可以直接在maxwell.bootstrap表中手动添加触发同步
insert into maxwell.bootstrap (database_name, table_name) values ('fooDB', 'barTable');
示例:
bin/maxwell-bootstrap --config localhost.properties --database foobar --table test --log_level info
bin/maxwell-bootstrap --config localhost.properties --database foobar --table test --where "my_date >= '2017-01-07 00:00:00'" --log_level info
注意--bootstrapper=sync时,在处理bootstrap时,会阻塞正常的binlog解析;--bootstrapper=async时,不会阻塞
json中的type
bootstrap-start -> bootstrap-insert -> bootstrap-complete  #其中,start和complete的data字段为空,不携带数据
在进行bootstrap过程中,若maxwell崩溃,重启时,bootstrap会完全重新开始,不管之前进行到多少,若不希望,可以设置complete字段值为1(完成),或者删除该行

Maxwell-bootstrap实战
maxwell-boot工具的作用其实就是在maxwell库的bootstrap表中添加一条记录,只需要添加database_name,table_name,client_id三个字段,也可以直接使用SQL语句在表中插入,maxwell-boot工具只是一个比较方便的方式

主执行程序:
./maxwell --user='maxwell' --password='XXXXXX' --host=192.168.31.77 --producer=stdout
启动一个maxwell-bootstrap
./maxwell-bootstrap --user='maxwell' --password='XXXXXX' --host=192.168.31.77 --database ecard --table t_persons --log_level info  --client_id maxwell
11:33:08,746 INFO MaxwellBootstrapConnectionPool - MaxwellBootstrapConnectionPool: Getting connection (user/password): jdbc:mysql://192.168.31.77:3306/maxwell
Sun Aug 26 11:33:08 CST 2018 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
11:33:09,124 INFO MaxwellBootstrapConnectionPool - MaxwellBootstrapConnectionPool: Created a new connection
11:33:09,125 INFO MaxwellBootstrapUtility - counting rows
11:33:09,135 INFO MaxwellBootstrapUtility - inserting bootstrap start row
11:33:09,645 INFO MaxwellBootstrapUtility - done.
bootstrap导出的数据会通过maxwell(即第一个程序进行输出)

Maxwell监控/指标
所有指标目前都是针对kafka的
当启动HTTP节点后,会有以下路径
/metrics,返回json格式的指标数据
/healthcheck,健康检查,如果大于0说明在过去15min内有错误
/ping,返回pong
导出JMX信息,需要预先设置JAVA_OPTS

常用配置

config config.properties, 配置文件
log_level info,日志级别
daemon, 守护进程
schema_database maxwell, 数据库
client_id maxwell, maxwell实例名称
replica_server_id 6379, 类似于server_id,多实例
filter, 过滤器
output_binlog_position true, 记录binlog位点
output_ddl true, 记录ddl变更

1、输出到stdout,json包含binlog位点,包含ddl语句

./maxwell --user='maxwell' --password='XXXXXX' --host=192.168.31.77 --output_binlog_position=true --output_ddl=true --producer=stdout

位点:"position":"mysql-binlog.000011:4170905"

2、输出到redis,使用队列的方式

./maxwell --user='maxwell' --password='XXXXXX' --host=192.168.31.77 --output_binlog_position=true --output_ddl=true --producer=redis --redis_host=192.168.31.77 --redis_auth=xxxxxx --redis_type=lpush --redis_database=5 --redis_list_key=maxwell

./maxwell --user='maxwell' --password='XXXXXX' --host=192.168.31.77 --output_binlog_position=true --output_ddl=true --producer=redis --redis_host=192.168.31.77 --redis_auth=xxxxxx --redis_type=lpush --redis_database=5 --redis_list_key=maxwell --daemon

3、输出到rabbitmq

./maxwell --user='maxwell' --password='XXXXXX' --host=192.168.31.77 --output_binlog_position=true --output_ddl=true --producer=rabbitmq --rabbitmq_host=192.168.31.78 --rabbitmq_user=maxwell --rabbitmq_pass=maxwell --rabbitmq_virtual_host=maxwell_vhost --rabbitmq_exchange=maxwell --rabbitmq_exchange_type=topic --rabbitmq_routing_key_template=%db%.%table% --rabbitmq_declare_exchange=true

4、使用config.properties配置文件

cat config.properties
user=maxwell
password=XXXXXX
host=192.168.31.77
output_binlog_position=true
output_ddl=true
producer=rabbitmq
rabbitmq_host=192.168.31.78
rabbitmq_user=maxwell
rabbitmq_pass=maxwell
rabbitmq_virtual_host=maxwell_vhost
rabbitmq_exchange=maxwell
rabbitmq_exchange_type=topic
rabbitmq_routing_key_template=%db%.%table%
rabbitmq_declare_exchange=true
运行程序
./maxwell --config=config.properties --daemon

参考:

1、自建Binlog订阅服务--Maxwell http://seanlook.com/2018/01/13/maxwell-binlog/

转载请注明:轻风博客 » MySQL Binlog解析工具Maxwell 1.17.1 的安装和使用

喜欢 (2)or分享 (0)