Skip to content

Commit 5233f12

Browse files
authored
rollback spelling fix to avoid incompatile (#3749)
1 parent 0b612ff commit 5233f12

18 files changed

+35
-35
lines changed

client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ public boolean sendMessageBack(final MessageExt msg) {
378378
MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
379379
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
380380

381-
this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getMqClientFactory().getDefaultMQProducer().send(newMsg);
381+
this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
382382
return true;
383383
} catch (Exception e) {
384384
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);

client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ private void initRebalanceImpl() {
329329
this.rebalanceImpl.setConsumerGroup(this.defaultLitePullConsumer.getConsumerGroup());
330330
this.rebalanceImpl.setMessageModel(this.defaultLitePullConsumer.getMessageModel());
331331
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultLitePullConsumer.getAllocateMessageQueueStrategy());
332-
this.rebalanceImpl.setMqClientFactory(this.mQClientFactory);
332+
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
333333
}
334334

335335
private void initPullAPIWrapper() {

client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,7 @@ public synchronized void start() throws MQClientException {
637637
this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
638638
this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
639639
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
640-
this.rebalanceImpl.setMqClientFactory(this.mQClientFactory);
640+
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
641641

642642
this.pullAPIWrapper = new PullAPIWrapper(
643643
mQClientFactory,

client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -591,7 +591,7 @@ public synchronized void start() throws MQClientException {
591591
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
592592
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
593593
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
594-
this.rebalanceImpl.setMqClientFactory(this.mQClientFactory);
594+
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
595595

596596
this.pullAPIWrapper = new PullAPIWrapper(
597597
mQClientFactory,
@@ -1089,11 +1089,11 @@ public ConsumerRunningInfo consumerRunningInfo() {
10891089
return info;
10901090
}
10911091

1092-
public MQClientInstance getMqClientFactory() {
1092+
public MQClientInstance getmQClientFactory() {
10931093
return mQClientFactory;
10941094
}
10951095

1096-
public void setMqClientFactory(MQClientInstance mQClientFactory) {
1096+
public void setmQClientFactory(MQClientInstance mQClientFactory) {
10971097
this.mQClientFactory = mQClientFactory;
10981098
}
10991099

client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -472,11 +472,11 @@ public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocat
472472
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
473473
}
474474

475-
public MQClientInstance getMqClientFactory() {
475+
public MQClientInstance getmQClientFactory() {
476476
return mQClientFactory;
477477
}
478478

479-
public void setMqClientFactory(MQClientInstance mQClientFactory) {
479+
public void setmQClientFactory(MQClientInstance mQClientFactory) {
480480
this.mQClientFactory = mQClientFactory;
481481
}
482482

client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<Messa
7878
}
7979

8080
// notify broker
81-
this.getMqClientFactory().sendHeartbeatToAllBrokerWithLock();
81+
this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
8282
}
8383

8484
@Override
@@ -114,7 +114,7 @@ private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {
114114

115115
if (pq.hasTempMessage()) {
116116
log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);
117-
this.defaultMQPushConsumerImpl.getMqClientFactory().getScheduledExecutorService().schedule(new Runnable() {
117+
this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() {
118118
@Override
119119
public void run() {
120120
log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq);

client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,7 @@ public void updateFaultItem(final String brokerName, final long currentLatency,
531531
}
532532

533533
private void validateNameServerSetting() throws MQClientException {
534-
List<String> nsList = this.getMqClientFactory().getMQClientAPIImpl().getNameServerAddressList();
534+
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
535535
if (null == nsList || nsList.isEmpty()) {
536536
throw new MQClientException(
537537
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
@@ -871,7 +871,7 @@ private SendResult sendKernelImpl(final Message msg,
871871
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
872872
}
873873

874-
public MQClientInstance getMqClientFactory() {
874+
public MQClientInstance getmQClientFactory() {
875875
return mQClientFactory;
876876
}
877877

@@ -1543,16 +1543,16 @@ private void requestFail(final String correlationId) {
15431543

15441544
private void prepareSendRequest(final Message msg, long timeout) {
15451545
String correlationId = CorrelationIdUtil.createCorrelationId();
1546-
String requestClientId = this.getMqClientFactory().getClientId();
1546+
String requestClientId = this.getmQClientFactory().getClientId();
15471547
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CORRELATION_ID, correlationId);
15481548
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, requestClientId);
15491549
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_TTL, String.valueOf(timeout));
15501550

1551-
boolean hasRouteData = this.getMqClientFactory().getTopicRouteTable().containsKey(msg.getTopic());
1551+
boolean hasRouteData = this.getmQClientFactory().getTopicRouteTable().containsKey(msg.getTopic());
15521552
if (!hasRouteData) {
15531553
long beginTimestamp = System.currentTimeMillis();
15541554
this.tryToFindTopicPublishInfo(msg.getTopic());
1555-
this.getMqClientFactory().sendHeartbeatToAllBrokerWithLock();
1555+
this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
15561556
long cost = System.currentTimeMillis() - beginTimestamp;
15571557
if (cost > 500) {
15581558
log.warn("prepare send request for <{}> cost {} ms", msg.getTopic(), cost);

client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ private Set<String> tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer,
413413
TopicPublishInfo topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);
414414
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
415415
producer.getTopicPublishInfoTable().putIfAbsent(topic, new TopicPublishInfo());
416-
producer.getMqClientFactory().updateTopicRouteInfoFromNameServer(topic);
416+
producer.getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
417417
topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);
418418
}
419419
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {

client/src/main/java/org/apache/rocketmq/client/trace/hook/EndTransactionTraceHookImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public void endTransaction(EndTransactionContext context) {
6363
traceBean.setKeys(context.getMessage().getKeys());
6464
traceBean.setStoreHost(context.getBrokerAddr());
6565
traceBean.setMsgType(MessageType.Trans_msg_Commit);
66-
traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostProducer().getMqClientFactory().getClientId());
66+
traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostProducer().getmQClientFactory().getClientId());
6767
traceBean.setMsgId(context.getMsgId());
6868
traceBean.setTransactionState(context.getTransactionState());
6969
traceBean.setTransactionId(context.getTransactionId());

client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
102102
pushConsumer.subscribe(topic, "*");
103103
pushConsumer.start();
104104

105-
mQClientFactory = spy(pushConsumerImpl.getMqClientFactory());
105+
mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
106106
field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
107107
field.setAccessible(true);
108108
field.set(pushConsumerImpl, mQClientFactory);
@@ -116,7 +116,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
116116
field.setAccessible(true);
117117
field.set(pushConsumerImpl, pullAPIWrapper);
118118

119-
pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setMqClientFactory(mQClientFactory);
119+
pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
120120
mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
121121

122122
when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),

client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public void init() throws Exception {
102102
field.setAccessible(true);
103103
field.set(mQClientFactory, mQClientAPIImpl);
104104

105-
producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
105+
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
106106

107107
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
108108
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
@@ -358,7 +358,7 @@ public void testSetCallbackExecutor() throws MQClientException {
358358
producer.setCallbackExecutor(customized);
359359

360360
NettyRemotingClient remotingClient = (NettyRemotingClient) producer.getDefaultMQProducerImpl()
361-
.getMqClientFactory().getMQClientAPIImpl().getRemotingClient();
361+
.getmQClientFactory().getMQClientAPIImpl().getRemotingClient();
362362

363363
assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized);
364364
}

client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,8 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
154154

155155
pushConsumer.start();
156156

157-
mQClientFactory = spy(pushConsumerImpl.getMqClientFactory());
158-
mQClientTraceFactory = spy(pushConsumerImpl.getMqClientFactory());
157+
mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
158+
mQClientTraceFactory = spy(pushConsumerImpl.getmQClientFactory());
159159

160160
field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
161161
field.setAccessible(true);
@@ -178,7 +178,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
178178
field.setAccessible(true);
179179
field.set(pushConsumerImpl, pullAPIWrapper);
180180

181-
pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setMqClientFactory(mQClientFactory);
181+
pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
182182
mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
183183

184184
when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
@@ -214,7 +214,7 @@ public void terminate() {
214214

215215
@Test
216216
public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
217-
traceProducer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
217+
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
218218

219219
final CountDownLatch countDownLatch = new CountDownLatch(1);
220220
final AtomicReference<MessageExt> messageAtomic = new AtomicReference<>();

client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsume
218218
field.setAccessible(true);
219219
field.set(litePullConsumerImpl, offsetStore);
220220

221-
traceProducer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
221+
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
222222

223223
when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
224224
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))

client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public void init() throws Exception {
100100
field.setAccessible(true);
101101
field.set(mQClientFactory, mQClientAPIImpl);
102102

103-
producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
103+
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
104104

105105
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
106106
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
@@ -112,7 +112,7 @@ public void init() throws Exception {
112112

113113
@Test
114114
public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
115-
producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTraceTemp, producer.getDefaultMQProducerImpl());
115+
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, producer.getDefaultMQProducerImpl());
116116
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
117117
producer.send(message);
118118
assertThat(tracer.finishedSpans().size()).isEqualTo(1);

client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public void init() throws Exception {
109109
field.setAccessible(true);
110110
field.set(mQClientFactory, mQClientAPIImpl);
111111

112-
producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
112+
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
113113

114114
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
115115
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
@@ -121,7 +121,7 @@ public void init() throws Exception {
121121

122122
@Test
123123
public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
124-
traceProducer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
124+
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
125125
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
126126
final CountDownLatch countDownLatch = new CountDownLatch(1);
127127
try {

client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) {
118118
field.setAccessible(true);
119119
field.set(mQClientFactory, mQClientAPIImpl);
120120

121-
producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
121+
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
122122

123123
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
124124
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
@@ -130,7 +130,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) {
130130

131131
@Test
132132
public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
133-
producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTraceTemp, producer.getDefaultMQProducerImpl());
133+
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, producer.getDefaultMQProducerImpl());
134134
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
135135
producer.sendMessageInTransaction(message, null);
136136

client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) {
123123
field.setAccessible(true);
124124
field.set(mQClientFactory, mQClientAPIImpl);
125125

126-
producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
126+
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
127127

128128
Field fieldHooks = DefaultMQProducerImpl.class.getDeclaredField("endTransactionHookList");
129129
fieldHooks.setAccessible(true);
@@ -141,7 +141,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) {
141141

142142
@Test
143143
public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
144-
traceProducer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
144+
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
145145
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
146146
AtomicReference<EndTransactionContext> context = new AtomicReference<>();
147147
doAnswer(mock -> {

example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public long consumeFromOffset(MessageQueue messageQueue) throws MQClientExceptio
136136
}
137137

138138
public void incPullTPS(String topic, int pullSize) {
139-
consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getMqClientFactory()
139+
consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory()
140140
.getConsumerStatsManager().incPullTPS(consumer.getConsumerGroup(), topic, pullSize);
141141
}
142142
});

0 commit comments

Comments
 (0)