|
26 | 26 | import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
|
27 | 27 | import org.apache.rocketmq.client.consumer.MessageSelector;
|
28 | 28 | import org.apache.rocketmq.common.Pair;
|
| 29 | +import org.apache.rocketmq.common.filter.ExpressionType; |
29 | 30 | import org.apache.rocketmq.common.message.Message;
|
30 | 31 | import org.apache.rocketmq.common.message.MessageQueue;
|
31 | 32 | import org.apache.rocketmq.logging.org.slf4j.Logger;
|
@@ -171,6 +172,13 @@ public void testEstimateLag() throws Exception {
|
171 | 172 | RMQSqlConsumer sqlConsumer = ConsumerFactory.getRMQSqlConsumer(NAMESRV_ADDR, initConsumerGroup(), topic, selector, sqlListener);
|
172 | 173 | RMQBlockListener tagListener = new RMQBlockListener(true);
|
173 | 174 | RMQNormalConsumer tagConsumer = getConsumer(NAMESRV_ADDR, topic, tag, tagListener);
|
| 175 | + |
| 176 | + //init subscriptionData & consumerFilterData for sql |
| 177 | + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, sql, ExpressionType.SQL92); |
| 178 | + for (BrokerController controller : brokerControllerList) { |
| 179 | + controller.getConsumerFilterManager().register(topic, sqlConsumer.getConsumerGroup(), sql, ExpressionType.SQL92, subscriptionData.getSubVersion()); |
| 180 | + } |
| 181 | + |
174 | 182 | // wait for building filter data
|
175 | 183 | await().atMost(5, TimeUnit.SECONDS).until(() -> sqlListener.isBlocked() && tagListener.isBlocked());
|
176 | 184 |
|
@@ -210,7 +218,6 @@ public void testEstimateLag() throws Exception {
|
210 | 218 | for (MessageQueue mq : mqs) {
|
211 | 219 | if (mq.getBrokerName().equals(controller.getBrokerConfig().getBrokerName())) {
|
212 | 220 | long brokerOffset = controller.getMessageStore().getMaxOffsetInQueue(topic, mq.getQueueId());
|
213 |
| - SubscriptionData subscriptionData = controller.getConsumerManager().findSubscriptionData(sqlConsumer.getConsumerGroup(), topic); |
214 | 221 | ConsumerFilterData consumerFilterData = controller.getConsumerFilterManager().get(topic, sqlConsumer.getConsumerGroup());
|
215 | 222 | long estimateMessageCount = controller.getMessageStore()
|
216 | 223 | .estimateMessageCount(topic, mq.getQueueId(), 0, brokerOffset,
|
|
0 commit comments