阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

ELK日志分析集群搭建管理(rsyslog->kafka->ELK)

266次阅读
没有评论

共计 14241 个字符,预计需要花费 36 分钟才能阅读完成。

日志分析作为掌握业务情况的一个重要手段,目前使用最多最成熟的莫过于 ELK 方案,其中也有各种搭配组合,像 rsyslog->ES->kibana、rsyslog->Redis->Logstash->ES->kibana、rsyslog->kafka->Logstash->ES->kibana 等等,复杂点的有 spark 的引用。每种方案适合不同的应用场景,没有好坏,我的集群用的是 rsyslog->kafka->Logstash->ES->kibanarsyslog->rsyslog 中继 ->kafka->Logstash->ES->kibana方案,目前 4 台 ES 每天索引 10 多亿条日志,包含 nginx、redis、php 等,运行比较健壮,每条日志的索引在 10 个字段左右,每天 Primary Shard 的索引量在 500 个 G 左右,考虑到性能和日志性质,我们没要复制分片,日志保留 7 天。

总结一下,其实就是 采集 -> 清洗 -> 索引 -> 展现 几个环节,再去考虑各环节中 缓存、队列 的使用。下面介绍一下此方案集群的搭建和配置。希望对同行有所帮助,也算我积的福德,在 ELK 探索过程中多谢远川和冯超同学的奉献交流。

一、采集(使用 rsyslog)

客户端使用 rsyslog8.19.0 做的收集,直接 CentOS 安装 rpm 包,安装详细见:

http://www.rsyslog.com/rhelcentos-rpms/

将 yum 源配置好后:

 yum install rsyslog
 yum install rsyslog-kafka

  安装好后对应 rsyslog 的配置文件如下:

module(load="imfile")
module(load="omkafka")
$PreserveFQDN on
main_queue(
  queue.workerthreads="10"      # threads to work on the queue
  queue.dequeueBatchSize="1000"    # max number of messages to process at once
  queue.size="50000"          # max queue size
)
##########################nginx log################################
$template nginxlog,"%$myhostname%`%msg%"
if $syslogfacility-text == 'local6' then {
    action(
        broker=["10.13.88.190:9092","10.13.88.191:9092","10.13.88.192:9092","10.13.88.193:9092"]
        type="omkafka"
        topic="cms-nginx"
        template="nginxlog"
        partitions.auto="on"
     )
    stop
  }
############################redis log#########################
$template redislog,"%$myhostname%`%msg%"
ruleset(name="redis7215-log") {
    action(
        broker=["10.13.88.190:9092","10.13.88.191:9092","10.13.88.192:9092","10.13.88.193:9092"]
        type="omkafka"
        topic="redis-log"
        template="redislog"
        partitions.auto="on"
     )
  }
input(type="imfile"
      File="/data1/ms/log/front/redis7215.log"
      Tag=""
      ruleset="redis7215-log"
      freshStartTail="on"     #start tailf
      reopenOnTruncate="on"   #Truncate  reopen
     )
input(type="imfile"
      File="/data1/ms/log/front/redis7243.log"
      Tag=""
      ruleset="redis7215-log"
      freshStartTail="on"
      reopenOnTruncate="on"
     )
############################php curl log#############################
$template phpcurl-log,"%$myhostname%`%msg%"
ruleset(name="phpcurl-log") {
    action(
        broker=["10.13.88.190:9092","10.13.88.191:9092","10.13.88.192:9092","10.13.88.193:9092"]
        type="omkafka"
        topic="phpcurl-log"
        template="phpcurl-log"
        partitions.auto="on"
     )
  }
input(type="imfile"
      File="/data1/ms/log/php_common/php_slow_log"
      Tag=""
      ruleset="phpcurl-log"
      freshStartTail="on"       
      reopenOnTruncate="on"
     )

为了避免在日志发送错误时,丢在 message 日志里,瞬间将磁盘占满,同时配置丢弃策略

*.info;mail.none;authpriv.none;cron.none;local6.none   /var/log/messages

目前收集了 nginx、redis、php curl 三种日志,说一下收集方案。

1、对于 nginx

 方案 1:采用 nginx 的 rsyslog 模块将日志打到 local6,对应 nginx 的配置如下

 ##########elk############################# 
        access_log  syslog:local6 STAT;

然后通过如上 rsyslog 的配置,将日志直接入 kafka 队列,kafka 集群是 4 个 broker。

