Skip to content

Commit 10d2918

Browse files
authored
[ISSUE #4832] Remove innerProducer and innerConsumer in EscapeBridge (#4834)
* remove innerProducer and innerConsumer in Escape #4832 * format code to pass code-style check * Move topic route management logic to TopicRouteInfoManager * Use ThreadPoolExecutor instead of creating a new ForkJoinPool * remove retry logic * remove unnecessary name server list * remove unnecessary invokeId * remove AssignmentManager and make maintenance of topic subscribe data lazy-initialized * simply code * remove unnecessary code
1 parent cf3c1ef commit 10d2918

File tree

10 files changed

+597
-291
lines changed

10 files changed

+597
-291
lines changed

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

+16-29
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
import org.apache.rocketmq.broker.controller.ReplicasManager;
6060
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
6161
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
62-
import org.apache.rocketmq.broker.loadbalance.AssignmentManager;
6362
import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService;
6463
import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
6564
import org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
@@ -95,6 +94,7 @@
9594
import org.apache.rocketmq.broker.topic.TopicConfigManager;
9695
import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService;
9796
import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
97+
import org.apache.rocketmq.broker.topic.TopicRouteInfoManager;
9898
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
9999
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
100100
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
@@ -171,7 +171,6 @@ public class BrokerController {
171171
protected final ConsumerOrderInfoManager consumerOrderInfoManager;
172172
protected final ProducerManager producerManager;
173173
protected final ScheduleMessageService scheduleMessageService;
174-
protected final AssignmentManager assignmentManager;
175174
protected final ClientHousekeepingService clientHousekeepingService;
176175
protected final PullMessageProcessor pullMessageProcessor;
177176
protected final PeekMessageProcessor peekMessageProcessor;
@@ -191,6 +190,7 @@ public class BrokerController {
191190
protected final ConsumerIdsChangeListener consumerIdsChangeListener;
192191
protected final EndTransactionProcessor endTransactionProcessor;
193192
private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();
193+
private final TopicRouteInfoManager topicRouteInfoManager;
194194
protected BrokerOuterAPI brokerOuterAPI;
195195
protected ScheduledExecutorService scheduledExecutorService;
196196
protected ScheduledExecutorService syncBrokerMemberGroupExecutorService;
@@ -322,7 +322,6 @@ public BrokerController(
322322

323323
this.filterServerManager = new FilterServerManager(this);
324324

325-
this.assignmentManager = new AssignmentManager(this);
326325
this.queryAssignmentProcessor = new QueryAssignmentProcessor(this);
327326
this.clientManageProcessor = new ClientManageProcessor(this);
328327
this.slaveSynchronize = new SlaveSynchronize(this);
@@ -386,6 +385,8 @@ public boolean online(String instanceId, String group, String topic) {
386385

387386
this.escapeBridge = new EscapeBridge(this);
388387

388+
this.topicRouteInfoManager = new TopicRouteInfoManager(this);
389+
389390
if (this.brokerConfig.isEnableSlaveActingMaster() && !this.brokerConfig.isSkipPreOnline()) {
390391
this.brokerPreOnlineService = new BrokerPreOnlineService(this);
391392
}
@@ -1238,10 +1239,6 @@ protected void shutdownBasicService() {
12381239
this.ackMessageProcessor.shutdownPopReviveService();
12391240
}
12401241

1241-
if (this.assignmentManager != null) {
1242-
this.assignmentManager.shutdown();
1243-
}
1244-
12451242
if (this.notificationProcessor != null) {
12461243
this.notificationProcessor.shutdown();
12471244
}
@@ -1353,6 +1350,10 @@ protected void shutdownBasicService() {
13531350
escapeBridge.shutdown();
13541351
}
13551352

1353+
if (this.topicRouteInfoManager != null) {
1354+
this.topicRouteInfoManager.shutdown();
1355+
}
1356+
13561357
if (this.brokerPreOnlineService != null && !this.brokerPreOnlineService.isStopped()) {
13571358
this.brokerPreOnlineService.shutdown();
13581359
}
@@ -1448,10 +1449,6 @@ protected void startBasicService() throws Exception {
14481449
this.ackMessageProcessor.startPopReviveService();
14491450
}
14501451

1451-
if (this.assignmentManager != null) {
1452-
this.assignmentManager.start();
1453-
}
1454-
14551452
if (this.topicQueueMappingCleanService != null) {
14561453
this.topicQueueMappingCleanService.start();
14571454
}
@@ -1484,6 +1481,10 @@ protected void startBasicService() throws Exception {
14841481
this.escapeBridge.start();
14851482
}
14861483

1484+
if (this.topicRouteInfoManager != null) {
1485+
this.topicRouteInfoManager.start();
1486+
}
1487+
14871488
if (this.brokerPreOnlineService != null) {
14881489
this.brokerPreOnlineService.start();
14891490
}
@@ -1779,16 +1780,6 @@ private boolean needRegister(final String clusterName,
17791780
return needRegister;
17801781
}
17811782

1782-
public String getNameServerList() {
1783-
if (this.brokerConfig.getNamesrvAddr() != null) {
1784-
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
1785-
return this.brokerConfig.getNamesrvAddr();
1786-
} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
1787-
return this.brokerOuterAPI.fetchNameServerAddr();
1788-
}
1789-
return null;
1790-
}
1791-
17921783
public void startService(long minBrokerId, String minBrokerAddr) {
17931784
BrokerController.LOG.info("{} start service, min broker id is {}, min broker addr: {}",
17941785
this.brokerConfig.getCanonicalName(), minBrokerId, minBrokerAddr);
@@ -2153,14 +2144,6 @@ public ExecutorService getSendMessageExecutor() {
21532144
return sendMessageExecutor;
21542145
}
21552146

2156-
public AssignmentManager getAssignmentManager() {
2157-
return assignmentManager;
2158-
}
2159-
2160-
public ClientManageProcessor getClientManageProcessor() {
2161-
return clientManageProcessor;
2162-
}
2163-
21642147
public SendMessageProcessor getSendMessageProcessor() {
21652148
return sendMessageProcessor;
21662149
}
@@ -2253,4 +2236,8 @@ public TimerCheckpoint getTimerCheckpoint() {
22532236
return timerCheckpoint;
22542237
}
22552238

2239+
public TopicRouteInfoManager getTopicRouteInfoManager() {
2240+
return this.topicRouteInfoManager;
2241+
}
2242+
22562243
}

0 commit comments

Comments
 (0)