Skip to content

Commit 63130f5

Browse files
[ISSUE #7545] [RIP-65] Support efficient random index for massive messages (#7546)
Support efficient random index for massive messages Co-authored-by: bareheadtom <[email protected]>
1 parent 8e7e2b5 commit 63130f5

24 files changed

+2019
-697
lines changed

style/spotbugs-suppressions.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
<Bug pattern="RV_ABSOLUTE_VALUE_OF_HASHCODE"/>
3232
</Match>
3333
<Match>
34-
<Class name="org.apache.rocketmq.tieredstore.file.TieredIndexFile"/>
34+
<Class name="org.apache.rocketmq.tieredstore.index.TieredIndexFile"/>
3535
<Method name="indexKeyHashMethod" />
3636
<Bug pattern="RV_ABSOLUTE_VALUE_OF_HASHCODE"/>
3737
</Match>

tieredstore/pom.xml

+14
Original file line numberDiff line numberDiff line change
@@ -53,5 +53,19 @@
5353
<artifactId>commons-io</artifactId>
5454
<scope>test</scope>
5555
</dependency>
56+
57+
<dependency>
58+
<groupId>org.openjdk.jmh</groupId>
59+
<artifactId>jmh-core</artifactId>
60+
<version>1.36</version>
61+
<scope>provided</scope>
62+
</dependency>
63+
64+
<dependency>
65+
<groupId>org.openjdk.jmh</groupId>
66+
<artifactId>jmh-generator-annprocess</artifactId>
67+
<version>1.36</version>
68+
<scope>provided</scope>
69+
</dependency>
5670
</dependencies>
5771
</project>

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java

+35-68
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.TimeUnit;
3232
import javax.annotation.Nullable;
3333
import org.apache.commons.lang3.tuple.Pair;
34+
import org.apache.rocketmq.common.BoundaryType;
3435
import org.apache.rocketmq.common.message.MessageQueue;
3536
import org.apache.rocketmq.logging.org.slf4j.Logger;
3637
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -50,15 +51,15 @@
5051
import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
5152
import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;
5253
import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
53-
import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
54+
import org.apache.rocketmq.tieredstore.index.IndexItem;
55+
import org.apache.rocketmq.tieredstore.index.IndexService;
5456
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
5557
import org.apache.rocketmq.tieredstore.metadata.TopicMetadata;
5658
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant;
5759
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
5860
import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil;
5961
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
6062
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
61-
import org.apache.rocketmq.common.BoundaryType;
6263

6364
public class TieredMessageFetcher implements MessageStoreFetcher {
6465

@@ -555,85 +556,51 @@ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, Bo
555556
public CompletableFuture<QueryMessageResult> queryMessageAsync(
556557
String topic, String key, int maxCount, long begin, long end) {
557558

558-
TieredIndexFile indexFile = TieredFlatFileManager.getIndexFile(storeConfig);
559+
IndexService indexStoreService = TieredFlatFileManager.getTieredIndexService(storeConfig);
559560

560-
int hashCode = TieredIndexFile.indexKeyHashMethod(TieredIndexFile.buildKey(topic, key));
561561
long topicId;
562562
try {
563563
TopicMetadata topicMetadata = metadataStore.getTopic(topic);
564564
if (topicMetadata == null) {
565-
LOGGER.info("TieredMessageFetcher#queryMessageAsync, topic metadata not found, topic: {}", topic);
565+
LOGGER.info("MessageFetcher#queryMessageAsync, topic metadata not found, topic: {}", topic);
566566
return CompletableFuture.completedFuture(new QueryMessageResult());
567567
}
568568
topicId = topicMetadata.getTopicId();
569569
} catch (Exception e) {
570-
LOGGER.error("TieredMessageFetcher#queryMessageAsync, get topic id failed, topic: {}", topic, e);
570+
LOGGER.error("MessageFetcher#queryMessageAsync, get topic id failed, topic: {}", topic, e);
571571
return CompletableFuture.completedFuture(new QueryMessageResult());
572572
}
573573

574-
return indexFile.queryAsync(topic, key, begin, end)
575-
.thenCompose(indexBufferList -> {
576-
QueryMessageResult result = new QueryMessageResult();
577-
int resultCount = 0;
578-
List<CompletableFuture<Void>> futureList = new ArrayList<>(maxCount);
579-
for (Pair<Long, ByteBuffer> pair : indexBufferList) {
580-
Long fileBeginTimestamp = pair.getKey();
581-
ByteBuffer indexBuffer = pair.getValue();
582-
583-
if (indexBuffer.remaining() % TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE != 0) {
584-
LOGGER.error("[Bug] TieredMessageFetcher#queryMessageAsync: " +
585-
"index buffer size {} is not multiple of index item size {}",
586-
indexBuffer.remaining(), TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE);
587-
continue;
588-
}
589-
590-
for (int indexOffset = indexBuffer.position();
591-
indexOffset < indexBuffer.limit();
592-
indexOffset += TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE) {
593-
594-
int indexItemHashCode = indexBuffer.getInt(indexOffset);
595-
if (indexItemHashCode != hashCode) {
596-
continue;
597-
}
598-
599-
int indexItemTopicId = indexBuffer.getInt(indexOffset + 4);
600-
if (indexItemTopicId != topicId) {
601-
continue;
602-
}
603-
604-
int queueId = indexBuffer.getInt(indexOffset + 4 + 4);
605-
CompositeFlatFile flatFile =
606-
flatFileManager.getFlatFile(new MessageQueue(topic, brokerName, queueId));
607-
if (flatFile == null) {
608-
continue;
609-
}
610-
611-
// decode index item
612-
long offset = indexBuffer.getLong(indexOffset + 4 + 4 + 4);
613-
int size = indexBuffer.getInt(indexOffset + 4 + 4 + 4 + 8);
614-
int timeDiff = indexBuffer.getInt(indexOffset + 4 + 4 + 4 + 8 + 4);
615-
long indexTimestamp = fileBeginTimestamp + timeDiff;
616-
if (indexTimestamp < begin || indexTimestamp > end) {
617-
continue;
618-
}
574+
CompletableFuture<List<IndexItem>> future = indexStoreService.queryAsync(topic, key, maxCount, begin, end);
619575

620-
CompletableFuture<Void> getMessageFuture = flatFile.getCommitLogAsync(offset, size)
621-
.thenAccept(messageBuffer -> result.addMessage(
622-
new SelectMappedBufferResult(0, messageBuffer, size, null)));
623-
futureList.add(getMessageFuture);
624-
625-
resultCount++;
626-
if (resultCount >= maxCount) {
627-
break;
628-
}
629-
}
630-
631-
if (resultCount >= maxCount) {
632-
break;
633-
}
576+
return future.thenCompose(indexItemList -> {
577+
QueryMessageResult result = new QueryMessageResult();
578+
List<CompletableFuture<Void>> futureList = new ArrayList<>(maxCount);
579+
for (IndexItem indexItem : indexItemList) {
580+
if (topicId != indexItem.getTopicId()) {
581+
continue;
634582
}
635-
return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]))
636-
.thenApply(v -> result);
637-
});
583+
CompositeFlatFile flatFile =
584+
flatFileManager.getFlatFile(new MessageQueue(topic, brokerName, indexItem.getQueueId()));
585+
if (flatFile == null) {
586+
continue;
587+
}
588+
CompletableFuture<Void> getMessageFuture = flatFile
589+
.getCommitLogAsync(indexItem.getOffset(), indexItem.getSize())
590+
.thenAccept(messageBuffer -> result.addMessage(
591+
new SelectMappedBufferResult(
592+
indexItem.getOffset(), messageBuffer, indexItem.getSize(), null)));
593+
futureList.add(getMessageFuture);
594+
if (futureList.size() >= maxCount) {
595+
break;
596+
}
597+
}
598+
return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).thenApply(v -> result);
599+
}).whenComplete((result, throwable) -> {
600+
if (result != null) {
601+
LOGGER.info("MessageFetcher#queryMessageAsync, query result: {}, topic: {}, topicId: {}, key: {}, maxCount: {}, timestamp: {}-{}",
602+
result.getMessageBufferList().size(), topic, topicId, key, maxCount, begin, end);
603+
}
604+
});
638605
}
639606
}

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java

