共计 35827 个字符,预计需要花费 90 分钟才能阅读完成。
这段时间主要是学习设计模式,一直想写一系列的设计模式相关博文,但是设计模式理解起来说难也不难,说简单也不简单,但要想能深入理解并在项目架构中运用自如的话,还是得花点时间好好积累的。扯得有点远了~
今天咱们来聊一聊消息队列(MQ),以 RabbitMQ 为例。
1. 定义
消息队列:在消息的传输过程中保存消息的的容器。
这是一个较为经典的消费 - 生产者模型,说起来比较抽象,打个比方:A 线程需要给 B 线程发送消息(A、B 线程不一定是在同一台机器上的),A 线程先把消息发送到消息队列服务器上,然后 B 线程去读取或是订阅消息服务器上消息队列中的消息,线程 A 和 B 之间并没有进行直接通信。MQ 服务器在中间起到中继的作用。
2. 适用的应用场景
比较适合异步传输,这里解释一下什么是异步和同步。
异步:发送方不关心消息有没有发送成功,只发送消息,不去获取消息是否发送成功。
同步:发送方关心消息是否发送成功,发送消息后,会等待接收方返回状态码,根据状态码来判断是否发送成功,然后执行相对于的动作。
下边以 Http 中的同步和异步为例:
如:普通的 B / S 架构客户端和服务器端之间的通信就是同步的,即提交请求 —> 等待服务器处理完毕返回消息 —> 拿到服务器返回的消息,处理完毕。
如:Ajax 技术就是异步的,请求通过事件触发 —> 服务器处理(浏览器不用等待,仍可以做其他的事情)—> 处理完毕。
有人可能会好奇说应用场景怎么说到了同步和异步,那说明你还不是很理解技术和应用场景之间的紧密联系。
3. RabbitMQ
在消息队列中有很多类似的产品,以后我会独立发一篇博文来经行说明。我们这里以 RabbitMQ 为例。
RabbitMQ 是 AMQP(高级消息队列协议)的一个标准实现,关于它的快速入门,可以在这里查看:http://www.rabbitmq.com/getstarted.html。
3.1. 角色概念
每个开源项目都有自己的设计方法以及模块角色,RabbitMQ 也不例外。结构图如下:
Broker:即消息队列服务器实体
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把 exchange 和 queue 按照路由规则绑定起来。
Routing Key:路由关键字,exchange 根据这个关键字进行消息投递。
vhost:虚拟主机,一个 broker 里可以开设多个 vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个 channel,每个 channel 代表一个会话任务。
3.2. 工作过程
- 生产者客户端:
- 客户端连接到 RabbitMQ 服务器上,打开一个消息通道(channel);
- 客户端声明一个消息交换机(exchange),并设置相关属性。
- 客户端声明一个消息队列(queue),并设置相关属性。
- 客户端使用 routing key 在消息交换机(exchange)和消息队列(queue)中建立好绑定关系。
- 客户端投递消息都消息交换机(exchange)上
- 客户端关闭消息通道(channel)以及和服务器的连接。
- 服务器端:
exchange 接收到消息后,根据消息的 key(这个 key 的产生规则暂时没研究,有知道的小伙伴可以留言告诉我)和以及设置的 binding,进行消息路由,将消息投递到一个或多个消息队列中。
关于 exchange 也有几个类型:
(1). Direct 交换机:完全根据 key 进行投递。例如,绑定时设置了 routing key 为 abc,客户端提交信息提交信息时只有设置了 key 为 abc 的才会投递到队列;
(2).Topic 交换机:在 key 进行模式匹配后进行投递。例如:符号”#”匹配一个或多个字符,符号”*”匹配一串连续的字母字符,例如”abc.#”可以匹配”abc.def.ghi”,而”abc.*”只可以匹配”abc.def”。
(3).Fanout 交换机:它采取广播模式,消息进来时,将会被投递到与改交换机绑定的所有队列中。
- 消费者客户端:
(暂时未研究,后续更新)
CentOS 5.6 安装 RabbitMQ http://www.linuxidc.com/Linux/2013-02/79508.htm
RabbitMQ 客户端 C ++ 安装详细记录 http://www.linuxidc.com/Linux/2012-02/53521.htm
用 Python 尝试 RabbitMQ http://www.linuxidc.com/Linux/2011-12/50653.htm
RabbitMQ 集群环境生产实例部署 http://www.linuxidc.com/Linux/2012-10/72720.htm
Ubuntu 下 PHP + RabbitMQ 使用 http://www.linuxidc.com/Linux/2010-07/27309.htm
在 CentOS 上安装 RabbitMQ 流程 http://www.linuxidc.com/Linux/2011-12/49610.htm
RabbitMQ 概念及环境搭建 http://www.linuxidc.com/Linux/2014-12/110449.htm
RabbitMQ 入门教程 http://www.linuxidc.com/Linux/2015-02/113983.htm
3.3. RabbitMQ 的消息持久化
RabbitMQ 支持数据持久化,也就是把数据写在磁盘上,可以增加数据的安全性。消息队列持久化包括三个部分:
- 消息交换机(exchange)持久化,在声明时指定 durable 为 1
- 消息队列(queue)持久化,在声明时指定 durable 为 1
- 消息持久化,在投递时指定 delivery_mode 为 2(1 是非持久化)
如果消息交换机(exchange)和消息队列(queue)都是持久化的话,那么他们之间的绑定(Binding)也是持久化的。如果消息交换机和消息队列之间一个持久化、一个非持久化,那么就不允许绑定。
更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2016-07/133178p2.htm
1、相关概念
RabbitMQ 是一个消息代理,事实上,它接收生产者产生的消息,然后将消息传递给消费者。在这个过程中,它可以路由,可以缓冲,或者更具你设定的规则来将消息持久化。RabbitMQ 和消息传输过程中一般会用一些术语:
- 生产者(Producing ):
意思无非是指发送消息的那一端,如果一个程序发送消息,那么它就将被称为 生产者,这里用大写的 P 来表示。
- 队列(queue ):
相当于邮箱的名字,它活动在 RabbitMQ 服务器里边。虽然消息流会通过 RabbitMQ 和你的应用程序,但是只会被存储在队列中。队列是不受任何限制的,它可以尽可能多的去存储你需要存储的消息(本质上来说它是个无限缓冲)。可以多个生产者向同一个消息队列发送消息,也可以多个消费者同时从一个消息队列中来接收消息。消息队列可以如下图模型。
- 消费者(Consuming ):
即接收端,消费者主要是等待接收消息的程序,用下图表示:
注意:在大多数应用场景中,生产者、消费者以及 RabbitMQ 服务是不会同时运行在一台机器上的。
下边将会实现两个 Java 程序,一个只发送生产者一条消息的生产者,一个接收消息、并打印消息的消费者。
在下边的对话中,”P”是我们的生产者,”C”是我们的消费者,中间的是矩形是队列(BabbitMQ 维护的消息缓冲)
2、Java 客户端的包
RabbitMQ 遵守 AMQP 协议,AMQP 协议是一个开放、通用的消息协议。关于 AMQP 协议的客户端,有多种语言的实现版本。本文使用的是 RabbitMQ 提供的 Java 客户端。
可在 http://www.rabbitmq.com/java-client.html 这里下载,然后将相对应的 Jar 包拷贝你的工作目录下。RabbitMQ 的 Java 客户端在 Maven 库中也有,groupId 是 com.rabbitmq,artifactId 是 amqp-client。
3、发送端
我们将消息发送器起名为 Send,消息接收器起名为 Recv。发送器将会连接 RabbitMQ,发送一条消息,然后退出。
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class Send {private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws java.io.IOException {/*连接可以抽象为 socket 连接,为我们维护协议版本信息和协议证书等。这里我们连接
上了本机的消息服务器实体(localhost)。如果我们想连接其它主机上的 RabbitMQ 服务,只需要修改一下主机名或是 IP 就可以了*/
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/*接下创建 channel(信道),这是绝大多数 API 都能用到的。为了发送消息,你必须要声明一个消息消息队列,然后向该队列里推送消息*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("[x] Sent'" + message + "'");
/*声明一个幂等的队列(只有在该队列不存在时,才会被创建)。消息的上下文是一个
字节数组,你可以指定它的编码。*/
channel.close();
connection.close();}
}
注:若 RabbitMQ 服务磁盘空间不足的话,运行会出错,参数设置为:disk_free_limit,更多的参数配置可在这里找到 http://www.rabbitmq.com/configure.html#config-items
4、接收端
RabbitMQ 会往接收器上推消息,与只发送一条消息的发送端不同,这里我们将监听消息并将消息打印出来。
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class Recv {private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException {/*这里怎么打开连接和信道,以及声明用于接收消息的队列,这些步骤与发送端基本上是一样的*/
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/*确保这里的队列是存在的*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("[*] Waiting for messages. To exit press CTRL+C");
/*这里用到了额外的类 QueueingConsumer 来缓存服务器将要推过来的消息。我们通知服务器向接收端推送消息,
然后服务器将会向客户端异步推送消息,这里提供了一个可以回调的对象来缓存消息,直到我们做好准备来使用
它,这个类就是 QueueingConsumer*/
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("[x] Received'" + message + "'");
}
}
}
消息接收完后 QueueingConsumer.nextDelivery()将会发生阻塞,暂停运行,直到有其他的消息推过来。
如果你需要检查和验证队列的话,需要使用 rabbitmqctl list_queues。
参考链接:http://www.rabbitmq.com/tutorials/tutorial-one-java.html
这段时间主要是学习设计模式,一直想写一系列的设计模式相关博文,但是设计模式理解起来说难也不难,说简单也不简单,但要想能深入理解并在项目架构中运用自如的话,还是得花点时间好好积累的。扯得有点远了~
今天咱们来聊一聊消息队列(MQ),以 RabbitMQ 为例。
1. 定义
消息队列:在消息的传输过程中保存消息的的容器。
这是一个较为经典的消费 - 生产者模型,说起来比较抽象,打个比方:A 线程需要给 B 线程发送消息(A、B 线程不一定是在同一台机器上的),A 线程先把消息发送到消息队列服务器上,然后 B 线程去读取或是订阅消息服务器上消息队列中的消息,线程 A 和 B 之间并没有进行直接通信。MQ 服务器在中间起到中继的作用。
2. 适用的应用场景
比较适合异步传输,这里解释一下什么是异步和同步。
异步:发送方不关心消息有没有发送成功,只发送消息,不去获取消息是否发送成功。
同步:发送方关心消息是否发送成功,发送消息后,会等待接收方返回状态码,根据状态码来判断是否发送成功,然后执行相对于的动作。
下边以 Http 中的同步和异步为例:
如:普通的 B / S 架构客户端和服务器端之间的通信就是同步的,即提交请求 —> 等待服务器处理完毕返回消息 —> 拿到服务器返回的消息,处理完毕。
如:Ajax 技术就是异步的,请求通过事件触发 —> 服务器处理(浏览器不用等待,仍可以做其他的事情)—> 处理完毕。
有人可能会好奇说应用场景怎么说到了同步和异步,那说明你还不是很理解技术和应用场景之间的紧密联系。
3. RabbitMQ
在消息队列中有很多类似的产品,以后我会独立发一篇博文来经行说明。我们这里以 RabbitMQ 为例。
RabbitMQ 是 AMQP(高级消息队列协议)的一个标准实现,关于它的快速入门,可以在这里查看:http://www.rabbitmq.com/getstarted.html。
3.1. 角色概念
每个开源项目都有自己的设计方法以及模块角色,RabbitMQ 也不例外。结构图如下:
Broker:即消息队列服务器实体
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把 exchange 和 queue 按照路由规则绑定起来。
Routing Key:路由关键字,exchange 根据这个关键字进行消息投递。
vhost:虚拟主机,一个 broker 里可以开设多个 vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个 channel,每个 channel 代表一个会话任务。
3.2. 工作过程
- 生产者客户端:
- 客户端连接到 RabbitMQ 服务器上,打开一个消息通道(channel);
- 客户端声明一个消息交换机(exchange),并设置相关属性。
- 客户端声明一个消息队列(queue),并设置相关属性。
- 客户端使用 routing key 在消息交换机(exchange)和消息队列(queue)中建立好绑定关系。
- 客户端投递消息都消息交换机(exchange)上
- 客户端关闭消息通道(channel)以及和服务器的连接。
- 服务器端:
exchange 接收到消息后,根据消息的 key(这个 key 的产生规则暂时没研究,有知道的小伙伴可以留言告诉我)和以及设置的 binding,进行消息路由,将消息投递到一个或多个消息队列中。
关于 exchange 也有几个类型:
(1). Direct 交换机:完全根据 key 进行投递。例如,绑定时设置了 routing key 为 abc,客户端提交信息提交信息时只有设置了 key 为 abc 的才会投递到队列;
(2).Topic 交换机:在 key 进行模式匹配后进行投递。例如:符号”#”匹配一个或多个字符,符号”*”匹配一串连续的字母字符,例如”abc.#”可以匹配”abc.def.ghi”,而”abc.*”只可以匹配”abc.def”。
(3).Fanout 交换机:它采取广播模式,消息进来时,将会被投递到与改交换机绑定的所有队列中。
- 消费者客户端:
(暂时未研究,后续更新)
CentOS 5.6 安装 RabbitMQ http://www.linuxidc.com/Linux/2013-02/79508.htm
RabbitMQ 客户端 C ++ 安装详细记录 http://www.linuxidc.com/Linux/2012-02/53521.htm
用 Python 尝试 RabbitMQ http://www.linuxidc.com/Linux/2011-12/50653.htm
RabbitMQ 集群环境生产实例部署 http://www.linuxidc.com/Linux/2012-10/72720.htm
Ubuntu 下 PHP + RabbitMQ 使用 http://www.linuxidc.com/Linux/2010-07/27309.htm
在 CentOS 上安装 RabbitMQ 流程 http://www.linuxidc.com/Linux/2011-12/49610.htm
RabbitMQ 概念及环境搭建 http://www.linuxidc.com/Linux/2014-12/110449.htm
RabbitMQ 入门教程 http://www.linuxidc.com/Linux/2015-02/113983.htm
3.3. RabbitMQ 的消息持久化
RabbitMQ 支持数据持久化,也就是把数据写在磁盘上,可以增加数据的安全性。消息队列持久化包括三个部分:
- 消息交换机(exchange)持久化,在声明时指定 durable 为 1
- 消息队列(queue)持久化,在声明时指定 durable 为 1
- 消息持久化,在投递时指定 delivery_mode 为 2(1 是非持久化)
如果消息交换机(exchange)和消息队列(queue)都是持久化的话,那么他们之间的绑定(Binding)也是持久化的。如果消息交换机和消息队列之间一个持久化、一个非持久化,那么就不允许绑定。
更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2016-07/133178p2.htm
目录
RabbitMQ 学习总结 第一篇:理论篇
RabbitMQ 学习总结 第二篇:快速入门 HelloWorld
RabbitMQ 学习总结 第三篇:工作队列 Work Queue
RabbitMQ 学习总结 第四篇:发布 / 订阅 Publish/Subscribe
RabbitMQ 学习总结 第五篇:路由 Routing
RabbitMQ 学习总结 第六篇:Topic 类型的 exchange
RabbitMQ 学习总结 第七篇:RCP(远程过程调用协议)
在上篇中我们实现了程序来从一个已经命名的队列里发送和接收消息。本篇博文中我们将要创建工作队列用来把一些比较耗时的任务分配给多个 worker。
工作队列的主要思想就是避开立刻处理某个资源消耗交大的任务并且需要等待它执行完成。取而代之的是我们可以将它加入计划列表,并在后边执行这些任务。我们将任务分装成一个消息,并发送到队列中。后台的工作程序在接收到消息后将会立刻执行任务。当运行多个执行器时,任务将会在他们之间共享。
这个概念在 web 应用程序中是比较实用的,对于一些在一个短的 http 请求里无法完成的复杂任务。
1、准备
上篇博文中是发送一个包含”Hello World“的消息。现在我们来发送一条代表复杂任务的字符串。我们这里没有一个真实存在的任务,例如修改图片大小和渲染 pdf 文件这类的任务,这里我们模拟一个任务繁忙的场景(使用 Thread.sleep()函数)。这里我们使用字符串类的点号个数来代表任务的复杂性,每一个点号都占用一秒钟的处理时间。例如,一个用”Hello…”来描述的伪造的任务将会占用三秒时间。
我们稍微修改一下上篇博文中的 Send.Java 代码,可以从客户端发送任意消息。这个程序将会指定任务到我们的工作列表中,命名为 NewTask.java:
发送消息部分如下:
String message = getMessage(argv);
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent'" + message + "'");
从运行参数中拿到消息类容:
private static String getMessage(String[] strings){if (strings.length < 1)
return "Hello World!";
return joinStrings(strings, " ");
}
private static String joinStrings(String[] strings, String delimiter) {int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {words.append(delimiter).append(strings[i]);
}
return words.toString();}
旧的接收端也要做稍微的修改:消息体里的一个逗号代表一个一秒钟的任务,接收端会接收到消息,然后执行任务。这里重新命名为 Work.java:
while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received'" + message + "'");
doWork(message);
System.out.println(" [x] Done");
}
然后模拟执行任务消耗时间:
private static void doWork(String task) throws InterruptedException {for (char ch: task.toCharArray()) {if (ch == '.') Thread.sleep(1000);
}
}
2、轮询调度
任务队列的一个较大的优势就是能够很方便的安排工作。如果后台队列里正在积压一些工作一直没有被执行的话,通过添加更多的 worker 就可以解决了。
首先,让我们来同时运行两个 worker 实例(C1 和 C2),他们将会同时从队列里拿到消息,具体的详情见下:
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
然后发布任务(运行发送端):
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask First message.
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Second message..
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Third message...
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fourth message....
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fifth message.....
然后查看我们的 worker 执行了什么任务:
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
[x] Received 'Third message...'
[x] Received 'Fifth message.....'
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Second message..'
[x] Received 'Fourth message....'
默认情况下,RabbitMQ 会把每个消息以此轮询发到各个消费者那,把消息平均的发到各个消费者那。这种分配管理的方式叫轮询,还可以测试多个 worker 的情形。
3、消息应答机制
完成一个任务需要花费几秒钟。你一定很好奇,如果某个消费者开始执行某个任务花费了很长的时间并且在执行到某个部分时崩溃了那会怎么样。在我们现在的代码中,在向消费者推送某条消息后,RabbitMQ 会立刻删除掉这条消息。这样的话,如果我们 kill 掉某个 worker 的话,那么我们将会流失掉该 worker 正在处理任务的消息(改任务未处理完成),我们也会丢失所有被发送到这个消费者且未处理完成的消息。
但是,我们不想丢失这部分消息,我们希望这类消息可以再次被发送到其它 worker 那。
为了保证永远不会丢失消息,RabbitMQ 支持消息应答机制。当消费者接收到消息并完成任务后会往 RabbitMQ 服务器发送一条确认的命令,然后 RabbitMQ 才会将消息删除。
如果某个消费者在还有发送确认信息就挂了,RabbitMQ 将会视为服务没有执行完成,然后把执行消息的服务再发给另外一个消费者。这种方式下,即时某个 worker 挂了,也不会使得消息丢失。
这里不是用超时来判断的,只有在某个消费者连接断开时,RabbitMQ 才会把重新发送该消费者没有返回确认的消息到其它消费者那。即时处理某条任务花费了很长的时间,在这里也是没有问题的。
消息应答机制默认是打开的,在上边例子中我们明确的关闭了它(autoAck=true),那么现在应该如下修改程序:
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false;
channel.basicConsume("hello", autoAck, consumer);
while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();
//...
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
这样就可以保证即时你 kill 掉了 worker 也不会出现信息丢失的现象,worker 被 kill 掉之后,所有的未确认消息将会被重新发送。
易错点:
很多人都会忘记调用 basicAck 方法,虽然这是一个很简单的错误,但往往却是致命。消费者退出后消息将会被重发,但是由于一些未能被确认消息不能被释放,RabbitMQ 将会消耗掉越来越多的内存。
为了能够调试这种错误,你可以使用 rabbitmqctl 来打印出 messages_unacknowledged 字段。
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello 0 0
...done.
4、消息的持久化
我们已经学习了在发生消费者挂掉或是任务被 kill 掉时的容错机制,下边将来看看当 RabbitMQ 服务被停止后,怎么保证消息不丢失。
当 RabbitMQ 退出或是宕机时会丢失队列和消息,当然有两个地方需要注意才能解决这类问题的发生:将队列和消息都持久化存储。
首先,我们要确保 RabbitMQ 永远不会丢失消息队列,那就需要声明它为持久化存储:
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
虽然这里的操作是正确的,但在这里依然不会生效,因为命名为“hello”的队列在之前已经被创建(非持久化),现在已经存在了。RabbitMQ 不允许你重新定义一个已经存在的消息队列,如果你尝试着去修改它的某些属性的话,那么你的程序将会报错。所以,这里你需要更换一个消息队列名称:
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
生产者和消费者都需要使用 queueDeclare 方法来指定持久化属性。
现在我们可以确保即使 RabbitMQ 重启了,任务队列也不会丢失。下边我就来实现消息持久化(通过设置属性 MessageProperties. PERSISTENT_TEXT_PLAIN,其中 MessageProperties 实现了 BasicProperties 接口)。
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
标记消息持久化并不能百分百的保证消息一定不会被丢失,虽然 RabbitMQ 会把消息写到磁盘上,但是从 RabbitMQ 接收到消息到写到磁盘上,这个短时间的过程中发生的 RabbitMQ 重启依然会使得为写入到磁盘的消息被丢失。事实上是这样的,RabbitMQ 接收到消息后,首先会把该消息写到内存缓冲区中,并不是直接把单条消息实时写到磁盘上的。消息的持久化不是健壮的,但是对于简单的任务队列是够用了。如果你需要一套很健壮的持久化方案,那么你可以使用 publisher confirms(稍后会更新详细的使用方法)。
5、公平的任务分发策略
你可能会注意到有的时候 RabbitMQ 不能像你预想中的那样分发消息。例如有两个 worker,第奇数个消息对应的任务都很耗时,第偶数个消息对应的任务都很快就能执行完。这样的话其中有个 worker 就会一直都很繁忙,另外一个 worker 几乎不做任务。RabbitMQ 不会去对这种现象做任何处理,依然均匀的去推送消息。
这是因为 RabbitMQ 在消息被生产者推送过来后就被推送到消费者端,它不会去查看未接收到消费者确认的消息数量。它只会把 N 个消息均与的分发到 N 个消费者那。
为了能解决这个问题,我们可以使用 basicQos 放来来设置消费者最多会同时接收多少个消息。这里设置为 1,表示 RabbitMQ 同一时间发给消费者的消息不超过一条。这样就能保证消费者在处理完某个任务,并发送确认信息后,RabbitMQ 才会向它推送新的消息,在此之间若是有新的消息话,将会被推送到其它消费者,若所有的消费者都在处理任务,那么就会等待。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
注意消息队列的大小:
如果所有的 worker 都处于较忙的状态下,你的消息队列有可能会太长(出现内存或磁盘瓶颈)。需要尽量多的关注这些信息,出现的时候可以适当的添加 worker。
6、代码的最后实现
发送端:
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
public class NewTask {private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv)
throws java.io.IOException {ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//指定队列持久化
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = getMessage(argv);
//指定消息持久化
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
System.out.println(" [x] Sent'" + message + "'");
channel.close();
connection.close();}
//...
}
接收端:
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class Worker {private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//指定队列持久化
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//指定该消费者同时只接收一条消息
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
//打开消息应答机制
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received'" + message + "'");
doWork(message);
System.out.println(" [x] Done" );
//返回接收到消息的确认信息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
//...
}
使用消息应答机制和 prefetchCount 可以实现一个工作队列了。持久化的选项可以使任务即使队列和消息即使在 RabbitMQ 重启后,依然不会丢失。
关于 Channel 和 MessageProperties 的更多应用可以参考 Java 官方 API 文档:
http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/
最后总结:
1、消费者端在信道上打开消息应答机制,并确保能返回接收消息的确认信息,这样可以保证消费者发生故障也不会丢失消息。
2、服务器端和客户端都要指定队列的持久化和消息的持久化,这样可以保证 RabbitMQ 重启,队列和消息也不会。
3、指定消费者接收的消息个数,避免出现消息均匀推送出现的资源不合理利用的问题。
参考链接:http://www.rabbitmq.com/tutorials/tutorial-two-java.html
目录
RabbitMQ 学习总结 第一篇:理论篇
RabbitMQ 学习总结 第二篇:快速入门 HelloWorld
RabbitMQ 学习总结 第三篇:工作队列 Work Queue
RabbitMQ 学习总结 第四篇:发布 / 订阅 Publish/Subscribe
RabbitMQ 学习总结 第五篇:路由 Routing
RabbitMQ 学习总结 第六篇:Topic 类型的 exchange
RabbitMQ 学习总结 第七篇:RCP(远程过程调用协议)
上篇中我们实现了 Work Queue 的创建,在 Work Queue 背后,其实是 rabbitMQ 把每条任务消息只发给一个消费者。本篇中我们将要研究如何把一条消息推送给多个消费者,这种模式被称为 publish/subscribe(发布 / 订阅)。
为了说明这个模式,我们将会构建一个简单的日志系统。这将会包含两部分程序,第一个是发送日志信息,第二个将会接收并打印它们。
在我们的日志系统里,每个运行的消费者程序都能接收到消息。这样我就运行一个 receiver 并把日志写到磁盘上,同时我们再运行另外一个消费者来把日志消息打印到屏幕上。
从本质上来说,是把日志消息推送到了所有的消费者端。
1、消息交换机
上篇中我们往 Queue 里发送消息,并从 Queue 里取出消息。现在我们来介绍 RabbitMQ 的完全消息模型。
我们来快速回顾一下之前博文中的内容:
- 一个生产者者应用程序发送消息;
- 一个消息队列用来存储和缓存消息;
- 一个消费者程序接收消息
RabbitMQ 的消息发送模型核心思想是生产者不直接把消息发送到消息队列中。事实上,生产者不知道自己的消息将会被缓存到哪个队列中。
其实生产者者可以把消息发送到 exchange(消息交换机)上。exchange 是一个很简单的事情,它一边接收生产者的消息,另一边再把消息推送到消息队列中。Exchange 必须知道在它接收到一条消息时应该怎么去处理。应该把这条消息推送到指定的消息队列中?还是把消息推送到所有的队列中?或是把消息丢掉?这些规则都可以用 exchange 类型来定义。
有一些可用的 exchange 类型:direct, topic, headers 和 fanout。这里我们主要看最后一个:fanout,这里我们创建一个名字为 logs、类型为 fanout 的 exchange:
channel.exchangeDeclare(“logs”, “fanout”);
fanout 类型的 exchange 是很简单的。就是它把它能接收到的所有消息广播到它知道的所有队列中。这正是我们的日志系统所需要的。
列出 exchange:
可以在服务器上使用 rabbitmqctl 命令来列出 RabbitMQ 服务器上的所有消息 exchange:
$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
logs fanout
...done.
在这个列表中有一些形如 amp.* 的 exchange,还有默认(未命名)的交换机。这些都是被默认创建的,但这些已经被默认创建的都不是你现在需要用到的。
没有名字的 exchange:
在之前的博文里没有使用都 exchange 的相关知识,但是任然能够发送消息。之所以能发送成功是因为我们使用一个默认 exchange,我们使用(””)来标识的。
channel.basicPublish("", "hello", null, message.getBytes());
第一个参数就是 exchange 的名字。空字符串的符号指的是默认的或没有命名的 exchange:消息会根据 routingKey 被路由到指定的消息队列中。
现在我们来吧消息推送到已命名的 exchange 上:
channel.basicPublish("logs", "", null, message.getBytes());
2、临时队列
如果你看过之前几篇的博文,应该会发现我们都是使用了一个指定名字的消息队列(hello 和 task_queue)。对应的生产者和消费者之间都要使用相同的消息队列名称,这在很重要的。
但是在我们的 log 系统中却不是这样,我们希望能够接收到所有的 log 消息,不只是其中的一部分。我们只要处理当前的 log 消息,不用管过去的历史 log。为了实现,我们需要做以下两步:
- 无论什么时候我们和 RabbitMQ 建立连接时,我们都要刷新、清空 Queue。为了达到这一的目的,我们可以用一个随机的名字(随机性可由自己来定义)来创建 Queue,也可以让服务器来自动建立一个随见的 Queue。
- 当消费者断开连接时,Queue 能自动被删除。
使用 Java 客户端时,我们使用无参数的 queueDeclare 方法,就可以创建一个已经生成名字的、排他性的且会自动删除的 Queue:
String queueName = channel.queueDeclare().getQueue();
这是就拿到了一个随机名字的 queue,形如:amq.gen-JzTY20BRgKO-HjmUJj0wLg
3、绑定(bindings)
我们已经创建了一个 fanout 类型的 exchange 和一个队列。现在我们需要让 exchange 向我们的 queue 里发送消息。Exchange 和 queue 之间关系被称为 binding(绑定)。
channel.queueBind(queueName, "logs", "");
现在开始,名字为 logs 的 exchange 就会忘我们的 queue 里退消息了。
查看 binding列表:
使用 rabbitmqctl list_bindings 命令来看已经存在的所有的 binding。
4、最终实现
发送日志消息的生产者程序和之前的程序没有太多的差别。最大的区别就是我们把消息推送到一个命名的 exchange 上,而不是之前未命名的默认 exchange。在我们发送消息时需要提供一个 routingKey,但对于 fanout 类型的 exchange 可以忽略。下边是生产者的代码 EmitLog.java:
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class EmitLog {private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv)
throws java.io.IOException {ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明 exchange 名字以及类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// getMessage 的实现请见上篇博文
String message = getMessage(argv);
//指定 exchange 的名字
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent'" + message + "'");
channel.close();
connection.close();}
//...
}
正如你所见,在建立连接后我们声明了 exchange。这一步是必须的,因为禁止向一个不存在的 exchange 推送消息。
如果没有向 exchange 负责的 queue,那么消息将会被丢失,这是没有问题的;如果没有消费者监听的话,我们会安全的丢掉这些消息。
ReceiveLogs.java 的代码如下:
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class ReceiveLogs {private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明消息路由的名称和类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//声明一个随机消息队列
String queueName = channel.queueDeclare().getQueue();
//绑定消息队列和消息路由
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
//启动一个消费者
channel.basicConsume(queueName, true, consumer);
while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received'" + message + "'");
}
}
}
- 编译文件:
javac -cp .:amqp-client-3.3.5.jar ReceiveLogs.java EmitLog.java
- 把日志存到文件里:
java -cp .:amqp-client-3.3.5.jar ReceiveLogs > logs_from_rabbit.log
然后监听该日志文件:
tail -10f logs_from_rabbit.log
- 往屏幕上打印日志消息:
java -cp .:amqp-client-3.3.5.jar ReceiveLogs
- 启动生产者:
java -cp .:amqp-client-3.3.5.jar EmitLog
日志输出到文件中:
日志消息打印到了屏幕上:
在运行 ReceiveLogs的时候,使用 rabbitmqctl list_bindings命令来查看 RabbitMQ中的 exchange:
leo@leocook:~$ sudo rabbitmqctl list_bindings
Listing bindings ...
exchange amq.gen-1Zuyn_44c8IWsdJWrI42Og queue amq.gen-1Zuyn_44c8IWsdJWrI42Og []
exchange amq.gen-rSrGSPWLNTuq1dfXipPfAA queue amq.gen-rSrGSPWLNTuq1dfXipPfAA []
exchange task_queue queue task_queue []
logs exchange amq.gen-1Zuyn_44c8IWsdJWrI42Og queue []
logs exchange amq.gen-rSrGSPWLNTuq1dfXipPfAA queue []
...done.
总结:
1、在生产者和消费者的信道中声明 exchange 名字以及类型
2、在生产者的信道中指定发送目标的 exchange
3、在消费者端的信道中声明一个随机的消息队列,并拿到这个队列名称;然后在信道上绑定该消息队列和消息路由
下篇咱们来讨论,消费者端怎么才能拿到生产者发送消息中的部分消息。
参考链接:http://www.rabbitmq.com/tutorials/tutorial-three-java.html
目录
RabbitMQ 学习总结 第一篇:理论篇
RabbitMQ 学习总结 第二篇:快速入门 HelloWorld
RabbitMQ 学习总结 第三篇:工作队列 Work Queue
RabbitMQ 学习总结 第四篇:发布 / 订阅 Publish/Subscribe
RabbitMQ 学习总结 第五篇:路由 Routing
RabbitMQ 学习总结 第六篇:Topic 类型的 exchange
RabbitMQ 学习总结 第七篇:RCP(远程过程调用协议)
上一篇中我们构建了一个简单的日志系统,我们可以把日志消息广播给多个接受者。
这篇中我们将来添加一个特性只接收部分消息。例如我只将一些错误 log 存到文件中,把所有的 log 都打印到控制台里。
1、绑定(Bindings)
在上篇博文中,我们已经创建了一个 binding,代码如下:
channel.queueBind(queueName, EXCHANGE_NAME, "");
一个 binding 就是 exchange 和 Queue 之间的一个关系。可以简单的理解为:这个 Queue 对其相对于的 exchange 的消息感兴趣(原文是 the queue is interested in messages from this exchange,能理解什么意思,但总觉得怪怪的)。
Binding 可以使用一个已经存在的 routingKey 参数。为了避免和 basic_publish 参数混淆,我们称之为 binding key。下边就是我们怎么用 key 来创建一个 binding:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
binding key 的意义有时候取决于 exchange 的类型。对于 Fanout 类型的 exchange,会忽略 binding key。
2、Direct 类型的 exchange
我们上篇博文中的日志系统会把所有的 log 消息广播给所有的消费者。我们想扩展来根据他们的日志级别来过滤 log 消息。例如:我们只想把 error 级别的日志写到磁盘文件中,而其它级别的日志消息则过滤掉。
我们之前使用的 fanout 类型的 exchange,但这样就不会有太多的灵活性。
在这里我们将要使用 direct 类型的 exchange。Direct 类型 exchange 的路由算法是很简单的:要想一个消息能到达这个队列,需要 binding key 和 routing key 正好能匹配得上。
为了说明这个道理,可以看看下边的描述:
在这样的结构中,我们可以看到 direct 类型的 exchange X,有两个 queue 绑定到它。第一个 queue 是以 orange 为 binding key 绑定到 exchange X 上的,第二个 queue 是由两个 binding key(black 和 green)绑定到 exchange X 的。
在这样的设置中,一条消息被推送到 exchange,如果使用的 routing key 是 orange,那么消息就会被路由到 Q1 中;如果使用的 routing key 是 black 或 green,那么该消息将会被路由到 Q2 中。其它的消息都将会被丢弃掉。
3、多重绑定(Multiple bindings)
用同一个 binding 来把多个 queue 绑定到同一个 exchange 也是可行的。例如在之前例子的基础上,在 X 和 Q1 之间添加 binding key 名字为 black,这样的话,这里的 direct 类型的 exchange 就和 fanout 类型的一样了,可以把消息推送给所有的 queue。带有 routing key 为 black 的消息将会被推送到 Q1 和 Q2 中。
4、发送日志(Emitting logs)
我们将会使用这种模型,不使用 fanout 类型的 exchange,而是使用 direct 类型的。我们使用日志级别做为 routing key,接收端根据设置的日志级别做为 binding key 来接收消息。首先来看看发射日志:
如之前一样,首先来创建一个 exchange:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
然后准备发送消息;
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
这里的”severity”可以是”info”、“warning”、”error”等。
5、订阅(Subscribing)
这里接收消息和上篇博文中的一样,只是有一点例外:我们将会为每一个感兴趣的日志级别进行绑定。
String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
6、最终实现
- EmitLogDirect.Java的代码:
public class EmitLogDirect {private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv)
throws java.io.IOException {ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明 direct 类型的 exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//拿到日志级别
String severity = getSeverity(argv);
//拿到日志消息
String message = getMessage(argv);
//指定 routing key,发送消息
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent'" + severity + "':'" + message + "'");
channel.close();
connection.close();}
//..
}
- ReceiveLogsDirect.java的代码:
public class ReceiveLogsDirect {private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明 direct 类型的 exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1){System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
//绑定我们需要接收的日志级别
for(String severity : argv){channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received'" + routingKey + "':'" + message + "'");
}
}
}
- 运行三个日志接收器:
接收 error 和 info 级别的日志:
接收 error 级别的日志:
接收 info 级别的日志:
- 运行两个日志发生器:
产生 error 级别的日志:
、
产生 info 级别的日志:
- 观察接收器端的变化:
接收 error 级别的接收器,只接收 error 级别的日志:
接收 info 级别的接收器,只接收 info 级别的日志:
Error 和 info 级别日志都接收的接收器,info 和 error 级别的日志都接收:
7、总结:
要记住生产者端的 routing key,那么在消费者端设置 binding key 和之前的 routing key 一样,就可以用 direct 类型的 exchange 了,以此来获取到自己需要的消息。
参考链接:http://www.rabbitmq.com/tutorials/tutorial-four-java.html
目录
RabbitMQ 学习总结 第一篇:理论篇
RabbitMQ 学习总结 第二篇:快速入门 HelloWorld
RabbitMQ 学习总结 第三篇:工作队列 Work Queue
RabbitMQ 学习总结 第四篇:发布 / 订阅 Publish/Subscribe
RabbitMQ 学习总结 第五篇:路由 Routing
RabbitMQ 学习总结 第六篇:Topic 类型的 exchange
RabbitMQ 学习总结 第七篇:RCP(远程过程调用协议)
在上篇博文中,我们对之前的日志系统做了稍许的完善。没有使用 fanout 类型的 exchange 来广播,而是使用了 direct 类型的 exchange 来选择性的接收日志消息。
尽管使用了 direct 类型的 exchange 对日志系统有所提升,但还是有一些限制(消息不能够基于多重因素来路由)。
在我们的日志系统中,希望不仅仅能够根据日志级别来订阅,还可以根据指定的 routing key 来订阅。你应该可以理解的,就如 unix 的系统日志工具,日志消息路由规则不仅仅基于日志级别(info/warn/crit…),还可以基于设备(auth/cron/kern…)。
这样大大的提高的灵活性,例如我们可以只监听 kern 推送出来的 error 级别的日志。
为了在我们的日志记录系统中实现这样的功能,我们需要了解更多关于 topic 类型的 exchange。
1、Topic 类型的 exchange
消息发送到 topic 类型的 exchange 上时不能随意指定 routing_key(一定是指由一系列由点号连接单词的字符串,单词可以是任意的,但一般都会与消息或多或少的有些关联)。Routing key 的长度不能超过 255 个字节。
Binding key 也一定要是同样的方式。Topic 类型的 exchange 就像一个直接的交换:一个由生产者指定了确定 routing key 的消息将会被推送给所有 Binding key 能与之匹配的消费者。然而这种绑定有两种特殊的情况:
- *(星号):可以替代一个单词(一串连续的字母串)
- #(井号):可以匹配一个或个字符
下边来举个例子:
在这个例子中,我们将会发送一些描述动物的消息。Routing key 的第一个单词是描述速度的,第二个单词是描述颜色的,第三个是描述物种的:“<speed>.<colour>.<species>”。
这里我们创建三个 Binding:Binding key 为”*.orange.*”的 Q1,和 binding key 为”*.*.rabbit”和”lazy.#”的 Q2。
这些 binding 可以总结为:
- Q1 对所有橘色的(orange)的动物感兴趣;
- Q2 希望能拿到所有兔子的(rabbit)信息,还有比较懒惰的(lazy.#)动物信息。
一条以”quick.orange.rabbit”为 routing key 的消息将会推送到 Q1 和 Q2 两个 queue 上,routing key 为“lazy.orange.elephant”的消息同样会被推送到 Q1 和 Q2 上。但如果 routing key 为”quick.orange.fox”的话,消息只会被推送到 Q1 上;routing key 为”lazy.brown.fox”的消息会被推送到 Q2 上,routing key 为 ”lazy.pink.rabbit”的消息也会被推送到 Q2 上,但同一条消息只会被推送到 Q2 上一次。
如果在发送消息时所指定的 exchange 和 routing key 在消费者端没有对应的 exchange 和 binding key 与之绑定的话,那么这条消息将会被丢弃掉。例如:”orange” 和 ”quick.orange.male.rabbit”。但是 routing 为”lazy.orange.male.rabbit”的消息,将会被推到 Q2 上。
Topic类型的 exchange:
Topic 类型的 exchange 是很强大的,也可以实现其它类型的 exchange。
- 当一个队列被绑定为 binding key 为”#”时,它将会接收所有的消息,此时和 fanout 类型的 exchange 很像。
- 当 binding key 不包含”*”和”#”时,这时候就很像 direct 类型的 exchange。
2、最终实现
我们准备在日志系统中使用 topic 类型的 exchange。开始我们准备 routing keys 使用两个单词:”<facility>.<severity>”。代码和上篇博文里的差不多,EmitLogTopic.Java:
public class EmitLogTopic {private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv)
throws Exception {ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//指定一个 topic 类型的 exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//这里拿到 routing key
String routingKey = getRouting(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent'" + routingKey + "':'" + message + "'");
connection.close();}
//...
}
ReceiveLogsTopic.java 的代码:
public class ReceiveLogsTopic {private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv)
throws Exception {ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//指定一个 topic 类型的 exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1){System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}
//绑定 binding key
for(String bindingKey : argv){channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received'" + routingKey + "':'" + message + "'");
}
}
}
运行情况如下:
3、总结
在上边的基础上,只是丰富了 routing key 和 binding key 的写法。
参考链接:http://www.rabbitmq.com/tutorials/tutorial-five-java.html
目录
RabbitMQ 学习总结 第一篇:理论篇
RabbitMQ 学习总结 第二篇:快速入门 HelloWorld
RabbitMQ 学习总结 第三篇:工作队列 Work Queue
RabbitMQ 学习总结 第四篇:发布 / 订阅 Publish/Subscribe
RabbitMQ 学习总结 第五篇:路由 Routing
RabbitMQ 学习总结 第六篇:Topic 类型的 exchange
RabbitMQ 学习总结 第七篇:RCP(远程过程调用协议)
在
更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2016-07/133178p2.htm
这篇博文中我们实现了怎么去使用 work queue 来把比较耗时的任务分散给多个 worker。
但是,如果我们想在远程的机器上的一个函数并等待它返回结果,我们应该怎么办呢?这就是另外一种模式了,它被称为 RPC(Remote procedure call)。
本篇博文中我们来实现怎么用 RabbitMQ 来构建一个 RPC 系统:一个 client(客户端)和一个可扩展的 RPC server(服务端)。这里我们来模拟一个返回斐波拉契数的 RPC 服务。
1、Client 端接口
为了说明一个 RPC 服务时怎么工作的,我们来创建一个简单的 client 类。这里来实现一个名字为 call 的方法来发送 RPC 请求,并发生阻塞,直到接收到回复:
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println("fib(4) is " + result);
RPC注意事项:
虽然 RPC 是一种常用的模式,但它也有一些缺陷。当无法确定使用本地调用还是使用 RPC 时,问题就出来了。有的时候不确定程序的运行环境,这样来做会给程序的调试增加了一定的复杂度。使用 RPC 并不能够让代码变得更简洁,滥用的话只会让代码变得更不方便维护。
伴随着上边的问题,咱们来看看下边的建议:
- 确定能很明显的分辨的出哪些调用是本地调用,哪些是远程调用。
- 完善系统的文档。清楚的标记出,模块间的依赖关系。
- 处理错误情况。当 RPC 服务挂了之后,客户端应该怎么去处理呢?
当有疑问时避免使用 RPC。如果可以的话,你可以使用异步管道(不用 RPC- 阻塞),结果被异步推送到下一个计算环节。
2、回调队列(Callback queue)
一般用 RabbitMQ 来实现 RPC 是很简单的。客户端发送一个请求消息然后服务器端回应一个响应消息。为了接收服务端的响应消息,我们需要在请求中发送一个 callback queue 地址。我们也可以使用一个默认的 queue(Java 客户端独有的)。如下:
callbackQueueName = channel.queueDeclare().getQueue();
//绑定 callback queue
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
// ... then code to read a response message from the callback_queue ...
消息属性:
AMQP 协议在发送消息时,预定义了 14 个属性连同消息一起发送出去。很多属性都是很少用到的,除了下边的这些:
消息的投递模型(deliveryMode):使消息持久化,和 work queue 里的设置一样。
上下文类型(contentType):用来描述媒体类型(mime-type)。例如常用的 JSON 格式,它的 mime-type 是 application/json。
我们需要导包:
import com.rabbitmq.client.AMQP.BasicProperties;
3、Correlation Id
在上边的方法中建议我们为每个 RPC 请求都创建一个 call queue,这样效率很低。我们有更好的办法,为每一个 client 创建一个 call queue。
这样处理的话又出现了一个新的问题,无法确定接收到的响应是对应哪个请求的。这时候就需要 correlationId 属性,我们为每一个请求都设置一个 correlationId 属性。当我们从 callback queue 中接收到一条消息之后,我们将会查看 correlationId 属性,这样就可以用一个请求来与之匹配了。如果从 callback queue 接收到了一条消息后,发现其中的 correlationId 未能找到与之匹配的请求,那么将把这条消息丢掉。
你可能会问我们为什么要要在 callback queue 里忽略掉不知道的 message,而不是报错呢?这是因为服务器端可能会出现的一种情况,虽然可能性很小,但还是有可能性的,有可能在 RPC 发送了响应之后,在发送确认完成任务的信息之前服务器重启了。如果这种情况发生了的话,重启了 RPC 服务之后,它将会再次接收到之前的请求,这样的话 client 将会重复处理响应,RPC 服务应该是等幂的。
4、总结
我们的 RPC 工作原理如下:
- 当 Client 启动时,它将会创建一个匿名的 callback queue。
- 对于一次 RPC 请求,client 会发送一条含有两个属性的消息:replyTo和correlationId。Reply 是设置的 callback queue,correlationId 是设置的当前请求的标示符。
- 请求将会被发送到 rpc_queue 里。
- RPC 的 worker(RPC server)等待 queue 中的请求。当出现一个请求之后,他将会处理任务,并向 replyTo 队列中发送消息。
- 客户端会等待 callback queue 上的消息。当消息出现时,它将会检查 correlationId 属性是否能与之前发送请求时的属性一直,若一致的话,client 将会处理回复的消息。
5、最终实现
斐波拉契任务:
private static int fib(int n) throws Exception {if (n == 0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}
这里定义计算斐波拉契数的方法,假设传进去的整数都是正整数。
RPC 服务端的代码实现如下 RPCServer.java:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
public class RPCServer {private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) {if (n ==0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}
public static void main(String[] argv) {Connection connection = null;
Channel channel = null;
try {ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
//一次只接收一条消息
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
//开启消息应答机制
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {String response = null;
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
//拿到 correlationId 属性
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties
.Builder()
.correlationId(props.getCorrelationId())
.build();
try {String message = new String(delivery.getBody(),"UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response = "" + fib(n);
} catch (Exception e){System.out.println(" [.] " + e.toString());
response = "";
}
finally {//拿到 replyQueue,并绑定为 routing key,发送消息
channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));
//返回消息确认信息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
catch (Exception e) {e.printStackTrace();
}
finally {if (connection != null) {try {connection.close();
}
catch (Exception ignore) {}}
}
}
}
服务器端代码实现很简单的:
- 建立连接,信道,声明队列
- 为了能把任务压力平均的分配到各个 worker 上,我们在方法 channel.basicQos 里设置 prefetchCount 的值。
- 我们使用 basicConsume 来接收消息,并等待任务处理,然后发送响应。
RPC客户端代码实现 RPCClient.java:
参考链接:http://www.rabbitmq.com/tutorials/tutorial-six-java.html
RabbitMQ 的详细介绍:请点这里
RabbitMQ 的下载地址:请点这里
本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-07/133178.htm