共计 12318 个字符,预计需要花费 31 分钟才能阅读完成。
最近一段时间,负责公司的产品日志埋点与收集工作,搭建了基于 Flume+HDFS+Hive 日志搜集系统。
一、日志搜集系统架构:
简单画了一下日志搜集系统的架构图,可以看出,flume 承担了 agent 与 collector 角色,HDFS 承担了数据持久化存储的角色。
作者搭建的服务器是个 demo 版,只用到了一个 flume_collector,数据只存储在 HDFS。当然高可用的日志搜集处理系统架构是需要多台 flume collector 做负载均衡与容错处理的。
二、日志产生:
1、log4j 配置,每隔 1 分钟 roll 一个文件,如果 1 分钟之内文件大于 5M,则再生成一个文件。
<!-- 产品数据分析日志 按分钟分 --> | |
<RollingRandomAccessFile name="RollingFile_product_minute" | |
fileName="${STAT_LOG_HOME}/${SERVER_NAME}_product.log" | |
filePattern="${STAT_LOG_HOME}/${SERVER_NAME}_product.log.%d{yyyy-MM-dd-HH-mm}-%i"> | |
<PatternLayout charset="UTF-8" | |
pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %level - %msg%xEx%n" /> | |
<Policies> | |
<TimeBasedTriggeringPolicy interval="1" | |
modulate="true" /> | |
<SizeBasedTriggeringPolicy size="${EVERY_FILE_SIZE}" /> | |
</Policies> | |
<Filters> | |
<ThresholdFilter level="INFO" onMatch="ACCEPT" | |
onMismatch="NEUTRAL" /> | |
</Filters> | |
</RollingRandomAccessFile> |
roll 后的文件格式如下
2、日志内容
json 格式文件,最外层 json 按顺序为:tableName,logRequest,timestamp,statBody,logResponse,resultCode,resultMsg
2016-11-30 09:18:21.916 INFO - {"tableName": "ReportView", | |
"logRequest": {*** | |
}, | |
"timestamp": 1480468701432, | |
"statBody": {*** | |
}, | |
"logResponse": {*** | |
}, | |
"resultCode": 1, | |
"resultFailMsg": "" | |
} |
三、flume 配置
虚拟机环境,请参考 http://www.linuxidc.com/Linux/2016-12/137955.htm
Hadoop 环境,请参考 http://www.linuxidc.com/Linux/2016-12/137957.htm
此处 flume 环境是
CentOS1:flume-agent
centos2:flume-collector
1、flume agent 配置,conf 文件
a1.sources = linuxidcSource | |
a1.channels = linuxidcChannel | |
a1.sinks = linuxidcSink | |
a1.sources.linuxidcSource.type = spooldir | |
a1.sources.linuxidcSource.channels = linuxidcChannel | |
#日志目录 | |
a1.sources.linuxidcSource.spoolDir = /opt/flumeSpool | |
a1.sources.linuxidcSource.fileHeader = true | |
#日志内容处理完后,会生成.COMPLETED 后缀的文件,同时.log 文件每一分钟 roll 一个,此处忽略.log 文件与.COMPLETED 文件 | |
a1.sources.linuxidcSource.ignorePattern=([^_]+)|(.*(\.log)$)|(.*(\.COMPLETED)$) | |
a1.sources.linuxidcSource.basenameHeader=true | |
a1.sources.linuxidcSource.deserializer.maxLineLength=102400 | |
#自定义拦截器,对 json 格式的源日志进行字段分隔,并添加 timestamp,为后面的 hdfsSink 做处理,拦截器代码见后面 | |
a1.sources.linuxidcSource.interceptors=i1 | |
a1.sources.linuxidcSource.interceptors.i1.type=com.linuxidc.flume_interceptor.HiveLogInterceptor2$Builder | |
a1.sinks.linuxidcSink.type = avro | |
a1.sinks.linuxidcSink.channel = linuxidcChannel | |
a1.sinks.linuxidcSink.hostname = centos2 | |
a1.sinks.linuxidcSink.port = 4545 | |
#此处配置 deflate 压缩后,hive collector 那边一定也要相应配置解压缩 | |
a1.sinks.linuxidcSink.compression-type=deflate | |
a1.channels.linuxidcChannel.type=memory | |
a1.channels.linuxidcChannel.capacity=10000 | |
a1.channels.linuxidcChannel.transactionCapacity=1000 |
2、flume collector 配置
a1.sources = avroSource | |
a1.channels = memChannel | |
a1.sinks = hdfsSink | |
a1.sources.avroSource.type = avro | |
a1.sources.avroSource.channels = memChannel | |
a1.sources.avroSource.bind=centos2 | |
a1.sources.avroSource.port=4545 | |
#与 flume agent 配置对应 | |
a1.sources.avroSource.compression-type=deflate | |
a1.sinks.hdfsSink.type = hdfs | |
a1.sinks.hdfsSink.channel = memChannel | |
# linuxidc_hive_log 为 hive 表,按年 - 月 - 日分区存储,a1.sinks.hdfsSink.hdfs.path=hdfs://centos1:9000/flume/linuxidc_hive_log/dt=%Y-%m-%d | |
a1.sinks.hdfsSink.hdfs.batchSize=10000 | |
a1.sinks.hdfsSink.hdfs.fileType=DataStream | |
a1.sinks.hdfsSink.hdfs.writeFormat=Text | |
a1.sinks.hdfsSink.hdfs.rollSize=10240000 | |
a1.sinks.hdfsSink.hdfs.rollCount=0 | |
a1.sinks.hdfsSink.hdfs.rollInterval=300 | |
a1.channels.memChannel.type=memory | |
a1.channels.memChannel.capacity=100000 | |
a1.channels.memChannel.transactionCapacity=10000 |
四、hive 表创建与分区
1、hive 表创建
在 hive 中执行建表语句后,hdfs://centos1:9000/flume/ 目录下新生成了 linuxidc_hive_log 目录。(建表语句里面有 location 关键字)
\u0001 表示 hive 通过该分隔符进行字段分离,该字符在 linux 用 vim 编辑器打开是 ^A。
由于日志格式是 JSON 格式,因为需要将 JSON 格式转换成 \u0001 字符分隔,并通过 dt 进行分区。这一步通过 flume 自定义拦截器来完成。
CREATE TABLE `linuxidc_hive_log`( | |
`tableNmae` string, | |
`logRequest` string, | |
`timestamp` bigint, | |
`statBody` string, | |
`logResponse` string, | |
`resultCode` int, | |
`resultFailMsg` string | |
) | |
PARTITIONED BY (`dt` string) | |
ROW FORMAT DELIMITED | |
FIELDS TERMINATED BY '\u0001' | |
STORED AS INPUTFORMAT | |
'org.apache.hadoop.mapred.TextInputFormat' | |
OUTPUTFORMAT | |
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' | |
LOCATION | |
'hdfs://centos1:9000/flume/linuxidc_hive_log'; |
2、hive 表分区
for ((i=-1;i<=365;i++)) | |
do | |
dt=$(date -d "$(date +%F) ${i} days" +%Y-%m-%d) | |
echo date=$dt | |
hive -e "ALTER TABLE linuxidc_hive_log ADD PARTITION(dt='${dt}')" >> logs/init_linuxidc_hive_log.out 2>>logs/init_linuxidc_hive_log.err | |
done |
五、自定义 flume 拦截器
新建 maven 工程,拦截器 HiveInterceptor2 代码如下。
package com.linuxidc.flume_interceptor; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Map; | |
import org.apache.flume.Context; | |
import org.apache.flume.Event; | |
import org.apache.flume.interceptor.Interceptor; | |
import org.apache.flume.interceptor.TimestampInterceptor.Constants; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.alibaba.fastjson.JSONObject; | |
import com.google.common.base.Charsets; | |
import com.google.common.base.Joiner; | |
public class HiveLogInterceptor2 implements Interceptor | |
{private static Logger logger = LoggerFactory.getLogger(HiveLogInterceptor2.class); | |
public static final String HIVE_SEPARATOR = "\001"; | |
public void close() | |
{// TODO Auto-generated method stub | |
} | |
public void initialize() | |
{// TODO Auto-generated method stub | |
} | |
public Event intercept(Event event) | |
{String orginalLog = new String(event.getBody(), Charsets.UTF_8); | |
try | |
{String log = this.parseLog(orginalLog); | |
// 设置时间, 用于 hdfsSink | |
long now = System.currentTimeMillis(); | |
Map headers = event.getHeaders(); | |
headers.put(Constants.TIMESTAMP, Long.toString(now)); | |
event.setBody(log.getBytes()); | |
} catch (Throwable throwable) | |
{logger.error(("errror when intercept,log [" + orginalLog + "]"), throwable); | |
return null; | |
} | |
return event; | |
} | |
public List<Event> intercept(List<Event> list) | |
{List<Event> events = new ArrayList<Event>(); | |
for (Event event : list) | |
{Event interceptedEvent = this.intercept(event); | |
if (interceptedEvent != null) | |
{events.add(interceptedEvent); | |
} | |
} | |
return events; | |
} | |
private static String parseLog(String log) | |
{List<String> logFileds = new ArrayList<String>(); | |
String dt = log.substring(0, 10); | |
String keyStr = "INFO -"; | |
int index = log.indexOf(keyStr); | |
String content = ""; | |
if (index != -1) | |
{content = log.substring(index + keyStr.length(), log.length()); | |
} | |
//针对不同 OS,使用不同回车换行符号 | |
content = content.replaceAll("\r", ""); | |
content = content.replaceAll("\n", "\\\\" + System.getProperty("line.separator")); | |
JSONObject jsonObj = JSONObject.parseObject(content); | |
String tableName = jsonObj.getString("tableName"); | |
String logRequest = jsonObj.getString("logRequest"); | |
String timestamp = jsonObj.getString("timestamp"); | |
String statBody = jsonObj.getString("statBody"); | |
String logResponse = jsonObj.getString("logResponse"); | |
String resultCode = jsonObj.getString("resultCode"); | |
String resultFailMsg = jsonObj.getString("resultFailMsg"); | |
//字段分离 | |
logFileds.add(tableName); | |
logFileds.add(logRequest); | |
logFileds.add(timestamp); | |
logFileds.add(statBody); | |
logFileds.add(logResponse); | |
logFileds.add(resultCode); | |
logFileds.add(resultFailMsg); | |
logFileds.add(dt); | |
return Joiner.on(HIVE_SEPARATOR).join(logFileds); | |
} | |
public static class Builder implements Interceptor.Builder | |
{public Interceptor build() | |
{return new HiveLogInterceptor2();} | |
public void configure(Context arg0) | |
{}} | |
} |
pom.xml 增加如下配置,将 flume 拦截器工程进行 maven 打包,jar 包与依赖包均拷到 ${flume-agent}/lib 目录
<build> | |
<plugins> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-dependency-plugin</artifactId> | |
<configuration> | |
<outputDirectory> | |
${project.build.directory} | |
</outputDirectory> | |
</configuration> | |
</plugin> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-dependency-plugin</artifactId> | |
<executions> | |
<execution> | |
<id>copy-dependencies</id> | |
<phase>prepare-package</phase> | |
<goals> | |
<goal>copy-dependencies</goal> | |
</goals> | |
<configuration> | |
<outputDirectory>${project.build.directory}/lib</outputDirectory> | |
<overWriteReleases>true</overWriteReleases> | |
<overWriteSnapshots>true</overWriteSnapshots> | |
<overWriteIfNewer>true</overWriteIfNewer> | |
</configuration> | |
</execution> | |
</executions> | |
</plugin> | |
</plugins> | |
</build> |
对日志用分隔符 ”\001″ 进行分隔,。经拦截器处理后的日志格式如下,^A 即是 ”\001″
ReportView^A{"request":{},"requestBody":{"detailInfos":[],"flag":"","reportId":7092,"pageSize":0,"searchs":[],"orders":[],"pageNum":1}}^A1480468701432^A{"sourceId":22745,"reportId":7092,"projectId":29355,"userId":2532}^A{"responseBody":{"statusCodeValue":200,"httpHeaders":{},"body":{"msg":" 请求成功 ","httpCode":200,"timestamp":1480468701849},"statusCode":"OK"},"response":{}}^A1^A^A2016-11-30
至此,flume+Hdfs+Hive 的配置均已完成。
后续可以通过 mapreduce 或者 HQL 对数据进行分析。
六、启动运行与结果
1、启动 hadoop hdfs
参考前一篇文章:Hadoop 1.2 集群搭建与环境配置 http://www.linuxidc.com/Linux/2016-12/137957.htm
2、启动 flume_collector 和 flume_agent,由于 flume 启动命令参数太多,自己写了一个启动脚本
start-Flume.sh
jps -l|grep org.apache.flume.node.Application|awk '{print $1}'|xargs kill -9 2>&1 >/dev/null | |
cd "$(dirname "$0")" | |
cd .. | |
nohup bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name a1 2>&1 > /dev/null & |
3、hdfs 查看数据
可以看到搜集的日志已经上传到 HDFS 上
[root@centos1 bin]# rm -rf FlumeData.1480587273016.tmp | |
[root@centos1 bin]# hadoop fs -ls /flume/linuxidc_hive_log/dt=2016-12-01/ | |
Found 3 items | |
-rw-r--r-- 3 root supergroup 5517 2016-12-01 08:12 /flume/linuxidc_hive_log/dt=2016-12-01/FlumeData.1480608753042.tmp | |
-rw-r--r-- 3 root supergroup 5517 2016-12-01 08:40 /flume/linuxidc_hive_log/dt=2016-12-01/FlumeData.1480610453116 | |
-rw-r--r-- 3 root supergroup 5517 2016-12-01 08:44 /flume/linuxidc_hive_log/dt=2016-12-01/FlumeData.1480610453117 | |
[root@centos1 bin]# |
4、启动 hive,查看数据,可以看到 hive 已经可以加载 hdfs 数据
[root@centos1 lib]# hive | |
Logging initialized using configuration in file:/root/apache-hive-1.2.1-bin/conf/hive-log4j.properties | |
hive> select * from linuxidc_hive_log limit 2; | |
OK | |
ReportView {"request":{},"requestBody":{"detailInfos":[],"flag":"","reportId":7092,"pageSize":0,"searchs":[],"orders":[],"pageNum":1}} 1480468701432 {"sourceId":22745,"reportId":7092,"projectId":29355,"userId":2532} {"responseBody":{"statusCodeValue":200,"httpHeaders":{},"body":{"msg":"请求成功","httpCode":200,"timestamp":1480468701849},"statusCode":"OK"},"response":{}} 1 2016-12-01 | |
ReportDesignResult {"request":{},"requestBody":{"sourceId":22745,"detailInfos":[{"colName":"月份 ","flag":"0","reportId":7092,"colCode":"col_2_22745","pageSize":20,"type":"1","pageNum":1,"rcolCode":"col_25538","colType":"string","formula":"","id":25538,"position":"row","colId":181664,"dorder":1,"pColName":" 月份 ","pRcolCode":"col_25538"},{"colName":" 综合利率 (合计)","flag":"1","reportId":7092,"colCode":"col_11_22745","pageSize":20,"type":"1","pageNum":1,"rcolCode":"sum_col_25539","colType":"number","formula":"sum","id":25539,"position":"group","colId":181673,"dorder":1,"pColName":" 综合利率 ","pRcolCode":"col_25539"}],"flag":"bar1","reportId":7092,"reportName":"iiiissszzzV","pageSize":100,"searchs":[],"orders":[],"pageNum":1,"projectId":29355}} 1480468703586{"reportType":"bar1","sourceId":22745,"reportId":7092,"num":5,"usedFields":" 月份 $$ 综合利率 (合计)$$","projectId":29355,"userId":2532} {"responseBody":{"statusCodeValue":200,"httpHeaders":{},"body":{"msg":" 请求成功","reportId":7092,"httpCode":200,"timestamp":1480468703774},"statusCode":"OK"},"response":{}} 1 2016-12-01 | |
Time taken: 2.212 seconds, Fetched: 2 row(s) | |
hive> |
七、常见问题与处理方法
1、FATAL: Spool Directory source linuxidcSource: {spoolDir: /opt/flumeSpool}: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.nio.charset.MalformedInputException: Input length = 1
可能原因:
1、字符编码问题,spoolDir 目录下的日志文件必须是 UTF-8
2、使用 Spooling Directory Source 的时候,一定要避免同时读写一个文件的情况,conf 文件增加如下配置
a1.sources.linuxidcSource.ignorePattern=([^_]+)|(.*(\.log)$)|(.*(\.COMPLETED)$)
2、日志导入到 hadoop 目录,但是 hive 表查询无数据。如 hdfs://centos1:9000/flume/linuxidc_hive_log/dt=2016-12-01/ 下面有数据,
hive 查询 select * from linuxidc_hive_log 却无数据
可能原因:
1、建表的时候,没有建立分区。即使 flume 进行了配置(a1.sinks.hdfsSink.hdfs.path=hdfs://centos1:9000/flume/linuxidc_hive_log/dt=%Y-%m-%d),但是表的分区结构没有建立,因此文件导入到 HDFS 上后,HIVE 并不能读取。
解决方法:先创建分区,建立 shell 可执行文件,将该表的分区先建好
for ((i=-10;i<=365;i++)) | |
do | |
dt=$(date -d "$(date +%F) ${i} days" +%Y-%m-%d) | |
echo date=$dt | |
hive -e "ALTER TABLE linuxidc_hive_log ADD PARTITION(dt='${dt}')" >> logs/init_linuxidc_hive_log.out 2>>logs/init_linuxidc_hive_log.err | |
done |
2、也可能是文件在 hdfs 上还是.tmp 文件,仍然被 hdfs 在写入。.tmp 文件 hive 暂时无法读取,只能读取非.tmp 文件。
解决方法:等待 hdfs 配置的 roll 间隔时间,或者达到一定大小后 tmp 文件重命名为 hdfs 上的日志文件后,再查询 hive,即可查到。
Hadoop 如何修改 HDFS 文件存储块大小 http://www.linuxidc.com/Linux/2013-09/90100.htm
将本地文件拷到 HDFS 中 http://www.linuxidc.com/Linux/2013-05/83866.htm
从 HDFS 下载文件到本地 http://www.linuxidc.com/Linux/2012-11/74214.htm
将本地文件上传至 HDFS http://www.linuxidc.com/Linux/2012-11/74213.htm
HDFS 基本文件常用命令 http://www.linuxidc.com/Linux/2013-09/89658.htm
Hadoop 中 HDFS 和 MapReduce 节点基本简介 http://www.linuxidc.com/Linux/2013-09/89653.htm
《Hadoop 实战》中文版 + 英文文字版 + 源码【PDF】http://www.linuxidc.com/Linux/2012-10/71901.htm
Hadoop: The Definitive Guide【PDF 版】http://www.linuxidc.com/Linux/2012-01/51182.htm
更多 Hadoop 相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13
本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-12/137959.htm
