Skip to content

Commit b86059c

Browse files
authored
Support LMQ dispatch in case if Consume Queue Store is RocksDB-based (#8842)
* feat: support LMQ dispatch Signed-off-by: Li Zhanhui <[email protected]> * fix: introduce group-commit for batch insertion of RocksDB KV pairs Signed-off-by: Li Zhanhui <[email protected]> * fix: propagate store error to broker module Signed-off-by: Li Zhanhui <[email protected]> * chore: fix all Bazel warning and errors Signed-off-by: Zhanhui Li <[email protected]> * fix: remove unnecessary batch-ops when writing RocksDB using atomic flush Signed-off-by: Li Zhanhui <[email protected]> * fix: find a writable directory for RocksDB logs Signed-off-by: Li Zhanhui <[email protected]> * chore: clean up ConfigHelperTest Signed-off-by: Li Zhanhui <[email protected]> * fix: truncate consume queues in case commit log records are truncated Signed-off-by: Li Zhanhui <[email protected]> * fix: truncate LMQ max offsets Signed-off-by: Li Zhanhui <[email protected]> * fix: correct truncate boundary of consume queues Signed-off-by: Li Zhanhui <[email protected]> * fix: correct MessageExt encoding Signed-off-by: Li Zhanhui <[email protected]> * chore: remove unused import Signed-off-by: Li Zhanhui <[email protected]> --------- Signed-off-by: Li Zhanhui <[email protected]> Signed-off-by: Zhanhui Li <[email protected]>
1 parent d2fd068 commit b86059c

File tree

68 files changed

+1440
-814
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+1440
-814
lines changed

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,5 @@ bazel-out
1717
bazel-bin
1818
bazel-rocketmq
1919
bazel-testlogs
20-
.vscode
20+
.vscode
21+
MODULE.bazel.lock

MODULE.bazel

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
###############################################################################
18+
# Bazel now uses Bzlmod by default to manage external dependencies.
19+
# Please consider migrating your external dependencies from WORKSPACE to MODULE.bazel.
20+
#
21+
# For more details, please check https://github.com/bazelbuild/bazel/issues/18958
22+
###############################################################################

WORKSPACE

+2
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ maven_install(
112112
"com.alipay.sofa:hessian:3.3.6",
113113
"io.netty:netty-tcnative-boringssl-static:2.0.48.Final",
114114
"org.mockito:mockito-junit-jupiter:4.11.0",
115+
"com.alibaba.fastjson2:fastjson2:2.0.43",
116+
"org.junit.jupiter:junit-jupiter-api:5.9.1",
115117
],
116118
fetch_sources = True,
117119
repositories = [

broker/BUILD.bazel

+3
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ java_library(
9191
"@maven//:io_github_aliyunmq_rocketmq_slf4j_api",
9292
"@maven//:org_powermock_powermock_core",
9393
"@maven//:io_opentelemetry_opentelemetry_api",
94+
"@maven//:com_googlecode_concurrentlinkedhashmap_concurrentlinkedhashmap_lru",
95+
"@maven//:org_apache_rocketmq_rocketmq_rocksdb",
96+
"@maven//:commons_collections_commons_collections",
9497
],
9598
)
9699

broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java

+9-5
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.rocketmq.logging.org.slf4j.Logger;
3838
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
3939
import org.apache.rocketmq.remoting.common.RemotingHelper;
40+
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
4041
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
4142
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
4243
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -49,6 +50,7 @@
4950
import org.apache.rocketmq.remoting.protocol.header.GetConsumerStatusRequestHeader;
5051
import org.apache.rocketmq.remoting.protocol.header.NotifyConsumerIdsChangedRequestHeader;
5152
import org.apache.rocketmq.remoting.protocol.header.ResetOffsetRequestHeader;
53+
import org.apache.rocketmq.store.exception.ConsumeQueueException;
5254

5355
public class Broker2Client {
5456
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -100,13 +102,12 @@ public void notifyConsumerIdsChanged(
100102
}
101103
}
102104

103-
104-
public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce) {
105+
public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce) throws RemotingCommandException {
105106
return resetOffset(topic, group, timeStamp, isForce, false);
106107
}
107108

