阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

RocketMQ 架构简析

193次阅读
没有评论

共计 6973 个字符,预计需要花费 18 分钟才能阅读完成。

RocketMQ 架构简析Apache RocketMQ 是阿里开源的一款高性能、高吞吐量的分布式消息中间件。

整体架构 RocketMQ 架构简析

RocketMQ 主要由 Producer、Broker、Consumer 三部分组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。 每个 Broker 可以存储多个 Topic 的消息,每个 Topic 的消息也可以分片存储于集群中的不同的 Broker Group。
                                                                                        Namesrv
说道 Namesrv 首先会想到服务注册与发现。分布式服务 SOA 架构体系中会有服务注册与发现中心。主要作用是指导服务调用方找到服务提供者提供的服务实例。RocketMQ 体系中 Namesrv 主要作用是:为 producer 和 consumer 提供关于 topic 的路由信息。 管理 broker 节点:监控更新 broker 的实时状态。路由注册、路由删除(故障剔除)。
Namesrv 充当路由消息的提供者。Namesrv 是一个几乎无状态节点,多个 Namesrv 实例组成集群,但相互独立,没有信息交换。
  1. 路由元信息
  • topicQueueTable:topic 消息队列路由信息。
  • brokerAddrTable:broker 基础信息。包含 broker name,所属集群名称,主 broker 地址等。
  • clusterAddrTable:broker 集群信息,存储集群中所有 broker 的名称。
  • brokerLiveTable:broker 状态信息。
  • filterServerTable:broker 上的 filterServer 列表。filterServer 用于消息过滤。
  1. 路由注册  RocketMQ 路由注册是通过 broker 与 Namesrv 的心跳功能实现的。broker 启动时向集群中所有 Namesrv 发送心跳包,之后每隔 30 秒向集群中所有 Namesrv 发送心跳包。心跳包中包含:broker 集群信息、broker 信息、topic 配置信息、broker 关联的 FilterServer 列表等。如果 brokerA 为 Master。并且 brokerA 上的 topic1 的配置信息发生变化或初次注册,Namesrv 会根据报文创建或更新 Topic 路由元数据,填充 topicQueueTable。
  2. 路由删除  Namesrv 收到 brokerA 的心跳包会更新 brokerLiveTable 中的 brokerA 对应的 BrokerLiveInfo 中的 lastUpdateTimestamp。Namesrv 每隔 10 秒扫描 brokerLiveTable 一次。如果 brokerA 对应的 BrokerLiveInfo 中 lastUpdateTimestamp 距当前时间超过 120 秒,Namesrv 认为 brokerA 失效,会将 brokerA 的路由信息移除并关闭与 broker 的 socket 连接。更新:topicQueueInfo、brokerAddrTable、brokerLiveTable、filterServerTable 等。
  3. 路由发现  RocketMQ 路由发现是非实时的。当 Topic 路由信息发生变化是,Namesrv 不会主动推送给客户端(Producer、Consumer)。而是由客户端定时到 Namesrv 拉去最新的路由信息并缓存(包含 Topic 路由信息)。

与 kafka 对比
kafka 由 zookeeper 集群提供命名服务(Naming Service)。
Kafka 通过 ZooKeeper 管理集群配置、选举 Leader 以及在 consumer g

Broker

消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
Broker 是以 group 为单位提供服务。一个 group 里面分 Master 和 Slave。Master 和 Slave 存储的数据一样,slave 从 master 同步数据(同步双写或异步复制看配置)。一个 Master 可以对应多个 Slave,一个 Slave 只能对应一个 Master。Master 与 Slave 的对应关系通过指定相同的 BrokerName、不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave。Master 也可以部署多个。broker 不必须是物理机或虚拟机:
RocketMQ 架构简析
每个 Broker 与 Namesrv 集群中的所有节点建立长连接,定时发送心跳包到所有 Namesrv,更新 broker 信息、topic 路由信息等。一个 Topic 的不同 queue(分区)可分布到集群中不同的 broker group 上。
与 kafka 对比:
kafka 和 RocketMQ 的 broker 都可以容纳多个一个或多个分区数据(kafka 分区:partition;RocketMQ 分区:queue)。
kafka 基于 partition(分区)做备份 / 高可用(partition follower)。
RocketMQ 增加了 broker group 的概念,基于 broker(可能包含多个分区)。
                                                                                   Producer