+12-17
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717

1818
package org.apache.rocketmq.tieredstore.file;
1919

20+
import java.util.Arrays;
21+
import java.util.HashSet;
22+
import java.util.Set;
2023
import org.apache.commons.lang3.StringUtils;
2124
import org.apache.rocketmq.common.message.MessageConst;
2225
import org.apache.rocketmq.common.message.MessageQueue;
2326
import org.apache.rocketmq.store.DispatchRequest;
2427
import org.apache.rocketmq.tieredstore.common.AppendResult;
28+
import org.apache.rocketmq.tieredstore.index.IndexService;
2529
import org.apache.rocketmq.tieredstore.metadata.QueueMetadata;
2630
import org.apache.rocketmq.tieredstore.metadata.TopicMetadata;
2731
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
@@ -31,13 +35,13 @@ public class CompositeQueueFlatFile extends CompositeFlatFile {
3135
private final MessageQueue messageQueue;
3236
private long topicSequenceNumber;
3337
private QueueMetadata queueMetadata;
34-
private final TieredIndexFile indexFile;
38+
private final IndexService indexStoreService;
3539

3640
public CompositeQueueFlatFile(TieredFileAllocator fileQueueFactory, MessageQueue messageQueue) {
3741
super(fileQueueFactory, TieredStoreUtil.toPath(messageQueue));
3842
this.messageQueue = messageQueue;
3943
this.recoverQueueMetadata();
40-
this.indexFile = TieredFlatFileManager.getIndexFile(storeConfig);
44+
this.indexStoreService = TieredFlatFileManager.getTieredIndexService(storeConfig);
4145
}
4246

4347
@Override
@@ -85,24 +89,15 @@ public AppendResult appendIndexFile(DispatchRequest request) {
8589
return AppendResult.FILE_CLOSED;
8690
}
8791

92+
Set<String> keySet = new HashSet<>(
93+
Arrays.asList(request.getKeys().split(MessageConst.KEY_SEPARATOR)));
8894
if (StringUtils.isNotBlank(request.getUniqKey())) {
89-
AppendResult result = indexFile.append(messageQueue, (int) topicSequenceNumber,
90-
request.getUniqKey(), request.getCommitLogOffset(), request.getMsgSize(), request.getStoreTimestamp());
91-
if (result != AppendResult.SUCCESS) {
92-
return result;
93-
}
95+
keySet.add(request.getUniqKey());
9496
}
9597

96-
for (String key : request.getKeys().split(MessageConst.KEY_SEPARATOR)) {
97-
if (StringUtils.isNotBlank(key)) {
98-
AppendResult result = indexFile.append(messageQueue, (int) topicSequenceNumber,
99-
key, request.getCommitLogOffset(), request.getMsgSize(), request.getStoreTimestamp());
100-
if (result != AppendResult.SUCCESS) {
101-
return result;
102-
}
103-
}
104-
}
105-
return AppendResult.SUCCESS;
98+
return indexStoreService.putKey(
99+
messageQueue.getTopic(), (int) topicSequenceNumber, messageQueue.getQueueId(), keySet,
100+
request.getCommitLogOffset(), request.getMsgSize(), request.getStoreTimestamp());
106101
}
107102