方案 2:线上还有另一个传输方案,rsyslog 设置一个中继,通过 udp 的方式将日志传到中继的 rsyslog,由中继 rsyslog 入 kafka,这么做的目的是方便了管理,当时还有个考虑是 udp 不会堵,但经过多轮测试后,nginx 的 rsyslog 模块也是很健壮,不会堵的。

2、对于 redis、php curl 的日志

通过 rsyslog 的 imfile 模块,直接对文件监听,配置见上面的 rsyslog 配置,在日志轮转时通过超链接的方式进行新文件的连接,对应的超连接计划任务如下,每天 0 点 5 分执行:

5 0 * * * root sh /usr/local/script/php_slow_log.sh  &> /dev/null

对应的 php_slow_log.sh 的脚本如下:

#!/bin/bash
DATE=`date +%F`
ln -sf  /data1/ms/log/php_common/curl-$DATE  /data1/ms/log/php_common/php_slow_log

备注:

a、rsyslog 向 4 个 kafka 的 broker 推送消息时,是以轮训的方式;

b、rsyslog 通过 udp 或 tcp 向外转发日志时,会默认加上时间、主机名、主机 ip 的属性。

二、队列(kafka+zookeeper)

 队列用的是 kafka,kafka 集群使用 zookeeper 管理,我们用了 4 台服务器混装了 4 个 kafka 和 3 个 zookeeper,kafka 和 zookeeper 的安装地址如下:

http://kafka.apache.org/downloads 注意:下载 Binary downloads 版本,别下错了,解压后就能用

http://zookeeper.apache.org/  注意:安装过程很简单,按照文档来即可,不在说明

1、关于 kafaka

a、配置比较简单,基本默认即可, 常调整的配置项如下:

配置文件:server.properties
broker.id=190  #id
num.partitions=20 # 默认 kafka 的 partion 数量 
log.dirs=/data1/kafka-logs  # 日志文件存放目录
log.retention.hours=3 # 日志保留时间长短
zookeeper.connect=10.13.88.190:2181,10.13.88.191:2181,10.13.88.192:2181 #zookeeper 指定
delete.topic.enable=true #topic 是可以删除的

b、安装后测试(假设 kafka 和 zookeeper 都装了):

开两个终端,两个终端分别运行如下命令

启动:./bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
终端 1:./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
终端 2:./bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --from-beginning --topic test

注意两个终端的 topic 要一个名字,这时你在终端 1 输入任何数据,在终端 2 是同步的,证明你安装成功。

c、kafka 常用管理命令

创建 topic:./bin/kafka-topics.sh --create --topic test --replication-factor 1 --partitions 32  --zookeeper  localhost:2181
删除 topic:./bin/kafka-topics.sh --delete --topic test --zookeeper localhost:2181
查看 topic 列表:./bin/kafka-topics.sh  --list  --zookeeper localhost:2181
查看某个 topic 详细:./bin/kafka-topics.sh --describe --topic test --zookeeper localhosts:2181
监控某个 topic 的消费:./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
指定消费组查看消费情况:./bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181  --group test

备注:topic 下 partitions 的数量决定了并发消费的数量,在设置上要根据消息的 QPS 和硬盘情况合理配置。

2、关于 zookeeper

a、配置比较简单,大多数默认项,最好奇数个,半数以上 zookeeper 存活可用

配置文件:zoo.cfg
dataDir=/data1/zookeeper
server.1=10.13.88.190:3888:4888
server.2=10.13.88.191:3889:4888
server.3=10.13.88.192:3889:4888

注意:要在数据目录手动建立 myid,myid 的值是 server 后面的数字,数字是有范围限制的1~255

b、zookeeper 的常用管理命令

zookeeper 我主要是看下它的整体状态,写了个简单脚本获取 zookeeper 的状态, 执行结果如下:

ELK 日志分析集群搭建管理(rsyslog->kafka->ELK)

脚本内容如下:

#!/bin/sh
#writer:gaolixu
[-z $1] && echo "Please specify zoo.cfg like /usr/local/zookeeper/conf/zoo.cfg" && exit
cat $1 |grep "^server" |awk -F'[:|=]' '{print $2}' |
while read line
do
echo -ne "$line\t"
echo stat|nc -w 2 $line 2181  |egrep "^(Node|Zxid|Mode|Connections)" |tr "\n" "\t"
echo stat|nc -w 2 $line 2181  |egrep "^(Node|Zxid|Mode|Connections)" &>/dev/null || echo -n "host is done."
echo
done
使用方式:zkstat.sh   / 配置文件 zoo.cfg 的位置

