Skip to content

Commit af43a3e

Browse files
authored
Fix exception when pop messages with multiple LMQ indexes (#7863)
1 parent 04dddec commit af43a3e

File tree

2 files changed

+101
-15
lines changed

2 files changed

+101
-15
lines changed

client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

+20-15
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Properties;
3131
import java.util.Set;
3232
import java.util.concurrent.atomic.AtomicInteger;
33+
import org.apache.commons.lang3.ArrayUtils;
3334
import org.apache.commons.lang3.StringUtils;
3435
import org.apache.rocketmq.client.ClientConfig;
3536
import org.apache.rocketmq.client.Validators;
@@ -1155,15 +1156,18 @@ private PopResult processPopResponse(final String brokerName, final RemotingComm
11551156
final Long msgQueueOffset;
11561157
if (MixAll.isLmq(topic) && messageExt.getReconsumeTimes() == 0 && StringUtils.isNotEmpty(
11571158
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) {
1158-
// process LMQ, LMQ topic has only 1 queue, which queue id is 0
1159+
// process LMQ
1160+
String[] queues = messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)
1161+
.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
1162+
String[] queueOffsets = messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)
1163+
.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
1164+
long offset = Long.parseLong(queueOffsets[ArrayUtils.indexOf(queues, topic)]);
1165+
// LMQ topic has only 1 queue, which queue id is 0
11591166
queueIdKey = ExtraInfoUtil.getStartOffsetInfoMapKey(topic, MixAll.LMQ_QUEUE_ID);
1160-
queueOffsetKey = ExtraInfoUtil.getQueueOffsetMapKey(topic, MixAll.LMQ_QUEUE_ID, Long.parseLong(
1161-
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)));
1162-
index = sortMap.get(queueIdKey).indexOf(
1163-
Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)));
1167+
queueOffsetKey = ExtraInfoUtil.getQueueOffsetMapKey(topic, MixAll.LMQ_QUEUE_ID, offset);
1168+
index = sortMap.get(queueIdKey).indexOf(offset);
11641169
msgQueueOffset = msgOffsetInfo.get(queueIdKey).get(index);
1165-
if (msgQueueOffset != Long.parseLong(
1166-
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET))) {
1170+
if (msgQueueOffset != offset) {
11671171
log.warn("Queue offset[%d] of msg is strange, not equal to the stored in msg, %s",
11681172
msgQueueOffset, messageExt);
11691173
}
@@ -1217,14 +1221,15 @@ private static Map<String, List<Long>> buildQueueOffsetSortedMap(String topic, L
12171221
final String key;
12181222
if (MixAll.isLmq(topic) && messageExt.getReconsumeTimes() == 0
12191223
&& StringUtils.isNotEmpty(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) {
1220-
// process LMQ, LMQ topic has only 1 queue, which queue id is 0
1221-
key = ExtraInfoUtil.getStartOffsetInfoMapKey(
1222-
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH), 0);
1223-
if (!sortMap.containsKey(key)) {
1224-
sortMap.put(key, new ArrayList<>(4));
1225-
}
1226-
sortMap.get(key).add(
1227-
Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)));
1224+
// process LMQ
1225+
String[] queues = messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)
1226+
.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
1227+
String[] queueOffsets = messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)
1228+
.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
1229+
// LMQ topic has only 1 queue, which queue id is 0
1230+
key = ExtraInfoUtil.getStartOffsetInfoMapKey(topic, MixAll.LMQ_QUEUE_ID);
1231+
sortMap.putIfAbsent(key, new ArrayList<>(4));
1232+
sortMap.get(key).add(Long.parseLong(queueOffsets[ArrayUtils.indexOf(queues, topic)]));
12281233
continue;
12291234
}
12301235
// Value of POP_CK is used to determine whether it is a pop retry,

client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java

+81
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.rocketmq.common.PlainAccessConfig;
4242
import org.apache.rocketmq.common.TopicConfig;
4343
import org.apache.rocketmq.common.message.Message;
44+
import org.apache.rocketmq.common.message.MessageAccessor;
4445
import org.apache.rocketmq.common.message.MessageConst;
4546
import org.apache.rocketmq.common.message.MessageDecoder;
4647
import org.apache.rocketmq.common.message.MessageExt;
@@ -570,6 +571,86 @@ public void onException(Throwable e) {
570571
done.await();
571572
}
572573