(消息)生产者。Producer 与 Namesrv 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 broker master 建立长连接,且定时向 broker master 发送心跳。Producer 完全无状态,可集群部署。
Producer 负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。
                                                                                     Consumer
(消息)消费者 Consumer 与 Namesrv 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。
Consumer 负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
集群模式下:相同 Consumer Group 的每个 Consumer 实例平均分摊消息。一个条消息仅能被一个 Consumer Group 消费一次。、
Producer、Consumer 都只需要和集群中一个 Namesrv 建立长连接。Broker 需要向集群中所有的 Namesrv 发送心跳包。
其实很好理解:
Namesrv 集群提供高可用的命名服务。
Producer、Consumer 只需要从其中一台定期同步路由信息。
如果 Broker 只随机调一台发送心跳包。那么不同的 Namesrv 保存的路由信息会出现

消费者类型:

  1. 拉取式消费(Pull Consumer)Consumer 消费的一种类型,应用通常主动调用 Consumer 的拉消息方法从 Broker 服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。Pull 方式里,取消息的过程需要用户自己写(包括提交 offset 等操作)。
  2. 推动式消费(Push Consumer)Consumer 消费的一种类型,该模式下 Broker 收到数据后会主动推送给消费端,该消费模式一般实时性较高。Push Consumer 原理上也是采取 pull 模式。实际上就是长轮询的 pull 模式。

一些概念

  • 主题(Topic)表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMQ 进行消息订阅的基本单位。每个 topic 可分为若干个分区(queue)。
  • 生产者组(Producer Group)同一类 Producer 的集合,这类 Producer 发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则 Broker 服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
  • 消费者组(Consumer Group)同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的 Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
  • 普通顺序消息(Normal Ordered Message)普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。
  • 严格顺序消息(Strictly Ordered Message)严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
  • 消息(Message)消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ 中每个消息拥有唯一的 Message ID,且可以携带具有业务标识的 Key。系统提供了通过 Message ID 和 Key 查询消息的功能。
  • 标签(Tag)为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化 RocketMQ 提供的查询系统。消费者可以根据 Tag 实现对不同子主题的不同消费逻辑,实现更好的扩展性。

关于消息中间件

消息中间件需要解决的问题:异步化、削峰填谷。
消息中间件应具备的基础能力是:消息发布、订阅、消费。概念相对简单这里不过多描述。
消息中间件的一些重要的机制:

1. 消息优先级(Message Priority;RocketMQ 不支持)

优先级是指在一个消息队列中,每条消息都有不同的优先级,一般用整数来描述,优先级高的消息先投递,如果消息完全在一个内存队列中,那么在投递前可以按照优先级排序,令优先级高的先投递。由于 RocketMQ 所有消息都是持久化的,所以如果按照优先级来排序,开销会非常大,因此 RocketMQ 没有特意支持消息优先级,但是可以通过变通的方式实现类似功能,即单独配置一个优先级高的队列,和一个普通优先级的队列,将不同优先级发送到不同队列即可。

2. 顺序消息(Message Order)

消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了 3 条消息,分别是订单创建,订单付款,订单完成。消费时,要按照这个顺序消费才能有意义。但是同时订单之间是可以并行消费的。RocketMQ 可以严格的保证消息有序。
  • 投递消息的顺序性:投递消息的顺序性可通过将一组消息投递到同一分区实现。例如:借助 MessageQueueSelector 将对相同订单的操作消息投放到同一分区。
  • 消费消息的顺序性:RoctetMQ 特性保障:特定分区(queue)中的消息不能同时被同一个消费者组中的多个 Consumer 消费,以避免重复消费。通过自定义或使用预置的 AllocateQueueStrategy 可设定分区的分配策略(哪些分区分配给哪个消费者消费)。

3. 高可用、消息可靠性

3.1 消息持久化

