Skip to content

Commit 714aae6

Browse files
committed
Add max inbound message size to ConnectionFactory
To avoid OOM with a very large message. The default value is 64 MiB. Fixes #1062 (cherry picked from commit 9ed45fd)
1 parent 83cf551 commit 714aae6

17 files changed

+282
-48
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

+30-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -205,6 +205,13 @@ public class ConnectionFactory implements Cloneable {
205205

206206
private CredentialsRefreshService credentialsRefreshService;
207207

208+
/**
209+
* Maximum body size of inbound (received) messages in bytes.
210+
*
211+
* <p>Default value is 67,108,864 (64 MiB).
212+
*/
213+
private int maxInboundMessageBodySize = 1_048_576 * 64;
214+
208215
/** @return the default host to use for connections */
209216
public String getHost() {
210217
return host;
@@ -970,11 +977,15 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IO
970977
if(this.nioParams.getNioExecutor() == null && this.nioParams.getThreadFactory() == null) {
971978
this.nioParams.setThreadFactory(getThreadFactory());
972979
}
973-
this.frameHandlerFactory = new SocketChannelFrameHandlerFactory(connectionTimeout, nioParams, isSSL(), sslContextFactory);
980+
this.frameHandlerFactory = new SocketChannelFrameHandlerFactory(
981+
connectionTimeout, nioParams, isSSL(), sslContextFactory,
982+
this.maxInboundMessageBodySize);
974983
}
975984
return this.frameHandlerFactory;
976985
} else {
977-
return new SocketFrameHandlerFactory(connectionTimeout, socketFactory, socketConf, isSSL(), this.shutdownExecutor, sslContextFactory);
986+
return new SocketFrameHandlerFactory(connectionTimeout, socketFactory,
987+
socketConf, isSSL(), this.shutdownExecutor, sslContextFactory,
988+
this.maxInboundMessageBodySize);
978989
}
979990

980991
}
@@ -1273,6 +1284,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
12731284
result.setRecoveredQueueNameSupplier(recoveredQueueNameSupplier);
12741285
result.setTrafficListener(trafficListener);
12751286
result.setCredentialsRefreshService(credentialsRefreshService);
1287+
result.setMaxInboundMessageBodySize(maxInboundMessageBodySize);
12761288
return result;
12771289
}
12781290

@@ -1556,6 +1568,21 @@ public int getChannelRpcTimeout() {
15561568
return channelRpcTimeout;
15571569
}
15581570

1571+
/**
1572+
* Maximum body size of inbound (received) messages in bytes.
1573+
*
1574+
* <p>Default value is 67,108,864 (64 MiB).
1575+
*
1576+
* @param maxInboundMessageBodySize the maximum size of inbound messages
1577+
*/
1578+
public void setMaxInboundMessageBodySize(int maxInboundMessageBodySize) {
1579+
if (maxInboundMessageBodySize <= 0) {
1580+
throw new IllegalArgumentException("Max inbound message body size must be greater than 0: "
1581+
+ maxInboundMessageBodySize);
1582+
}
1583+
this.maxInboundMessageBodySize = maxInboundMessageBodySize;
1584+
}
1585+
15591586
/**
15601587
* The factory to create SSL contexts.
15611588
* This provides more flexibility to create {@link SSLContext}s

src/main/java/com/rabbitmq/client/impl/AMQChannel.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -62,7 +62,7 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
6262
private final int _channelNumber;
6363

6464
/** Command being assembled */
65-
private AMQCommand _command = new AMQCommand();
65+
private AMQCommand _command;
6666

6767
/** The current outstanding RPC request, if any. (Could become a queue in future.) */
6868
private RpcWrapper _activeRpc = null;
@@ -76,6 +76,7 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
7676
private final boolean _checkRpcResponseType;
7777

7878
private final TrafficListener _trafficListener;
79+
private final int maxInboundMessageBodySize;
7980

8081
/**
8182
* Construct a channel on the given connection, with the given channel number.
@@ -91,6 +92,8 @@ public AMQChannel(AMQConnection connection, int channelNumber) {
9192
this._rpcTimeout = connection.getChannelRpcTimeout();
9293
this._checkRpcResponseType = connection.willCheckRpcResponseType();
9394
this._trafficListener = connection.getTrafficListener();
95+
this.maxInboundMessageBodySize = connection.getMaxInboundMessageBodySize();
96+
this._command = new AMQCommand(this.maxInboundMessageBodySize);
9497
}
9598

9699
/**
@@ -110,7 +113,7 @@ public int getChannelNumber() {
110113
public void handleFrame(Frame frame) throws IOException {
111114
AMQCommand command = _command;
112115
if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
113-
_command = new AMQCommand(); // prepare for the next one
116+
_command = new AMQCommand(this.maxInboundMessageBodySize); // prepare for the next one
114117
handleCompleteInboundCommand(command);
115118
}
116119
}

src/main/java/com/rabbitmq/client/impl/AMQCommand.java

+20-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -44,17 +44,21 @@ public class AMQCommand implements Command {
4444
/** The assembler for this command - synchronised on - contains all the state */
4545
private final CommandAssembler assembler;
4646