574+
@Test
575+
public void testPopMultiLmqMessage_async() throws Exception {
576+
final long popTime = System.currentTimeMillis();
577+
final int invisibleTime = 10 * 1000;
578+
final String lmqTopic = MixAll.LMQ_PREFIX + "lmq1";
579+
final String lmqTopic2 = MixAll.LMQ_PREFIX + "lmq2";
580+
final String multiDispatch = String.join(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER, lmqTopic, lmqTopic2);
581+
final String multiOffset = String.join(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER, "0", "0");
582+
doAnswer(new Answer<Void>() {
583+
@Override
584+
public Void answer(InvocationOnMock mock) throws Throwable {
585+
InvokeCallback callback = mock.getArgument(3);
586+
RemotingCommand request = mock.getArgument(1);
587+
ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null);
588+
RemotingCommand response = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
589+
response.setCode(ResponseCode.SUCCESS);
590+
response.setOpaque(request.getOpaque());
591+
592+
PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.readCustomHeader();
593+
responseHeader.setInvisibleTime(invisibleTime);
594+
responseHeader.setPopTime(popTime);
595+
responseHeader.setReviveQid(0);
596+
responseHeader.setRestNum(1);
597+
StringBuilder startOffsetInfo = new StringBuilder(64);
598+
ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, topic, 0, 0L);
599+
responseHeader.setStartOffsetInfo(startOffsetInfo.toString());
600+
StringBuilder msgOffsetInfo = new StringBuilder(64);
601+
ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, topic, 0, Collections.singletonList(0L));
602+
responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString());
603+
response.setRemark("FOUND");
604+
response.makeCustomHeaderToNet();
605+
606+
MessageExt message = new MessageExt();
607+
message.setQueueId(0);
608+
message.setFlag(0);
609+
message.setQueueOffset(10L);
610+
message.setCommitLogOffset(10000L);
611+
message.setSysFlag(0);
612+
message.setBornTimestamp(System.currentTimeMillis());
613+
message.setBornHost(new InetSocketAddress("127.0.0.1", 10));
614+
message.setStoreTimestamp(System.currentTimeMillis());
615+
message.setStoreHost(new InetSocketAddress("127.0.0.1", 11));
616+
message.setBody("body".getBytes());
617+
message.setTopic(topic);
618+
MessageAccessor.putProperty(message, MessageConst.PROPERTY_INNER_MULTI_DISPATCH, multiDispatch);
619+
MessageAccessor.putProperty(message, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, multiOffset);
620+
response.setBody(MessageDecoder.encode(message, false));
621+
responseFuture.setResponseCommand(response);
622+
callback.operationSucceed(responseFuture.getResponseCommand());
623+
return null;
624+
}
625+
}).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
626+
final CountDownLatch done = new CountDownLatch(1);
627+
final PopMessageRequestHeader requestHeader = new PopMessageRequestHeader();
628+
requestHeader.setTopic(lmqTopic);
629+
mqClientAPI.popMessageAsync(brokerName, brokerAddr, requestHeader, 10 * 1000, new PopCallback() {
630+
@Override
631+
public void onSuccess(PopResult popResult) {
632+
assertThat(popResult.getPopStatus()).isEqualTo(PopStatus.FOUND);
633+
assertThat(popResult.getRestNum()).isEqualTo(1);
634+
assertThat(popResult.getInvisibleTime()).isEqualTo(invisibleTime);
635+
assertThat(popResult.getPopTime()).isEqualTo(popTime);
636+
assertThat(popResult.getMsgFoundList()).size().isEqualTo(1);
637+
assertThat(popResult.getMsgFoundList().get(0).getTopic()).isEqualTo(lmqTopic);
638+
assertThat(popResult.getMsgFoundList().get(0).getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))
639+
.isEqualTo(multiDispatch);
640+
assertThat(popResult.getMsgFoundList().get(0).getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET))
641+
.isEqualTo(multiOffset);
642+
done.countDown();
643+
}
644+
645+
@Override
646+
public void onException(Throwable e) {
647+
Assertions.fail("want no exception but got one", e);
648+
done.countDown();
649+
}
650+
});
651+
done.await();
652+
}
653+
573654
@Test
574655
public void testAckMessageAsync_Success() throws Exception {
575656
doAnswer(new Answer<Void>() {

0 commit comments

Comments
 (0)