共计 3364 个字符,预计需要花费 9 分钟才能阅读完成。
一、概述
1. 通过搭建高可用 flume 来实现对数据的收集并存储到 hdfs 上,架构图如下:
二、配置 Agent
1.cat flume-client.properties
#name the components on this agent 声明 source、channel、sink 的名称
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
#Describe/configure the source 声明 source 的类型为通过 tcp 的方式监听本地端口 5140
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
#define sinkgroups 此处配置 k1、k2 的组策略,类型为均衡负载方式
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=load_balance
a1.sinkgroups.g1.processor.backoff=true
a1.sinkgroups.g1.processor.selector=round_robin
#define the sink 1 数据流向,都是通过 avro 方式发到两台 collector 机器
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=Hadoop1
a1.sinks.k1.port=5150
#define the sink 2
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=hadoop2
a1.sinks.k2.port=5150
# Use a channel which buffers events in memory 指定 channel 的类型为内存模式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel=c1
#a2 和 a3 的配置和 a1 相同
三、配置 Collector
1.cat flume-server.properties
#name the components on this agent 声明 source、channel、sink 的名称
collector1.sources = r1
collector1.channels = c1
collector1.sinks = k1
# Describe the source 声明 source 的类型为 avro
collector1.sources.r1.type = avro
collector1.sources.r1.port = 5150
collector1.sources.r1.bind = 0.0.0.0
collector1.sources.r1.channels = c1
# Describe channels c1 which buffers events in memory 指定 channel 的类型为内存模式
collector1.channels.c1.type = memory
collector1.channels.c1.capacity = 1000
collector1.channels.c1.transactionCapacity = 100
# Describe the sink k1 to hdfs 指定 sink 数据流向 hdfs
collector1.sinks.k1.type = hdfs
collector1.sinks.k1.channel = c1
collector1.sinks.k1.hdfs.path = hdfs://master/user/flume/log
collector1.sinks.k1.hdfs.fileType = DataStream
collector1.sinks.k1.hdfs.writeFormat = TEXT
collector1.sinks.k1.hdfs.rollInterval = 300
collector1.sinks.k1.hdfs.filePrefix = %Y-%m-%d
collector1.sinks.k1.hdfs.round = true
collector1.sinks.k1.hdfs.roundValue = 5
collector1.sinks.k1.hdfs.roundUnit = minute
collector1.sinks.k1.hdfs.useLocalTimeStamp = true
#collector2 配置和 collector1 相同
四、启动
1. 在 Collector 上启动 fulme-ng
flume-ng agent -n collector1 -c conf -f /usr/local/flume/conf/flume-server.properties -Dflume.root.logger=INFO,console
# -n 后面接配置文件中的 Agent Name
2. 在 Agent 上启动 flume-ng
flume-ng agent -n a1 -c conf -f /usr/local/flume/conf/flume-client.properties -Dflume.root.logger=INFO,console
五、测试
[root@hadoop5 ~]# echo “hello” | nc localhost 5140 #需要安装 nc
17/09/03 22:56:58 INFO source.AvroSource: Avro source r1 started.
17/09/03 22:59:09 INFO ipc.NettyServer: [id: 0x60551752, /192.168.100.15:34310 => /192.168.100.11:5150] OPEN
17/09/03 22:59:09 INFO ipc.NettyServer: [id: 0x60551752, /192.168.100.15:34310 => /192.168.100.11:5150] BOUND: /192.168.100.11:5150
17/09/03 22:59:09 INFO ipc.NettyServer: [id: 0x60551752, /192.168.100.15:34310 => /192.168.100.11:5150] CONNECTED: /192.168.100.15:34310
17/09/03 23:03:54 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
17/09/03 23:03:54 INFO hdfs.BucketWriter: Creating hdfs://master/user/flume/log/2017-09-03.1504494234038.tmp
六、总结
高可用 flume-ng 一般有两种模式:load_balance 和 failover。此次使用的是 load_balance,failover 的配置如下:
#set failover
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 1
a1.sinkgroups.g1.processor.maxpenalty = 10000
一些常用的 source、channel、sink 类型如下:
本文永久更新链接地址:http://www.linuxidc.com/Linux/2017-10/147645.htm