108103
public MessageQueue getMessageQueue() {

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
import java.nio.ByteBuffer;
2121
import java.util.concurrent.CompletableFuture;
2222
import org.apache.commons.lang3.tuple.Pair;
23+
import org.apache.rocketmq.common.BoundaryType;
2324
import org.apache.rocketmq.tieredstore.common.AppendResult;
2425
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
25-
import org.apache.rocketmq.common.BoundaryType;
2626

2727
public class TieredConsumeQueue {
2828

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,6 @@ public FileSegmentType getFileType() {
141141
return fileType;
142142
}
143143

144-
@VisibleForTesting
145144
public List<TieredFileSegment> getFileSegmentList() {
146145
return fileSegmentList;
147146
}
@@ -274,7 +273,7 @@ public int getFileSegmentCount() {
274273
}
275274

276275
@Nullable
277-
protected TieredFileSegment getFileByIndex(int index) {
276+
public TieredFileSegment getFileByIndex(int index) {
278277
fileSegmentLock.readLock().lock();
279278
try {
280279
if (index < fileSegmentList.size()) {
@@ -354,7 +353,7 @@ protected TieredFileSegment getFileByTime(long timestamp, BoundaryType boundaryT
354353
}
355354
}
356355

357-
protected List<TieredFileSegment> getFileListByTime(long beginTime, long endTime) {
356+
public List<TieredFileSegment> getFileListByTime(long beginTime, long endTime) {
358357
fileSegmentLock.readLock().lock();
359358
try {
360359
return fileSegmentList.stream()

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java

+15-25
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
3535
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
3636
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
37+
import org.apache.rocketmq.tieredstore.index.IndexService;
38+
import org.apache.rocketmq.tieredstore.index.IndexStoreService;
3739
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
3840
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
3941

@@ -43,7 +45,7 @@ public class TieredFlatFileManager {
4345
private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
4446

4547
private static volatile TieredFlatFileManager instance;
46-
private static volatile TieredIndexFile indexFile;
48+
private static volatile IndexStoreService indexStoreService;
4749

4850
private final TieredMetadataStore metadataStore;
4951
private final TieredMessageStoreConfig storeConfig;
@@ -76,25 +78,26 @@ public static TieredFlatFileManager getInstance(TieredMessageStoreConfig storeCo
7678
return instance;
7779
}
7880

79-
public static TieredIndexFile getIndexFile(TieredMessageStoreConfig storeConfig) {
81+
public static IndexService getTieredIndexService(TieredMessageStoreConfig storeConfig) {
8082
if (storeConfig == null) {
81-
return indexFile;
83+
return indexStoreService;
8284
}
8385

84-
if (indexFile == null) {
86+
if (indexStoreService == null) {
8587
synchronized (TieredFlatFileManager.class) {
86-
if (indexFile == null) {
88+
if (indexStoreService == null) {
8789
try {
8890
String filePath = TieredStoreUtil.toPath(new MessageQueue(
8991
TieredStoreUtil.RMQ_SYS_TIERED_STORE_INDEX_TOPIC, storeConfig.getBrokerName(), 0));
90-
indexFile = new TieredIndexFile(new TieredFileAllocator(storeConfig), filePath);
92+
indexStoreService = new IndexStoreService(new TieredFileAllocator(storeConfig), filePath);
93+
indexStoreService.start();
9194
} catch (Exception e) {
9295
logger.error("Construct FlatFileManager indexFile error", e);
9396
}
9497
}
9598
}
9699
}
97-
return indexFile;
100+
return indexStoreService;
98101
}
99102

100103
public void doCommit() {
@@ -120,15 +123,6 @@ public void doCommit() {
120123
}
121124
}, delay, TimeUnit.MILLISECONDS);
122125
}
123-
TieredStoreExecutor.commitExecutor.schedule(() -> {
124-
try {
125-
if (indexFile != null) {
126-
indexFile.commit(true);
127-
}
128-
} catch (Throwable e) {
129-
logger.error("Commit indexFile periodically failed", e);
130-
}
131-
}, 0, TimeUnit.MILLISECONDS);
132126
}
133127

134128
public void doCleanExpiredFile() {
@@ -148,10 +142,6 @@ public void doCleanExpiredFile() {
148142
}
149143
});
150144
}
151-
if (indexFile != null) {
152-
indexFile.cleanExpiredFile(expiredTimeStamp);
153-
indexFile.destroyExpiredFile();
154-
}
155145
}
156146

157147
private void doScheduleTask() {
@@ -244,7 +234,7 @@ public void cleanup() {
244234

245235
private static void cleanStaticReference() {
246236
instance = null;
247-
indexFile = null;
237+
indexStoreService = null;
248238
}
249239

250240
@Nullable
@@ -271,17 +261,17 @@ public ImmutableList<CompositeQueueFlatFile> deepCopyFlatFileToList() {
271261
}
272262

273263
public void shutdown() {
274-
if (indexFile != null) {
275-
indexFile.commit(true);
264+
if (indexStoreService != null) {
265+
indexStoreService.shutdown();
276266
}
277267
for (CompositeFlatFile flatFile : deepCopyFlatFileToList()) {
278268
flatFile.shutdown();
279269
}
280270
}
281271

282272
public void destroy() {
283-
if (indexFile != null) {
284-
indexFile.destroy();
273+
if (indexStoreService != null) {
274+
indexStoreService.destroy();
285275
}
286276
ImmutableList<CompositeQueueFlatFile> flatFileList = deepCopyFlatFileToList();
287277
cleanup();

0 commit comments

Comments
 (0)