Skip to content

Commit 0176ef4

Browse files
[ISSUE #9149]Assign offset in offsetTable even if the subscription key not exist. (#9150)
* Assign offset in offsetTable even if the subscription key not exist.
1 parent 8086fc5 commit 0176ef4

File tree

1 file changed

+4
-16
lines changed

1 file changed

+4
-16
lines changed

broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java

+4-16
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.rocketmq.broker.offset;
1818

19+
import com.google.common.collect.Maps;
1920
import java.util.HashMap;
2021
import java.util.HashSet;
2122
import java.util.Iterator;
@@ -417,27 +418,14 @@ public void assignResetOffset(String topic, String group, int queueId, long offs
417418
}
418419

419420
String key = topic + TOPIC_GROUP_SEPARATOR + group;
420-
ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);
421-
if (null == map) {
422-
map = new ConcurrentHashMap<Integer, Long>();
423-
ConcurrentMap<Integer, Long> previous = resetOffsetTable.putIfAbsent(key, map);
424-
if (null != previous) {
425-
map = previous;
426-
}
427-
}
428-
429-
map.put(queueId, offset);
430-
LOG.debug("Reset offset OK. Topic={}, group={}, queueId={}, resetOffset={}",
431-
topic, group, queueId, offset);
421+
resetOffsetTable.computeIfAbsent(key, k -> Maps.newConcurrentMap()).put(queueId, offset);
422+
LOG.debug("Reset offset OK. Topic={}, group={}, queueId={}, resetOffset={}", topic, group, queueId, offset);
432423

433424
// Two things are important here:
434425
// 1, currentOffsetMap might be null if there is no previous records;
435426
// 2, Our overriding here may get overridden by the client instantly in concurrent cases; But it still makes
436427
// sense in cases like clients are offline.
437-
ConcurrentMap<Integer, Long> currentOffsetMap = offsetTable.get(key);
438-
if (null != currentOffsetMap) {
439-
currentOffsetMap.put(queueId, offset);
440-
}
428+
offsetTable.computeIfAbsent(key, k -> Maps.newConcurrentMap()).put(queueId, offset);
441429
}
442430

443431
public boolean hasOffsetReset(String topic, String group, int queueId) {

0 commit comments

Comments
 (0)