zookeeper 是相当稳定的,基本不用管。

备注:zookeeper 配置文件里不能有汉字,否则启动不起来。

三、清洗(logstash)

 logstash 用做清洗,并且将处理好的日志推送到 es 里,安装过程很简单详见网址:

https://www.elastic.co/guide/en/logstash/current/installing-logstash.html#package-repositories

我线上的 nginx 的配置文件如下:

input {
  kafka {
    zk_connect => "10.13.88.190:2181,10.13.88.191:2181,10.13.88.192:2181"
    topic_id => "cms-nginx"
    group_id => "cms-nginx"
    consumer_threads => 1
    reset_beginning => false
    decorate_events => false
  }
}
filter {
  ruby {
        init => "@kname = ['host-name','front','http_x_up_calling_line_id','request','http_user_agent','status','remote_addr_1','id','http_referer','request_time','body_bytes_sent','http_deviceid','http_x_forwarded_for','domain','cookie']"
        code => "event.append(Hash[@kname.zip(event['message'].split('`'))])"
        remove_field => ["@version","_score","id","tags","key","message","http_deviceid","http_x_up_calling_line_id","","cookie"]
        }
     
    if [front] {
        grok {
        match => ["front","%{HTTPDATE:logdate}"]
        }
        date {
        match => ["logdate""dd/MMM/yyyy:HH:mm:ss Z"]
        target => "@timestamp"
        remove_field => ["front","logdate"]
        }
    }
    if [request] {
        ruby {
            init => "@kname = ['method','uri','verb']"
            code => "event.append(Hash[@kname.zip(event['request'].split(' '))])"
            remove_field => ["request","method","verb"]
                }
        }
    if [remote_addr_1] {
        grok {
             match => ["remote_addr_1""%{IPV4:remote_addr}" ]
             remove_field => ["remote_addr_1"]
                }
       }
    mutate {
        convert => [
            "body_bytes_sent" "integer",
                  "status" "integer",           
            "request_time" "float" ]
    }
}
output {
        elasticsearch {
                hosts => ["10.39.40.94:9200","10.39.40.95:9200","10.39.40.96:9200","10.39.40.97:9200"]
                workers => 1
                index => "logstash-cms-nginx-%{+YYYY.MM.dd.hh}"
                }
       #stdout {codec => dots 
                #workers => 5
               #}                           #测试性能时使用
       #stdout {codec => rubydebug}       #调试时使用
}
启动命令:./bin/logstash  -w 4 -b 1000 -f /etc/logstash/conf.d/kafka_logstash_cms_nginx.conf &
-w 后面的 worker 数是根据 cpu 的核心数大概算一下,我这里一台服务器开三个 logstash,每个起 4 个 worker

  配置文件看着很长,其实阅读性很好,很易懂上手编写,无非就是定义切割点,如果大切割点下需要继续切割,就加 if 判断,继续切割,吐个槽里面 threads 和 workers 的数量好像不管用,我压测时去看线程数对不上, 看的方法是 top -H -p logstash 的 pid。

  再就是看看哪些需要计算的变成数字型,还有个 timestamp 的处理,这个可以看看上面的代码,对于 nginx 打印的时间符合 ISO8601 标准,可以用他做 es 的时间索引,这样有个好处,如果某个环节慢索引赶不上的话,日志不会错序。时间标准详细可见:http://udn.yyuap.com/doc/logstash-best-practice-cn/filter/date.html

备注:

a、尽量去掉没用的字段,精简索引,非常重要;

b、nginx 打印出来的时间是标准化的,可以用它传到 es 作为 timestamp 建索引;

c、对于响应时间、响应内容大小、状态码要转换成数字类型,方便在 kibana 里做计算等操作;

d、切割双引号可以使用如下配置

code => "event.append(Hash[@kname.zip(event['message'].split(34.chr))])"

e、抓包后发现,logstash 向 es��数据是轮训的,从 zookeeper 消费数据不是轮训,可能是 1 个个用,有压力或问题后再去启用后面的 zookeeper。

f、尽量按照官方如下写法建立多个索引向 es 推送,防止单个索引巨大,search 时计算不出来

index => "logstash-cms-nginx-%{+YYYY.MM.dd.hh}"

g、测试性能方法如下

由于没有现成工具,我们用了打点计量的方式进行压测,摘掉 es 后将输出变为一个点,每处理一条信息打一个点,然后将打出的点用 pv 命令统计出字节流量,反推出 logstash 的吞吐量。

  cp 一个配置文件,修改 output 如下:

 