47+
AMQCommand(int maxBodyLength) {
48+
this(null, null, null, maxBodyLength);
49+
}
50+
4751
/** Construct a command ready to fill in by reading frames */
4852
public AMQCommand() {
49-
this(null, null, null);
53+
this(null, null, null, Integer.MAX_VALUE);
5054
}
5155

5256
/**
5357
* Construct a command with just a method, and without header or body.
5458
* @param method the wrapped method
5559
*/
5660
public AMQCommand(com.rabbitmq.client.Method method) {
57-
this(method, null, null);
61+
this(method, null, null, Integer.MAX_VALUE);
5862
}
5963

6064
/**
@@ -64,7 +68,19 @@ public AMQCommand(com.rabbitmq.client.Method method) {
6468
* @param body the message body data
6569
*/
6670
public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, byte[] body) {
67-
this.assembler = new CommandAssembler((Method) method, contentHeader, body);
71+
this.assembler = new CommandAssembler((Method) method, contentHeader, body, Integer.MAX_VALUE);
72+
}
73+
74+
/**
75+
* Construct a command with a specified method, header and body.
76+
* @param method the wrapped method
77+
* @param contentHeader the wrapped content header
78+
* @param body the message body data
79+
* @param maxBodyLength the maximum size for an inbound message body
80+
*/
81+
public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, byte[] body,
82+
int maxBodyLength) {
83+
this.assembler = new CommandAssembler((Method) method, contentHeader, body, maxBodyLength);
6884
}
6985

7086
/** Public API - {@inheritDoc} */

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

