nginx日志使用json格式存储,日志通过filebeat采集到kafka,之后,一份通过elasticsearch消费,做查询;一份通过clickhouse消费,做聚合计算
nginx日志标准化 json格式
nginx配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
log_format json escape=json '{' '"time":"$time_iso8601",' '"msec":"$msec",' '"remote_addr":"$remote_addr",' '"http_x_forwarded_for":"$http_x_forwarded_for",' '"host":"$host",' '"request_method":"$request_method",' '"server_protocol":"$server_protocol",' '"request_uri":"$request_uri",' '"uri":"$uri",' '"http_referer":"$http_referer",' '"http_user_agent":"$http_user_agent",' '"status":"$status",' '"bytes_sent":"$bytes_sent",' '"request_length":"$request_length",' '"request_time":"$request_time",' '"upstream_bytes_sent":"$upstream_bytes_sent",' '"upstream_bytes_received":"$upstream_bytes_received",' '"proxy_host":"$proxy_host",' '"upstream_addr":"$upstream_addr",' '"upstream_status":"$upstream_status",' '"upstream_header_time":"$upstream_header_time",' '"upstream_connect_time":"$upstream_connect_time",' '"app":"$upstream_http_jyg_route_app",' '"upstream_addr_jyg":"$upstream_http_jyg_route_upstream",' '"upstream_response_time":"$upstream_response_time"' '}'; access_log logs/new-json.log json; |
日志轮换
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
#cat /etc/logrotate.d/nginx /data/server/tengine/logs/*.log { daily missingok rotate 3 dateext dateformat .%Y%m%d-%H # compress notifempty create 644 root root sharedscripts postrotate [ -f /data/server/tengine/logs/nginx.pid ] && kill -USR1 `cat /data/server/tengine/logs/nginx.pid` endscript } |
有时候,日志比较大,磁盘不够用,可以再添加定时任务,按需轮换日志
1 2 3 4 5 6 |
# nginx log rotate 5 */4 * * * /usr/sbin/logrotate -vf /etc/logrotate.d/nginx >/dev/null 2>&1 # 添加定时任务 每4小时执行一次日志轮换 echo "# nginx log rotate" >> /var/spool/cron/root echo "5 */4 * * * /usr/sbin/logrotate -vf /etc/logrotate.d/nginx >/dev/null 2>&1" >> /var/spool/cron/root |
filebeat采集日志到kafka
filebeat配置如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
cat /etc/filebeat/filebeat.yml max_procs: 2 filebeat.inputs: - type: log #json.keys_under_root: true #json.overwrite_keys: true #json.ignore_decoding_error: true symlinks: true tail_files: true paths: - /data/server/tengine/logs/new-json.log processors: - decode_json_fields: fields: ["message"] process_array: true target: "" overwrite_keys: true - drop_fields: fields: ["message","beat","source","input","prospector","offset"] output.kafka: hosts: ["1.1.1.1:9092","2.2.2.2:9092","3.3.3.3:9092","4.4.4.4:9092","5.5.5.5:9092"] topic: "nginx-access-log" required_acks: 1 worker: 2 bulk_max_size: 4096 compression: lz4 #max_message_bytes: 1000000 |
搭建clickhouse服务
clickhouse内建有kafka表引擎,可以消费kafka中消息,消费性能也非常高,在一台16c32g的云主机上,消费速率最高可以到220k/s,所以,这里仅使用单台就够用了,不介绍集群模式
挂载磁盘
1 2 3 4 5 6 7 |
mkfs.xfs /dev/vdb blkid mkdir /data /etc/fstab UUID=bf7f1ad8-f9d9-431c-9ce1-a50e59d9a170 /data xfs defaults 0 0 mount -a df -h |
准备数据目录
mkdir /data/clickhouse
chown -R clickhouse:clickhouse /data/clickhouse/
安装clickhouse
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
// 版本 21.4.4.30-2 yum install yum-utils rpm --import https://repo.clickhouse.tech/CLICKHOUSE-KEY.GPG yum-config-manager --add-repo https://repo.clickhouse.tech/rpm/clickhouse.repo yum install clickhouse-server clickhouse-client // 编辑配置文件 /etc/clickhouse-server/config.xml // 启动 systemctl start clickhouse-server systemctl status clickhouse-server systemctl enable clickhouse-server // 测试 clickhouse-client -m show databases; |
/etc/clickhouse-server/config.xml修改以下内容
1 2 3 4 5 6 |
<level>information</level> <timezone>Asia/Shanghai</timezone> <listen_host>::</listen_host> <path>/data/clickhouse/</path> <tmp_path>/data/clickhouse/tmp/</tmp_path> <user_files_path>/data/clickhouse/user_files/</user_files_path> |
建库建表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
clickhouse-client -m -- 建立数据库 create database nginx; show databases; use nginx; -- 建表 kafka create table if not exists nginx.jsonlog_prod ( `time` String, `msec` String, `remote_addr` String, `domain` String, `request_method` String, `uri` String, `status` String, `bytes_sent` String, `request_length` String, `request_time` String, `upstream_bytes_sent` String, `upstream_bytes_received` String, `proxy_host` String, `upstream_addr` String, `upstream_status` String, `upstream_header_time` String, `upstream_connect_time` String, `app` String, `upstream_addr_jyg` String, `upstream_response_time` String, `api` String, `rewrite` String, `serverip` String, `jiayuguan` String, `pdu` String )engine=Kafka() SETTINGS kafka_broker_list = '1.1.1.1:9092,2.2.2.2:9092,3.3.3.3:9092,4.4.4.4:9092,5.5.5.5:9092', kafka_topic_list = 'nginx-access-log', kafka_group_name = 'clickhouse-nginx', kafka_format = 'JSONEachRow', kafka_num_consumers = 5, kafka_thread_per_consumer = 1, kafka_max_block_size = 262020, kafka_commit_every_batch = 1; -- kafka_max_block_size = 262020, -- kafka_max_block_size 默认= max_block_size = 65505 262020 -- 测试消费 show tables; select count() from nginx.jsonlog_prod; select * from nginx.jsonlog_prod limit 1 \G -- 建立目标表 create table if not exists nginx.nginx_prod ( `time` DateTime, `msec` Float64, `remote_addr` String, `host` String, `request_method` String, `uri` String, `status` UInt16, `bytes_sent` UInt32, `request_length` UInt32, `request_time` Float64, `upstream_bytes_sent` UInt32, `upstream_bytes_received` UInt32, `proxy_host` String, `upstream_addr` String, `upstream_status` UInt16, `upstream_header_time` Float64, `upstream_connect_time` Float64, `app` String, `upstream_addr_jyg` String, `upstream_response_time` Float64, `api` String, `rewrite` String, `serverip` String, `jiayuguan` String, `pdu` String )engine=MergeTree() partition by toYYYYMMDD(time) order by time TTL time + INTERVAL 24 HOUR; -- show create table nginx.nginx_prod \G -- alter table nginx.nginx_prod modify ttl time + interval 24 hour; 阻塞执行耗时较长 会做数据处理 -- 建立数据管道 CREATE MATERIALIZED VIEW nginx.jsonlog_to_nginx_prod_queue TO nginx.nginx_prod AS SELECT toDateTime(substring(time,1,19)) as time, toFloat64OrZero(msec) as msec, remote_addr, domain as host, request_method, uri, toUInt16OrZero(status) as status, toUInt32OrZero(bytes_sent) as bytes_sent, toUInt32OrZero(request_length) as request_length, toFloat64OrZero(request_time) as request_time, toUInt32OrZero(upstream_bytes_sent) as upstream_bytes_sent, toUInt32OrZero(upstream_bytes_received) as upstream_bytes_received, proxy_host, upstream_addr, toUInt16OrZero(upstream_status) as upstream_status, toFloat64OrZero(upstream_header_time) as upstream_header_time, toFloat64OrZero(upstream_connect_time) as upstream_connect_time, app, upstream_addr_jyg, toFloat64OrZero(upstream_response_time) as upstream_response_time, api, rewrite, serverip, jiayuguan, pdu FROM nginx.jsonlog_prod; -- kafka消费速率 220k/s 字段优化前消费速率 118k/s -- 停止kafka数据消费 drop VIEW nginx.jsonlog_to_nginx_prod_queue; -- DETACH TABLE nginx.jsonlog_to_nginx_prod_queue; -- ATTACH TABLE nginx.jsonlog_to_nginx_prod_queue; -- 验证 show tables; select count() from nginx.nginx_prod; select * from nginx_prod order by time desc limit 1 \G select distinct serverip from nginx.jsonlog_prod; optimize table nginx.nginx_prod final; -- 工具命令 -- 清空表 TRUNCATE TABLE nginx.nginx_prod; |
计算逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
-- 聚合查询逻辑 注意reqtime放大了1000倍 ms级别 SELECT host, api, app, pdu, jiayuguan, status, quantilesTiming(0.50,0.90,0.95,0.99)(request_time*1000) as reqtime, stddevSamp(request_time) as stdreqtime, quantilesTiming(0.50,0.90,0.95,0.99)(upstream_bytes_received) as received, stddevSamp(upstream_bytes_received) as stdreceived, quantilesTiming(0.50,0.90,0.95,0.99)(upstream_bytes_sent) as send, stddevSamp(upstream_bytes_sent) as stdsend, count() as n FROM nginx.nginx_prod GROUP by host, api, app, pdu, jiayuguan, status ORDER by n desc; -- time clickhouse-client --query "SELECT host, api, app, pdu, jiayuguan, status, quantilesTiming(0.50,0.90,0.95,0.99)(request_time*1000) as reqtime, stddevSamp(request_time) as stdreqtime, quantilesTiming(0.50,0.90,0.95,0.99)(upstream_bytes_received) as received, stddevSamp(upstream_bytes_received) as stdreceived, quantilesTiming(0.50,0.90,0.95,0.99)(upstream_bytes_sent) as send, stddevSamp(upstream_bytes_sent) as stdsend, count() as n FROM nginx.nginx_prod GROUP by host, api, app, pdu, jiayuguan, status ORDER by n desc;" | wc -l -- 查询耗时4秒 |
clickhouse日志输出
/var/log/clickhouse-server
output_format_json_quote_denormals = 0 返回null,默认; 1 返回 inf,-nan,-inf
select area/period from account_orders format JSON;
时间参数
WHERE time > date_add(second, -65, now()) AND time < date_add(second, -5, now())
kafka消费延时监控
1 2 3 4 5 6 7 8 9 10 11 |
create table if not exists nginx.kafka_delay ( time DateTime, delay UInt16, env String )engine=MergeTree() order by time; insert into nginx.kafka_delay select now(),now()-time,concat('prod','exp') from nginx.nginx_prod order by time desc limit 1; select * from nginx.kafka_delay order by time desc limit 5; -- truncate table nginx.kafka_delay; -- 导出 clickhouse-client --query "select * from nginx.kafka_delay" >kafka_delay.log |
资料参考
ClickHouse 手记系列之二:关于 Kafka Engine 你需要知道的
Also tried setting the kafka_max_block_size parameter to 512k as well at the stream_poll_timeout_ms =100, stream_flush_interval_ms = 800 settings and we were able to pull at about 60k rows per second. which is a big improvement it appears. so this might have been the issue. I'll update if we get more data.
users.xml
<stream_poll_timeout_ms>5000</stream_poll_timeout_ms>
<stream_flush_interval_ms>1500</stream_flush_interval_ms>
<max_streams_to_max_threads_ratio>4</max_streams_to_max_threads_ratio>
第三方库 kafka 数据导入
官方客户端
clickhouse_sinker
转载请注明:轻风博客 » nginx日志使用clickhouse做计算分析