RocketMQ、Kafka 以文件记录形式持久化。
RocketMQ 采用了单一的日志文件,即把同 1 个 broker 上面所有 topic 的所有 queue 的消息,存放在一个文件里面,从而避免了随机的磁盘写入。
RocketMQ 架构简析
如上图所示,所有消息都存在一个单一的 CommitLog 文件里面,然后有后台线程异步的同步到 ConsumeQueue,再由 Consumer 进行消费。
TODO 同步、异步刷盘。RocketMQ 架构简析
TODO RocketMQ 充分利用 Linux 文件系统内存 cache 来提高性能。TODO CommitLog index Commitlog segment 的大小与页缓存一致。
RocketMQ 消息存储机制会在后面的文章详细说明。

3.2 broker master/salve

TODO broker group master/salve
TODO Async/Sync Master;

4. 高并发、可扩展 ==> 分布式

提高并发效率 => 提高生产、消费并行度 => 提高分区数量。
RocketMQ、kafka 都支持 topic 数据分区存放、动态扩展。
以 RocketMQ 为例:
topic 创建的时候可以用集群模式去创建(这样集群里面每个 broker 的 queue 的数量相同),也可以用单个 broker 模式去创建(这样每个 broker 的 queue 数量可以不一致)。

4.1 生产并行度

RocketMQ 的生产并行度是由其自身机制及 broker 的数量决定的。这块后面的文章会详细分析。

4.2 消费并行度

广播模式下所有消费者会接受并消费当前 topic 下所有 Queue 的消息。
集群模式下,一个 queue 只分配给一个 consumer 实例:这是由于拉取消息是 consumer 主动控制的,如果多个实例同时消费一个 queue 的消息,会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个 queue 只分给一个 consumer 实例,一个 consumer 实例可以允许同时分到不同的 queue。
Kafka 的消费并行度依赖 Topic 配置的分区数,如分区数为 10,那么最多 10 台机器来并行消费(每台机器只能开启一个线程),或者一台机器消费(10 个线程并行消费)。即消费并行度和分区数一致。RocketMQ 消费并行度分两种情况:顺序消费方式并行度同卡夫卡完全一致;乱序方式并行度取决于 Consumer 的线程数,如 Topic 配置 10 个队列,10 台机器消费,每台机器 100 个线程,那么并行度为 1000。

4.3 消息队列分配策略

Producer 使用 MessageQueueSelector 选择将消息投放到哪个分区 使用 AllocateMessageQueueStrategy 将不同分区分配给 Consumer Group 中的不同 Consumer。一个分区(queue)仅允许分配给同一个 Consumer Group 下的一个 Consumer(防止重复消费)。
MessageQueueSelector

RocketMQ 架构简析

内置实现类:SelectMessageQueueByMachineRoom SelectMessageQueueByHash SelectMessageQueueByRandom
可以通过实现 MessageQueueSelector 接口,来自定义 Producer 投递消息时选择分区的算法。
AllocateMessageQueueStrategy

RocketMQ 架构简析

内置实现类:
AllocateMessageQueueAveragely:平均分配算法 
AllocateMessageQueueAveragelyByCircle:基于环形平均分配算法
AllocateMachineRoomNearby:基于机房临近原则算法
AllocateMessageQueueByMachineRoom:基于机房分配算法
AllocateMessageQueueConsistentHash:基于一致性 hash 算法
AllocateMessageQueueByConfig:基于配置分配算法
可以通过实现 AllocateMessageQueueStrategy 来自定义 queue 分配给特定 Consumer Group 下不同 Consumer 的策略。

参考:

https://github.com/apache/rocketmq/blob/master/docs/cn/
https://juejin.im/post/6844903589819875336
https://jaskey.github.io/blog/2016/12/19/rocketmq-rebalance/
http://objcoding.com/2019/09/13/kafka-partition-and-rmq-queue/
http://www.itmuch.com/books/rocketmq
作者:RyanLee86799 来源:https://juejin.im/post/6844904130822029320

文章转载:JAVA 高级架构

(版权归原作者所有,侵删)

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-12-03发表,共计6973字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中