Skip to content

Commit 14b63ee

Browse files
authored
[RIP-28] light message queue(LMQ) (#3694)
1 parent 8d28d3f commit 14b63ee

34 files changed

+2466
-283
lines changed

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,13 @@
4747
import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
4848
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
4949
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
50+
import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService;
5051
import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
5152
import org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
5253
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
5354
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
5455
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
56+
import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager;
5557
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
5658
import org.apache.rocketmq.broker.plugin.MessageStoreFactory;
5759
import org.apache.rocketmq.broker.plugin.MessageStorePluginContext;
@@ -64,7 +66,9 @@
6466
import org.apache.rocketmq.broker.processor.ReplyMessageProcessor;
6567
import org.apache.rocketmq.broker.processor.SendMessageProcessor;
6668
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
69+
import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager;
6770
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
71+
import org.apache.rocketmq.broker.topic.LmqTopicConfigManager;
6872
import org.apache.rocketmq.broker.topic.TopicConfigManager;
6973
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
7074
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
@@ -106,6 +110,7 @@
106110
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
107111
import org.apache.rocketmq.store.stats.BrokerStats;
108112
import org.apache.rocketmq.store.stats.BrokerStatsManager;
113+
import org.apache.rocketmq.store.stats.LmqBrokerStatsManager;
109114

110115
public class BrokerController {
111116
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -180,18 +185,18 @@ public BrokerController(
180185
this.nettyServerConfig = nettyServerConfig;
181186
this.nettyClientConfig = nettyClientConfig;
182187
this.messageStoreConfig = messageStoreConfig;
183-
this.consumerOffsetManager = new ConsumerOffsetManager(this);
184-
this.topicConfigManager = new TopicConfigManager(this);
188+
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this);
189+
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this);
185190
this.pullMessageProcessor = new PullMessageProcessor(this);
186-
this.pullRequestHoldService = new PullRequestHoldService(this);
191+
this.pullRequestHoldService = messageStoreConfig.isEnableLmq() ? new LmqPullRequestHoldService(this) : new PullRequestHoldService(this);
187192
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
188193
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
189194
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
190195
this.consumerFilterManager = new ConsumerFilterManager(this);
191196
this.producerManager = new ProducerManager();
192197
this.clientHousekeepingService = new ClientHousekeepingService(this);
193198
this.broker2Client = new Broker2Client(this);
194-
this.subscriptionGroupManager = new SubscriptionGroupManager(this);
199+
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
195200
this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
196201
this.filterServerManager = new FilterServerManager(this);
197202

@@ -207,7 +212,8 @@ public BrokerController(
207212
this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
208213
this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
209214

210-
this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat());
215+
this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat());
216+
211217
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
212218

213219
this.brokerFastFailure = new BrokerFastFailure(this);

broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java

+4
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ public static String getConsumerOffsetPath(final String rootDir) {
3939
return rootDir + File.separator + "config" + File.separator + "consumerOffset.json";
4040
}
4141

42+
public static String getLmqConsumerOffsetPath(final String rootDir) {
43+
return rootDir + File.separator + "config" + File.separator + "lmqConsumerOffset.json";
44+
}
45+
4246
public static String getSubscriptionGroupPath(final String rootDir) {
4347
return rootDir + File.separator + "config" + File.separator + "subscriptionGroup.json";
4448
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.broker.longpolling;
18+
19+
import org.apache.rocketmq.broker.BrokerController;
20+
import org.apache.rocketmq.common.MixAll;
21+
import org.apache.rocketmq.common.constant.LoggerName;
22+
import org.apache.rocketmq.logging.InternalLogger;
23+
import org.apache.rocketmq.logging.InternalLoggerFactory;
24+
25+
26+
public class LmqPullRequestHoldService extends PullRequestHoldService {
27+
private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
28+
29+
public LmqPullRequestHoldService(BrokerController brokerController) {
30+
super(brokerController);
31+
}
32+
33+
@Override
34+
public String getServiceName() {
35+
return LmqPullRequestHoldService.class.getSimpleName();
36+
}
37+
38+
@Override
39+
public void checkHoldRequest() {
40+
for (String key : pullRequestTable.keySet()) {
41+
int idx = key.lastIndexOf(TOPIC_QUEUEID_SEPARATOR);
42+
if (idx <= 0 || idx >= key.length() - 1) {
43+
pullRequestTable.remove(key);
44+
continue;
45+
}
46+
String topic = key.substring(0, idx);
47+
int queueId = Integer.parseInt(key.substring(idx + 1));
48+
final long offset = brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
49+
try {
50+
this.notifyMessageArriving(topic, queueId, offset);
51+
} catch (Throwable e) {
52+
LOGGER.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
53+
}
54+
if (MixAll.isLmq(topic)) {
55+
ManyPullRequest mpr = pullRequestTable.get(key);
56+
if (mpr == null || mpr.getPullRequestList() == null || mpr.getPullRequestList().isEmpty()) {
57+
pullRequestTable.remove(key);
58+
}
59+
}
60+
}
61+
}
62+
}

broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java

+4
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,8 @@ public synchronized List<PullRequest> cloneListAndClear() {
3939

4040
return null;
4141
}
42+
43+
public ArrayList<PullRequest> getPullRequestList() {
44+
return pullRequestList;
45+
}
4246
}

broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@
3131

3232
public class PullRequestHoldService extends ServiceThread {
3333
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
34-
private static final String TOPIC_QUEUEID_SEPARATOR = "@";
35-
private final BrokerController brokerController;
34+
protected static final String TOPIC_QUEUEID_SEPARATOR = "@";
35+
protected final BrokerController brokerController;
3636
private final SystemClock systemClock = new SystemClock();
37-
private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
37+
protected ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
3838
new ConcurrentHashMap<String, ManyPullRequest>(1024);
3939

4040
public PullRequestHoldService(final BrokerController brokerController) {
@@ -93,7 +93,7 @@ public String getServiceName() {
9393
return PullRequestHoldService.class.getSimpleName();
9494
}
9595

96-
private void checkHoldRequest() {
96+
protected void checkHoldRequest() {
9797
for (String key : this.pullRequestTable.keySet()) {
9898
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
9999
if (2 == kArray.length) {

broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,12 @@
3535

3636
public class ConsumerOffsetManager extends ConfigManager {
3737
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
38-
private static final String TOPIC_GROUP_SEPARATOR = "@";
38+
protected static final String TOPIC_GROUP_SEPARATOR = "@";
3939

40-
private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
40+
protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
4141
new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
4242

43-
private transient BrokerController brokerController;
43+
protected transient BrokerController brokerController;
4444

4545
public ConsumerOffsetManager() {
4646
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.broker.offset;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
import java.util.concurrent.ConcurrentHashMap;
22+
23+
import org.apache.rocketmq.broker.BrokerController;
24+
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
25+
import org.apache.rocketmq.common.MixAll;
26+
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
27+
28+
public class LmqConsumerOffsetManager extends ConsumerOffsetManager {
29+
private ConcurrentHashMap<String, Long> lmqOffsetTable = new ConcurrentHashMap<>(512);
30+
31+
public LmqConsumerOffsetManager(BrokerController brokerController) {
32+
super(brokerController);
33+
}
34+
35+
@Override
36+
public long queryOffset(final String group, final String topic, final int queueId) {
37+
if (!MixAll.isLmq(group)) {
38+
return super.queryOffset(group, topic, queueId);
39+
}
40+
// topic@group
41+
String key = topic + TOPIC_GROUP_SEPARATOR + group;
42+
Long offset = lmqOffsetTable.get(key);
43+
if (offset != null) {
44+
return offset;
45+
}
46+
return -1;
47+
}
48+
49+
@Override
50+
public Map<Integer, Long> queryOffset(final String group, final String topic) {
51+
if (!MixAll.isLmq(group)) {
52+
return super.queryOffset(group, topic);
53+
}
54+
Map<Integer, Long> map = new HashMap<>();
55+
// topic@group
56+
String key = topic + TOPIC_GROUP_SEPARATOR + group;
57+
Long offset = lmqOffsetTable.get(key);
58+
if (offset != null) {
59+
map.put(0, offset);
60+
}
61+
return map;
62+
}
63+
64+
@Override
65+
public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
66+
final long offset) {
67+
if (!MixAll.isLmq(group)) {
68+
super.commitOffset(clientHost, group, topic, queueId, offset);
69+
return;
70+
}
71+
// topic@group
72+
String key = topic + TOPIC_GROUP_SEPARATOR + group;
73+
lmqOffsetTable.put(key, offset);
74+
}
75+
76+
@Override
77+
public String encode() {
78+
return this.encode(false);
79+
}
80+
81+
@Override
82+
public String configFilePath() {
83+
return BrokerPathConfigHelper.getLmqConsumerOffsetPath(brokerController.getMessageStoreConfig().getStorePathRootDir());
84+
}
85+
86+
@Override
87+
public void decode(String jsonString) {
88+
if (jsonString != null) {
89+
LmqConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, LmqConsumerOffsetManager.class);
90+
if (obj != null) {
91+
super.offsetTable = obj.offsetTable;
92+
this.lmqOffsetTable = obj.lmqOffsetTable;
93+
}
94+
}
95+
}
96+
97+
@Override
98+
public String encode(final boolean prettyFormat) {
99+
return RemotingSerializable.toJson(this, prettyFormat);
100+
}
101+
102+
public ConcurrentHashMap<String, Long> getLmqOffsetTable() {
103+
return lmqOffsetTable;
104+
}
105+
106+
public void setLmqOffsetTable(ConcurrentHashMap<String, Long> lmqOffsetTable) {
107+
this.lmqOffsetTable = lmqOffsetTable;
108+
}
109+
}

0 commit comments

Comments
 (0)