27
27
import java .util .Map ;
28
28
import org .apache .rocketmq .common .BrokerConfig ;
29
29
import org .apache .rocketmq .common .UtilAll ;
30
+ import org .apache .rocketmq .common .message .MessageConst ;
30
31
import org .apache .rocketmq .common .message .MessageDecoder ;
31
32
import org .apache .rocketmq .store .config .MessageStoreConfig ;
32
33
import org .apache .rocketmq .store .stats .BrokerStatsManager ;
@@ -146,6 +147,34 @@ public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
146
147
return master ;
147
148
}
148
149
150
+ protected DefaultMessageStore genForMultiQueue () throws Exception {
151
+ MessageStoreConfig messageStoreConfig = buildStoreConfig (
152
+ commitLogFileSize , cqFileSize , true , cqExtFileSize
153
+ );
154
+
155
+ messageStoreConfig .setEnableLmq (true );
156
+ messageStoreConfig .setEnableMultiDispatch (true );
157
+
158
+ BrokerConfig brokerConfig = new BrokerConfig ();
159
+
160
+ DefaultMessageStore master = new DefaultMessageStore (
161
+ messageStoreConfig ,
162
+ new BrokerStatsManager (brokerConfig .getBrokerClusterName (), brokerConfig .isEnableDetailStat ()),
163
+ new MessageArrivingListener () {
164
+ @ Override
165
+ public void arriving (String topic , int queueId , long logicOffset , long tagsCode ,
166
+ long msgStoreTime , byte [] filterBitMap , Map <String , String > properties ) {
167
+ }
168
+ }
169
+ , brokerConfig );
170
+
171
+ assertThat (master .load ()).isTrue ();
172
+
173
+ master .start ();
174
+
175
+ return master ;
176
+ }
177
+
149
178
protected void putMsg (DefaultMessageStore master ) throws Exception {
150
179
long totalMsgs = 200 ;
151
180
@@ -158,6 +187,33 @@ protected void putMsg(DefaultMessageStore master) throws Exception {
158
187
}
159
188
}
160
189
190
+ protected void putMsgMultiQueue (DefaultMessageStore master ) throws Exception {
191
+ for (long i = 0 ; i < 1 ; i ++) {
192
+ master .putMessage (buildMessageMultiQueue ());
193
+ }
194
+ }
195
+
196
+ private MessageExtBrokerInner buildMessageMultiQueue () {
197
+ MessageExtBrokerInner msg = new MessageExtBrokerInner ();
198
+ msg .setTopic (topic );
199
+ msg .setTags ("TAG1" );
200
+ msg .setKeys ("Hello" );
201
+ msg .setBody (msgBody );
202
+ msg .setKeys (String .valueOf (System .currentTimeMillis ()));
203
+ msg .setQueueId (queueId );
204
+ msg .setSysFlag (0 );
205
+ msg .setBornTimestamp (System .currentTimeMillis ());
206
+ msg .setStoreHost (StoreHost );
207
+ msg .setBornHost (BornHost );
208
+ for (int i = 0 ; i < 1 ; i ++) {
209
+ msg .putUserProperty (MessageConst .PROPERTY_INNER_MULTI_DISPATCH , "%LMQ%123,%LMQ%456" );
210
+ msg .putUserProperty (String .valueOf (i ), "imagoodperson" + i );
211
+ }
212
+ msg .setPropertiesString (MessageDecoder .messageProperties2String (msg .getProperties ()));
213
+
214
+ return msg ;
215
+ }
216
+
161
217
protected void deleteDirectory (String rootPath ) {
162
218
File file = new File (rootPath );
163
219
deleteFile (file );
@@ -217,6 +273,41 @@ public void testPutMessagePositionInfo_buildCQRepeatedly() throws Exception {
217
273
218
274
}
219
275
276
+ @ Test
277
+ public void testPutMessagePositionInfoMultiQueue () throws Exception {
278
+ DefaultMessageStore messageStore = null ;
279
+ try {
280
+
281
+ messageStore = genForMultiQueue ();
282
+
283
+ int totalMessages = 10 ;
284
+
285
+ for (int i = 0 ; i < totalMessages ; i ++) {
286
+ putMsgMultiQueue (messageStore );
287
+ }
288
+ Thread .sleep (5 );
289
+
290
+ ConsumeQueue cq = messageStore .getConsumeQueueTable ().get (topic ).get (queueId );
291
+
292
+ ConsumeQueue lmqCq1 = messageStore .getConsumeQueueTable ().get ("%LMQ%123" ).get (0 );
293
+
294
+ ConsumeQueue lmqCq2 = messageStore .getConsumeQueueTable ().get ("%LMQ%456" ).get (0 );
295
+
296
+ assertThat (cq ).isNotNull ();
297
+
298
+ assertThat (lmqCq1 ).isNotNull ();
299
+
300
+ assertThat (lmqCq2 ).isNotNull ();
301
+
302
+ } finally {
303
+ if (messageStore != null ) {
304
+ messageStore .shutdown ();
305
+ messageStore .destroy ();
306
+ }
307
+ deleteDirectory (storePath );
308
+ }
309
+ }
310
+
220
311
@ Test
221
312
public void testConsumeQueueWithExtendData () {
222
313
DefaultMessageStore master = null ;
0 commit comments