共计 13653 个字符,预计需要花费 35 分钟才能阅读完成。
kafka 作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它。kafka 的部署包括 zookeeper 环境 /kafka 环境,同时还需要进行一些配置操作. 接下来介绍如何使用 kafka。
我们使用 3 个 zookeeper 实例构建 zk 集群,使用 2 个 kafka broker 构建 kafka 集群。
其中 kafka 为 0.8V,zookeeper 为 3.4.5V
————————————– 分割线 ————————————–
分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm
Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm
Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm
Kafka 使用入门教程 http://www.linuxidc.com/Linux/2014-07/104470.htm
————————————– 分割线 ————————————–
test-kafka.zip (5.3 KB) 下载
免费下载地址在 http://linux.linuxidc.com/
用户名与密码都是 www.linuxidc.com
具体下载目录在 /2014 年资料 / 9 月 /29 日 /Kafka 部署与代码实例
下载方法见 http://www.linuxidc.com/Linux/2013-07/87684.htm
————————————– 分割线 ————————————–
一.Zookeeper 集群构建
我们有 3 个 zk 实例,分别为 zk-0,zk-1,zk-2; 如果你仅仅是测试使用,可以使用 1 个 zk 实例.
1) zk-0
调整配置文件:
clientPort=2181
server.0=127.0.0.1:2888:3888
server.1=127.0.0.1:2889:3889
server.2=127.0.0.1:2890:3890
## 只需要修改上述配置,其他配置保留默认值
启动 zookeeper
./zkServer.sh start
2) zk-1
调整配置文件 (其他配置和 zk- 0 一只):
clientPort=2182
## 只需要修改上述配置,其他配置保留默认值
启动 zookeeper
./zkServer.sh start
3) zk-2
调整配置文件 (其他配置和 zk- 0 一只):
clientPort=2183
## 只需要修改上述配置,其他配置保留默认值
启动 zookeeper
./zkServer.sh start
二. Kafka 集群构建
因为 Broker 配置文件涉及到 zookeeper 的相关约定,因此我们先展示 broker 配置文件. 我们使用 2 个 kafka broker 来构建这个集群环境,分别为 kafka-0,kafka-1.
1) kafka-0
在 config 目录下修改配置文件为:
broker.id=0
port=9092
num.network.threads=2
num.io.threads=2
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dir=./logs
num.partitions=2
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=536870912
##replication 机制, 让每个 topic 的 partitions 在 kafka-cluster 中备份 2 个
## 用来提高 cluster 的容错能力..
default.replication.factor=1
log.cleanup.interval.mins=10
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
zookeeper.connection.timeout.ms=1000000
因为 kafka 用 scala 语言编写,因此运行 kafka 需要首先准备 scala 相关环境。
> cd kafka-0
> ./sbt update
> ./sbt package
> ./sbt assembly-package-dependency
其中最后一条指令执行有可能出现异常,暂且不管。启动 kafka broker:
> JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &
因为 zookeeper 环境已经正常运行了,我们无需通过 kafka 来挂载启动 zookeeper. 如果你的一台机器上部署了多个 kafka broker,你需要声明 JMS_PORT.
2) kafka-1
broker.id=1
port=9093
## 其他配置和 kafka- 0 保持一致
然后和 kafka- 0 一样执行打包命令,然后启动此 broker.
> JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &
仍然可以通过如下指令查看 topic 的 ”partition”/”replicas” 的分布和存活情况.
> bin/kafka-list-topic.sh –zookeeper localhost:2181
topic: my-replicated-topic partition: 0 leader: 2 replicas: 1,2,0 isr: 2
topic: test partition: 0 leader: 0 replicas: 0 isr: 0
到目前为止环境已经 OK 了, 那我们就开始展示编程实例吧。[配置参数详解] http://www.linuxidc.com/Linux/2014-09/107388.htm
更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2014-09/107387p2.htm
三. 项目准备
项目基于 maven 构建,不得不说 kafka Java 客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下 pom.xml; 其中各个依赖包必须版本协调一致。如果 kafka client 的版本和 kafka server 的版本不一致, 将会有很多异常, 比如 ”broker id not exists” 等; 因为 kafka 从 0.7 升级到 0.8 之后 (正名为 2.8.0),client 与 server 通讯的 protocol 已经改变.
<dependencies>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.14</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.8.2</artifactId>
<version>0.8.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.3</version>
</dependency>
</dependencies>
四.Producer 端代码
1) producer.properties 文件:此文件放在 /resources 目录下
#partitioner.class=
##broker 列表可以为 kafka server 的子集, 因为 producer 需要从 broker 中获取 metadata
## 尽管每个 broker 都可以提供 metadata, 此处还是建议, 将所有 broker 都列举出来
## 此值, 我们可以在 spring 中注入过来
##metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093
##,127.0.0.1:9093
## 同步, 建议为 async
producer.type=sync
compression.codec=0
serializer.class=kafka.serializer.StringEncoder
## 在 producer.type=async 时有效
#batch.num.messages=100
2) KafkaProducerClient.java 代码样例
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* User: guanqing-liu
*/
public class KafkaProducerClient {
private Producer<String, String> inner;
private String brokerList;//for metadata discovery,spring setter
private String location = “kafka-producer.properties”;//spring setter
private String defaultTopic;//spring setter
public void setBrokerList(String brokerList) {
this.brokerList = brokerList;
}
public void setLocation(String location) {
this.location = location;
}
public void setDefaultTopic(String defaultTopic) {
this.defaultTopic = defaultTopic;
}
public KafkaProducerClient(){}
public void init() throws Exception {
Properties properties = new Properties();
properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));
if(brokerList != null) {
properties.put(“metadata.broker.list”, brokerList);
}
ProducerConfig config = new ProducerConfig(properties);
inner = new Producer<String, String>(config);
}
public void send(String message){
send(defaultTopic,message);
}
public void send(Collection<String> messages){
send(defaultTopic,messages);
}
public void send(String topicName, String message) {
if (topicName == null || message == null) {
return;
}
KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);
inner.send(km);
}
public void send(String topicName, Collection<String> messages) {
if (topicName == null || messages == null) {
return;
}
if (messages.isEmpty()) {
return;
}
List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
int i= 0;
for (String entry : messages) {
KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);
kms.add(km);
i++;
if(i % 20 == 0){
inner.send(kms);
kms.clear();
}
}
if(!kms.isEmpty()){
inner.send(kms);
}
}
public void close() {
inner.close();
}
/**
* @param args
*/
public static void main(String[] args) {
KafkaProducerClient producer = null;
try {
producer = new KafkaProducerClient();
//producer.setBrokerList(“”);
int i = 0;
while (true) {
producer.send(“test-topic”, “this is a sample” + i);
i++;
Thread.sleep(2000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (producer != null) {
producer.close();
}
}
}
}
3) spring 配置
<bean id=”kafkaProducerClient” class=”com.test.kafka.KafkaProducerClient” init-method=”init” destroy-method=”close”>
<property name=”zkConnect” value=”${zookeeper_cluster}”></property>
<property name=”defaultTopic” value=”${kafka_topic}”></property>
</bean>
kafka 作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它。kafka 的部署包括 zookeeper 环境 /kafka 环境,同时还需要进行一些配置操作. 接下来介绍如何使用 kafka。
我们使用 3 个 zookeeper 实例构建 zk 集群,使用 2 个 kafka broker 构建 kafka 集群。
其中 kafka 为 0.8V,zookeeper 为 3.4.5V
————————————– 分割线 ————————————–
分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm
Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm
Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm
Kafka 使用入门教程 http://www.linuxidc.com/Linux/2014-07/104470.htm
————————————– 分割线 ————————————–
test-kafka.zip (5.3 KB) 下载
免费下载地址在 http://linux.linuxidc.com/
用户名与密码都是 www.linuxidc.com
具体下载目录在 /2014 年资料 / 9 月 /29 日 /Kafka 部署与代码实例
下载方法见 http://www.linuxidc.com/Linux/2013-07/87684.htm
————————————– 分割线 ————————————–
一.Zookeeper 集群构建
我们有 3 个 zk 实例,分别为 zk-0,zk-1,zk-2; 如果你仅仅是测试使用,可以使用 1 个 zk 实例.
1) zk-0
调整配置文件:
clientPort=2181
server.0=127.0.0.1:2888:3888
server.1=127.0.0.1:2889:3889
server.2=127.0.0.1:2890:3890
## 只需要修改上述配置,其他配置保留默认值
启动 zookeeper
./zkServer.sh start
2) zk-1
调整配置文件 (其他配置和 zk- 0 一只):
clientPort=2182
## 只需要修改上述配置,其他配置保留默认值
启动 zookeeper
./zkServer.sh start
3) zk-2
调整配置文件 (其他配置和 zk- 0 一只):
clientPort=2183
## 只需要修改上述配置,其他配置保留默认值
启动 zookeeper
./zkServer.sh start
二. Kafka 集群构建
因为 Broker 配置文件涉及到 zookeeper 的相关约定,因此我们先展示 broker 配置文件. 我们使用 2 个 kafka broker 来构建这个集群环境,分别为 kafka-0,kafka-1.
1) kafka-0
在 config 目录下修改配置文件为:
broker.id=0
port=9092
num.network.threads=2
num.io.threads=2
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dir=./logs
num.partitions=2
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=536870912
##replication 机制, 让每个 topic 的 partitions 在 kafka-cluster 中备份 2 个
## 用来提高 cluster 的容错能力..
default.replication.factor=1
log.cleanup.interval.mins=10
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
zookeeper.connection.timeout.ms=1000000
因为 kafka 用 scala 语言编写,因此运行 kafka 需要首先准备 scala 相关环境。
> cd kafka-0
> ./sbt update
> ./sbt package
> ./sbt assembly-package-dependency
其中最后一条指令执行有可能出现异常,暂且不管。启动 kafka broker:
> JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &
因为 zookeeper 环境已经正常运行了,我们无需通过 kafka 来挂载启动 zookeeper. 如果你的一台机器上部署了多个 kafka broker,你需要声明 JMS_PORT.
2) kafka-1
broker.id=1
port=9093
## 其他配置和 kafka- 0 保持一致
然后和 kafka- 0 一样执行打包命令,然后启动此 broker.
> JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &
仍然可以通过如下指令查看 topic 的 ”partition”/”replicas” 的分布和存活情况.
> bin/kafka-list-topic.sh –zookeeper localhost:2181
topic: my-replicated-topic partition: 0 leader: 2 replicas: 1,2,0 isr: 2
topic: test partition: 0 leader: 0 replicas: 0 isr: 0
到目前为止环境已经 OK 了, 那我们就开始展示编程实例吧。[配置参数详解] http://www.linuxidc.com/Linux/2014-09/107388.htm
更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2014-09/107387p2.htm
五.Consumer 端
1) consumer.properties: 文件位于 /resources 目录下
## 此值可以配置, 也可以通过 spring 注入
##zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
##,127.0.0.1:2182,127.0.0.1:2183
# timeout in ms for connecting to zookeeper
zookeeper.connectiontimeout.ms=1000000
#consumer group id
group.id=test-group
#consumer timeout
#consumer.timeout.ms=5000
auto.commit.enable=true
auto.commit.interval.ms=60000
2) KafkaConsumerClient.Java 代码样例
package com.test.kafka;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
/**
* User: guanqing-liu
*/
public class KafkaConsumerClient {
private String groupid; //can be setting by spring
private String zkConnect;//can be setting by spring
private String location = “kafka-consumer.properties”;// 配置文件位置
private String topic;
private int partitionsNum = 1;
private MessageExecutor executor; //message listener
private ExecutorService threadPool;
private ConsumerConnector connector;
private Charset charset = Charset.forName(“utf8”);
public void setGroupid(String groupid) {
this.groupid = groupid;
}
public void setZkConnect(String zkConnect) {
this.zkConnect = zkConnect;
}
public void setLocation(String location) {
this.location = location;
}
public void setTopic(String topic) {
this.topic = topic;
}
public void setPartitionsNum(int partitionsNum) {
this.partitionsNum = partitionsNum;
}
public void setExecutor(MessageExecutor executor) {
this.executor = executor;
}
public KafkaConsumerClient() {}
//init consumer,and start connection and listener
public void init() throws Exception {
if(executor == null){
throw new RuntimeException(“KafkaConsumer,exectuor cant be null!”);
}
Properties properties = new Properties();
properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));
if(groupid != null){
properties.put(“groupid”, groupid);
}
if(zkConnect != null){
properties.put(“zookeeper.connect”, zkConnect);
}
ConsumerConfig config = new ConsumerConfig(properties);
connector = Consumer.createJavaConsumerConnector(config);
Map<String, Integer> topics = new HashMap<String, Integer>();
topics.put(topic, partitionsNum);
Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);
List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic);
threadPool = Executors.newFixedThreadPool(partitionsNum * 2);
//start
for (KafkaStream<byte[], byte[]> partition : partitions) {
threadPool.execute(new MessageRunner(partition));
}
}
public void close() {
try {
threadPool.shutdownNow();
} catch (Exception e) {
//
} finally {
connector.shutdown();
}
}
class MessageRunner implements Runnable {
private KafkaStream<byte[], byte[]> partition;
MessageRunner(KafkaStream<byte[], byte[]> partition) {
this.partition = partition;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = partition.iterator();
while (it.hasNext()) {
// connector.commitOffsets(); 手动提交 offset, 当 autocommit.enable=false 时使用
MessageAndMetadata<byte[], byte[]> item = it.next();
try{
executor.execute(new String(item.message(),charset));// UTF-8, 注意异常
}catch(Exception e){
//
}
}
}
public String getContent(Message message){
ByteBuffer buffer = message.payload();
if (buffer.remaining() == 0) {
return null;
}
CharBuffer charBuffer = charset.decode(buffer);
return charBuffer.toString();
}
}
public static interface MessageExecutor {
public void execute(String message);
}
/**
* @param args
*/
public static void main(String[] args) {
KafkaConsumerClient consumer = null;
try {
MessageExecutor executor = new MessageExecutor() {
public void execute(String message) {
System.out.println(message);
}
};
consumer = new KafkaConsumerClient();
consumer.setTopic(“test-topic”);
consumer.setPartitionsNum(2);
consumer.setExecutor(executor);
consumer.init();
} catch (Exception e) {
e.printStackTrace();
} finally {
if(consumer != null){
consumer.close();
}
}
}
}
3) spring 配置 (略)
需要提醒的是, 上述 LogConsumer 类中, 没有太多的关注异常情况, 必须在 MessageExecutor.execute() 方法中抛出异常时的情况.
在测试时,建议优先启动 consumer,然后再启动 producer,这样可以实时的观测到最新的消息。
Kafka 的详细介绍 :请点这里
Kafka 的下载地址 :请点这里