Skip to content

Commit 7ae1ec3

Browse files
belugabehrJens-G
authored andcommitted
THRIFT-5297: Improve TThreadPoolServer Handling of Incoming Connections
Client: Java Patch: David Mollitor This closes #2266
1 parent ebc2ab5 commit 7ae1ec3

File tree

2 files changed

+64
-130
lines changed

2 files changed

+64
-130
lines changed

lib/java/src/org/apache/thrift/server/TThreadPoolServer.java

+54-126
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919

2020
package org.apache.thrift.server;
2121

22-
import java.util.Random;
23-
import java.util.WeakHashMap;
22+
import java.util.Optional;
2423
import java.util.concurrent.ExecutorService;
2524
import java.util.concurrent.RejectedExecutionException;
2625
import java.util.concurrent.SynchronousQueue;
26+
import java.util.concurrent.ThreadFactory;
2727
import java.util.concurrent.ThreadPoolExecutor;
2828
import java.util.concurrent.TimeUnit;
2929

@@ -41,18 +41,14 @@
4141
* a worker pool that deals with client connections in blocking way.
4242
*/
4343
public class TThreadPoolServer extends TServer {
44-
private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServer.class.getName());
44+
private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServer.class);
4545

4646
public static class Args extends AbstractServerArgs<Args> {
4747
public int minWorkerThreads = 5;
4848
public int maxWorkerThreads = Integer.MAX_VALUE;
4949
public ExecutorService executorService;
5050
public int stopTimeoutVal = 60;
5151
public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
52-
public int requestTimeout = 20;
53-
public TimeUnit requestTimeoutUnit = TimeUnit.SECONDS;
54-
public int beBackoffSlotLength = 100;
55-
public TimeUnit beBackoffSlotLengthUnit = TimeUnit.MILLISECONDS;
5652

5753
public Args(TServerTransport transport) {
5854
super(transport);
@@ -78,27 +74,6 @@ public Args stopTimeoutUnit(TimeUnit tu) {
7874
return this;
7975
}
8076

81-
public Args requestTimeout(int n) {
82-
requestTimeout = n;
83-
return this;
84-
}
85-
86-
public Args requestTimeoutUnit(TimeUnit tu) {
87-
requestTimeoutUnit = tu;
88-
return this;
89-
}
90-
//Binary exponential backoff slot length
91-
public Args beBackoffSlotLength(int n) {
92-
beBackoffSlotLength = n;
93-
return this;
94-
}
95-
96-
//Binary exponential backoff slot time unit
97-
public Args beBackoffSlotLengthUnit(TimeUnit tu) {
98-
beBackoffSlotLengthUnit = tu;
99-
return this;
100-
}
101-
10277
public Args executorService(ExecutorService executorService) {
10378
this.executorService = executorService;
10479
return this;
@@ -107,49 +82,40 @@ public Args executorService(ExecutorService executorService) {
10782

10883
// Executor service for handling client connections
10984
private ExecutorService executorService_;
110-
private WeakHashMap<WorkerProcess, Boolean> activeWorkers = new WeakHashMap<>();
11185

11286
private final TimeUnit stopTimeoutUnit;
11387

11488
private final long stopTimeoutVal;
11589

116-
private final TimeUnit requestTimeoutUnit;
117-
118-
private final long requestTimeout;
119-
120-
private final long beBackoffSlotInMillis;
121-
122-
private Random random = new Random(System.currentTimeMillis());
123-
12490
public TThreadPoolServer(Args args) {
12591
super(args);
12692

12793
stopTimeoutUnit = args.stopTimeoutUnit;
12894
stopTimeoutVal = args.stopTimeoutVal;
129-
requestTimeoutUnit = args.requestTimeoutUnit;
130-
requestTimeout = args.requestTimeout;
131-
beBackoffSlotInMillis = args.beBackoffSlotLengthUnit.toMillis(args.beBackoffSlotLength);
13295

13396
executorService_ = args.executorService != null ?
13497
args.executorService : createDefaultExecutorService(args);
13598
}
13699

137100
private static ExecutorService createDefaultExecutorService(Args args) {
138-
SynchronousQueue<Runnable> executorQueue =
139-
new SynchronousQueue<Runnable>();
140-
return new ThreadPoolExecutor(args.minWorkerThreads,
141-
args.maxWorkerThreads,
142-
args.stopTimeoutVal,
143-
args.stopTimeoutUnit,
144-
executorQueue);
101+
return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, 60L, TimeUnit.SECONDS,
102+
new SynchronousQueue<>(), new ThreadFactory() {
103+
@Override
104+
public Thread newThread(Runnable r) {
105+
Thread thread = new Thread(r);
106+
thread.setDaemon(true);
107+
thread.setName("TThreadPoolServer WorkerProcess-%d");
108+
return thread;
109+
}
110+
});
145111
}
146112

147113
protected ExecutorService getExecutorService() {
148114
return executorService_;
149115
}
150116

151117
protected boolean preServe() {
152-
try {
118+
try {
153119
serverTransport_.listen();
154120
} catch (TTransportException ttx) {
155121
LOGGER.error("Error occurred during listening.", ttx);
@@ -166,13 +132,16 @@ protected boolean preServe() {
166132
}
167133

168134
public void serve() {
169-
if (!preServe()) {
170-
return;
171-
}
135+
if (!preServe()) {
136+
return;
137+
}
138+
139+
execute();
140+
141+
executorService_.shutdownNow();
172142

173-
execute();
174143
if (!waitForShutdown()) {
175-
LOGGER.error("Shutdown is not done after " + stopTimeoutVal + stopTimeoutUnit);
144+
LOGGER.error("Shutdown is not done after " + stopTimeoutVal + stopTimeoutUnit);
176145
}
177146

178147
setServing(false);
@@ -182,51 +151,17 @@ protected void execute() {
182151
while (!stopped_) {
183152
try {
184153
TTransport client = serverTransport_.accept();
185-
WorkerProcess wp = new WorkerProcess(client);
186-
187-
int retryCount = 0;
188-
long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout);
189-
while(true) {
190-
try {
191-
executorService_.execute(wp);
192-
activeWorkers.put(wp, Boolean.TRUE);
193-
break;
194-
} catch(Throwable t) {
195-
if (t instanceof RejectedExecutionException) {
196-
retryCount++;
197-
try {
198-
if (remainTimeInMillis > 0) {
199-
//do a truncated 20 binary exponential backoff sleep
200-
long sleepTimeInMillis = ((long) (random.nextDouble() *
201-
(1L << Math.min(retryCount, 20)))) * beBackoffSlotInMillis;
202-
sleepTimeInMillis = Math.min(sleepTimeInMillis, remainTimeInMillis);
203-
TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis);
204-
remainTimeInMillis = remainTimeInMillis - sleepTimeInMillis;
205-
} else {
206-
client.close();
207-
wp = null;
208-
LOGGER.warn("Task has been rejected by ExecutorService " + retryCount
209-
+ " times till timedout, reason: " + t);
210-
break;
211-
}
212-
} catch (InterruptedException e) {
213-
LOGGER.warn("Interrupted while waiting to place client on executor queue.");
214-
Thread.currentThread().interrupt();
215-
break;
216-
}
217-
} else if (t instanceof Error) {
218-
LOGGER.error("ExecutorService threw error: " + t, t);
219-
throw (Error)t;
220-
} else {
221-
//for other possible runtime errors from ExecutorService, should also not kill serve
222-
LOGGER.warn("ExecutorService threw error: " + t, t);
223-
break;
224-
}
154+
try {
155+
executorService_.execute(new WorkerProcess(client));
156+
} catch (RejectedExecutionException ree) {
157+
if (!stopped_) {
158+
LOGGER.warn("ThreadPool is saturated with incoming requests. Closing latest connection.");
225159
}
160+
client.close();
226161
}
227162
} catch (TTransportException ttx) {
228163
if (!stopped_) {
229-
LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
164+
LOGGER.warn("Transport error occurred during acceptance of message", ttx);
230165
}
231166
}
232167
}
@@ -241,8 +176,7 @@ protected boolean waitForShutdown() {
241176
long now = System.currentTimeMillis();
242177
while (timeoutMS >= 0) {
243178
try {
244-
executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
245-
return true;
179+
return executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
246180
} catch (InterruptedException ix) {
247181
long newnow = System.currentTimeMillis();
248182
timeoutMS -= (newnow - now);
@@ -255,10 +189,6 @@ protected boolean waitForShutdown() {
255189
public void stop() {
256190
stopped_ = true;
257191
serverTransport_.interrupt();
258-
executorService_.shutdown();
259-
for (WorkerProcess wp : activeWorkers.keySet()) {
260-
wp.stop();
261-
}
262192
}
263193

264194
private class WorkerProcess implements Runnable {
@@ -287,7 +217,7 @@ public void run() {
287217
TProtocol inputProtocol = null;
288218
TProtocol outputProtocol = null;
289219

290-
TServerEventHandler eventHandler = null;
220+
Optional<TServerEventHandler> eventHandler = Optional.empty();
291221
ServerContext connectionContext = null;
292222

293223
try {
@@ -297,22 +227,25 @@ public void run() {
297227
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
298228
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
299229

300-
eventHandler = getEventHandler();
301-
if (eventHandler != null) {
302-
connectionContext = eventHandler.createContext(inputProtocol, outputProtocol);
303-
}
304-
// we check stopped_ first to make sure we're not supposed to be shutting
305-
// down. this is necessary for graceful shutdown.
306-
while (true) {
230+
eventHandler = Optional.ofNullable(getEventHandler());
307231

308-
if (eventHandler != null) {
309-
eventHandler.processContext(connectionContext, inputTransport, outputTransport);
310-
}
232+
if (eventHandler.isPresent()) {
233+
connectionContext = eventHandler.get().createContext(inputProtocol, outputProtocol);
234+
}
311235

312-
if (stopped_) {
313-
break;
314-
}
315-
processor.process(inputProtocol, outputProtocol);
236+
while (true) {
237+
if (Thread.currentThread().isInterrupted()) {
238+
LOGGER.debug("WorkerProcess requested to shutdown");
239+
break;
240+
}
241+
if (eventHandler.isPresent()) {
242+
eventHandler.get().processContext(connectionContext, inputTransport, outputTransport);
243+
}
244+
// This process cannot be interrupted by Interrupting the Thread. This
245+
// will return once a message has been processed or the socket timeout
246+
// has elapsed, at which point it will return and check the interrupt
247+
// state of the thread.
248+
processor.process(inputProtocol, outputProtocol);
316249
}
317250
} catch (Exception x) {
318251
LOGGER.debug("Error processing request", x);
@@ -322,11 +255,11 @@ public void run() {
322255
// Ignore err-logging all transport-level/type exceptions
323256
if (!isIgnorableException(x)) {
324257
// Log the exception at error level and continue
325-
LOGGER.error((x instanceof TException? "Thrift " : "") + "Error occurred during processing of message.", x);
258+
LOGGER.error((x instanceof TException ? "Thrift " : "") + "Error occurred during processing of message.", x);
326259
}
327260
} finally {
328-
if (eventHandler != null) {
329-
eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
261+
if (eventHandler.isPresent()) {
262+
eventHandler.get().deleteContext(connectionContext, inputProtocol, outputProtocol);
330263
}
331264
if (inputTransport != null) {
332265
inputTransport.close();
@@ -344,10 +277,9 @@ private boolean isIgnorableException(Exception x) {
344277
TTransportException tTransportException = null;
345278

346279
if (x instanceof TTransportException) {
347-
tTransportException = (TTransportException)x;
348-
}
349-
else if (x.getCause() instanceof TTransportException) {
350-
tTransportException = (TTransportException)x.getCause();
280+
tTransportException = (TTransportException) x;
281+
} else if (x.getCause() instanceof TTransportException) {
282+
tTransportException = (TTransportException) x.getCause();
351283
}
352284

353285
if (tTransportException != null) {
@@ -359,9 +291,5 @@ else if (x.getCause() instanceof TTransportException) {
359291
}
360292
return false;
361293
}
362-
363-
private void stop() {
364-
client_.close();
365-
}
366294
}
367295
}

lib/java/test/org/apache/thrift/server/TestThreadPoolServer.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class TestThreadPoolServer {
3636
*/
3737
@Test
3838
public void testStopServerWithOpenClient() throws Exception {
39-
TServerSocket serverSocket = new TServerSocket(0);
39+
TServerSocket serverSocket = new TServerSocket(0, 3000);
4040
TThreadPoolServer server = buildServer(serverSocket);
4141
Thread serverThread = new Thread(() -> server.serve());
4242
serverThread.start();
@@ -45,11 +45,17 @@ public void testStopServerWithOpenClient() throws Exception {
4545
Thread.sleep(1000);
4646
// There is a thread listening to the client
4747
Assert.assertEquals(1, ((ThreadPoolExecutor) server.getExecutorService()).getActiveCount());
48+
49+
// Trigger the server to stop, but it does not wait
4850
server.stop();
49-
server.waitForShutdown();
51+
Assert.assertTrue(server.waitForShutdown());
52+
5053
// After server is stopped, the executor thread pool should be shut down
51-
Assert.assertTrue("Server thread pool should be terminated.", server.getExecutorService().isTerminated());
52-
Assert.assertTrue("Client is still open.", client.isOpen());
54+
Assert.assertTrue("Server thread pool should be terminated", server.getExecutorService().isTerminated());
55+
56+
// TODO: The socket is actually closed (timeout) but the client code
57+
// ignores the timeout Exception and maintains the socket open state
58+
Assert.assertTrue("Client should be closed after server shutdown", client.isOpen());
5359
}
5460
}
5561

0 commit comments

Comments
 (0)