47
47
import org .apache .rocketmq .broker .filtersrv .FilterServerManager ;
48
48
import org .apache .rocketmq .broker .latency .BrokerFastFailure ;
49
49
import org .apache .rocketmq .broker .latency .BrokerFixedThreadPoolExecutor ;
50
+ import org .apache .rocketmq .broker .longpolling .LmqPullRequestHoldService ;
50
51
import org .apache .rocketmq .broker .longpolling .NotifyMessageArrivingListener ;
51
52
import org .apache .rocketmq .broker .longpolling .PullRequestHoldService ;
52
53
import org .apache .rocketmq .broker .mqtrace .ConsumeMessageHook ;
53
54
import org .apache .rocketmq .broker .mqtrace .SendMessageHook ;
54
55
import org .apache .rocketmq .broker .offset .ConsumerOffsetManager ;
56
+ import org .apache .rocketmq .broker .offset .LmqConsumerOffsetManager ;
55
57
import org .apache .rocketmq .broker .out .BrokerOuterAPI ;
56
58
import org .apache .rocketmq .broker .plugin .MessageStoreFactory ;
57
59
import org .apache .rocketmq .broker .plugin .MessageStorePluginContext ;
64
66
import org .apache .rocketmq .broker .processor .ReplyMessageProcessor ;
65
67
import org .apache .rocketmq .broker .processor .SendMessageProcessor ;
66
68
import org .apache .rocketmq .broker .slave .SlaveSynchronize ;
69
+ import org .apache .rocketmq .broker .subscription .LmqSubscriptionGroupManager ;
67
70
import org .apache .rocketmq .broker .subscription .SubscriptionGroupManager ;
71
+ import org .apache .rocketmq .broker .topic .LmqTopicConfigManager ;
68
72
import org .apache .rocketmq .broker .topic .TopicConfigManager ;
69
73
import org .apache .rocketmq .broker .transaction .AbstractTransactionalMessageCheckListener ;
70
74
import org .apache .rocketmq .broker .transaction .TransactionalMessageCheckService ;
106
110
import org .apache .rocketmq .store .dledger .DLedgerCommitLog ;
107
111
import org .apache .rocketmq .store .stats .BrokerStats ;
108
112
import org .apache .rocketmq .store .stats .BrokerStatsManager ;
113
+ import org .apache .rocketmq .store .stats .LmqBrokerStatsManager ;
109
114
110
115
public class BrokerController {
111
116
private static final InternalLogger log = InternalLoggerFactory .getLogger (LoggerName .BROKER_LOGGER_NAME );
@@ -180,18 +185,18 @@ public BrokerController(
180
185
this .nettyServerConfig = nettyServerConfig ;
181
186
this .nettyClientConfig = nettyClientConfig ;
182
187
this .messageStoreConfig = messageStoreConfig ;
183
- this .consumerOffsetManager = new ConsumerOffsetManager (this );
184
- this .topicConfigManager = new TopicConfigManager (this );
188
+ this .consumerOffsetManager = messageStoreConfig . isEnableLmq () ? new LmqConsumerOffsetManager ( this ) : new ConsumerOffsetManager (this );
189
+ this .topicConfigManager = messageStoreConfig . isEnableLmq () ? new LmqTopicConfigManager ( this ) : new TopicConfigManager (this );
185
190
this .pullMessageProcessor = new PullMessageProcessor (this );
186
- this .pullRequestHoldService = new PullRequestHoldService (this );
191
+ this .pullRequestHoldService = messageStoreConfig . isEnableLmq () ? new LmqPullRequestHoldService ( this ) : new PullRequestHoldService (this );
187
192
this .messageArrivingListener = new NotifyMessageArrivingListener (this .pullRequestHoldService );
188
193
this .consumerIdsChangeListener = new DefaultConsumerIdsChangeListener (this );
189
194
this .consumerManager = new ConsumerManager (this .consumerIdsChangeListener );
190
195
this .consumerFilterManager = new ConsumerFilterManager (this );
191
196
this .producerManager = new ProducerManager ();
192
197
this .clientHousekeepingService = new ClientHousekeepingService (this );
193
198
this .broker2Client = new Broker2Client (this );
194
- this .subscriptionGroupManager = new SubscriptionGroupManager (this );
199
+ this .subscriptionGroupManager = messageStoreConfig . isEnableLmq () ? new LmqSubscriptionGroupManager ( this ) : new SubscriptionGroupManager (this );
195
200
this .brokerOuterAPI = new BrokerOuterAPI (nettyClientConfig );
196
201
this .filterServerManager = new FilterServerManager (this );
197
202
@@ -207,7 +212,8 @@ public BrokerController(
207
212
this .heartbeatThreadPoolQueue = new LinkedBlockingQueue <Runnable >(this .brokerConfig .getHeartbeatThreadPoolQueueCapacity ());
208
213
this .endTransactionThreadPoolQueue = new LinkedBlockingQueue <Runnable >(this .brokerConfig .getEndTransactionPoolQueueCapacity ());
209
214
210
- this .brokerStatsManager = new BrokerStatsManager (this .brokerConfig .getBrokerClusterName (), this .brokerConfig .isEnableDetailStat ());
215
+ this .brokerStatsManager = messageStoreConfig .isEnableLmq () ? new LmqBrokerStatsManager (this .brokerConfig .getBrokerClusterName (), this .brokerConfig .isEnableDetailStat ()) : new BrokerStatsManager (this .brokerConfig .getBrokerClusterName (), this .brokerConfig .isEnableDetailStat ());
216
+
211
217
this .setStoreHost (new InetSocketAddress (this .getBrokerConfig ().getBrokerIP1 (), this .getNettyServerConfig ().getListenPort ()));
212
218
213
219
this .brokerFastFailure = new BrokerFastFailure (this );
0 commit comments