108109
public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
109-
boolean isC) {
110+
boolean isC) throws RemotingCommandException {
110111
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
111112

112113
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
@@ -135,8 +136,11 @@ public RemotingCommand resetOffset(String topic, String group, long timeStamp, b
135136

136137
long timeStampOffset;
137138
if (timeStamp == -1) {
138-
139-
timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
139+
try {
140+
timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
141+
} catch (ConsumeQueueException e) {
142+
throw new RemotingCommandException("Failed to get max offset in queue", e);
143+
}
140144
} else {
141145
timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
142146
}

broker/src/main/java/org/apache/rocketmq/broker/longpolling/LmqPullRequestHoldService.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.rocketmq.logging.org.slf4j.Logger;
2323
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
2424

25-
2625
public class LmqPullRequestHoldService extends PullRequestHoldService {
2726
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
2827

@@ -48,8 +47,8 @@ public void checkHoldRequest() {
4847
}
4948
String topic = key.substring(0, idx);
5049
int queueId = Integer.parseInt(key.substring(idx + 1));
51-
final long offset = brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
5250
try {
51+
final long offset = brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
5352
this.notifyMessageArriving(topic, queueId, offset);
5453
} catch (Throwable e) {
5554
LOGGER.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);

broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.rocketmq.logging.org.slf4j.Logger;
2929
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
3030
import org.apache.rocketmq.store.ConsumeQueueExt;
31+
import org.apache.rocketmq.store.exception.ConsumeQueueException;
3132

3233
public class PullRequestHoldService extends ServiceThread {
3334
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -103,8 +104,8 @@ protected void checkHoldRequest() {
103104
if (2 == kArray.length) {
104105
String topic = kArray[0];
105106
int queueId = Integer.parseInt(kArray[1]);
106-
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
107107
try {
108+
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
108109
this.notifyMessageArriving(topic, queueId, offset);
109110
} catch (Throwable e) {
110111
log.error(
@@ -131,7 +132,12 @@ public void notifyMessageArriving(final String topic, final int queueId, final l
131132
for (PullRequest request : requestList) {
132133
long newestOffset = maxOffset;
133134
if (newestOffset <= request.getPullFromThisOffset()) {
134-
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
135+
try {
136+
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
137+
} catch (ConsumeQueueException e) {
138+
log.error("Failed tp get max offset in queue", e);
139+
continue;
140+
}
135141
}
136142

137143
if (newestOffset > request.getPullFromThisOffset()) {

broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java

+63-35
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
4949
import org.apache.rocketmq.store.DefaultMessageFilter;
5050
import org.apache.rocketmq.store.MessageStore;
51+
import org.apache.rocketmq.store.exception.ConsumeQueueException;
5152

5253
public class ConsumerLagCalculator {
5354
private final BrokerConfig brokerConfig;
@@ -212,45 +213,61 @@ public void calculateLag(Consumer<CalculateLagResult> lagRecorder) {
212213

213214
CalculateLagResult result = new CalculateLagResult(info.group, info.topic, false);
214215

215-
Pair<Long, Long> lag = getConsumerLagStats(info.group, info.topic, info.isPop);
216-
if (lag != null) {
217-
result.lag = lag.getObject1();
218-
result.earliestUnconsumedTimestamp = lag.getObject2();
216+
try {
217+
Pair<Long, Long> lag = getConsumerLagStats(info.group, info.topic, info.isPop);
218+
if (lag != null) {
219+
result.lag = lag.getObject1();
220+
result.earliestUnconsumedTimestamp = lag.getObject2();
221+
}
222+
lagRecorder.accept(result);
223+
} catch (ConsumeQueueException e) {
224+
LOGGER.error("Failed to get lag stats", e);
219225
}
220-
lagRecorder.accept(result);
221226

222227
if (info.isPop) {
223-
Pair<Long, Long> retryLag = getConsumerLagStats(info.group, info.retryTopic, true);
228+
try {
229+
Pair<Long, Long> retryLag = getConsumerLagStats(info.group, info.retryTopic, true);
224230

225-
result = new CalculateLagResult(info.group, info.topic, true);
226-
if (retryLag != null) {
227-
result.lag = retryLag.getObject1();
228-
result.earliestUnconsumedTimestamp = retryLag.getObject2();
231+
result = new CalculateLagResult(info.group, info.topic, true);
232+
if (retryLag != null) {
233+
result.lag = retryLag.getObject1();
234+
result.earliestUnconsumedTimestamp = retryLag.getObject2();
235+
}
236+
lagRecorder.accept(result);
237+
} catch (ConsumeQueueException e) {
238+
LOGGER.error("Failed to get lag stats", e);
229239
}
230-
lagRecorder.accept(result);
231240
}
232241
});
233242
}
234243

235244
public void calculateInflight(Consumer<CalculateInflightResult> inflightRecorder) {
236245
processAllGroup(info -> {
237246
CalculateInflightResult result = new CalculateInflightResult(info.group, info.topic, false);
238-
Pair<Long, Long> inFlight = getInFlightMsgStats(info.group, info.topic, info.isPop);
239-
if (inFlight != null) {
240-
result.inFlight = inFlight.getObject1();
241-
result.earliestUnPulledTimestamp = inFlight.getObject2();
247+
try {
248+
Pair<Long, Long> inFlight = getInFlightMsgStats(info.group, info.topic, info.isPop);
249+
if (inFlight != null) {
250+
result.inFlight = inFlight.getObject1();
251+
result.earliestUnPulledTimestamp = inFlight.getObject2();
252+
}
253+
inflightRecorder.accept(result);
254+
} catch (ConsumeQueueException e) {
255+
LOGGER.error("Failed to get inflight message stats", e);
242256
}
243-
inflightRecorder.accept(result);
244257

245258
if (info.isPop) {
246-
Pair<Long, Long> retryInFlight = getInFlightMsgStats(info.group, info.retryTopic, true);
259+
try {
260+
Pair<Long, Long> retryInFlight = getInFlightMsgStats(info.group, info.retryTopic, true);
247261

248-
result = new CalculateInflightResult(info.group, info.topic, true);
249-
if (retryInFlight != null) {
250-
result.inFlight = retryInFlight.getObject1();
251-
result.earliestUnPulledTimestamp = retryInFlight.getObject2();
262+
result = new CalculateInflightResult(info.group, info.topic, true);
263+
if (retryInFlight != null) {
264+
result.inFlight = retryInFlight.getObject1();
265+
result.earliestUnPulledTimestamp = retryInFlight.getObject2();
266+
}
267+
inflightRecorder.accept(result);
268+
} catch (ConsumeQueueException e) {
269+
LOGGER.error("Failed to get inflight message stats", e);
252270
}
253-
inflightRecorder.accept(result);
254271
}
255272
});
256273
}
@@ -259,20 +276,28 @@ public void calculateAvailable(Consumer<CalculateAvailableResult> availableRecor
259276
processAllGroup(info -> {
260277
CalculateAvailableResult result = new CalculateAvailableResult(info.group, info.topic, false);
261278

262-
result.available = getAvailableMsgCount(info.group, info.topic, info.isPop);
263-
availableRecorder.accept(result);
279+
try {
280+
result.available = getAvailableMsgCount(info.group, info.topic, info.isPop);
281+
availableRecorder.accept(result);
282+
} catch (ConsumeQueueException e) {
283+
LOGGER.error("Failed to get available message count", e);
284+
}
264285

265-
if (info.isPop) {
266-
long retryAvailable = getAvailableMsgCount(info.group, info.retryTopic, true);
267286

268-
result = new CalculateAvailableResult(info.group, info.topic, true);
269-
result.available = retryAvailable;
270-
availableRecorder.accept(result);
287+
if (info.isPop) {
288+
try {
289+
long retryAvailable = getAvailableMsgCount(info.group, info.retryTopic, true);
290+
result = new CalculateAvailableResult(info.group, info.topic, true);
291+
result.available = retryAvailable;
292+
availableRecorder.accept(result);
293+
} catch (ConsumeQueueException e) {
294+
LOGGER.error("Failed to get available message count", e);
295+
}
271296
}
272297
});
273298
}
274299

275-
public Pair<Long, Long> getConsumerLagStats(String group, String topic, boolean isPop) {
300+
public Pair<Long, Long> getConsumerLagStats(String group, String topic, boolean isPop) throws ConsumeQueueException {
276301
long total = 0L;
277302
long earliestUnconsumedTimestamp = Long.MAX_VALUE;
278303

@@ -298,7 +323,8 @@ public Pair<Long, Long> getConsumerLagStats(String group, String topic, boolean
298323
return new Pair<>(total, earliestUnconsumedTimestamp);
299324
}
300325

301-
public Pair<Long, Long> getConsumerLagStats(String group, String topic, int queueId, boolean isPop) {
326+
public Pair<Long, Long> getConsumerLagStats(String group, String topic, int queueId, boolean isPop)
327+
throws ConsumeQueueException {
302328
long brokerOffset = messageStore.getMaxOffsetInQueue(topic, queueId);
303329
if (brokerOffset < 0) {
304330
brokerOffset = 0;
@@ -329,7 +355,7 @@ public Pair<Long, Long> getConsumerLagStats(String group, String topic, int queu
329355
return new Pair<>(lag, consumerStoreTimeStamp);
330356
}
331357

332-
public Pair<Long, Long> getInFlightMsgStats(String group, String topic, boolean isPop) {
358+
public Pair<Long, Long> getInFlightMsgStats(String group, String topic, boolean isPop) throws ConsumeQueueException {
333359
long total = 0L;
334360
long earliestUnPulledTimestamp = Long.MAX_VALUE;
335361

@@ -355,7 +381,8 @@ public Pair<Long, Long> getInFlightMsgStats(String group, String topic, boolean
355381
return new Pair<>(total, earliestUnPulledTimestamp);
356382
}
357383

358-
public Pair<Long, Long> getInFlightMsgStats(String group, String topic, int queueId, boolean isPop) {
384+
public Pair<Long, Long> getInFlightMsgStats(String group, String topic, int queueId, boolean isPop)
385+
throws ConsumeQueueException {
359386
if (isPop) {
360387
long inflight = popInflightMessageCounter.getGroupPopInFlightMessageNum(topic, group, queueId);
361388
long pullOffset = popBufferMergeService.getLatestOffset(topic, group, queueId);
@@ -384,7 +411,7 @@ public Pair<Long, Long> getInFlightMsgStats(String group, String topic, int queu
384411
return new Pair<>(inflight, pullStoreTimeStamp);
385412
}
386413

387-
public long getAvailableMsgCount(String group, String topic, boolean isPop) {
414+
public long getAvailableMsgCount(String group, String topic, boolean isPop) throws ConsumeQueueException {
388415
long total = 0L;
389416

390417
if (group == null || topic == null) {
@@ -403,7 +430,8 @@ public long getAvailableMsgCount(String group, String topic, boolean isPop) {
403430
return total;
404431
}
405432

406-
public long getAvailableMsgCount(String group, String topic, int queueId, boolean isPop) {
433+
public long getAvailableMsgCount(String group, String topic, int queueId, boolean isPop)
434+
throws ConsumeQueueException {
407435
long brokerOffset = messageStore.getMaxOffsetInQueue(topic, queueId);
408436
if (brokerOffset < 0) {
409437
brokerOffset = 0;

broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java

+18-6
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,11 @@
3939
import org.apache.rocketmq.common.metrics.NopLongCounter;
4040
import org.apache.rocketmq.common.metrics.NopLongHistogram;
4141
import org.apache.rocketmq.store.PutMessageStatus;
42+
import org.apache.rocketmq.store.exception.ConsumeQueueException;
4243
import org.apache.rocketmq.store.pop.AckMsg;
4344
import org.apache.rocketmq.store.pop.PopCheckPoint;
45+
import org.apache.rocketmq.logging.org.slf4j.Logger;
46+
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
4447

4548
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
4649
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
@@ -57,6 +60,7 @@
5760
import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.LABEL_REVIVE_MESSAGE_TYPE;
5861

5962
public class PopMetricsManager {
63+
private static final Logger log = LoggerFactory.getLogger(PopMetricsManager.class);
6064
public static Supplier<AttributesBuilder> attributesBuilderSupplier;
6165

6266
private static LongHistogram popBufferScanTimeConsume = new NopLongHistogram();
@@ -138,19 +142,27 @@ private static void calculatePopReviveLatency(BrokerController brokerController,
138142
ObservableLongMeasurement measurement) {
139143
PopReviveService[] popReviveServices = brokerController.getAckMessageProcessor().getPopReviveServices();
140144
for (PopReviveService popReviveService : popReviveServices) {
141-
measurement.record(popReviveService.getReviveBehindMillis(), newAttributesBuilder()
142-
.put(LABEL_QUEUE_ID, popReviveService.getQueueId())
143-
.build());
145+
try {
146+
measurement.record(popReviveService.getReviveBehindMillis(), newAttributesBuilder()
147+
.put(LABEL_QUEUE_ID, popReviveService.getQueueId())
148+
.build());
149+
} catch (ConsumeQueueException e) {
150+
log.error("Failed to get revive behind duration", e);
151+
}
144152
}
145153
}
146154

147155
private static void calculatePopReviveLag(BrokerController brokerController,
148156
ObservableLongMeasurement measurement) {
149157
PopReviveService[] popReviveServices = brokerController.getAckMessageProcessor().getPopReviveServices();
150158
for (PopReviveService popReviveService : popReviveServices) {
151-
measurement.record(popReviveService.getReviveBehindMessages(), newAttributesBuilder()
152-
.put(LABEL_QUEUE_ID, popReviveService.getQueueId())
153-
.build());
159+
try {
160+
measurement.record(popReviveService.getReviveBehindMessages(), newAttributesBuilder()
161+
.put(LABEL_QUEUE_ID, popReviveService.getQueueId())
162+
.build());
163+
} catch (ConsumeQueueException e) {
164+
log.error("Failed to get revive behind message count", e);
165+
}
154166
}
155167
}
156168

0 commit comments

Comments
 (0)