阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

集成JMS

29次阅读
没有评论

共计 10105 个字符,预计需要花费 26 分钟才能阅读完成。

JMS 即 Java Message Service,是 JavaEE 的消息服务接口。JMS 主要有两个版本:1.1 和 2.0。2.0 和 1.1 相比,主要是简化了收发消息的代码。

所谓消息服务,就是两个进程之间,通过消息服务器传递消息:

┌────────┐    ┌──────────────┐    ┌────────┐
│Producer│───▶│Message Server│───▶│Consumer│
└────────┘    └──────────────┘    └────────┘

使用消息服务,而不是直接调用对方的 API,它的好处是:

  • 双方各自无需知晓对方的存在,消息可以异步处理,因为消息服务器会在 Consumer 离线的时候自动缓存消息;
  • 如果 Producer 发送的消息频率高于 Consumer 的处理能力,消息可以积压在消息服务器,不至于压垮 Consumer;
  • 通过一个消息服务器,可以连接多个 Producer 和多个 Consumer。

因为消息服务在各类应用程序中非常有用,所以 JavaEE 专门定义了 JMS 规范。注意到 JMS 是一组接口定义,如果我们要使用 JMS,还需要选择一个具体的 JMS 产品。常用的 JMS 服务器有开源的 ActiveMQ,商业服务器如 WebLogic、WebSphere 等也内置了 JMS 支持。这里我们选择开源的 ActiveMQ 作为 JMS 服务器,因此,在开发 JMS 之前我们必须首先安装 ActiveMQ。

现在问题来了:从官网下载 ActiveMQ 时,蹦出一个页面,让我们选择 ActiveMQ Classic 或者 ActiveMQ Artemis,这两个是什么关系,又有什么区别?

实际上 ActiveMQ Classic 原来就叫 ActiveMQ,是 Apache 开发的基于 JMS 1.1 的消息服务器,目前稳定版本号是 5.x,而 ActiveMQ Artemis 是由 RedHat 捐赠的 HornetQ 服务器代码的基础上开发的,目前稳定版本号是 2.x。和 ActiveMQ Classic 相比,Artemis 版的代码与 Classic 完全不同,并且,它支持 JMS 2.0,使用基于 Netty 的异步 IO,大大提升了性能。此外,Artemis 不仅提供了 JMS 接口,它还提供了 AMQP 接口,STOMP 接口和物联网使用的 MQTT 接口。选择 Artemis,相当于一鱼四吃。

所以,我们这里直接选择 ActiveMQ Artemis。从官网下载最新的 2.x 版本,解压后设置环境变量 ARTEMIS_HOME,指向 Artemis 根目录,例如C:\Apps\artemis,然后,把ARTEMIS_HOME/bin 加入 PATH 环境变量:

  • Windows 下添加 %ARTEMIS_HOME%\bin 到 Path 路径;
  • Mac 和 Linux 下添加 $ARTEMIS_HOME/bin 到 PATH 路径。

Artemis 有个很好的设计,就是它把程序和数据完全分离了。我们解压后的 ARTEMIS_HOME 目录是程序目录,要启动一个 Artemis 服务,还需要创建一个数据目录。我们把数据目录直接设定在项目 spring-integration-jmsjms-data目录下。执行命令artemis create jms-data

$ pwd
/Users/liaoxuefeng/workspace/spring-integration-jms

$ artemis create jms-data
Creating ActiveMQ Artemis instance at: /Users/liaoxuefeng/workspace/spring-integration-jms/jms-data

--user: is a mandatory property!
Please provide the default username:
admin

--password: is mandatory with this configuration:
Please provide the default password:
********

--allow-anonymous | --require-login: is a mandatory property!
Allow anonymous access?, valid values are Y,N,True,False
N

Auto tuning journal ...
done! Your system can make 0.09 writes per millisecond, your journal-buffer-timeout will be 11392000

You can now start the broker by executing:  

   "/Users/liaoxuefeng/workspace/spring-integration-jms/jms-data/bin/artemis" run

Or you can run the broker in the background using:

   "/Users/liaoxuefeng/workspace/spring-integration-jms/jms-data/bin/artemis-service" start

在创建过程中,会要求输入连接用户和口令,这里我们设定 adminpassword,以及是否允许匿名访问(这里选择N)。

此数据目录 jms-data 不仅包含消息数据、日志,还自动创建了两个启动服务的命令 bin/artemisbin/artemis-service,前者在前台启动运行,按 Ctrl+ C 结束,后者会一直在后台运行。

