Skip to content

Commit e38da55

Browse files
authored
[ISSUE #3799]main compaction process (#5351)
* add compaction delete policy * wrapper message ext encoder * fix * cleanup policy * cleanup policy * wrapper message ext encoder * Revert "cleanup policy" This reverts commit da76a48. * cleanup policy * topic compaction * compaction recovery and user document * use cleanup policy * fix
1 parent 4af193e commit e38da55

37 files changed

+3815
-341
lines changed

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

+1
Original file line numberDiff line numberDiff line change
@@ -1788,6 +1788,7 @@ protected void handleRegisterBrokerResult(List<RegisterBrokerResult> registerBro
17881788
if (registerBrokerResult != null) {
17891789
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
17901790
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
1791+
this.messageStore.updateMasterAddress(registerBrokerResult.getMasterAddr());
17911792
}
17921793

17931794
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());

broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
5252
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
5353
import org.apache.rocketmq.common.topic.TopicValidator;
54-
import org.apache.rocketmq.common.utils.DeletePolicyUtils;
54+
import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
5555
import org.apache.rocketmq.common.utils.QueueTypeUtils;
5656
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
5757
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
@@ -250,7 +250,7 @@ public RemotingCommand sendMessage(final ChannelHandlerContext ctx,
250250

251251
MessageAccessor.setProperties(msgInner, oriProps);
252252

253-
CleanupPolicy cleanupPolicy = DeletePolicyUtils.getDeletePolicy(Optional.of(topicConfig));
253+
CleanupPolicy cleanupPolicy = CleanupPolicyUtils.getDeletePolicy(Optional.of(topicConfig));
254254
if (Objects.equals(cleanupPolicy, CleanupPolicy.COMPACTION)) {
255255
if (StringUtils.isBlank(msgInner.getKeys())) {
256256
response.setCode(ResponseCode.MESSAGE_ILLEGAL);

common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ public class TopicAttributes {
3131
newHashSet("BatchCQ", "SimpleCQ"),
3232
"SimpleCQ"
3333
);
34-
public static final EnumAttribute DELETE_POLICY_ATTRIBUTE = new EnumAttribute(
35-
"delete.policy",
34+
public static final EnumAttribute CLEANUP_POLICY_ATTRIBUTE = new EnumAttribute(
35+
"cleanup.policy",
3636
false,
3737
newHashSet("DELETE", "COMPACTION"),
3838
"DELETE"
@@ -49,7 +49,7 @@ public class TopicAttributes {
4949
static {
5050
ALL = new HashMap<>();
5151
ALL.put(QUEUE_TYPE_ATTRIBUTE.getName(), QUEUE_TYPE_ATTRIBUTE);
52-
ALL.put(DELETE_POLICY_ATTRIBUTE.getName(), DELETE_POLICY_ATTRIBUTE);
52+
ALL.put(CLEANUP_POLICY_ATTRIBUTE.getName(), CLEANUP_POLICY_ATTRIBUTE);
5353
ALL.put(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName(), TOPIC_MESSAGE_TYPE_ATTRIBUTE);
5454
}
5555
}

common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java

+2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ public class MessageDecoder {
4343
public final static int MESSAGE_PHYSIC_OFFSET_POSITION = 28;
4444
public final static int MESSAGE_STORE_TIMESTAMP_POSITION = 56;
4545
public final static int MESSAGE_MAGIC_CODE = -626843481;
46+
// End of file empty MAGIC CODE cbd43194
47+
public final static int BLANK_MAGIC_CODE = -875286124;
4648
public static final char NAME_VALUE_SEPARATOR = 1;
4749
public static final char PROPERTY_SEPARATOR = 2;
4850
public static final int PHY_POS_POSITION = 4 + 4 + 4 + 4 + 4 + 8;

common/src/main/java/org/apache/rocketmq/common/utils/DeletePolicyUtils.java common/src/main/java/org/apache/rocketmq/common/utils/CleanupPolicyUtils.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -24,27 +24,27 @@
2424
import java.util.Objects;
2525
import java.util.Optional;
2626

27-
public class DeletePolicyUtils {
27+
public class CleanupPolicyUtils {
2828
public static boolean isCompaction(Optional<TopicConfig> topicConfig) {
2929
return Objects.equals(CleanupPolicy.COMPACTION, getDeletePolicy(topicConfig));
3030
}
3131

3232
public static CleanupPolicy getDeletePolicy(Optional<TopicConfig> topicConfig) {
3333
if (!topicConfig.isPresent()) {
34-
return CleanupPolicy.valueOf(TopicAttributes.DELETE_POLICY_ATTRIBUTE.getDefaultValue());
34+
return CleanupPolicy.valueOf(TopicAttributes.CLEANUP_POLICY_ATTRIBUTE.getDefaultValue());
3535
}
3636

37-
String attributeName = TopicAttributes.DELETE_POLICY_ATTRIBUTE.getName();
37+
String attributeName = TopicAttributes.CLEANUP_POLICY_ATTRIBUTE.getName();
3838

3939
Map<String, String> attributes = topicConfig.get().getAttributes();
4040
if (attributes == null || attributes.size() == 0) {
41-
return CleanupPolicy.valueOf(TopicAttributes.DELETE_POLICY_ATTRIBUTE.getDefaultValue());
41+
return CleanupPolicy.valueOf(TopicAttributes.CLEANUP_POLICY_ATTRIBUTE.getDefaultValue());
4242
}
4343

4444
if (attributes.containsKey(attributeName)) {
4545
return CleanupPolicy.valueOf(attributes.get(attributeName));
4646
} else {
47-
return CleanupPolicy.valueOf(TopicAttributes.DELETE_POLICY_ATTRIBUTE.getDefaultValue());
47+
return CleanupPolicy.valueOf(TopicAttributes.CLEANUP_POLICY_ATTRIBUTE.getDefaultValue());
4848
}
4949
}
5050
}
+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Compaction Topic
2+
3+
## 使用方式
4+
### 创建compaction topic
5+
```shell
6+
$ bin/mqadmin updateTopic -w 8 -r 8 -a +delete.policy=COMPACTION -n localhost:9876 -t ctopic -c DefaultCluster
7+
create topic to 127.0.0.1:10911 success.
8+
TopicConfig [topicName=ctopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={+delete.policy=COMPACTION}]
9+
```
10+
### 生产数据
11+
与普通消息一样
12+
```java
13+
DefaultMQProducer producer = new DefaultMQProducer("CompactionTestGroup");
14+
producer.setNamesrvAddr("localhost:9876");
15+
producer.start();
16+
17+
String topic = "ctopic";
18+
String tag = "tag1";
19+
String key = "key1";
20+
Message msg = new Message(topic, tag, key, "bodys"getBytes(StandardCharsets.UTF_8));
21+
SendResult sendResult = producer.send(msg, (mqs, message, shardingKey) -> {
22+
int select = Math.abs(shardingKey.hashCode());
23+
if (select < 0) {
24+
select = 0;
25+
}
26+
return mqs.get(select % mqs.size());
27+
}, key);
28+
29+
System.out.printf("%s%n", sendResult);
30+
```
31+
### 消费数据
32+
消费offset与compaction之前保持不变,如果指定offset消费,当指定的offset不存在时,返回后面最近的一条数据
33+
在compaction场景下,大部分消费都是从0开始消费完整的数据
34+
```java
35+
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("compactionTestGroup");
36+
consumer.setNamesrvAddr("localhost:9876");
37+
consumer.setPullThreadNums(4);
38+
consumer.start();
39+
40+
Collection<MessageQueue> messageQueueList = consumer.fetchMessageQueues("ctopic");
41+
consumer.assign(messageQueueList);
42+
messageQueueList.forEach(mq -> {
43+
try {
44+
consumer.seekToBegin(mq);
45+
} catch (MQClientException e) {
46+
e.printStackTrace();
47+
}
48+
});
49+
50+
Map<String, byte[]> kvStore = Maps.newHashMap();
51+
while (true) {
52+
List<MessageExt> msgList = consumer.poll(1000);
53+
if (msgList != null) {
54+
msgList.forEach(msg -> kvStore.put(msg.getKeys(), msg.getBody()));
55+
}
56+
}
57+
58+
//use the kvStore
59+
```

docs/en/Example_Compaction_Topic.md

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Compaction Topic
2+
3+
## use example
4+
### create compaction topic
5+
```shell
6+
$ bin/mqadmin updateTopic -w 8 -r 8 -a +delete.policy=COMPACTION -n localhost:9876 -t ctopic -c DefaultCluster
7+
create topic to 127.0.0.1:10911 success.
8+
TopicConfig [topicName=ctopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={+delete.policy=COMPACTION}]
9+
```
10+
11+
### produce message
12+
the same with ordinary message
13+
```java
14+
DefaultMQProducer producer = new DefaultMQProducer("CompactionTestGroup");
15+
producer.setNamesrvAddr("localhost:9876");
16+
producer.start();
17+
18+
Message msg = new Message(topic, "tags", "keys", "bodys"getBytes(StandardCharsets.UTF_8));
19+
SendResult sendResult = producer.send(msg);
20+
21+
System.out.printf("%s%n", sendResult);
22+
```
23+
### consume message
24+
the message offset remains unchanged after compaction. If the consumer specified offset does not exist, return the most recent message after the offset.
25+
26+
In the compaction scenario, most consumption was started from the beginning of the queue.
27+
```java
28+
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("compactionTestGroup");
29+
consumer.setNamesrvAddr("localhost:9876");
30+
consumer.setPullThreadNums(4);
31+
consumer.start();
32+
33+
Collection<MessageQueue> messageQueueList = consumer.fetchMessageQueues("ctopic");
34+
consumer.assign(messageQueueList);
35+
messageQueueList.forEach(mq -> {
36+
try {
37+
consumer.seekToBegin(mq);
38+
} catch (MQClientException e) {
39+
e.printStackTrace();
40+
}
41+
});
42+
43+
Map<String, byte[]> kvStore = Maps.newHashMap();
44+
while (true) {
45+
List<MessageExt> msgList = consumer.poll(1000);
46+
if (msgList != null) {
47+
msgList.forEach(msg -> kvStore.put(msg.getKeys(), msg.getBody()));
48+
}
49+
}
50+
51+
//use the kvStore
52+
```

store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java

+7
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wro
5454
this.pagecacheRT = pagecacheRT;
5555
}
5656

57+
public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wroteBytes, long storeTimestamp) {
58+
this.status = status;
59+
this.wroteOffset = wroteOffset;
60+
this.wroteBytes = wroteBytes;
61+
this.storeTimestamp = storeTimestamp;
62+
}
63+
5764
public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wroteBytes, Supplier<String> msgIdSupplier,
5865
long storeTimestamp, long logicsOffset, long pagecacheRT) {
5966
this.status = status;

0 commit comments

Comments
 (0)