+7
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ public static Map<String, Object> defaultClientProperties() {
157157
private volatile ChannelManager _channelManager;
158158
/** Saved server properties field from connection.start */
159159
private volatile Map<String, Object> _serverProperties;
160+
private final int maxInboundMessageBodySize;
160161

161162
/**
162163
* Protected API - respond, in the driver thread, to a ShutdownSignal.
@@ -244,6 +245,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics
244245

245246
this.credentialsRefreshService = params.getCredentialsRefreshService();
246247

248+
247249
this._channel0 = createChannel0();
248250

249251
this._channelManager = null;
@@ -257,6 +259,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics
257259
this.errorOnWriteListener = params.getErrorOnWriteListener() != null ? params.getErrorOnWriteListener() :
258260
(connection, exception) -> { throw exception; }; // we just propagate the exception for non-recoverable connections
259261
this.workPoolTimeout = params.getWorkPoolTimeout();
262+
this.maxInboundMessageBodySize = params.getMaxInboundMessageBodySize();
260263
}
261264

262265
AMQChannel createChannel0() {
@@ -1202,4 +1205,8 @@ public boolean willCheckRpcResponseType() {
12021205
public TrafficListener getTrafficListener() {
12031206
return trafficListener;
12041207
}
1208+
1209+
int getMaxInboundMessageBodySize() {
1210+
return maxInboundMessageBodySize;
1211+
}
12051212
}

src/main/java/com/rabbitmq/client/impl/AbstractFrameHandlerFactory.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
// Copyright (c) 2016-2023 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
116
package com.rabbitmq.client.impl;
217

318
import com.rabbitmq.client.SocketConfigurator;
@@ -10,10 +25,13 @@ public abstract class AbstractFrameHandlerFactory implements FrameHandlerFactory
1025
protected final int connectionTimeout;
1126
protected final SocketConfigurator configurator;
1227
protected final boolean ssl;
28+
protected final int maxInboundMessageBodySize;
1329

14-
protected AbstractFrameHandlerFactory(int connectionTimeout, SocketConfigurator configurator, boolean ssl) {
30+
protected AbstractFrameHandlerFactory(int connectionTimeout, SocketConfigurator configurator,
31+
boolean ssl, int maxInboundMessageBodySize) {
1532
this.connectionTimeout = connectionTimeout;
1633
this.configurator = configurator;
1734
this.ssl = ssl;
35+
this.maxInboundMessageBodySize = maxInboundMessageBodySize;
1836
}
1937
}

src/main/java/com/rabbitmq/client/impl/CommandAssembler.java

+16-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -21,6 +21,7 @@
2121

2222
import com.rabbitmq.client.AMQP;
2323
import com.rabbitmq.client.UnexpectedFrameError;
24+
import static java.lang.String.format;
2425

2526
/**
2627
* Class responsible for piecing together a command from a series of {@link Frame}s.
@@ -52,12 +53,16 @@ private enum CAState {
5253
/** No bytes of content body not yet accumulated */
5354
private long remainingBodyBytes;
5455

55-
public CommandAssembler(Method method, AMQContentHeader contentHeader, byte[] body) {
56+
private final int maxBodyLength;
57+
58+
public CommandAssembler(Method method, AMQContentHeader contentHeader, byte[] body,
59+
int maxBodyLength) {
5660
this.method = method;
5761
this.contentHeader = contentHeader;
58-
this.bodyN = new ArrayList<byte[]>(2);
62+
this.bodyN = new ArrayList<>(2);
5963
this.bodyLength = 0;
6064
this.remainingBodyBytes = 0;
65+
this.maxBodyLength = maxBodyLength;
6166
appendBodyFragment(body);
6267
if (method == null) {
6368
this.state = CAState.EXPECTING_METHOD;
@@ -99,7 +104,14 @@ private void consumeMethodFrame(Frame f) throws IOException {
99104
private void consumeHeaderFrame(Frame f) throws IOException {
100105
if (f.type == AMQP.FRAME_HEADER) {
101106
this.contentHeader = AMQImpl.readContentHeaderFrom(f.getInputStream());
102-
this.remainingBodyBytes = this.contentHeader.getBodySize();
107+
long bodySize = this.contentHeader.getBodySize();
108+
if (bodySize >= this.maxBodyLength) {
109+
throw new IllegalStateException(format(
110+
"Message body is too large (%d), maximum size is %d",
111+
bodySize, this.maxBodyLength
112+
));
113+
}
114+
this.remainingBodyBytes = bodySize;
103115
updateContentBodyState();
104116
} else {
105117
throw new UnexpectedFrameError(f, AMQP.FRAME_HEADER);

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -64,6 +64,8 @@ public class ConnectionParams {
6464

6565
private CredentialsRefreshService credentialsRefreshService;
6666

67+
private int maxInboundMessageBodySize;
68+
6769
public ConnectionParams() {}
6870

6971
public CredentialsProvider getCredentialsProvider() {
@@ -297,4 +299,12 @@ public void setCredentialsRefreshService(CredentialsRefreshService credentialsRe
297299
public CredentialsRefreshService getCredentialsRefreshService() {
298300
return credentialsRefreshService;
299301
}
302+
303+
public int getMaxInboundMessageBodySize() {
304+
return maxInboundMessageBodySize;
305+
}
306+
307+
public void setMaxInboundMessageBodySize(int maxInboundMessageBodySize) {
308+
this.maxInboundMessageBodySize = maxInboundMessageBodySize;
309+
}
300310
}

src/main/java/com/rabbitmq/client/impl/Frame.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -25,6 +25,7 @@
2525
import java.util.Date;
2626
import java.util.List;
2727
import java.util.Map;
28+
import static java.lang.String.format;
2829

2930
/**
3031
* Represents an AMQP wire-protocol frame, with frame type, channel number, and payload bytes.
@@ -82,7 +83,7 @@ public static Frame fromBodyFragment(int channelNumber, byte[] body, int offset,
8283
*
8384
* @return a new Frame if we read a frame successfully, otherwise null
8485
*/
85-
public static Frame readFrom(DataInputStream is) throws IOException {
86+
public static Frame readFrom(DataInputStream is, int maxPayloadSize) throws IOException {
8687
int type;
8788
int channel;
8889

@@ -108,6 +109,12 @@ public static Frame readFrom(DataInputStream is) throws IOException {
108109

109110
channel = is.readUnsignedShort();
110111
int payloadSize = is.readInt();
112+
if (payloadSize >= maxPayloadSize) {
113+
throw new IllegalStateException(format(
114+
"Frame body is too large (%d), maximum size is %d",
115+
payloadSize, maxPayloadSize
116+
));
117+
}
111118
byte[] payload = new byte[payloadSize];
112119
is.readFully(payload);
113120

src/main/java/com/rabbitmq/client/impl/SocketFrameHandler.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -52,22 +52,26 @@ public class SocketFrameHandler implements FrameHandler {
5252
/** Socket's outputstream - data to the broker - synchronized on */
5353
private final DataOutputStream _outputStream;
5454

55+
private final int maxInboundMessageBodySize;
56+
5557
/** Time to linger before closing the socket forcefully. */
5658
public static final int SOCKET_CLOSING_TIMEOUT = 1;
5759

5860
/**
5961
* @param socket the socket to use
6062
*/
6163
public SocketFrameHandler(Socket socket) throws IOException {
62-
this(socket, null);
64+
this(socket, null, Integer.MAX_VALUE);
6365
}
6466

6567
/**
6668
* @param socket the socket to use
6769
*/
68-
public SocketFrameHandler(Socket socket, ExecutorService shutdownExecutor) throws IOException {
70+
public SocketFrameHandler(Socket socket, ExecutorService shutdownExecutor,
71+
int maxInboundMessageBodySize) throws IOException {
6972
_socket = socket;
7073
_shutdownExecutor = shutdownExecutor;
74+
this.maxInboundMessageBodySize = maxInboundMessageBodySize;
7175

7276
_inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
7377
_outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
@@ -181,7 +185,7 @@ public void initialize(AMQConnection connection) {
181185
@Override
182186
public Frame readFrame() throws IOException {
183187
synchronized (_inputStream) {
184-
return Frame.readFrom(_inputStream);
188+
return Frame.readFrom(_inputStream, this.maxInboundMessageBodySize);
185189
}
186190
}
187191

0 commit comments

Comments
 (0)