阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

如何在项目中引入MetaQ消息收发机制

188次阅读
没有评论

共计 1608 个字符,预计需要花费 5 分钟才能阅读完成。

当需要异步发送和接收大量消息时,需要在 Crystal 项目中引入 MetaQ 消息收发机制。

关于 MetaQ 使用的官方例子可参考:https://github.com/killme2008/Metamorphosis/wiki/%E7%AE%80%E5%8D%95%E4%BE%8B%E5%AD%90

Crystal 框架将 MetaQ 进行封装,简化 MetaQ 的使用,具体如下:

消息生产端

  1. 引入 crystal-metaq-producer 项目最为依赖:

    <dependency>
        <groupId>com.gsoft.crystal</groupId>
        <artifactId>crystal-metaq-producer</artifactId>
    </dependency>
  2. 调用消息发送对象,发送指定消息:

    @Resource
    private MessageProducer mp;
    @Resource
    private MessageConsumer mc;
     
    @Test
    public void testProducer() {String topic = "test";
        final String msg = "test message !";
         
        mp.publish(topic);
        Message message = new Message(topic, msg.getBytes());
        try {mp.sendMessage(message);
        } catch (MetaClientException e) {e.printStackTrace();
        } catch (InterruptedException e) {e.printStackTrace();
        }

    其中,topic 必须是 MetaQ 服务器中定义的主题之一。

    在发送消息前,必须先 mp.publish(topic),与指定主题关联。

     

消息消费端

  1. 引入 crystal-metaq-consumer 项目最为依赖:

    <dependency>
        <groupId>com.gsoft.crystal</groupId>
        <artifactId>crystal-metaq-consumer</artifactId>
    </dependency>
  2. 调用消息消费对象,注册监听器:

    @Resource
    private MessageConsumer mc;
     
    @Test
    public void testProducer() {String topic = "test";
         
        try {mc.subscribe(topic, 1024*1024, new MessageListener() {
                 
                @Override
                public void recieveMessages(Message message) throws InterruptedException {String str = new String(message.getData());
                    System.out.println("Recived Message:" + str);
                    Assert.assertEquals(msg, str);
                }
                 
                @Override
                public Executor getExecutor() {return null;
                }
            });
            mc.completeSubscribe();} catch (MetaClientException e1) {e1.printStackTrace();
        }

    其中,mc.subscribe()方法可执行多次,最后需执行 mc.completeSubscribe() 方法。

    另,上述方法中的 1024*1024 参数为接收的消息内容最大字节数,可自行调整以优化性能(不了解具体如何优化情况下,建议不要调整)。

    监听器中的 recieveMessages 方法即为消息消费方法,getExecutor 方法返回线程池的执行器,如返回 null,则不采用线程池。

本文永久更新链接地址 :http://www.linuxidc.com/Linux/2016-12/138427.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计1608字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中