|
| 1 | +# Light message queue (LMQ) |
| 2 | + |
| 3 | + |
| 4 | +## 一、broker启动配置 |
| 5 | + |
| 6 | + |
| 7 | +broker.conf文件需要增加以下的配置项,开启LMQ开关,这样就可以识别LMQ相关属性的消息,进行原子分发消息到LMQ队列 |
| 8 | +```properties |
| 9 | +enableLmq = true |
| 10 | +enableMultiDispatch = true |
| 11 | +``` |
| 12 | +## 二、发送消息 |
| 13 | +发送消息的时候通过设置 INNER_MULTI_DISPATCH 属性,LMQ queue使用逗号分割,queue前缀必须是 %LMQ%,这样broker就可以识别LMQ queue. |
| 14 | +```java |
| 15 | +DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); |
| 16 | +producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); |
| 17 | +producer.start(); |
| 18 | + |
| 19 | + |
| 20 | +/* |
| 21 | +* Create a message instance, specifying topic, tag and message body. |
| 22 | +*/ |
| 23 | +Message msg = new Message("TopicTest" /* Topic */, |
| 24 | + "TagA" /* Tag */, |
| 25 | + ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ |
| 26 | + ); |
| 27 | +/* |
| 28 | +* INNER_MULTI_DISPATCH property and PREFIX must start as "%LMQ%", |
| 29 | +* If it is multiple LMQ, need to use “,” split |
| 30 | +*/ |
| 31 | +message.putUserProperty("INNER_MULTI_DISPATCH", "%LMQ%123,%LMQ%456"); |
| 32 | +/* |
| 33 | +* Call send message to deliver message to one of brokers. |
| 34 | +*/ |
| 35 | +SendResult sendResult = producer.send(msg); |
| 36 | +``` |
| 37 | +## 三、拉取消息 |
| 38 | +LMQ queue在每个broker上只有一个queue,也即queueId为0, 指明轻量级的MessageQueue,就可以拉取消息进行消费。 |
| 39 | +```java |
| 40 | +DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(); |
| 41 | +defaultMQPullConsumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); |
| 42 | +defaultMQPullConsumer.setVipChannelEnabled(false); |
| 43 | +defaultMQPullConsumer.setConsumerGroup("CID_RMQ_SYS_LMQ_TEST"); |
| 44 | +defaultMQPullConsumer.setInstanceName("CID_RMQ_SYS_LMQ_TEST"); |
| 45 | +defaultMQPullConsumer.setRegisterTopics(new HashSet<>(Arrays.asList("TopicTest"))); |
| 46 | +defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(2000); |
| 47 | +defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(3000); |
| 48 | +defaultMQPullConsumer.start(); |
| 49 | + |
| 50 | +String brokerName = "set broker Name"; |
| 51 | +MessageQueue mq = new MessageQueue("%LMQ%123", brokerName, 0); |
| 52 | + |
| 53 | +Long offset = defaultMQPullConsumer.maxOffset(mq); |
| 54 | + |
| 55 | +defaultMQPullConsumer.pullBlockIfNotFound( |
| 56 | + mq, "*", offset, 32, |
| 57 | + new PullCallback() { |
| 58 | + @Override |
| 59 | + public void onSuccess(PullResult pullResult) { |
| 60 | + List<MessageExt> list = pullResult.getMsgFoundList(); |
| 61 | + if (list == null || list.isEmpty()) { |
| 62 | + return; |
| 63 | + } |
| 64 | + for (MessageExt messageExt : list) { |
| 65 | + |
| 66 | + } |
| 67 | + } |
| 68 | + @Override |
| 69 | + public void onException(Throwable e) { |
| 70 | + |
| 71 | + } |
| 72 | +}); |
| 73 | +``` |
| 74 | + |
| 75 | + |
0 commit comments