共计 20939 个字符,预计需要花费 53 分钟才能阅读完成。
一、Kafka 概述
Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于像 Hadoop 的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka 的目的是通过 Hadoop 的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。
分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm
Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm
Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm
Apache kafka 原理与特性(0.8V) http://www.linuxidc.com/Linux/2014-09/107388.htm
Kafka 部署与代码实例 http://www.linuxidc.com/Linux/2014-09/107387.htm
Kafka 介绍和集群环境搭建 http://www.linuxidc.com/Linux/2014-09/107382.htm
二、Kafka 相关术语
- BrokerKafka 集群包含一个或多个服务器,这种服务器被称为 broker
- Topic每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 Topic。(物理上不同 Topic 的消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)
- PartitionPartition 是物理上的概念,每个 Topic 包含一个或多个 Partition.
- Producer负责发布消息到 Kafka broker
- Consumer消息消费者,向 Kafka broker 读取消息的客户端。
- Consumer Group每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name,若不指定 group name 则属于默认的 group)。
二、Kafka 下载及安装
1、下载
1 | wget http://apache.fayea.com/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz |
2、安装
1 2 | tar zxvf kafka_2. 11 - 0.9 . 0.1 .tgz cd kafka_2. 11 - 0.9 . 0.1 |
3、集群配置
设定有两台服务器 192.168.1.237、192.168.1.238,两台服务器各安装有两 zookeeper, 端口都为 2181(zookeeper 不再说明),每个服务器都为 Kafka 配置 3 个 broker。
3.1、server.properties 配置
1 2 3 4 5 6 | broker.id = 10 port = 9090 host.name= 192.168 . 1.237 advertised.host.name= 192.168 . 1.237 log.dirs=/tmp/kafka-logs/server0 zookeeper.connect= 192.168 . 1.237 : 2181 , 192.168 . 1.238 : 2181 |
说明:host.name\advertised.host.name 两个参数还是要配置为 IP,否则会有各种各样的问题。
3.2、server1.properties 配置
1 | cp config/servier.properties config/server1.properties<br>vim config/server1.properties |
1 2 3 4 5 6 | broker.id = 11 port = 9091 host.name= 192.168 . 1.237 advertised.host.name= 192.168 . 1.237 log.dirs=/tmp/kafka-logs/server1 zookeeper.connect= 192.168 . 1.237 : 2181 , 192.168 . 1.238 : 2181 |
3.3、server2.properties 配置
1 2 | cp config/servier.properties config/server2.properties vim config/server2.properties |
1 2 3 4 5 6 | broker.id = 12 port = 9092 host.name= 192.168 . 1.237 advertised.host.name= 192.168 . 1.237 log.dirs=/tmp/kafka-logs/server2 zookeeper.connect= 192.168 . 1.237 : 2181 , 192.168 . 1.238 : 2181 |
说明:同一台服务器 port、log.dirs 不能相同,不同的服务器 broker.id 只要在一个集群中都不能相同。
3.4、同理 另一台服务器的 server.properties,server1.properties,server2.properties 的 broker.id 分别为:20、21、22,port 分别为:9090、9091、9092 其它:host.name=192.168.1.238、advertised.host.name=192.168.1.238
3.5、启动
1 2 3 | bin/kafka-server-start.sh config/server.properties & bin/kafka-server-start.sh config/server1.properties & bin/kafka-server-start.sh config/server2.properties & |
3.6、监控端口
1 2 3 4 | netstat -tunpl |grep 2181 netstat -tunpl |grep 9090 netstat -tunpl |grep 9091 netstat -tunpl |grep 9092 |
看一下这 4 个端口起来没有,并看一下 iptables 有没有加入这 4 个 IP 的启动,或要把 iptables 相关,否则 Java 连接不进来。
四、测试
4.1、创建 Topic
1 | bin/kafka-topics.sh --create --zookeeper 192.168 . 1.237 : 2181 --replication-factor 3 --partitions 1 --topic testTopic |
4.2、查看创建情况
1 | bin/kafka-topics.sh --describe --zookeeper 192.168 . 1.237 : 2181 --topic testTopic |
4.3、生产者发送消息
1 | bin/kafka-console-producer.sh --broker-list 192.168 . 1.237 : 9090 --topic testTopic |
4.4、消费都接收消息
1 | bin/kafka-console-consumer.sh --zookeeper 192.168 . 1.237 : 2181 --from-beginning --topic testTopic |
4.5、检查 consumer offset 位置
1 | bin/kafka-run- class .sh kafka.tools.ConsumerOffsetChecker --zkconnect 192.168 . 1.237 : 2181 --group testTopic |
五、遇到的问题
1、运行一段时间报错
1 2 3 4 5 6 | # # There is insufficient memory for the Java Runtime Environment to continue . # Native memory allocation (malloc) failed to allocate 986513408 bytes for committing reserved memory. # An error report file with more information is saved as: # //hs_err_pid6500.log OpenJDK 64 -Bit Server VM warning: INFO: os::commit_memory( 0x00000000bad30000 , 986513408 , 0 ) failed; error= 'Cannot allocate memory' (errno= 12 ) |
解决:
you can adjust the JVM heap size by editing kafka-server-start.sh
, zookeeper-server-start.sh
and so on:
1 | export KAFKA_HEAP_OPTS= "-Xmx1G -Xms1G" |
The -Xms
parameter specifies the minimum heap size. To get your server to at least start up, try changing it to use less memory. Given that you only have 512M, you should change the maximum heap size (-Xmx
) too:
1 | export KAFKA_HEAP_OPTS= "-Xmx256M -Xms128M" |
更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2016-06/132067p2.htm
一、概述
Spring Integration Kafka 是基于 Apache Kafka 和 Spring Integration 来集成 Kafka,对开发配置提供了方便。
二、配置
1、spring-kafka-consumer.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | <?xml version= "1.0" encoding= "UTF-8" ?> <beans xmlns= "http://www.springframework.org/schema/beans" xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance" xmlns: int = "http://www.springframework.org/schema/integration" xmlns: int -kafka= "http://www.springframework.org/schema/integration/kafka" xmlns:task= "http://www.springframework.org/schema/task" xsi:schemaLocation="http: //www.springframework.org/schema/integration/kafka http: //www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd http: //www.springframework.org/schema/integration http: //www.springframework.org/schema/integration/spring-integration.xsd http: //www.springframework.org/schema/beans http: //www.springframework.org/schema/beans/spring-beans.xsd http: //www.springframework.org/schema/task http: //www.springframework.org/schema/task/spring-task.xsd"> <!-- topic test conf --> < int :channel id= "inputFromKafka" > < int :dispatcher task-executor= "kafkaMessageExecutor" /> </ int :channel> <!-- zookeeper 配置 可以配置多个 --> < int -kafka:zookeeper-connect id= "zookeeperConnect" zk-connect= "192.168.1.237:2181" zk-connection-timeout= "6000" zk-session-timeout= "6000" zk-sync-time= "2000" /> <!-- channel 配置 auto-startup= "true" 否则接收不发数据 --> < int -kafka:inbound-channel-adapter id= "kafkaInboundChannelAdapter" kafka-consumer-context-ref= "consumerContext" auto-startup= "true" channel= "inputFromKafka" > < int :poller fixed-delay= "1" time-unit= "MILLISECONDS" /> </ int -kafka:inbound-channel-adapter> <task:executor id= "kafkaMessageExecutor" pool-size= "8" keep-alive= "120" queue-capacity= "500" /> <bean id= "kafkaDecoder" class = "org.springframework.integration.kafka.serializer.common.StringDecoder" /> <bean id= "consumerProperties" class = "org.springframework.beans.factory.config.PropertiesFactoryBean" > <property name= "properties" > <props> <prop key= "auto.offset.reset" >smallest</prop> <prop key= "socket.receive.buffer.bytes" > 10485760 </prop> <!-- 10M --> <prop key= "fetch.message.max.bytes" > 5242880 </prop> <prop key= "auto.commit.interval.ms" > 1000 </prop> </props> </property> </bean> <!-- 消息接收的 BEEN --> <bean id= "kafkaConsumerService" class = "com.sunney.service.impl.KafkaConsumerService" /> <!-- 指定接收的方法 --> < int :outbound-channel-adapter channel= "inputFromKafka" ref= "kafkaConsumerService" method= "processMessage" /> < int -kafka:consumer-context id= "consumerContext" consumer-timeout= "1000" zookeeper-connect= "zookeeperConnect" consumer-properties= "consumerProperties" > < int -kafka:consumer-configurations> < int -kafka:consumer-configuration group-id= "default1" value-decoder= "kafkaDecoder" key-decoder= "kafkaDecoder" max-messages= "5000" > <!-- 两个 TOPIC 配置 --> < int -kafka:topic id= "mytopic" streams= "4" /> < int -kafka:topic id= "sunneytopic" streams= "4" /> </ int -kafka:consumer-configuration> </ int -kafka:consumer-configurations> </ int -kafka:consumer-context> </beans> |
2、spring-kafka-producer.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | <?xml version= "1.0" encoding= "UTF-8" ?> <beans xmlns= "http://www.springframework.org/schema/beans" xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance" xmlns: int = "http://www.springframework.org/schema/integration" xmlns: int -kafka= "http://www.springframework.org/schema/integration/kafka" xmlns:task= "http://www.springframework.org/schema/task" xsi:schemaLocation="http: //www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd http: //www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http: //www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http: //www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <!-- commons config --> <bean id= "stringSerializer" class = "org.apache.kafka.common.serialization.StringSerializer" /> <bean id= "kafkaEncoder" class = "org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder" > <constructor-arg value= "Java.lang.String" /> </bean> <bean id= "producerProperties" class = "org.springframework.beans.factory.config.PropertiesFactoryBean" > <property name= "properties" > <props> <prop key= "topic.metadata.refresh.interval.ms" > 3600000 </prop> <prop key= "message.send.max.retries" > 5 </prop> <prop key= "serializer.class" >kafka.serializer.StringEncoder</prop> <prop key= "request.required.acks" > 1 </prop> </props> </property> </bean> <!-- topic test config --> < int :channel id= "kafkaTopicTest" > < int :queue /> </ int :channel> < int -kafka:outbound-channel-adapter id= "kafkaOutboundChannelAdapterTopicTest" kafka-producer-context-ref= "producerContextTopicTest" auto-startup= "true" channel= "kafkaTopicTest" order= "3" > < int :poller fixed-delay= "1000" time-unit= "MILLISECONDS" receive-timeout= "1" task-executor= "taskExecutor" /> </ int -kafka:outbound-channel-adapter> <task:executor id= "taskExecutor" pool-size= "5" keep-alive= "120" queue-capacity= "500" /> <!-- <bean id= "kafkaEncoder" class = "org.springframework.integration.kafka.serializer.avro.AvroSpecificDatumBackedKafkaEncoder" > <constructor-arg value= "com.company.AvroGeneratedSpecificRecord" /> </bean> --> < int -kafka:producer-context id= "producerContextTopicTest" producer-properties= "producerProperties" > < int -kafka:producer-configurations> <!-- 多个 topic 配置 --> < int -kafka:producer-configuration broker-list= "192.168.1.237:9090,192.168.1.237:9091,192.168.1.237:9092" key-serializer= "stringSerializer" value- class -type= "java.lang.String" value-serializer= "stringSerializer" topic= "mytopic" /> < int -kafka:producer-configuration broker-list= "192.168.1.237:9090,192.168.1.237:9091,192.168.1.237:9092" key-serializer= "stringSerializer" value- class -type= "java.lang.String" value-serializer= "stringSerializer" topic= "sunneytopic" /> </ int -kafka:producer-configurations> </ int -kafka:producer-context> </beans> |
3、发消息接口 KafkaService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | package com.sunney.service; /** * 类 KafkaService.java 的实现描述:发消息接口类 * @author Sunney 2016 年 4 月 30 日 上午 11:30:53 */ public interface KafkaService { /** * 发消息 * @param topic 主题 * @param obj 发送内容 */ public void sendUserInfo(String topic, Object obj); } |
4、发消息实现类 KafkaServiceImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | package com.sunney.service.impl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.integration.kafka.support.KafkaHeaders; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import org.springframework.stereotype.Service; import com.sunney.service.KafkaService; /** * 类 KafkaServiceImpl.java 的实现描述:发消息实现类 * @author Sunney 2016 年 4 月 30 日 上午 11:31:13 */ @Service ( "kafkaService" ) public class KafkaServiceImpl implements KafkaService{ @Autowired @Qualifier ( "kafkaTopicTest" ) MessageChannel channel; public void sendUserInfo(String topic, Object obj) { channel.send(MessageBuilder.withPayload(obj) .setHeader(KafkaHeaders.TOPIC,topic) .build()); } } |
5、消费接收类 KafkaConsumerService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | package com.sunney.service.impl; import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSON; import com.sunney.service.UserDto; /** * 类 KafkaConsumerService.java 的实现描述:消费接收类 * * @author Sunney 2016 年 4 月 30 日 上午 11:46:14 */ public class KafkaConsumerService { static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService. class ); public void processMessage(Map<String, Map<Integer, String>> msgs) { logger.info( "===============processMessage===============" ); for (Map.Entry<String, Map<Integer, String>> entry : msgs.entrySet()) { logger.info( "============Topic:" + entry.getKey()); LinkedHashMap<Integer, String> messages = (LinkedHashMap<Integer, String>) entry.getValue(); Set<Integer> keys = messages.keySet(); for (Integer i : keys) logger.info( "======Partition:" + i); Collection<String> values = messages.values(); for (Iterator<String> iterator = values.iterator(); iterator.hasNext();) { String message = "[" +iterator.next()+ "]" ; logger.info( "=====message:" + message); List<UserDto> userList = JSON.parseArray(message, UserDto. class ); logger.info( "=====userList.size:" + userList.size()); } } } } |
6、pom
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId> <version> 1.3 . 0 .RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version> 4.11 </version> <scope>test</scope> </dependency> <dependency> <groupId> org.apache.avro</groupId> <artifactId>avro</artifactId> <version> 1.7 . 7 </version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version> 1.2 . 7 </version> </dependency> </dependencies> |
六、源代码地址:https://github.com/sunney2010/kafka-demo
七、遇到的问题
1、消费端口收不到消息
spring-kafka-consumer.xml 的 auto-startup 设置为 true
一、Kafka 概述
Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于像 Hadoop 的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka 的目的是通过 Hadoop 的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。
分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm
Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm
Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm
Apache kafka 原理与特性(0.8V) http://www.linuxidc.com/Linux/2014-09/107388.htm
Kafka 部署与代码实例 http://www.linuxidc.com/Linux/2014-09/107387.htm
Kafka 介绍和集群环境搭建 http://www.linuxidc.com/Linux/2014-09/107382.htm
二、Kafka 相关术语
- BrokerKafka 集群包含一个或多个服务器,这种服务器被称为 broker
- Topic每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 Topic。(物理上不同 Topic 的消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)
- PartitionPartition 是物理上的概念,每个 Topic 包含一个或多个 Partition.
- Producer负责发布消息到 Kafka broker
- Consumer消息消费者,向 Kafka broker 读取消息的客户端。
- Consumer Group每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name,若不指定 group name 则属于默认的 group)。
二、Kafka 下载及安装
1、下载
1 | wget http://apache.fayea.com/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz |
2、安装
1 2 | tar zxvf kafka_2. 11 - 0.9 . 0.1 .tgz cd kafka_2. 11 - 0.9 . 0.1 |
3、集群配置
设定有两台服务器 192.168.1.237、192.168.1.238,两台服务器各安装有两 zookeeper, 端口都为 2181(zookeeper 不再说明),每个服务器都为 Kafka 配置 3 个 broker。
3.1、server.properties 配置
1 2 3 4 5 6 | broker.id = 10 port = 9090 host.name= 192.168 . 1.237 advertised.host.name= 192.168 . 1.237 log.dirs=/tmp/kafka-logs/server0 zookeeper.connect= 192.168 . 1.237 : 2181 , 192.168 . 1.238 : 2181 |
说明:host.name\advertised.host.name 两个参数还是要配置为 IP,否则会有各种各样的问题。
3.2、server1.properties 配置
1 | cp config/servier.properties config/server1.properties<br>vim config/server1.properties |
1 2 3 4 5 6 | broker.id = 11 port = 9091 host.name= 192.168 . 1.237 advertised.host.name= 192.168 . 1.237 log.dirs=/tmp/kafka-logs/server1 zookeeper.connect= 192.168 . 1.237 : 2181 , 192.168 . 1.238 : 2181 |
3.3、server2.properties 配置
1 2 | cp config/servier.properties config/server2.properties vim config/server2.properties |
1 2 3 4 5 6 | broker.id = 12 port = 9092 host.name= 192.168 . 1.237 advertised.host.name= 192.168 . 1.237 log.dirs=/tmp/kafka-logs/server2 zookeeper.connect= 192.168 . 1.237 : 2181 , 192.168 . 1.238 : 2181 |
说明:同一台服务器 port、log.dirs 不能相同,不同的服务器 broker.id 只要在一个集群中都不能相同。
3.4、同理 另一台服务器的 server.properties,server1.properties,server2.properties 的 broker.id 分别为:20、21、22,port 分别为:9090、9091、9092 其它:host.name=192.168.1.238、advertised.host.name=192.168.1.238
3.5、启动
1 2 3 | bin/kafka-server-start.sh config/server.properties & bin/kafka-server-start.sh config/server1.properties & bin/kafka-server-start.sh config/server2.properties & |
3.6、监控端口
1 2 3 4 | netstat -tunpl |grep 2181 netstat -tunpl |grep 9090 netstat -tunpl |grep 9091 netstat -tunpl |grep 9092 |
看一下这 4 个端口起来没有,并看一下 iptables 有没有加入这 4 个 IP 的启动,或要把 iptables 相关,否则 Java 连接不进来。
四、测试
4.1、创建 Topic
1 | bin/kafka-topics.sh --create --zookeeper 192.168 . 1.237 : 2181 --replication-factor 3 --partitions 1 --topic testTopic |
4.2、查看创建情况
1 | bin/kafka-topics.sh --describe --zookeeper 192.168 . 1.237 : 2181 --topic testTopic |
4.3、生产者发送消息
1 | bin/kafka-console-producer.sh --broker-list 192.168 . 1.237 : 9090 --topic testTopic |
4.4、消费都接收消息
1 | bin/kafka-console-consumer.sh --zookeeper 192.168 . 1.237 : 2181 --from-beginning --topic testTopic |
4.5、检查 consumer offset 位置
1 | bin/kafka-run- class .sh kafka.tools.ConsumerOffsetChecker --zkconnect 192.168 . 1.237 : 2181 --group testTopic |
五、遇到的问题
1、运行一段时间报错
1 2 3 4 5 6 | # # There is insufficient memory for the Java Runtime Environment to continue . # Native memory allocation (malloc) failed to allocate 986513408 bytes for committing reserved memory. # An error report file with more information is saved as: # //hs_err_pid6500.log OpenJDK 64 -Bit Server VM warning: INFO: os::commit_memory( 0x00000000bad30000 , 986513408 , 0 ) failed; error= 'Cannot allocate memory' (errno= 12 ) |
解决:
you can adjust the JVM heap size by editing kafka-server-start.sh
, zookeeper-server-start.sh
and so on:
1 | export KAFKA_HEAP_OPTS= "-Xmx1G -Xms1G" |
The -Xms
parameter specifies the minimum heap size. To get your server to at least start up, try changing it to use less memory. Given that you only have 512M, you should change the maximum heap size (-Xmx
) too:
1 | export KAFKA_HEAP_OPTS= "-Xmx256M -Xms128M" |
更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2016-06/132067p2.htm
一、概述
Kafka 在雅虎内部被很多团队使用, 媒体团队用它做实时分析流水线, 可以处理高达 20Gbps(压缩数据)的峰值带宽。
为了简化开发者和服务工程师维护 Kafka 集群的工作,构建了一个叫做 Kafka 管理器的基于 Web 工具,叫做 Kafka Manager。这个管理工具可以很容易地发现分布在集群中的哪些 topic 分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建 Topic。同时,这个管理工具也是一个非常好的可以快速浏览这个集群的工具。
该软件是用 Scala 语言编写的。目前 (2015 年 02 月 03 日) 雅虎已经开源了 Kafka Manager 工具。这款 Kafka 集群管理工具主要支持以下几个功能:
1、管理几个不同的集群;
2、很容易地检查集群的状态 (topics, brokers, 副本的分布, 分区的分布);
3、选择副本;
4、产生分区分配(Generate partition assignments) 基于集群的当前状态;
5、重新分配分区。
二、Kafka Manager 下载及安装
项目地址:https://github.com/yahoo/kafka-manager
这个项目比 https://github.com/claudemamo/kafka-web-console 要好用一些,显示的信息更加丰富,kafka-manager 本身可以是一个集群。
不过 kafka-manager 也没有权限管理功能。
下载:
1 | git clone git @github .com:yahoo/kafka-manager.git |
下载完后,只能源代码你什么也做不了,我们要把项目编译打包,该软件是用 Scala 语言编写,把有编译打包很麻烦,他依赖于 sbt。sbt 比较难安装。
三、sbt 安装
1、下载 sbt-0.13.11
我的服务器是 CentOS 自动安装几次没有成功,我还是选择手动安装。
请自己到 http://www.scala-sbt.org/download.html 下载最新版本,我的版本是 sbt-0.13.11
2、建立目录,解压文件到所建立目录
1 2 | $ sudo mkdir /opt/scala/sbt $ sudo tar zxvf sbt- 0.13 . 11 .tgz -C /opt/scala/ |
3、建立启动 sbt 的脚本文件
1 2 3 4 5 6 7 | /* 选定一个位置,建立启动 sbt 的脚本文本文件,如 /opt/scala/sbt/ 目录下面新建文件名为 sbt 的文本文件 */ $ cd /opt/scala/sbt/ $ vim sbt /* 在 sbt 文本文件中添加 BT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M" Java $SBT_OPTS -jar /opt/scala/sbt/bin/sbt-launch.jar "$@" 然后按 esc 键 输入 :wq 保存退出,注意红色字体中的路径可以是绝对路径也可以是相对路径,只要能够正确的定位到解压的 sbt 文件包中的 sbt-launch.jar 文件即可 */ |
并修改 sbt 文件权限
1 | $ chmod u+x sbt |
4、配置 PATH 环境变量,保证在控制台中可以使用 sbt 命令
1 2 3 | $ vim /etc/profile /* 在文件尾部添加如下代码后,保存退出 */ export PATH=/opt/scala/sbt/:$PATH |
1 2 | /* 使配置文件立刻生效 */ $ source /etc/profile |
5、测试 sbt 是否安装成功
第一次执行时,会下载一些文件包,然后才能正常使用,要确保联网了,下载的过程分很慢。安装成功后显示如下
1 | sbt sbt-version<br>[info] Set current project to sbt (in build file:/opt/scala/sbt/)<br>[info] 0.13 . 11 |
四、编绎打包
1 2 | cd kafka-manager sbt clean dist |
生成的包会在 kafka-manager/target/universal 下面。生成的包只需要 java 环境就可以运行了,在部署的机器上不需要安装 sbt。
如果打包会很慢的要有点耐心呀,还有可能打包失败,可以考虑配置代理。
四、Kafka Manager 部署
1、打好包好,在部署机器上解压,修改好配置文件,就可以运行了 – 解压
1 | unzip kafka-manager- 1.0 -SNAPSHOT.zip |
2、修改 conf/application.conf,把 kafka-manager.zkhosts 改为自己的 zookeeper 服务器地址
1 | kafka-manager.zkhosts= "192.168.1.237:2181" |
3、启动
1 2 | cd kafka-manager- 1.0 -SNAPSHOT/bin ./kafka-manager -Dconfig.file=../conf/application.conf |
4、查看帮助 和 后台运行
1 2 | ./kafka-manager -h nohup ./kafka-manager -Dconfig.file=../conf/application.conf >/dev/ null 2 >& 1 & |
说明:正常来说,play 框架应该会自动加载 conf/application.conf 配置里的内容,但是貌似这个不起作用,要显式指定才行。
参考:https://github.com/yahoo/kafka-manager/issues/16
5、默认 http 端口是 9000,可以修改配置文件里的 http.port 的值,或者通过命令行参数传递:
1 | ./kafka-manager -Dhttp.port= 9001 |
五、sbt 配置代理
sbt 的配置 http 代理的参考文档:http://www.scala-sbt.org/0.12.1/docs/Detailed-Topics/Setup-Notes.html#http-proxy
通过 - D 设置叁数即可:
1 | java -Dhttp.proxyHost=myproxy -Dhttp.proxyPort= 8080 -Dhttp.proxyUser=username -Dhttp.proxyPassword=mypassword |
也可以用下面这种方式,设置一下 SBT_OPTS 的环境变量即可:
1 | export SBT_OPTS= "$SBT_OPTS -Dhttp.proxyHost=myproxy -Dhttp.proxyPort=myport" |
注意:myproxy,这个值里不要带 http 前缀,也不要带端口号。
比如,你的代理是 http://localhost:8123,那么应该这样配置:
1 | export SBT_OPTS= "$SBT_OPTS -Dhttp.proxyHost=localhost -Dhttp.proxyPort=8123" |
本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-06/132067.htm