共计 11586 个字符,预计需要花费 29 分钟才能阅读完成。
上一篇文章《Kafka 高吞吐量性能揭秘 http://www.linuxidc.com/Linux/2016-03/129065.htm》介绍了 Kafka 在设计上是如何来保证高时效、大吞吐量的,主要的内容集中在底层原理和架构上,属于理论知识范畴。这次我们站在应用和运维的角度,聊一聊集群到位后要怎么才能最好的配置参数和进行测试性能。Kafka 的配置详尽且复杂,想要进行全面的性能调优需要掌握大量信息,我也只是通过工作中的一些实战经验来筛选出对集群性能影响最大的几个要点,接下来要阐述的观点也仅限于我所描述的环境下,请大家根据自己的环境适当取舍。
今天的文章分为两大部分,第一部分介绍一下我总结的跟性能有关的一些参数、含义以及调优策略。第二部分会给出一些我自己实践过的测试结果对照组,具体的数值和结果可能因场景、机器、环境而异,但是总体的思路和方法应该是一致的。
在正式进入主题之前,介绍一下本次测试所使用的机器配置:
6 台物理机,其中三台部署 Broker,三台专门用来 launch request。
每台物理机:24 Processors,189G Memory,2G 单机带宽。
执行本次测试时为了能够覆盖到到一些“非常规”的用法,我把 Broker 的 HeapSize 设置到了 30G。
相关参数介绍
在调试和优化使用 Java 开发的系统时,第一步肯定绕不开对 JVM 的调优,Kafka 自然也不例外,而 JVM 调优的重点则是在内存上。
其实 Kafka 服务本身并不需要很大内存,上篇文章也已经详细介绍过 Kafka 依赖系统提供的 PageCache 来满足性能上的要求,利用 VisualJVM 等工具可以很清晰的分析出 Heap Space 的占用比例情况。本文中测试时设置 30G 内存的目的是支持更高的并发,高并发本身就必然会需要更多的内存来支持,同时高并发也意味着 SocketBuffer 等相关缓存容量会成倍增长。实际使用中,调整内存大小的准则是留给系统尽可能多的空闲内存,Broker 本身则是够用就好。
说完了大小设置我们再来聊一下 JVM 上的垃圾回收器,官方文档里推荐使用最新的 G1 来代替 CMS 作为垃圾回收器。不过也明确指出在某些低版本 (1.7u21) 的 JDK 上还是会存在一些不稳定的问题。推荐使用的最低版本为 JDK 1.7u51。下面是本次试验中 Broker 的 JVM 内存配置参数:
-Xms30g -Xmx30g -XX:PermSize=48m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35
其实 G1 早在 JDK 1.6u14 中就已经作为体验版首次被引入,但是由于最初误宣传需要收费才能使用,和其自身尚不稳定存在 Bug 等因素,一直等到 1.7 的后期 update 版本才逐渐走入我们的视野。
G1 相比较于 CMS 的优势:
G1 是一种适用于服务器端的垃圾回收器,很好的平衡了吞吐量和响应能力。
对于内存的划分方法不同,Eden, Survivor, Old 区域不再固定,使用内存会更高效。G1 通过对内存进行 Region 的划分,有效避免了内存碎片问题。
G1 可以指定 GC 时可用于暂停线程的时间(不保证严格遵守)。而 CMS 并不提供可控选项。
CMS 只有在 FullGC 之后会重新合并压缩内存,而 G1 把回收和合并集合在一起。
CMS 只能使用在 Old 区,在清理 Young 时一般是配合使用 ParNew,而 G1 可以统一两类分区的回收算法。
G1 的适用场景:
JVM 占用内存较大(At least 4G)
应用本身频繁申请、释放内存,进而产生大量内存碎片时。
对于 GC 时间较为敏感的应用。
接下来,我们来总结一下 Kafka 本身可能会对性能产生影响的配置项。
Broker
num.network.threads:3
用于接收并处理网络请求的线程数,默认为 3。其内部实现是采用 Selector 模型。启动一个线程作为 Acceptor 来负责建立连接,再配合启动 num.network.threads 个线程来轮流负责从 Sockets 里读取请求,一般无需改动,除非上下游并发请求量过大。
num.partitions:1
Partition 的数量选取也会直接影响到 Kafka 集群的吞吐性能。例如我写过 MapReduce 任务从 Kafka 中读取数据,每个 Partition 对应一个 Mapper 去消费数据,如果 Partition 数量太少,则任务会因为 Mapper 数不足而非常慢。此外,当 Partition 数量相对于流入流出的数据量显得较少,或由于业务逻辑和 Partition 数量没有匹配好造成个别 Partition 读写数据量大,大量的读写请求集中落在一台或几台机器上时,很容易就会打满 NIC 的全部流量。不难想象这时不仅这一个 Partition 的读写会出现性能瓶颈,同 Broker 上的其他 Partition 或服务都会陷入一个网络资源匮乏的情况。
queued.max.requests:500
这个参数是指定用于缓存网络请求的队列的最大容量,这个队列达到上限之后将不再接收新请求。一般不会成为瓶颈点,除非 I / O 性能太差,这时需要配合 num.io.threads 等配置一同进行调整。
相关阅读:
分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm
Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm
Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm
Apache kafka 原理与特性(0.8V) http://www.linuxidc.com/Linux/2014-09/107388.htm
Kafka 部署与代码实例 http://www.linuxidc.com/Linux/2014-09/107387.htm
Kafka 介绍和集群环境搭建 http://www.linuxidc.com/Linux/2014-09/107382.htm
Replica 相关配置:
replica.lag.time.max.ms:10000replica.lag.max.messages:4000num.replica.fetchers:1
上篇文章已经简单介绍过上两项配置的含义,这里不再重复,重点说一下第三项配置。对于任意 (Broker, Leader) 元组,都会有 replication.factor- 1 个 Broker 作为 Replica,在 Replica 上会启动若干 Fetch 线程把对应的数据同步到本地,而 num.replica.fetchers 这个参数是用来控制 Fetch 线程的数量。
一般来说如果发现 Partition 的 ISR 当中只有自己一个 Partition,且长时间没有新的 Replica 增加进来时,就可以考虑适当的增大这个参数加快复制进度。其内部实现上,每个 Fetch 就对应了一个 SimpleConsumer,对于任意一台其他机器上需要 Catch-up 的 Leader,会创建 num.replica.fetchers 个 SimpleConsumer 来拉取 Log。
当初刚知道这块设计的时候还蛮疑惑的,在 Kafka 文档开篇的时候就郑重介绍过,同一个 ConsumerGroup 内的 Consumer 和 Partition 在同一时间内必须保证是一对一的消费关系,而这么简单地增加 SimpleConsumer 就可以提高效率又是什么原因呢?
查看源码,在 AbstractFetcherThread.scala 里可以看到,Fetch 启动的多线程其实就是一个个的 SimpleConsumer。
首先,getFetcherId()利用 numFetcher 来控制 FetchId 的范围,进而控制 Consumer 数量。partitionsPerFetcher 结构则是一个从 Partition 到 Partition 上启动的 Fetchers 的 Mapping。
上面为每个 Partition 启动的多个 Fetcher(也就是 SimpleConsumer)之间通过 partitionMap: mutable.HashMap[TopicAndPartition, Long]来共享 offset,达到并行 Fetch 数据的目的。因此,通过共享 offset 既保证了同一时间内 Consumer 和 Partition 之间的一对一关系,又允许我们通过增多 Fetch 线程来提高效率。
default.replication.factor:1
这个参数指新创建一个 topic 时,默认的 Replica 数量。当 Producer 中的 acks!=0 && acks!= 1 时,Replica 的大小可能会导致在 Produce 数据时的性能表现有很大不同。Replica 过少会影响数据的可用性,太多则会白白浪费存储资源,一般建议在 2~3 为宜。
fetch.purgatory.purge.interval.requests:1000producer.purgatory.purge.interval.requests:1000
更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2016-03/129066p2.htm
首先让我先来介绍一下这个“炼狱”究竟是用来做什么用的。Broker 的一项主要工作就是接收并处理网络上发来的 Request。这些 Request 其中有一些是可以立即答复的,那很自然这些 Request 会被直接回复。另外还有一部分是没办法或者 Request 自发的要求延时答复(例如发送和接收的 Batch),Broker 会把这种 Request 放入 Paurgatory 当中,同时每一个加入 Purgatory 当中的 Request 还会额外的加入到两个监控对队列:
WatcherFor 队列:用于检查 Request 是否被满足。
DelayedQueue 队列:用于检测 Request 是否超时。
Request 最终的状态只有一个,就是 Complete。请求被满足和超时最终都会被统一的认为是 Complete。
目前版本的 Purgatory 设计上是存在一定缺陷的。Request 状态转变为 Complete 后,并没能立即从 Purgatory 中移除,而是继续占用资源,因此占用内存累积最终会引发 OOM。这种情况一般只会在 topic 流量较少的情况下触发。更详细的资料可以查阅扩展阅读,在此不做展开。
在实际使用中我也是踩了这个坑过来的,当时的情况是集群新上了一个 topic,初期该 topic 数据很少(Low volume topic),导致那段时间在凌晨 3,4 点左右会随机有 Broker 因为 OOM 挂掉。定位原因后把 *.purgatory.purge.interval.requests 的配置调整小至 100 就解决了这个问题。
Kafka 的研发团队已经开始着手重新设计 Purgatory,力求能够让 Request 在 Complete 时立即从 Purgatory 中移除。
log.flush.interval.ms:Long.MaxValuelog.flush.scheduler.interval.ms:Long.MaxValuelog.flush.interval.messages:Long.MaxValue
Flush 相关的配置参数控制着 Broker 写盘的频率,一般无需改动。如果 topic 的数据量较小可以考虑减少 log.flush.interval.ms 和 log.flush.interval.messages 来强制刷写数据,减少可能由于缓存数据未写盘带来的不一致。
in.insync.replicas:1
这个参数只能在 topic 层级配置,指定每次 Producer 写操作至少要保证有多少个在 ISR 的 Replica 确认,一般配合 request.required.acks 使用。要注意,这个参数如果设置的过高可能会大幅降低吞吐量。
compression.codec:none
Message 落地时是否采用以及采用何种压缩算法。一般都是把 Producer 发过来 Message 直接保存,不再改变压缩方式。
Producer” style=”font-weight: 400; font-size: 16px; color: rgb(0, 0, 0); font-family: ‘Helvetica Neue’, Helvetica, ‘Hiragino Sans GB’, ‘Microsoft YaHei’, Arial, sans-serif; line-height: 25.6px;”>Producer
buffer.memory:33554432 (32m)
在 Producer 端用来存放尚未发送出去的 Message 的缓冲区大小。缓冲区满了之后可以选择阻塞发送或抛出异常,由 block.on.buffer.full 的配置来决定。
compression.type:none
默认发送不进行压缩,推荐配置一种适合的压缩算法,可以大幅度的减缓网络压力和 Broker 的存储压力。
linger.ms:0
Producer 默认会把两次发送时间间隔内收集到的所有 Requests 进行一次聚合然后再发送,以此提高吞吐量,而 linger.ms 则更进一步,这个参数为每次发送增加一些 delay,以此来聚合更多的 Message。
batch.size:16384
Producer 会尝试去把发往同一个 Partition 的多个 Requests 进行合并,batch.size 指明了一次 Batch 合并后 Requests 总大小的上限。如果这个值设置的太小,可能会导致所有的 Request 都不进行 Batch。
acks:1
这个配置可以设定发送消息后是否需要 Broker 端返回确认。
0: 不需要进行确认,速度最快。存在丢失数据的风险。
1: 仅需要 Leader 进行确认,不需要 ISR 进行确认。是一种效率和安全折中的方式。
all: 需要 ISR 中所有的 Replica 给予接收确认,速度最慢,安全性最高,但是由于 ISR 可能会缩小到仅包含一个 Replica,所以设置参数为 all 并不能一定避免数据丢失。
注:新老 Producer 的参数有很大不同,其他配置含义可以对照参考 Kafka 官方文档。
Consumer
num.consumer.fetchers:1
启动 Consumer 的个数,适当增加可以提高并发度。
fetch.min.bytes:1
每次 Fetch Request 至少要拿到多少字节的数据才可以返回。
fetch.wait.max.ms:100
在 Fetch Request 获取的数据至少达到 fetch.min.bytes 之前,允许等待的最大时长。对应上面说到的 Purgatory 中请求的超时时间。
性能测试实战
由于可调整的配置参数较多,为了可以准确的展示不同配置对性能产生的影响,我们每次只调整一个参数,观察对照组结果。测试工具使用 Kafka 提供的 Performance 工具 ProducerPerformance 和 ConsumerPerformance。
Producer
Kafka 在 0.8 版本推出了新的 Producer Client,较之前版本有极大的性能提升,所以后续的示例无需说明都采用的是新 Producer,这里就只给出一组新旧 Producer 的对照组数据。
其中,Producer 的 message.size 为 1024,不压缩,测试时都发送 500000 条 Message。相信大家看过上面结果,就很清楚以后为什么要乖乖地用新设计的 Producer 来发消息了。
Kafka 发布时提供了两个 Producer 的性能测试工具:
kafka.tools.ProducerPerformance (Scala)
org.apache.kafka.clients.tools.ProducerPerformance (Java)
两份工具的大体功能类似。通过 Scala 版的代码可以很方便的输出 CVS 文件,通过 Patch:1190(https://issues.apache.org/jira/browse/KAFKA-1190)中包含的一个 R 脚本可以将这个 CVS 文件结果可视化。
注:如果使用 Scala 版代码,不建议开启 –vary-message-size 功能。这个功能使得每次构造消息时都会在内部调用 random 方法生成随机长度的消息,尤其是在进行压力测试时,构造随机串的消耗累计占比飙高,严重影响发送效率最终致使测试结果失准。
下面,从 thread, acks, linger.ms, replica, compression 几个主要维度测试了一下 Producer 的组合性能表现。其中,公共指标如下:
message.size=1024
batch.siz=10240
message.count=50000000
测试结果如下:
注:部分提高了 linger.ms 的 Case 效果不明显是由于触发了其他的 flush 条件。
Consumer
Consumer 的测试相对来说就简单很多,毕竟拉取数据时只从 Leader 读,无论多少 Replica 都是如此。所以比较关键的参数就聚焦到了 fetch.size 和 thread 上。
上述本文给出的参数只是一种参考,适用于我们的集群配置。大家有兴趣可以根据上面提供的方法,在自己的集群上新建独立 topic,在实际环境中测试,这样得出的配置才是最适合你的配置。希望大家都能通过上面的方法把自己手头的 Kafka 调教好,榨干最后一丝性能。
扩展阅读
- Request Purgatory 潜在引发 OOM 的问题: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=34839465
- Purgatory Redesign: https://cwiki.apache.org/confluence/display/KAFKA/Purgatory+Redesign+Proposal
- 深入理解 G1 内存收集器: http://t.cn/RAUulGC
- How to choose the number of topics/partitions in a Kafka cluster?: http://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster
- Tips for improving performance of kafka producer: http://ingest.tips/2015/07/19/tips-for-improving-performance-of-kafka-producer/
Kafka 的详细介绍:请点这里
Kafka 的下载地址:请点这里
本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-03/129066.htm
上一篇文章《Kafka 高吞吐量性能揭秘 http://www.linuxidc.com/Linux/2016-03/129065.htm》介绍了 Kafka 在设计上是如何来保证高时效、大吞吐量的,主要的内容集中在底层原理和架构上,属于理论知识范畴。这次我们站在应用和运维的角度,聊一聊集群到位后要怎么才能最好的配置参数和进行测试性能。Kafka 的配置详尽且复杂,想要进行全面的性能调优需要掌握大量信息,我也只是通过工作中的一些实战经验来筛选出对集群性能影响最大的几个要点,接下来要阐述的观点也仅限于我所描述的环境下,请大家根据自己的环境适当取舍。
今天的文章分为两大部分,第一部分介绍一下我总结的跟性能有关的一些参数、含义以及调优策略。第二部分会给出一些我自己实践过的测试结果对照组,具体的数值和结果可能因场景、机器、环境而异,但是总体的思路和方法应该是一致的。
在正式进入主题之前,介绍一下本次测试所使用的机器配置:
6 台物理机,其中三台部署 Broker,三台专门用来 launch request。
每台物理机:24 Processors,189G Memory,2G 单机带宽。
执行本次测试时为了能够覆盖到到一些“非常规”的用法,我把 Broker 的 HeapSize 设置到了 30G。
相关参数介绍
在调试和优化使用 Java 开发的系统时,第一步肯定绕不开对 JVM 的调优,Kafka 自然也不例外,而 JVM 调优的重点则是在内存上。
其实 Kafka 服务本身并不需要很大内存,上篇文章也已经详细介绍过 Kafka 依赖系统提供的 PageCache 来满足性能上的要求,利用 VisualJVM 等工具可以很清晰的分析出 Heap Space 的占用比例情况。本文中测试时设置 30G 内存的目的是支持更高的并发,高并发本身就必然会需要更多的内存来支持,同时高并发也意味着 SocketBuffer 等相关缓存容量会成倍增长。实际使用中,调整内存大小的准则是留给系统尽可能多的空闲内存,Broker 本身则是够用就好。
说完了大小设置我们再来聊一下 JVM 上的垃圾回收器,官方文档里推荐使用最新的 G1 来代替 CMS 作为垃圾回收器。不过也明确指出在某些低版本 (1.7u21) 的 JDK 上还是会存在一些不稳定的问题。推荐使用的最低版本为 JDK 1.7u51。下面是本次试验中 Broker 的 JVM 内存配置参数:
-Xms30g -Xmx30g -XX:PermSize=48m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35
其实 G1 早在 JDK 1.6u14 中就已经作为体验版首次被引入,但是由于最初误宣传需要收费才能使用,和其自身尚不稳定存在 Bug 等因素,一直等到 1.7 的后期 update 版本才逐渐走入我们的视野。
G1 相比较于 CMS 的优势:
G1 是一种适用于服务器端的垃圾回收器,很好的平衡了吞吐量和响应能力。
对于内存的划分方法不同,Eden, Survivor, Old 区域不再固定,使用内存会更高效。G1 通过对内存进行 Region 的划分,有效避免了内存碎片问题。
G1 可以指定 GC 时可用于暂停线程的时间(不保证严格遵守)。而 CMS 并不提供可控选项。
CMS 只有在 FullGC 之后会重新合并压缩内存,而 G1 把回收和合并集合在一起。
CMS 只能使用在 Old 区,在清理 Young 时一般是配合使用 ParNew,而 G1 可以统一两类分区的回收算法。
G1 的适用场景:
JVM 占用内存较大(At least 4G)
应用本身频繁申请、释放内存,进而产生大量内存碎片时。
对于 GC 时间较为敏感的应用。
接下来,我们来总结一下 Kafka 本身可能会对性能产生影响的配置项。
Broker
num.network.threads:3
用于接收并处理网络请求的线程数,默认为 3。其内部实现是采用 Selector 模型。启动一个线程作为 Acceptor 来负责建立连接,再配合启动 num.network.threads 个线程来轮流负责从 Sockets 里读取请求,一般无需改动,除非上下游并发请求量过大。
num.partitions:1
Partition 的数量选取也会直接影响到 Kafka 集群的吞吐性能。例如我写过 MapReduce 任务从 Kafka 中读取数据,每个 Partition 对应一个 Mapper 去消费数据,如果 Partition 数量太少,则任务会因为 Mapper 数不足而非常慢。此外,当 Partition 数量相对于流入流出的数据量显得较少,或由于业务逻辑和 Partition 数量没有匹配好造成个别 Partition 读写数据量大,大量的读写请求集中落在一台或几台机器上时,很容易就会打满 NIC 的全部流量。不难想象这时不仅这一个 Partition 的读写会出现性能瓶颈,同 Broker 上的其他 Partition 或服务都会陷入一个网络资源匮乏的情况。
queued.max.requests:500
这个参数是指定用于缓存网络请求的队列的最大容量,这个队列达到上限之后将不再接收新请求。一般不会成为瓶颈点,除非 I / O 性能太差,这时需要配合 num.io.threads 等配置一同进行调整。
相关阅读:
分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm
Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm
Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm
Apache kafka 原理与特性(0.8V) http://www.linuxidc.com/Linux/2014-09/107388.htm
Kafka 部署与代码实例 http://www.linuxidc.com/Linux/2014-09/107387.htm
Kafka 介绍和集群环境搭建 http://www.linuxidc.com/Linux/2014-09/107382.htm
Replica 相关配置:
replica.lag.time.max.ms:10000replica.lag.max.messages:4000num.replica.fetchers:1
上篇文章已经简单介绍过上两项配置的含义,这里不再重复,重点说一下第三项配置。对于任意 (Broker, Leader) 元组,都会有 replication.factor- 1 个 Broker 作为 Replica,在 Replica 上会启动若干 Fetch 线程把对应的数据同步到本地,而 num.replica.fetchers 这个参数是用来控制 Fetch 线程的数量。
一般来说如果发现 Partition 的 ISR 当中只有自己一个 Partition,且长时间没有新的 Replica 增加进来时,就可以考虑适当的增大这个参数加快复制进度。其内部实现上,每个 Fetch 就对应了一个 SimpleConsumer,对于任意一台其他机器上需要 Catch-up 的 Leader,会创建 num.replica.fetchers 个 SimpleConsumer 来拉取 Log。
当初刚知道这块设计的时候还蛮疑惑的,在 Kafka 文档开篇的时候就郑重介绍过,同一个 ConsumerGroup 内的 Consumer 和 Partition 在同一时间内必须保证是一对一的消费关系,而这么简单地增加 SimpleConsumer 就可以提高效率又是什么原因呢?
查看源码,在 AbstractFetcherThread.scala 里可以看到,Fetch 启动的多线程其实就是一个个的 SimpleConsumer。
首先,getFetcherId()利用 numFetcher 来控制 FetchId 的范围,进而控制 Consumer 数量。partitionsPerFetcher 结构则是一个从 Partition 到 Partition 上启动的 Fetchers 的 Mapping。
上面为每个 Partition 启动的多个 Fetcher(也就是 SimpleConsumer)之间通过 partitionMap: mutable.HashMap[TopicAndPartition, Long]来共享 offset,达到并行 Fetch 数据的目的。因此,通过共享 offset 既保证了同一时间内 Consumer 和 Partition 之间的一对一关系,又允许我们通过增多 Fetch 线程来提高效率。
default.replication.factor:1
这个参数指新创建一个 topic 时,默认的 Replica 数量。当 Producer 中的 acks!=0 && acks!= 1 时,Replica 的大小可能会导致在 Produce 数据时的性能表现有很大不同。Replica 过少会影响数据的可用性,太多则会白白浪费存储资源,一般建议在 2~3 为宜。
fetch.purgatory.purge.interval.requests:1000producer.purgatory.purge.interval.requests:1000
更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2016-03/129066p2.htm