共计 5807 个字符,预计需要花费 15 分钟才能阅读完成。
我们在前面已经介绍了 JMS 和 AMQP,JMS 是 JavaEE 的标准消息接口,Artemis 是一个 JMS 实现产品,AMQP 是跨语言的一个标准消息接口,RabbitMQ 是一个 AMQP 实现产品。
Kafka 也是一个消息服务器,它的特点一是快,二是有巨大的吞吐量,那么 Kafka 实现了什么标准消息接口呢?
Kafka 没有实现任何标准的消息接口,它自己提供的 API 就是 Kafka 的接口。
哥没有实现任何标准,哥自己就是标准。
—— Kafka
Kafka 本身是 Scala 编写的,运行在 JVM 之上。Producer 和 Consumer 都通过 Kafka 的客户端使用网络来与之通信。从逻辑上讲,Kafka 设计非常简单,它只有一种类似 JMS 的 Topic 的消息通道:
┌──────────┐
┌──▶│Consumer-1│
│ └──────────┘
┌────────┐ ┌─────┐ │ ┌──────────┐
│Producer│───▶│Topic│──┼──▶│Consumer-2│
└────────┘ └─────┘ │ └──────────┘
│ ┌──────────┐
└──▶│Consumer-3│
└──────────┘
那么 Kafka 如何支持十万甚至百万的并发呢?答案是分区。Kafka 的一个 Topic 可以有一个至多个 Partition,并且可以分布到多台机器上:
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
Topic
│ │
┌───────────┐ ┌──────────┐
│┌─▶│Partition-1│──┐│┌──▶│Consumer-1│
│ └───────────┘ │ │ └──────────┘
┌────────┐ ││ ┌───────────┐ │││ ┌──────────┐
│Producer│───┼─▶│Partition-2│──┼─┼──▶│Consumer-2│
└────────┘ ││ └───────────┘ │││ └──────────┘
│ ┌───────────┐ │ │ ┌──────────┐
│└─▶│Partition-3│──┘│└──▶│Consumer-3│
└───────────┘ └──────────┘
└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
Kafka 只保证在一个 Partition 内部,消息是有序的,但是,存在多个 Partition 的情况下,Producer 发送的 3 个消息会依次发送到 Partition-1、Partition- 2 和 Partition-3,Consumer 从 3 个 Partition 接收的消息并不一定是 Producer 发送的顺序,因此,多个 Partition 只能保证接收消息大概率按发送时间有序,并不能保证完全按 Producer 发送的顺序。这一点在使用 Kafka 作为消息服务器时要特别注意,对发送顺序有严格要求的 Topic 只能有一个 Partition。
Kafka 的另一个特点是消息发送和接收都尽量使用批处理,一次处理几十甚至上百条消息,比一次一条效率要高很多。
最后要注意的是消息的持久性。Kafka 总是将消息写入 Partition 对应的文件,消息保存多久取决于服务器的配置,可以按照时间删除(默认 3 天),也可以按照文件大小删除,因此,只要 Consumer 在离线期内的消息还没有被删除,再次上线仍然可以接收到完整的消息流。这一功能实际上是客户端自己实现的,客户端会存储它接收到的最后一个消息的 offsetId,再次上线后按上次的 offsetId 查询。offsetId 是 Kafka 标识某个 Partion 的每一条消息的递增整数,客户端通常将它存储在 ZooKeeper 中。
有了 Kafka 消息设计的基本概念,我们来看看如何在 Spring Boot 中使用 Kafka。
安装 Kafka
首先从 Kafka 官网下载最新版 Kafaka,解压后在 bin
目录找到两个文件:
zookeeper-server-start.sh
:启动 ZooKeeper(已内置在 Kafka 中);kafka-server-start.sh
:启动 Kafka。
先启动 ZooKeeper:
$ ./zookeeper-server-start.sh ../config/zookeeper.properties
再启动 Kafka:
./kafka-server-start.sh ../config/server.properties
看到如下输出表示启动成功:
... INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
如果要关闭 Kafka 和 ZooKeeper,依次按 Ctrl- C 退出即可。注意这是在本地开发时使用 Kafka 的方式,线上 Kafka 服务推荐使用云服务厂商托管模式(AWS 的 MSK,阿里云的消息队列 Kafka 版)。
使用 Kafka
在 Spring Boot 中使用 Kafka,首先要引入依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
注意这个依赖是 spring-kafka
项目提供的。
然后,在 application.yml
中添加 Kafka 配置:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
auto-offset-reset: latest
max-poll-records: 100
max-partition-fetch-bytes: 1000000
除了 bootstrap-servers
必须指定外,consumer
相关的配置项均为调优选项。例如,max-poll-records
表示一次最多抓取 100 条消息。配置名称去哪里看?IDE 里定义一个 KafkaProperties.Consumer
的变量:
KafkaProperties.Consumer c = null;
然后按住 Ctrl 查看源码即可。
发送消息
Spring Boot 自动为我们创建一个 KafkaTemplate
用于发送消息。注意到这是一个泛型类,而默认配置总是使用 String
作为 Kafka 消息的类型,所以注入 KafkaTemplate<String, String>
即可:
@Component
public class MessagingService {@Autowired ObjectMapper objectMapper;
@Autowired KafkaTemplate<String, String> kafkaTemplate;
public void sendRegistrationMessage(RegistrationMessage msg) throws IOException {send("topic_registration", msg);
}
public void sendLoginMessage(LoginMessage msg) throws IOException {send("topic_login", msg);
}
private void send(String topic, Object msg) throws IOException {ProducerRecord<String, String> pr = new ProducerRecord<>(topic, objectMapper.writeValueAsString(msg));
pr.headers().add("type", msg.getClass().getName().getBytes(StandardCharsets.UTF_8));
kafkaTemplate.send(pr);
}
}
发送消息时,需指定 Topic 名称,消息正文。为了发送一个 JavaBean,这里我们没有使用 MessageConverter
来转换 JavaBean,而是直接把消息类型作为 Header 添加到消息中,Header 名称为type
,值为 Class 全名。消息正文是序列化的 JSON。
接收消息
接收消息可以使用 @KafkaListener
注解:
@Component
public class TopicMessageListener {private final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
ObjectMapper objectMapper;
@KafkaListener(topics = "topic_registration", groupId = "group1")
public void onRegistrationMessage(@Payload String message, @Header("type") String type) throws Exception {RegistrationMessage msg = objectMapper.readValue(message, getType(type));
logger.info("received registration message: {}", msg);
}
@KafkaListener(topics = "topic_login", groupId = "group1")
public void onLoginMessage(@Payload String message, @Header("type") String type) throws Exception {LoginMessage msg = objectMapper.readValue(message, getType(type));
logger.info("received login message: {}", msg);
}
@KafkaListener(topics = "topic_login", groupId = "group2")
public void processLoginMessage(@Payload String message, @Header("type") String type) throws Exception {LoginMessage msg = objectMapper.readValue(message, getType(type));
logger.info("process login message: {}", msg);
}
@SuppressWarnings("unchecked")
private static <T> Class<T> getType(String type) {// TODO: use cache:
try {return (Class<T>) Class.forName(type);
} catch (ClassNotFoundException e) {throw new RuntimeException(e);
}
}
}
在接收消息的方法中,使用 @Payload
表示传入的是消息正文,使用 @Header
可传入消息的指定 Header,这里传入@Header("type")
,就是我们发送消息时指定的 Class 全名。接收消息时,我们需要根据 Class 全名来反序列化获得 JavaBean。
上述代码一共定义了 3 个 Listener,其中有两个方法监听的是同一个 Topic,但它们的 Group ID 不同。假设 Producer 发送的消息流是 A、B、C、D,Group ID 不同表示这是两个不同的 Consumer,它们将分别收取完整的消息流,即各自均收到 A、B、C、D。Group ID 相同的多个 Consumer 实际上被视作一个 Consumer,即如果有两个 Group ID 相同的 Consumer,那么它们各自收到的很可能是 A、C 和 B、D。
运行应用程序,注册新用户后,观察日志输出:
... c.i.learnjava.service.UserService : try register by [email protected]...
... c.i.learnjava.web.UserController : user registered: [email protected]
... c.i.l.service.TopicMessageListener : received registration message: [RegistrationMessage: [email protected], name=Bob, timestamp=1594637517458]
用户登录后,观察日志输出:
... c.i.learnjava.service.UserService : try login by [email protected]...
... c.i.l.service.TopicMessageListener : received login message: [LoginMessage: [email protected], name=Bob, success=true, timestamp=1594637523470]
... c.i.l.service.TopicMessageListener : process login message: [LoginMessage: [email protected], name=Bob, success=true, timestamp=1594637523470]
因为 Group ID 不同,同一个消息被两个 Consumer 分别独立接收。如果把 Group ID 改为相同,那么同一个消息只会被两者之一接收。
有细心的童鞋可能会问,在 Kafka 中是如何创建 Topic 的?又如何指定某个 Topic 的分区数量?
实际上开发使用的 Kafka 默认允许自动创建 Topic,创建 Topic 时默认的分区数量是 2,可以通过 server.properties
修改默认分区数量。
在生产环境中通常会关闭自动创建功能,Topic 需要由运维人员先创建好。和 RabbitMQ 相比,Kafka 并不提供网页版管理后台,管理 Topic 需要使用命令行,比较繁琐,只有云服务商通常会提供更友好的管理后台。
练习
在 Spring Boot 中使用 Kafka。
下载练习
小结
Spring Boot 通过 KafkaTemplate
发送消息,通过 @KafkaListener
接收消息;
配置 Consumer 时,指定 Group ID 非常重要。