output {
       stdout {codec => dots 
       workers => 1
        }
       }
 

  同时为了不影响线上业务,修改 group_id, 这样的话测试消费和线上消费互不影响,配置文件修改如下:

  kafka {
    zk_connect => "10.13.88.190:2181,10.13.88.191:2181,10.13.88.192:2181"
    topic_id => "nginx"
    group_id => "test001" 
    consumer_threads => 12
    reset_beginning => false
    decorate_events => flase
  }
 

测试时执行命令:/opt/logstash/bin/logstash -f /tmp/kafka_test.conf |pv -abt > /dev/null

压测结果如下:

ELK 日志分析集群搭建管理(rsyslog->kafka->ELK)

 

每个点是一个 byte,等到数据稳定后,计算每 s 的吞吐量为 2.93*1024=3000,也就是这一个 logstash 最大吞吐量为能处理 3000 条信息每 s。

四、索引(es 2.X 版本)

  es 是硬盘 io 和 cpu 消耗比较重的部分,硬优化有 ssd,软优化牵涉到 Java 层面的 GC 调优、index 调优、进程池调优、merge 调优等,目前跑的还是比较好的,也有说将 index 和 search 分开的,防止 search 太大影响 index 索引,没去尝试,10 多亿条日志,在目前的架构下性能还可以。

es 的安装也是比较简单详见:https://www.elastic.co/guide/en/elasticsearch/reference/current/rpm.html

a、我们线上添加的进程调优如下,不加这个配置,跑一段时间就会有出现某一台 es 负载特别高的情况,而且难以恢复,甚至死机,加后运行健壮,先看调整前后对比图:

ELK 日志分析集群搭建管理(rsyslog->kafka->ELK)

ELK 日志分析集群搭建管理(rsyslog->kafka->ELK)

ELK 日志分析集群搭建管理(rsyslog->kafka->ELK)ELK 日志分析集群搭建管理(rsyslog->kafka->ELK)

bootstrap.memory_lock: false   # centos 6 版本内核不支持

bootstrap.system_call_filter: false
indices.store.throttle.type: merge  #merge 调优
indices.store.throttle.max_bytes_per_sec: 50m
index.merge.scheduler.max_thread_count: 1
index.merge.policy.max_merged_segment: 15gb
index.refresh_interval: -1
index.translog.flush_threshold_ops: 100000 #tranlog 达到多少条进行数据平衡,默认为 5000 
index.translog.flush_threshold_size: 512m
threadpool.bulk.type: fixed
threadpool.bulk.size: 12   #12 核机器,核心数
threadpool.bulk.queue_size: 1000
threadpool.index.type: fixed
threadpool.index.size: 12   #12 核机器,核心数
threadpool.index.queue_size: 1000
threadpool.get.type: fixed
threadpool.get.size: 12    #12 核机器,核心数
threadpool.get.queue_size: 1000
threadpool.search.type: fixed
threadpool.search.size: 24    #12 核机器,核心数 2 倍
threadpool.search.queue_size: 500
index.indexing.slowlog.threshold.index.warn: 10s
index.indexing.slowlog.threshold.index.info: 5s
index.indexing.slowlog.threshold.index.debug: 2s
index.indexing.slowlog.threshold.index.trace: 500ms
index.search.slowlog.threshold.fetch.warn: 1s
index.search.slowlog.threshold.fetch.info: 800ms
index.search.slowlog.threshold.fetch.debug: 500ms
index.search.slowlog.threshold.fetch.trace: 200ms

另外的一些配置中项如下:

cluster.name: yz-search   # 集群名称
node.name: yz-search94    # 当前节点名称
index.number_of_shards: 4  # 每个 index 的 shards 数,这个根据磁盘数量等去算,默认 5
index.number_of_replicas: 0  # 要不要复制分片
index.refresh_interval: 60s  # 刷新时间
path.logs: /data1/LogData/logs # 日志存放地点
network.host: 10.39.40.94   # 绑定 ip 地址
http.port: 9200   #http 管理端口
discovery.zen.ping.unicast.hosts: ["10.39.40.94:9300","10.39.40.95:9300","10.39.40.96:9300","10.39.40.97:9300"# 用来发现集群存在的 es 主机
discovery.zen.minimum_master_nodes: 2  # 最少有几个主机就可以进行 master 选举

b、heap 数的设置,官方说是不能超过 30G,我们 64G 的内存,设置是 25G,GC 方式采用 G1

ELK 日志分析集群搭建管理(rsyslog->kafka->ELK)

c、常用 es 的集群管理命令,当然只是看信息的可以浏览器里直接输入查看

curl http://10.39.40.94:9200/_cat/nodes?v     # 节点概况
curl http://10.39.40.94:9200/_cat/shards?v    # 查看 shards 的信息
curl http://10.39.40.94:9200/_cat/indices?v   # 查看索引信息,如果新推的日志,可以看这个确认是否索引成功   
curl -X DELETE "http://10.39.40.94:9200/ 索引名称"  # 删除指定历史索引,速度很快

对于我们线上的日志,默认保存 7 天,每天晚上清除一次,清除的脚本如下:

#!/bin/bash
DATES=`date +%Y.%m.%d -d'-7 day'`
curl -X DELETE

ES 升级 5.2.1 详见:ELK 之 ES2.4.1 双实例平滑升级至 5.2.1 问题解决及 supervisor 管理记

五、展现(kibana)

展现 kibana 没什么可说的,直接安装后,配置好 es 的地址就可以用,安装很简单有 rpm 包,前端可以用 nginx 做个代理,做限制,安装详见:https://www.elastic.co/downloads/kibana

安装后模型搭建也比较人性化,用几次就熟练了。

备注:像 logstash、kafka 这种加 & 号启动的服务(有些启动后自己 fork 新进程然后退出的其实不合适)可以用 supervisor 管理,比较方便。配置相当简单,可以在浏览器看状态。

ELK 日志分析集群搭建管理(rsyslog->kafka->ELK)

ELK 日志分析集群搭建管理(rsyslog->kafka->ELK)

supervisor 的配置文件如下:

[unix_http_server]
file=/tmp/supervisor.sock   ; (the path to the socket file)
[inet_http_server]         ; inet (TCP) server disabled by default
port=*:9001        ; (ip_address:port specifier, *:port for all iface)
#username=cms              ; (default is no username (open server))
#password=123               ; (default is no password (open server))
[supervisord]
logfile=/tmp/supervisord.log ; (main log file;default $CWD/supervisord.log)
logfile_maxbytes=50MB        ; (max main logfile bytes b4 rotation;default 50MB)
logfile_backups=10           ; (num of main logfile rotation backups;default 10)
loglevel=info                ; (log level;default info; others: debug,warn,trace)
pidfile=/tmp/supervisord.pid ; (supervisord pidfile;default supervisord.pid)
nodaemon=false               ; (start in foreground if true;default false)
minfds=1024                  ; (min. avail startup file descriptors;default 1024)
minprocs=200                 ; (min. avail process descriptors;default 200)
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL  for a unix socket
serverurl=http://*:9001 ; use an http:// url to specify an inet socket
#username=cms              ; should be same as http_username if set
#password=123                ; should be same as http_password if set
[program:logstash_cms_php_slow_log]
command=/opt/logstash/bin/logstash -w 4 -b 1000 -f /etc/logstash/conf.d/kafka_logstash_cms_php.conf    ; the program (relative uses PATH, can take args)
process_name=%(process_num)d ; process_name expr (default %(program_name)s)
numprocs=1                    ; number of processes copies to start (def 1)
umask=022                     ; umask for process (default None)
priority=999                  ; the relative start priority (default 999)
autostart=true                ; start at supervisord start (default: true)
redirect_stderr=true          ; redirect proc stderr to stdout (default false)
stdout_logfile=/var/log/logstash_php_log        ; stdout log path, NONE for none; default AUTO
[program:logstash_cms_nginx_log]
command=/opt/logstash/bin/logstash -w 4  -b 1000  -f /etc/logstash/conf.d/kafka_logstash_cms_nginx.conf    ; the program (relative uses PATH, can take args)
process_name=%(process_num)d ; process_name expr (default %(program_name)s)
numprocs=3                    ; number of processes copies to start (def 1)
umask=022                     ; umask for process (default None)
priority=999                  ; the relative start priority (default 999)
autostart=true                ; start at supervisord start (default: true)
redirect_stderr=true          ; redirect proc stderr to stdout (default false)
stdout_logfile=/var/log/logstash_nginx_log        ; stdout log path, NONE for none; default AUTO
;[include]
;files = relative/directory/*.ini

本文永久更新链接地址:http://www.linuxidc.com/Linux/2017-03/142225.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计14241字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中