Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: Use shared EventLoop for TM and RM clients to reduce thread overhead and improve performance #7179

Merged
merged 11 commits into from
Mar 4, 2025
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#7142](https://github.com/apache/incubator-seata/pull/7142)] upgrade commons-compress to 1.27.1
- [[#7149](https://github.com/apache/incubator-seata/pull/7149)] Fix abnormal character display issues in ./distribution/NOTICE.md
- [[#7170](https://github.com/apache/incubator-seata/pull/7170)] Optimize seata client I/O processing by adjusting thread count
- [[#7179](https://github.com/apache/incubator-seata/pull/7179)] Use shared EventLoop for TM and RM clients to reduce thread overhead and improve performance


### security:
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
- [[#7142](https://github.com/apache/incubator-seata/pull/7142)] 升级 commons-compress 至 1.27.1 版本
- [[#7149](https://github.com/apache/incubator-seata/pull/7149)] 修复./distribution/NOTICE.md文件中的异常字符串显示问题
- [[#7170](https://github.com/apache/incubator-seata/pull/7170)] 通过调整线程数优化 Seata 客户端 I/O 处理
- [[#7179](https://github.com/apache/incubator-seata/pull/7179)] 使用共享的 EventLoop 来减少 TM 和 RM 客户端的线程开销并提高性能


### security:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,11 @@ public interface ConfigurationKeys {
*/
String WORKER_THREAD_SIZE = THREAD_FACTORY_PREFIX + "workerThreadSize";

/**
* The constant ENABLE_SHARED_EVENTLOOP
*/
String ENABLE_CLIENT_SHARED_EVENTLOOP = TRANSPORT_PREFIX + "enableClientSharedEventLoopGroup";

/**
* The constant SHUTDOWN_PREFIX
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ public interface DefaultValues {
*/
@Deprecated
boolean DEFAULT_ENABLE_CLIENT_BATCH_SEND_REQUEST = true;
/**
* The constant DEFAULT_ENABLE_CLIENT_USE_SHARED_EVENT_LOOP.
*/
boolean DEFAULT_ENABLE_CLIENT_USE_SHARED_EVENT_LOOP = false;
/**
* The constant DEFAULT_ENABLE_TM_CLIENT_BATCH_SEND_REQUEST.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,21 @@

/**
* Rpc client.
*
*/
public class NettyClientBootstrap implements RemotingBootstrap {

private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientBootstrap.class);
private static final String THREAD_PREFIX_SPLIT_CHAR = "_";

private static EventLoopGroup sharedEventLoopGroupWorker = null;

private final NettyClientConfig nettyClientConfig;
private final Bootstrap bootstrap = new Bootstrap();
private final EventLoopGroup eventLoopGroupWorker;
private EventExecutorGroup defaultEventExecutorGroup;
private final AtomicBoolean initialized = new AtomicBoolean(false);
private static final String THREAD_PREFIX_SPLIT_CHAR = "_";
private final NettyPoolKey.TransactionRole transactionRole;
private final EventLoopGroup eventLoopGroupWorker;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a static variable, the clientBootstrap held by the two instances of TmNettyRemotingClient and RmNettyRemotingClient are not the same, how does this variable manage to be shared use?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be related to the comment?

In fact, I also believe that it should be a static variable to use the same value across multiple instances.
However, if it becomes a static variable, there’s an issue where the previous eventLoopGroupWorker gets modified when a new one is created. I wanted to ask about that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous eventLoopGroupWorker should not have been created yet, so I don't think there should be an action to modify this.

Copy link
Contributor Author

@YongGoose YongGoose Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in f308be6 !

PTAL ⚡️


private EventExecutorGroup defaultEventExecutorGroup;
private ChannelHandler[] channelHandlers;

public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup,
Expand All @@ -81,14 +84,15 @@ public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExec
this.nettyClientConfig = nettyClientConfig;
int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize();
this.transactionRole = transactionRole;
if (NettyServerConfig.enableEpoll()) {
this.eventLoopGroupWorker = new EpollEventLoopGroup(selectorThreadSizeThreadSize,
new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),
selectorThreadSizeThreadSize));

boolean enableClientSharedEventLoop = this.nettyClientConfig.getEnableClientSharedEventLoop();
if (enableClientSharedEventLoop) {
if (sharedEventLoopGroupWorker == null) {
sharedEventLoopGroupWorker = getOrCreateEventLoopGroupWorker(selectorThreadSizeThreadSize);
}
eventLoopGroupWorker = sharedEventLoopGroupWorker;
} else {
this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize,
new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),
selectorThreadSizeThreadSize));
eventLoopGroupWorker = createEventLoopGroupWorker(selectorThreadSizeThreadSize);
}
this.defaultEventExecutorGroup = eventExecutorGroup;
}
Expand Down Expand Up @@ -123,7 +127,7 @@ public void start() {
new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()),
nettyClientConfig.getClientWorkerThreads()));
}
this.bootstrap.group(this.eventLoopGroupWorker).channel(
this.bootstrap.group(eventLoopGroupWorker).channel(
nettyClientConfig.getClientChannelClazz()).option(
ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(
ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(
Expand Down Expand Up @@ -170,7 +174,7 @@ public void initChannel(SocketChannel ch) {
@Override
public void shutdown() {
try {
this.eventLoopGroupWorker.shutdownGracefully();
eventLoopGroupWorker.shutdownGracefully();
if (this.defaultEventExecutorGroup != null) {
this.defaultEventExecutorGroup.shutdownGracefully();
}
Expand Down Expand Up @@ -233,4 +237,23 @@ public void handlerAdded(ChannelHandlerContext ctx) {
private String getThreadPrefix(String threadPrefix) {
return threadPrefix + THREAD_PREFIX_SPLIT_CHAR + transactionRole.name();
}

private EventLoopGroup getOrCreateEventLoopGroupWorker(int selectorThreadSizeThreadSize) {
if (eventLoopGroupWorker == null) {
return createEventLoopGroupWorker(selectorThreadSizeThreadSize);
}
return eventLoopGroupWorker;
}

private EventLoopGroup createEventLoopGroupWorker(int selectorThreadSizeThreadSize) {
if (NettyServerConfig.enableEpoll()) {
return new EpollEventLoopGroup(selectorThreadSizeThreadSize,
new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),
selectorThreadSizeThreadSize));
}

return new NioEventLoopGroup(selectorThreadSizeThreadSize,
new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),
selectorThreadSizeThreadSize));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
package org.apache.seata.core.rpc.netty;

import io.netty.channel.Channel;
import org.apache.seata.core.constants.ConfigurationKeys;
import org.apache.seata.common.ConfigurationKeys;
import org.apache.seata.core.rpc.TransportServerType;

import static org.apache.seata.common.DefaultValues.DEFAULT_ENABLE_CLIENT_BATCH_SEND_REQUEST;
import static org.apache.seata.common.DefaultValues.DEFAULT_PROTOCOL;
import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_RM_REQUEST_TIMEOUT;
import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_TM_REQUEST_TIMEOUT;
import static org.apache.seata.common.DefaultValues.DEFAULT_SELECTOR_THREAD_PREFIX;
import static org.apache.seata.common.DefaultValues.DEFAULT_ENABLE_CLIENT_USE_SHARED_EVENT_LOOP;
import static org.apache.seata.common.DefaultValues.DEFAULT_WORKER_THREAD_PREFIX;

/**
Expand Down Expand Up @@ -354,6 +355,10 @@ public int getClientSelectorThreadSize() {
return threadSize > 0 ? threadSize : WorkThreadMode.Default.getValue();
}

public boolean getEnableClientSharedEventLoop() {
return CONFIG.getBoolean(ConfigurationKeys.ENABLE_CLIENT_SHARED_EVENTLOOP, DEFAULT_ENABLE_CLIENT_USE_SHARED_EVENT_LOOP);
}

/**
* Get max acquire conn mills long.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.core.rpc.netty;

import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class NettyClientBootstrapTest {

@Mock
private NettyClientConfig nettyClientConfig;
private DefaultEventExecutorGroup eventExecutorGroup;

@BeforeEach
void init() {
eventExecutorGroup = new DefaultEventExecutorGroup(1);
}

@Test
void testSharedEventLoopGroupEnabled() {
when(nettyClientConfig.getEnableClientSharedEventLoop()).thenReturn(true);
NettyClientBootstrap tmNettyClientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, NettyPoolKey.TransactionRole.TMROLE);
EventLoopGroup tmEventLoopGroupWorker = getEventLoopGroupWorker(tmNettyClientBootstrap);

NettyClientBootstrap rmNettyClientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, NettyPoolKey.TransactionRole.RMROLE);
EventLoopGroup rmEventLoopGroupWorker = getEventLoopGroupWorker(rmNettyClientBootstrap);

Assertions.assertEquals(tmEventLoopGroupWorker, rmEventLoopGroupWorker);
}

@Test
void testSharedEventLoopGroupDisabled() {
when(nettyClientConfig.getEnableClientSharedEventLoop()).thenReturn(false);
NettyClientBootstrap tmNettyClientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, NettyPoolKey.TransactionRole.TMROLE);
EventLoopGroup tmEventLoopGroupWorker = getEventLoopGroupWorker(tmNettyClientBootstrap);

NettyClientBootstrap rmNettyClientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, NettyPoolKey.TransactionRole.RMROLE);
EventLoopGroup rmEventLoopGroupWorker = getEventLoopGroupWorker(rmNettyClientBootstrap);

Assertions.assertNotEquals(tmEventLoopGroupWorker, rmEventLoopGroupWorker);
}

private EventLoopGroup getEventLoopGroupWorker(NettyClientBootstrap bootstrap) {
try {
java.lang.reflect.Field field = NettyClientBootstrap.class.getDeclaredField("eventLoopGroupWorker");
field.setAccessible(true);
return (EventLoopGroup) field.get(bootstrap);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
2 changes: 2 additions & 0 deletions script/client/conf/file.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ transport {
rpcTmRequestTimeout = 30000
# the rm client rpc request timeout
rpcRmRequestTimeout = 15000
# the shared event loop group enable
enableClientSharedEventLoopGroup = false
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
Expand Down
1 change: 1 addition & 0 deletions script/client/spring/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ seata.transport.enable-tm-client-batch-send-request=false
seata.transport.enable-rm-client-batch-send-request=true
seata.transport.rpc-rm-request-timeout=15000
seata.transport.rpc-tm-request-timeout=30000
seata.transport.enable-client-shared-event-loop-group=false

seata.config.type=file

Expand Down
1 change: 1 addition & 0 deletions script/client/spring/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ seata:
enable-rm-client-batch-send-request: true
rpc-rm-request-timeout: 15000
rpc-tm-request-timeout: 30000
enable-client-shared-event-loop-group: false
config:
type: file
consul:
Expand Down
1 change: 1 addition & 0 deletions script/config-center/config.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ transport.enableTcServerBatchSendResponse=false
transport.rpcRmRequestTimeout=30000
transport.rpcTmRequestTimeout=30000
transport.rpcTcRequestTimeout=30000
transport.enableClientSharedEventLoopGroup=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_TC_REQUEST_TIMEOUT;
import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_TM_REQUEST_TIMEOUT;
import static org.apache.seata.common.DefaultValues.DEFAULT_TRANSPORT_HEARTBEAT;
import static org.apache.seata.common.DefaultValues.DEFAULT_ENABLE_CLIENT_USE_SHARED_EVENT_LOOP;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.TRANSPORT_PREFIX;


Expand Down Expand Up @@ -92,6 +93,11 @@ public class TransportProperties {
*/
private long rpcTcRequestTimeout = DEFAULT_RPC_TC_REQUEST_TIMEOUT;

/**
* use shared event loop group
*/
private boolean enableClientSharedEventLoop = DEFAULT_ENABLE_CLIENT_USE_SHARED_EVENT_LOOP;


public String getType() {
return type;
Expand Down Expand Up @@ -193,10 +199,18 @@ public long getRpcTcRequestTimeout() {
return rpcTcRequestTimeout;
}

public boolean isEnableClientSharedEventLoop() {
return enableClientSharedEventLoop;
}

public void setRpcTcRequestTimeout(long rpcTcRequestTimeout) {
this.rpcTcRequestTimeout = rpcTcRequestTimeout;
}

public void setEnableClientSharedEventLoop(boolean useSharedEventLoop) {
this.enableClientSharedEventLoop = useSharedEventLoop;
}

public String getProtocol() {
return protocol;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public void testTransportProperties() {
assertEquals("seata", context.getBean(TransportProperties.class).getSerialization());
assertEquals("none", context.getBean(TransportProperties.class).getCompressor());
assertTrue(context.getBean(TransportProperties.class).isEnableClientBatchSendRequest());
assertFalse(context.getBean(TransportProperties.class).isEnableClientSharedEventLoop());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public void testTransportProperties() {
transportProperties.setRpcRmRequestTimeout(1);
transportProperties.setRpcTmRequestTimeout(1);
transportProperties.setRpcTcRequestTimeout(1);
transportProperties.setEnableClientSharedEventLoop(true);

Assertions.assertEquals("server", transportProperties.getServer());
Assertions.assertEquals("type", transportProperties.getType());
Expand All @@ -49,5 +50,6 @@ public void testTransportProperties() {
Assertions.assertEquals(1, transportProperties.getRpcRmRequestTimeout());
Assertions.assertEquals(1, transportProperties.getRpcTmRequestTimeout());
Assertions.assertEquals(1, transportProperties.getRpcTcRequestTimeout());
Assertions.assertTrue(transportProperties.isEnableClientSharedEventLoop());
}
}
Loading