我们把目录切换到 jms-data/bin,直接运行artemis run 即可启动 Artemis 服务:

$ ./artemis run
     _        _               _
    / \  ____| |_  ___ __  __(_) _____
   / _ \|  _ \ __|/ _ \  \/  | |/  __/
  / ___ \ | \/ |_/  __/ |\/| | |\___ \
 /_/   \_\|   \__\____|_|  |_|_|/___ /
 Apache ActiveMQ Artemis 2.13.0

...

2020-06-02 07:50:21,718 INFO  [org.apache.activemq.artemis] AMQ241001: HTTP Server started at http://localhost:8161
2020-06-02 07:50:21,718 INFO  [org.apache.activemq.artemis] AMQ241002: Artemis Jolokia REST API available at http://localhost:8161/console/jolokia
2020-06-02 07:50:21,719 INFO  [org.apache.activemq.artemis] AMQ241004: Artemis Console available at http://localhost:8161/console

启动成功后,Artemis 提示可以通过 URLhttp://localhost:8161/console访问管理后台。注意 不要关闭命令行窗口

注意

如果 Artemis 启动时显示警告:AMQ222212: Disk Full! … Clients will report blocked. 这是因为磁盘空间不够,可以在 etc/broker.xml 配置中找到 并改为 99。

在编写 JMS 代码之前,我们首先得理解 JMS 的消息模型。JMS 把生产消息的一方称为 Producer,处理消息的一方称为 Consumer。有两种类型的消息通道,一种是 Queue:

┌────────┐    ┌────────┐    ┌────────┐
│Producer│───▶│ Queue  │───▶│Consumer│
└────────┘    └────────┘    └────────┘

一种是 Topic:

                            ┌────────┐
                         ┌─▶│Consumer│
                         │  └────────┘
┌────────┐    ┌────────┐ │  ┌────────┐
│Producer│───▶│ Topic  │─┼─▶│Consumer│
└────────┘    └────────┘ │  └────────┘
                         │  ┌────────┐
                         └─▶│Consumer│
                            └────────┘

它们的区别在于,Queue 是一种一对一的通道,如果 Consumer 离线无法处理消息时,Queue 会把消息存起来,等 Consumer 再次连接的时候发给它。设定了持久化机制的 Queue 不会丢失消息。如果有多个 Consumer 接入同一个 Queue,那么它们等效于以集群方式处理消息,例如,发送方发送的消息是 A,B,C,D,E,F,两个 Consumer 可能分别收到 A,C,E 和 B,D,F,即每个消息只会交给其中一个 Consumer 处理。

Topic 则是一种一对多通道。一个 Producer 发出的消息,会被多个 Consumer 同时收到,即每个 Consumer 都会收到一份完整的消息流。那么问题来了:如果某个 Consumer 暂时离线,过一段时间后又上线了,那么在它离线期间产生的消息还能不能收到呢?

这取决于消息服务器对 Topic 类型消息的持久化机制。如果消息服务器不存储 Topic 消息,那么离线的 Consumer 会丢失部分离线时期的消息,如果消息服务器存储了 Topic 消息,那么离线的 Consumer 可以收到自上次离线时刻开始后产生的所有消息。JMS 规范通过 Consumer 指定一个持久化订阅可以在上线后收取所有离线期间的消息,如果指定的是非持久化订阅,那么离线期间的消息会全部丢失。

细心的童鞋可以看出来,如果一个 Topic 的消息全部都持久化了,并且只有一个 Consumer,那么它和 Queue 其实是一样的。实际上,很多消息服务器内部都只有 Topic 类型的消息架构,Queue 可以通过 Topic“模拟”出来。

无论是 Queue 还是 Topic,对 Producer 没有什么要求。多个 Producer 也可以写入同一个 Queue 或者 Topic,此时消息服务器内部会自动排序确保消息总是有序的。

以上是消息服务的基本模型。具体到某个消息服务器时,Producer 和 Consumer 通常是通过 TCP 连接消息服务器,在编写 JMS 程序时,又会遇到 ConnectionFactoryConnectionSession 等概念,其实这和 JDBC 连接是类似的:

  • ConnectionFactory:代表一个到消息服务器的连接池,类似 JDBC 的 DataSource;
  • Connection:代表一个到消息服务器的连接,类似 JDBC 的 Connection;
  • Session:代表一个经过认证后的连接会话;
  • Message:代表一个消息对象。

在 JMS 1.1 中,发送消息的典型代码如下:

try {Connection connection = null;
    try {// 创建连接:
        connection = connectionFactory.createConnection();
        // 创建会话:
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        // 创建一个 Producer 并关联到某个 Queue:
        MessageProducer messageProducer = session.createProducer(queue);
        // 创建一个文本消息:
        TextMessage textMessage = session.createTextMessage(text);
        // 发送消息:
        messageProducer.send(textMessage);
    } finally {// 关闭连接:
        if (connection != null) {connection.close();
        }
    }
} catch (JMSException ex) {// 处理 JMS 异常
}

JMS 2.0 改进了一些 API 接口,发送消息变得更简单:

try (JMSContext context = connectionFactory.createContext()) {context.createProducer().send(queue, text);
}

JMSContext实现了 AutoCloseable 接口,可以使用 try(resource) 语法,代码更简单。

有了以上预备知识,我们就可以开始开发 JMS 应用了。

首先,我们在 pom.xml 中添加如下依赖:

  • org.springframework:spring-jms:6.0.0
  • org.apache.activemq:artemis-jakarta-client:2.27.0

Artemis 的 Client 接口依赖了jakarta.jms:jakarta.jms-api,因此不必再引入 JMS API 的依赖。

在 AppConfig 中,通过 @EnableJms 让 Spring 自动扫描 JMS 相关的 Bean,并加载 JMS 配置文件jms.properties

@Configuration
@ComponentScan
@EnableWebMvc
@EnableJms // 启用 JMS
@EnableTransactionManagement
@PropertySource({"classpath:/jdbc.properties", "classpath:/jms.properties"})
public class AppConfig {...}

首先要创建的 Bean 是ConnectionFactory,即连接消息服务器的连接池:

@Bean
ConnectionFactory createJMSConnectionFactory(@Value("${jms.uri:tcp://localhost:61616}") String uri,
    @Value("${jms.username:admin}") String username,
    @Value("${jms.password:password}") String password)
{return new ActiveMQJMSConnectionFactory(uri, username, password);
}

因为我们使用的消息服务器是 ActiveMQ Artemis,所以 ConnectionFactory 的实现类就是消息服务器提供的ActiveMQJMSConnectionFactory,它需要的参数均由配置文件读取后传入,并设置了默认值。

我们再创建一个 JmsTemplate,它是 Spring 提供的一个工具类,和JdbcTemplate 类似,可以简化发送消息的代码:

@Bean
JmsTemplate createJmsTemplate(@Autowired ConnectionFactory connectionFactory) {return new JmsTemplate(connectionFactory);
}

下一步要创建的是JmsListenerContainerFactory

@Bean("jmsListenerContainerFactory")
DefaultJmsListenerContainerFactory createJmsListenerContainerFactory(@Autowired ConnectionFactory connectionFactory) {var factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    return factory;
}

除了必须指定 Bean 的名称为 jmsListenerContainerFactory 外,这个 Bean 的作用是处理和 Consumer 相关的 Bean。我们先跳过它的原理,继续编写 MessagingService 来发送消息:

@Component
public class MessagingService {@Autowired ObjectMapper objectMapper;
    @Autowired JmsTemplate jmsTemplate;

    public void sendMailMessage(MailMessage msg) throws Exception {String text = objectMapper.writeValueAsString(msg);
        jmsTemplate.send("jms/queue/mail", new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage(text);
            }
        });
    }
}

JMS 的消息类型支持以下几种:

  • TextMessage:文本消息;
  • BytesMessage:二进制消息;
  • MapMessage:包含多个 Key-Value 对的消息;
  • ObjectMessage:直接序列化 Java 对象的消息;
  • StreamMessage:一个包含基本类型序列的消息。

最常用的是发送基于 JSON 的文本消息,上述代码通过 JmsTemplate 创建一个 TextMessage 并发送到名称为 jms/queue/mail 的 Queue。

注意:Artemis 消息服务器默认配置下会自动创建 Queue,因此不必手动创建一个名为 jms/queue/mail 的 Queue,但不是所有的消息服务器都会自动创建 Queue,生产环境的消息服务器通常会关闭自动创建功能,需要手动创建 Queue。

再注意到 MailMessage 是我们自己定义的一个 JavaBean,真正的 JMS 消息是创建的TextMessage,它的内容是 JSON。

当用户注册成功后,我们就调用 MessagingService.sendMailMessage() 发送一条 JMS 消息,此代码十分简单,这里不再贴出。

下面我们要详细讨论的是如何处理消息,即编写 Consumer。从理论上讲,可以创建另一个 Java 进程来处理消息,但对于我们这个简单的 Web 程序来说没有必要,直接在同一个 Web 应用中接收并处理消息即可。

处理消息的核心代码是编写一个 Bean,并在处理方法上标注@JmsListener

@Component
public class MailMessageListener {final Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired ObjectMapper objectMapper;
    @Autowired MailService mailService;

    @JmsListener(destination = "jms/queue/mail", concurrency = "10")
    public void onMailMessageReceived(Message message) throws Exception {logger.info("received message:" + message);
        if (message instanceof TextMessage) {String text = ((TextMessage) message).getText();
            MailMessage mm = objectMapper.readValue(text, MailMessage.class);
            mailService.sendRegistrationMail(mm);
        } else {logger.error("unable to process non-text message!");
        }
    }
}

注意到 @JmsListener 指定了 Queue 的名称,因此,凡是发到此 Queue 的消息都会被这个 onMailMessageReceived() 方法处理,方法参数是 JMS 的 Message 接口,我们通过强制转型为 TextMessage 并提取 JSON,反序列化后获得自定义的 JavaBean,也就获得了发送邮件所需的所有信息。

下面问题来了:Spring 处理 JMS 消息的流程是什么?

如果我们直接调用 JMS 的 API 来处理消息,那么编写的代码大致如下:

// 创建 JMS 连接:
Connection connection = connectionFactory.createConnection();
// 创建会话:
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个 Consumer:
MessageConsumer consumer = session.createConsumer(queue);
// 为 Consumer 指定一个消息处理器:
consumer.setMessageListener(new MessageListener() {public void onMessage(Message message) {// 在此处理消息... 
    }
});
// 启动接收消息的循环:
connection.start();

我们自己编写的 MailMessageListener.onMailMessageReceived() 相当于消息处理器:

consumer.setMessageListener(new MessageListener() {public void onMessage(Message message) {mailMessageListener.onMailMessageReceived(message); 
    }
});

所以,Spring 根据 AppConfig 的注解 @EnableJms 自动扫描带有 @JmsListener 的 Bean 方法,并为其创建一个 MessageListener 把它包装起来。

注意到前面我们还创建了一个 JmsListenerContainerFactory 的 Bean,它的作用就是为每个 MessageListener 创建 MessageConsumer 并启动消息接收循环。

再注意到 @JmsListener 还有一个 concurrency 参数,10 表示可以最多同时并发处理 10 个消息,5-10表示并发处理的线程可以在 5~10 之间调整。

因此,Spring 在通过 MessageListener 接收到消息后,并不是直接调用 mailMessageListener.onMailMessageReceived(),而是用线程池调用,因此,要时刻牢记,onMailMessageReceived() 方法可能被多线程并发执行,一定要保证线程安全。

我们总结一下 Spring 接收消息的步骤:

通过 JmsListenerContainerFactory 配合 @EnableJms 扫描所有 @JmsListener 方法,自动创建 MessageConsumerMessageListener 以及线程池,启动消息循环接收处理消息,最终由我们自己编写的 @JmsListener 方法处理消息,可能会由多线程同时并发处理。

要验证消息发送和处理,我们注册一个新用户,可以看到如下日志输出:

2020-06-02 08:04:27 INFO  c.i.learnjava.web.UserController - user registered: bob@example.com
2020-06-02 08:04:27 INFO  c.i.l.service.MailMessageListener - received message: ActiveMQMessage[ID:9fc5...]:PERSISTENT/ClientMessageImpl[messageID=983, durable=true, address=jms/queue/mail, ...]]
2020-06-02 08:04:27 INFO  c.i.learnjava.service.MailService - [send mail] sending registration mail to bob@example.com...
2020-06-02 08:04:30 INFO  c.i.learnjava.service.MailService - [send mail] registration mail was sent to bob@example.com.

可见,消息被成功发送到 Artemis,然后在很短的时间内被接收处理了。

使用消息服务对发送 Email 进行改造的好处是,发送 Email 的能力通常是有限的,通过 JMS 消息服务,如果短时间内需要给大量用户发送 Email,可以先把消息堆积在 JMS 服务器上慢慢发送,对于批量发送邮件、短信等尤其有用。

练习

使用 JMS。

下载练习

小结

JMS 是 Java 消息服务,可以通过 JMS 服务器实现消息的异步处理。

消息服务主要解决 Producer 和 Consumer 生产和处理速度不匹配的问题。

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2024-08-05发表,共计10105字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中