Skip to content

Commit 991af5e

Browse files
committed
[ISSUE #5542] Fix ConsumerProcessor lockBatchMQ future allOf data race issue
1 parent d781271 commit 991af5e

File tree

1 file changed

+5
-15
lines changed

1 file changed

+5
-15
lines changed

proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java

+5-15
Original file line numberDiff line numberDiff line change
@@ -313,19 +313,15 @@ public CompletableFuture<Set<MessageQueue>> lockBatchMQ(ProxyContext ctx, Set<Me
313313
Set<MessageQueue> successSet = new CopyOnWriteArraySet<>();
314314
Set<AddressableMessageQueue> addressableMessageQueueSet = buildAddressableSet(mqSet);
315315
Map<String, List<AddressableMessageQueue>> messageQueueSetMap = buildAddressableMapByBrokerName(addressableMessageQueueSet);
316-
List<CompletableFuture<Set<MessageQueue>>> futureList = new ArrayList<>();
316+
List<CompletableFuture<Void>> futureList = new ArrayList<>();
317317
messageQueueSetMap.forEach((k, v) -> {
318318
LockBatchRequestBody requestBody = new LockBatchRequestBody();
319319
requestBody.setConsumerGroup(consumerGroup);
320320
requestBody.setClientId(clientId);
321321
requestBody.setMqSet(v.stream().map(AddressableMessageQueue::getMessageQueue).collect(Collectors.toSet()));
322-
CompletableFuture<Set<MessageQueue>> future0 = new CompletableFuture<>();
323-
try {
324-
future0 = serviceManager.getMessageService().lockBatchMQ(ctx, v.get(0), requestBody, timeoutMillis);
325-
future0.thenAccept(successSet::addAll);
326-
} catch (Throwable t) {
327-
future0.completeExceptionally(t);
328-
}
322+
CompletableFuture<Void> future0 = serviceManager.getMessageService()
323+
.lockBatchMQ(ctx, v.get(0), requestBody, timeoutMillis)
324+
.thenAccept(successSet::addAll);
329325
futureList.add(FutureUtils.addExecutor(future0, this.executor));
330326
});
331327
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).whenComplete((v, t) -> {
@@ -348,13 +344,7 @@ public CompletableFuture<Void> unlockBatchMQ(ProxyContext ctx, Set<MessageQueue>
348344
requestBody.setConsumerGroup(consumerGroup);
349345
requestBody.setClientId(clientId);
350346
requestBody.setMqSet(v.stream().map(AddressableMessageQueue::getMessageQueue).collect(Collectors.toSet()));
351-
CompletableFuture<Void> future0 = new CompletableFuture<>();
352-
try {
353-
future0 = serviceManager.getMessageService().unlockBatchMQ(ctx, v.get(0), requestBody, timeoutMillis);
354-
future0.complete(null);
355-
} catch (Throwable t) {
356-
future0.completeExceptionally(t);
357-
}
347+
CompletableFuture<Void> future0 = serviceManager.getMessageService().unlockBatchMQ(ctx, v.get(0), requestBody, timeoutMillis);
358348
futureList.add(FutureUtils.addExecutor(future0, this.executor));
359349
});
360350
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).whenComplete((v, t) -> {

0 commit comments

Comments
 (0)