Skip to content

Commit d2fd068

Browse files
authored
fix: registerProducer should not be affected by concurrent scanNotAct… (#8847)
* fix: registerProducer should not be affected by concurrent scanNotActiveChannel Signed-off-by: Li Zhanhui <[email protected]> * chore: fix code format and make CI pass Signed-off-by: Li Zhanhui <[email protected]> --------- Signed-off-by: Li Zhanhui <[email protected]>
1 parent c033c3e commit d2fd068

File tree

1 file changed

+20
-16
lines changed

1 file changed

+20
-16
lines changed

broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java

+20-16
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,15 @@ public ConcurrentMap<String, ConcurrentMap<Channel, ClientChannelInfo>> getGroup
7171
public ProducerTableInfo getProducerTable() {
7272
Map<String, List<ProducerInfo>> map = new HashMap<>();
7373
for (String group : this.groupChannelTable.keySet()) {
74-
for (Entry<Channel, ClientChannelInfo> entry: this.groupChannelTable.get(group).entrySet()) {
74+
for (Entry<Channel, ClientChannelInfo> entry : this.groupChannelTable.get(group).entrySet()) {
7575
ClientChannelInfo clientChannelInfo = entry.getValue();
7676
if (map.containsKey(group)) {
7777
map.get(group).add(new ProducerInfo(
78-
clientChannelInfo.getClientId(),
79-
clientChannelInfo.getChannel().remoteAddress().toString(),
80-
clientChannelInfo.getLanguage(),
81-
clientChannelInfo.getVersion(),
82-
clientChannelInfo.getLastUpdateTimestamp()
78+
clientChannelInfo.getClientId(),
79+
clientChannelInfo.getChannel().remoteAddress().toString(),
80+
clientChannelInfo.getLanguage(),
81+
clientChannelInfo.getVersion(),
82+
clientChannelInfo.getLastUpdateTimestamp()
8383
));
8484
} else {
8585
map.put(group, new ArrayList<>(Collections.singleton(new ProducerInfo(
@@ -118,8 +118,8 @@ public void scanNotActiveChannel() {
118118
clientChannelTable.remove(info.getClientId());
119119
}
120120
log.warn(
121-
"ProducerManager#scanNotActiveChannel: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
122-
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
121+
"ProducerManager#scanNotActiveChannel: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
122+
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
123123
callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group, info);
124124
RemotingHelper.closeChannel(info.getChannel());
125125
}
@@ -144,8 +144,8 @@ public boolean doChannelCloseEvent(final String remoteAddr, final Channel channe
144144
clientChannelTable.remove(clientChannelInfo.getClientId());
145145
removed = true;
146146
log.info(
147-
"NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
148-
clientChannelInfo.toString(), remoteAddr, group);
147+
"NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
148+
clientChannelInfo.toString(), remoteAddr, group);
149149
callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group, clientChannelInfo);
150150
if (clientChannelInfoTable.isEmpty()) {
151151
ConcurrentMap<Channel, ClientChannelInfo> oldGroupTable = this.groupChannelTable.remove(group);
@@ -167,21 +167,26 @@ public void registerProducer(final String group, final ClientChannelInfo clientC
167167
ConcurrentMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
168168
if (null == channelTable) {
169169
channelTable = new ConcurrentHashMap<>();
170+
// Make sure channelTable will NOT be cleaned by #scanNotActiveChannel
171+
channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
170172
ConcurrentMap<Channel, ClientChannelInfo> prev = this.groupChannelTable.putIfAbsent(group, channelTable);
171-
if (null != prev) {
173+
if (null == prev) {
174+
// Add client-id to channel mapping for new producer group
175+
clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
176+
} else {
172177
channelTable = prev;
173178
}
174179
}
175180

176181
clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
182+
// Add client-channel info to existing producer group
177183
if (null == clientChannelInfoFound) {
178184
channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
179185
clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
180-
log.info("new producer connected, group: {} channel: {}", group,
181-
clientChannelInfo.toString());
186+
log.info("new producer connected, group: {} channel: {}", group, clientChannelInfo.toString());
182187
}
183188

184-
189+
// Refresh existing client-channel-info update-timestamp
185190
if (clientChannelInfoFound != null) {
186191
clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
187192
}
@@ -193,8 +198,7 @@ public void unregisterProducer(final String group, final ClientChannelInfo clien
193198
ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
194199
clientChannelTable.remove(clientChannelInfo.getClientId());
195200
if (old != null) {
196-
log.info("unregister a producer[{}] from groupChannelTable {}", group,
197-
clientChannelInfo.toString());
201+
log.info("unregister a producer[{}] from groupChannelTable {}", group, clientChannelInfo.toString());
198202
callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group, clientChannelInfo);
199203
}
200204

0 commit comments

Comments
 (0)