Skip to content

Commit 9202de3

Browse files
authored
[ISSUE #8933] feat: DefaultPullConsumer add balance switch (#8934)
1 parent 23cc24c commit 9202de3

File tree

2 files changed

+17
-0
lines changed

2 files changed

+17
-0
lines changed

client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java

+10
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
8888

8989
private int maxReconsumeTimes = 16;
9090

91+
private boolean enableRebalance = true;
92+
9193
public DefaultMQPullConsumer() {
9294
this(MixAll.DEFAULT_CONSUMER_GROUP, null);
9395
}
@@ -468,4 +470,12 @@ public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
468470
public void persist(MessageQueue mq) {
469471
this.getOffsetStore().persist(queueWithNamespace(mq));
470472
}
473+
474+
public boolean isEnableRebalance() {
475+
return enableRebalance;
476+
}
477+
478+
public void setEnableRebalance(boolean enableRebalance) {
479+
this.enableRebalance = enableRebalance;
480+
}
471481
}

client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java

+7
Original file line numberDiff line numberDiff line change
@@ -381,13 +381,20 @@ public Set<SubscriptionData> subscriptions() {
381381

382382
@Override
383383
public void doRebalance() {
384+
if (!defaultMQPullConsumer.isEnableRebalance()) {
385+
return;
386+
}
384387
if (this.rebalanceImpl != null) {
385388
this.rebalanceImpl.doRebalance(false);
386389
}
387390
}
388391

389392
@Override
390393
public boolean tryRebalance() {
394+
if (!defaultMQPullConsumer.isEnableRebalance()) {
395+
return true;
396+
}
397+
391398
if (this.rebalanceImpl != null) {
392399
return this.rebalanceImpl.doRebalance(false);
393400
}

0 commit comments

Comments
 (0)