共计 7163 个字符,预计需要花费 18 分钟才能阅读完成。
本文基于 Kafka 0.8
1. 引言
互联网够公司的日志无处不在,web 日志,js 日志,搜索日志,监控日志等等。对于这些日志的离线分析(Hadoop),wget&rsync 虽然人力维护成本较高,但可以满足功能行需求。但对于这些日志的实时分析需求(例如实时推荐,监控系统),则往往必须要引入一些“高大上”的系统。
传统的企业消息系统(例如 WebSphere)并不是非常适合大规模的日志处理系统,理由如下:
1) 过于关注可靠性,这些可靠性增加了系统实现 &API 的复杂度,而在日志处理过程中,丢失几条日志常常“无伤大雅”
2) 包括 API,scale 及消息缓冲的设计理念都不适合 Hign Throughput 的日志处理系统
针对这些问题,近些年各个公司都做了一些自己的日志收集系统,例如:Facebook 的 Scribe、Yahoo 的 data highway,Cloudera 的 Flume,Apache 的 Chukwa,百度的 BigPipe,阿里的 RocketMQ。
Kafka 是 LinkedIn 开发并开源出来的一个高吞吐的分布式消息系统。其具有以下特点:
1) 支持高 Throughput 的应用
2) scale out:无需停机即可扩展机器
3) 持久化:通过将数据持久化到硬盘以及 replication 防止数据丢失
4) 支持 online 和 offline 的场景。
2. 介绍
kafka 使用 scala 开发,支持多语言客户端(c++、Java、Python、go 等)其架构如下[2]:
Producer:消息发布者
Broker:消息中间件处理结点,一个 kafka 节点就是一个 broker
Consumer:消息订阅者
kafka 的消息分几个层次:
1) Topic:一类消息,例如 page view 日志,click 日志等都可以以 topic 的形式存在,kafka 集群能够同时负责多个 topic 的分发
2) Partition:Topic 物理上的分组,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id(offset)。
3) Message:消息,最小订阅单元
具体流程:
1. Producer 根据指定的 partition 方法(round-robin、hash 等),将消息发布到指定 topic 的 partition 里面
2. kafka 集群接收到 Producer 发过来的消息后,将其持久化到硬盘,并保留消息指定时长(可配置),而不关注消息是否被消费。
3. Consumer 从 kafka 集群 pull 数据,并控制获取消息的 offset
3. 设计
ThroughPut
High Throughput 是 kafka 需要实现的核心目标之一,为此 kafka 做了以下一些设计:
1)数据磁盘持久化:消息不在内存中 cache,直接写入到磁盘,充分利用磁盘的顺序读写性能
2)zero-copy:减少 IO 操作步骤
3)数据批量发送
4)数据压缩
5)Topic 划分为多个 partition,提高 parallelism
load balance&HA
1) producer 根据用户指定的算法,将消息发送到指定的 partition
2) 存在多个 partiiton,每个 partition 有自己的 replica,每个 replica 分布在不同的 Broker 节点上
3) 多个 partition 需要选取出 lead partition,lead partition 负责读写,并由 zookeeper 负责 fail over
4) 通过 zookeeper 管理 broker 与 consumer 的动态加入与离开
pull-based system
由于 kafka broker 会持久化数据,broker 没有内存压力,因此,consumer 非常适合采取 pull 的方式消费数据,具有以下几点好处:
1)简化 kafka 设计
2)consumer 根据消费能力自主控制消息拉取速度
3)consumer 根据自身情况自主选择消费模式,例如批量,重复消费,从尾端开始消费等
Scale Out
当需要增加 broker 结点时,新增的 broker 会向 zookeeper 注册,而 producer 及 consumer 会根据注册在 zookeeper 上的 watcher 感知这些变化,并及时作出调整。
Kafka 的详细介绍:请点这里
Kafka 的下载地址:请点这里
相关阅读:
分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm
Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm
本文基于 Kafka 0.8
在一台机器上构建一个 3 个节点的 kafka 集群,并测试 producer、consumer 在正常情况下的行为,以及在 lead broker/follow broker 失效情况下的行为
1. 下载并解压 kafka 0.8.0 release
$ mkdir kafka
$ wget http://apache.dataguru.cn/kafka/0.8.0/kafka_2.8.0-0.8.0.tar.gz
$ tar -zxvf kafka_2.8.0-0.8.0.tar.gz
$ cd kafka_2.8.0-0.8.0
$ ll
total 2560
drwxr-xr-x 6 root root 4096 Dec 17 17:44 ./
drwxr-xr-x 4 root root 4096 Dec 17 18:20 ../
drwxr-xr-x 3 root root 4096 Dec 17 18:16 bin/
drwxr-xr-x 2 root root 4096 Dec 17 17:43 config/
-rw-r–r– 1 root root 2520145 Nov 27 06:21 kafka_2.8.0-0.8.0.jar
drwxr-xr-x 2 root root 4096 Nov 27 06:21 libs/
-rw-r–r– 1 root root 12932 Nov 27 06:21 LICENSE
drwxr-xr-x 2 root root 4096 Dec 17 18:00 logs/
-rw——- 1 root root 47165 Dec 17 18:10 nohup.out
-rw-r–r– 1 root root 162 Nov 27 06:21 NOTICE
2. 启动一个单节点的 zookeeper
$ nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
3. 准备启动一个 3 个 broker 节点的 kafka 集群,因此做如下配置
$ cp config/server.properties config/server-1.properties
$ cp config/server.properties config/server-2.properties
并做如下修改:
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
说明:
broker.id: broker 节点的唯一标识
port: broker 节点使用端口号
log.dir: 消息目录位置
4. 启动 3 个 broker 节点
$ JMX_PORT=9997 bin/kafka-server-start.sh config/server-1.properties &
$ JMX_PORT=9998 bin/kafka-server-start.sh config/server-2.properties &
$ JMX_PORT=9999 bin/kafka-server-start.sh config/server.properties &
5. 创建 topic 并查看
$ bin/kafka-create-topic.sh –zookeeper localhost:2181 –replica 3 –partition 1 –topic 3test
creation succeeded!
$ bin/kafka-list-topic.sh –zookeeper localhost:2181
topic: 3test partition: 0 leader: 2 replicas: 2,1,0 isr: 2,1,0
topic: test partition: 0 leader: 0 replicas: 0 isr: 0
topic: test_topic partition: 0 leader: 1 replicas: 0,1,2 isr: 1,2,0
说明:
partiton:partion id,由于此处只有一个 partition,因此 partition id 为 0
leader:当前负责读写的 lead broker id
relicas:当前 partition 的所有 replication broker list
isr:relicas 的子集,只包含出于活动状态的 broker
6. 启动 consumer & producer,并在 producer 启动后的 console 输入一些信息
$ bin/kafka-console-consumer.sh –zookeeper localhost:2181 –from-beginning –topic 3test
message1
message3
message2
$ bin/kafka-console-producer.sh –broker-list localhost:9092,localhost:9093,localhost:9094 –topic 3test
message1
message3
message2
producer 发送的数据 consumer 都能正常消费
7. 干掉 follow broker
杀掉一个非 lead broker(lead broker id 为 2)
$ pkill -9 -f server-1.properties
查看 topic:
$ bin/kafka-list-topic.sh –zookeeper localhost:2181
topic: 3test partition: 0 leader: 2 replicas: 2,1,0 isr: 2,0
topic: test partition: 0 leader: 0 replicas: 0 isr: 0
topic: test_topic partition: 0 leader: 2 replicas: 0,1,2 isr: 2,0
此时,存活的 broker 只有 2,0
测试:produce 发送消息,consumer 能正常接收到
8. 继续干掉 leader broker
干掉 leader broker 后,连续查看 topic 状态
$ pkill -9 -f server-2.properties
$ bin/kafka-list-topic.sh –zookeeper localhost:2181
topic: 3test partition: 0 leader: 2 replicas: 2,1,0 isr: 2,0
topic: test partition: 0 leader: 0 replicas: 0 isr: 0
topic: test_topic partition: 0 leader: 2 replicas: 0,1,2 isr: 2,0
$ bin/kafka-list-topic.sh –zookeeper localhost:2181
topic: 3test partition: 0 leader: 2 replicas: 2,1,0 isr: 2,0
topic: test partition: 0 leader: 0 replicas: 0 isr: 0
topic: test_topic partition: 0 leader: 2 replicas: 0,1,2 isr: 2,0
$ bin/kafka-list-topic.sh –zookeeper localhost:2181
topic: 3test partition: 0 leader: 0 replicas: 2,1,0 isr: 0
topic: test partition: 0 leader: 0 replicas: 0 isr: 0
topic: test_topic partition: 0 leader: 0 replicas: 0,1,2 isr: 0
$ bin/kafka-list-topic.sh –zookeeper localhost:2181
topic: 3test partition: 0 leader: 0 replicas: 2,1,0 isr: 0
topic: test partition: 0 leader: 0 replicas: 0 isr: 0
topic: test_topic partition: 0 leader: 0 replicas: 0,1,2 isr: 0
杀掉 leader broker 过了一会,broker 0 成为新的 leader broker
测试:produce 发送消息,consumer 能正常接收到
本文基于 Kafka 0.8
1. 引言
互联网够公司的日志无处不在,web 日志,js 日志,搜索日志,监控日志等等。对于这些日志的离线分析(Hadoop),wget&rsync 虽然人力维护成本较高,但可以满足功能行需求。但对于这些日志的实时分析需求(例如实时推荐,监控系统),则往往必须要引入一些“高大上”的系统。
传统的企业消息系统(例如 WebSphere)并不是非常适合大规模的日志处理系统,理由如下:
1) 过于关注可靠性,这些可靠性增加了系统实现 &API 的复杂度,而在日志处理过程中,丢失几条日志常常“无伤大雅”
2) 包括 API,scale 及消息缓冲的设计理念都不适合 Hign Throughput 的日志处理系统
针对这些问题,近些年各个公司都做了一些自己的日志收集系统,例如:Facebook 的 Scribe、Yahoo 的 data highway,Cloudera 的 Flume,Apache 的 Chukwa,百度的 BigPipe,阿里的 RocketMQ。
Kafka 是 LinkedIn 开发并开源出来的一个高吞吐的分布式消息系统。其具有以下特点:
1) 支持高 Throughput 的应用
2) scale out:无需停机即可扩展机器
3) 持久化:通过将数据持久化到硬盘以及 replication 防止数据丢失
4) 支持 online 和 offline 的场景。
2. 介绍
kafka 使用 scala 开发,支持多语言客户端(c++、Java、Python、go 等)其架构如下[2]:
Producer:消息发布者
Broker:消息中间件处理结点,一个 kafka 节点就是一个 broker
Consumer:消息订阅者
kafka 的消息分几个层次:
1) Topic:一类消息,例如 page view 日志,click 日志等都可以以 topic 的形式存在,kafka 集群能够同时负责多个 topic 的分发
2) Partition:Topic 物理上的分组,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id(offset)。
3) Message:消息,最小订阅单元
具体流程:
1. Producer 根据指定的 partition 方法(round-robin、hash 等),将消息发布到指定 topic 的 partition 里面
2. kafka 集群接收到 Producer 发过来的消息后,将其持久化到硬盘,并保留消息指定时长(可配置),而不关注消息是否被消费。
3. Consumer 从 kafka 集群 pull 数据,并控制获取消息的 offset
3. 设计
ThroughPut
High Throughput 是 kafka 需要实现的核心目标之一,为此 kafka 做了以下一些设计:
1)数据磁盘持久化:消息不在内存中 cache,直接写入到磁盘,充分利用磁盘的顺序读写性能
2)zero-copy:减少 IO 操作步骤
3)数据批量发送
4)数据压缩
5)Topic 划分为多个 partition,提高 parallelism
load balance&HA
1) producer 根据用户指定的算法,将消息发送到指定的 partition
2) 存在多个 partiiton,每个 partition 有自己的 replica,每个 replica 分布在不同的 Broker 节点上
3) 多个 partition 需要选取出 lead partition,lead partition 负责读写,并由 zookeeper 负责 fail over
4) 通过 zookeeper 管理 broker 与 consumer 的动态加入与离开
pull-based system
由于 kafka broker 会持久化数据,broker 没有内存压力,因此,consumer 非常适合采取 pull 的方式消费数据,具有以下几点好处:
1)简化 kafka 设计
2)consumer 根据消费能力自主控制消息拉取速度
3)consumer 根据自身情况自主选择消费模式,例如批量,重复消费,从尾端开始消费等
Scale Out
当需要增加 broker 结点时,新增的 broker 会向 zookeeper 注册,而 producer 及 consumer 会根据注册在 zookeeper 上的 watcher 感知这些变化,并及时作出调整。
Kafka 的详细介绍:请点这里
Kafka 的下载地址:请点这里
相关阅读:
分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm
Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm