From c7aef8c53a293073de604525df7e480a2144f5ab Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Tue, 28 May 2019 23:57:36 +0200 Subject: [PATCH 1/2] net: introduce input_buffer_factory concept. Signed-off-by: Radoslaw Zarzynski --- include/seastar/net/api.hh | 16 +++++++++++++++- include/seastar/net/posix-stack.hh | 7 +++---- include/seastar/net/stack.hh | 2 +- src/net/native-stack-impl.hh | 4 ++-- src/net/posix-stack.cc | 16 ++++++++++++---- src/net/stack.cc | 4 ++-- src/net/tls.cc | 4 ++-- 7 files changed, 37 insertions(+), 16 deletions(-) diff --git a/include/seastar/net/api.hh b/include/seastar/net/api.hh index 1fa06158c3..e08619ce1e 100644 --- a/include/seastar/net/api.hh +++ b/include/seastar/net/api.hh @@ -129,6 +129,17 @@ public: void close(); }; +class input_buffer_factory { +public: + virtual ~input_buffer_factory() = default; + /// Provide a rx buffer. Implementation is responsible for determining its size + /// and memory. This is useful when a network stack implementation does not put + /// extra requirements on these factors. The POSIX stack is the example here. + /// \param allocator Memory allocator \c connected_socket implementation prefers. + /// Maybe nullptr. + virtual temporary_buffer create(compat::polymorphic_allocator* allocator) = 0; +}; + } /* namespace net */ /// \addtogroup networking-module @@ -156,7 +167,10 @@ public: /// Gets the input stream. /// /// Gets an object returning data sent from the remote endpoint. - input_stream input(); + /// \param ibf_hint optional factory of rx buffers. The decision + /// whether to use the factory is opt to an implementation of \c + /// connected_socket. + input_stream input(net::input_buffer_factory* ibf_hint = nullptr); /// Gets the output stream. /// /// Gets an object that sends data to the remote endpoint. diff --git a/include/seastar/net/posix-stack.hh b/include/seastar/net/posix-stack.hh index 803f6db626..f47db4f0a0 100644 --- a/include/seastar/net/posix-stack.hh +++ b/include/seastar/net/posix-stack.hh @@ -106,11 +106,10 @@ class posix_data_source_impl final : public data_source_impl { compat::polymorphic_allocator* _buffer_allocator; lw_shared_ptr _fd; temporary_buffer _buf; - size_t _buf_size; + net::input_buffer_factory* _buffer_factory; public: - explicit posix_data_source_impl(lw_shared_ptr fd, compat::polymorphic_allocator* allocator=memory::malloc_allocator, - size_t buf_size = 8192) : _buffer_allocator(allocator), _fd(std::move(fd)), - _buf(make_temporary_buffer(_buffer_allocator, buf_size)), _buf_size(buf_size) {} + explicit posix_data_source_impl(lw_shared_ptr fd, net::input_buffer_factory* ibf, compat::polymorphic_allocator* allocator=memory::malloc_allocator) + : _buffer_allocator(allocator), _fd(std::move(fd)), _buffer_factory(ibf) {} future> get() override; future<> close() override; }; diff --git a/include/seastar/net/stack.hh b/include/seastar/net/stack.hh index d0d4a0a097..e980cf3c53 100644 --- a/include/seastar/net/stack.hh +++ b/include/seastar/net/stack.hh @@ -32,7 +32,7 @@ namespace net { class connected_socket_impl { public: virtual ~connected_socket_impl() {} - virtual data_source source() = 0; + virtual data_source source(input_buffer_factory* ibf = nullptr) = 0; virtual data_sink sink() = 0; virtual void shutdown_input() = 0; virtual void shutdown_output() = 0; diff --git a/src/net/native-stack-impl.hh b/src/net/native-stack-impl.hh index 4be2ca08f6..dd0c7d019c 100644 --- a/src/net/native-stack-impl.hh +++ b/src/net/native-stack-impl.hh @@ -86,7 +86,7 @@ class native_connected_socket_impl : public connected_socket_impl { public: explicit native_connected_socket_impl(lw_shared_ptr conn) : _conn(std::move(conn)) {} - virtual data_source source() override; + virtual data_source source(net::input_buffer_factory*) override; virtual data_sink sink() override; virtual void shutdown_input() override; virtual void shutdown_output() override; @@ -180,7 +180,7 @@ public: }; template -data_source native_connected_socket_impl::source() { +data_source native_connected_socket_impl::source(net::input_buffer_factory*) { return data_source(std::make_unique(_conn)); } diff --git a/src/net/posix-stack.cc b/src/net/posix-stack.cc index 758deb0b32..5c08011788 100644 --- a/src/net/posix-stack.cc +++ b/src/net/posix-stack.cc @@ -116,8 +116,16 @@ class posix_connected_socket_impl final : public connected_socket_impl, posix_co explicit posix_connected_socket_impl(lw_shared_ptr fd, conntrack::handle&& handle, compat::polymorphic_allocator* allocator=memory::malloc_allocator) : _fd(std::move(fd)), _handle(std::move(handle)), _allocator(allocator) {} public: - virtual data_source source() override { - return data_source(std::make_unique< posix_data_source_impl>(_fd, _allocator)); + virtual data_source source(net::input_buffer_factory* ibf) override { + if (!ibf) { + static struct final : input_buffer_factory { + temporary_buffer create(compat::polymorphic_allocator* const allocator) override { + return make_temporary_buffer(allocator, 8192); + } + } default_posix_inbuf_factory{}; + ibf = &default_posix_inbuf_factory; + } + return data_source(std::make_unique(_fd, ibf, _allocator)); } virtual data_sink sink() override { return data_sink(std::make_unique< posix_data_sink_impl>(_fd)); @@ -317,10 +325,10 @@ posix_ap_server_socket_impl::move_connected_socket(socket_address sa, future> posix_data_source_impl::get() { - return _fd->read_some(_buf.get_write(), _buf_size).then([this] (size_t size) { + _buf = _buffer_factory->create(_buffer_allocator); + return _fd->read_some(_buf.get_write(), _buf.size()).then([this] (size_t size) { _buf.trim(size); auto ret = std::move(_buf); - _buf = make_temporary_buffer(_buffer_allocator, _buf_size); return make_ready_future>(std::move(ret)); }); } diff --git a/src/net/stack.cc b/src/net/stack.cc index 4956b02eb9..8111de56c7 100644 --- a/src/net/stack.cc +++ b/src/net/stack.cc @@ -84,8 +84,8 @@ connected_socket& connected_socket::operator=(connected_socket&& cs) noexcept = connected_socket::~connected_socket() {} -input_stream connected_socket::input() { - return input_stream(_csi->source()); +input_stream connected_socket::input(net::input_buffer_factory* const ise) { + return input_stream(_csi->source(ise)); } output_stream connected_socket::output(size_t buffer_size) { diff --git a/src/net/tls.cc b/src/net/tls.cc index fd40900459..c2c5316292 100644 --- a/src/net/tls.cc +++ b/src/net/tls.cc @@ -1050,7 +1050,7 @@ class tls_connected_socket_impl : public net::connected_socket_impl, public sess class source_impl; class sink_impl; - data_source source() override; + data_source source(net::input_buffer_factory*) override; data_sink sink() override; void shutdown_input() override { @@ -1160,7 +1160,7 @@ class tls_socket_impl : public net::socket_impl { } -data_source tls::tls_connected_socket_impl::source() { +data_source tls::tls_connected_socket_impl::source(net::input_buffer_factory*) { return data_source(std::make_unique(_session)); } From 2e5354dd04710260b6f283220c1b07983ba334fe Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Fri, 7 Jun 2019 14:44:36 +0200 Subject: [PATCH 2/2] net: extend input_buffer_factory with passing back of unused buffers. Signed-off-by: Radoslaw Zarzynski --- include/seastar/net/api.hh | 7 ++++++- src/net/posix-stack.cc | 12 ++++++++---- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/include/seastar/net/api.hh b/include/seastar/net/api.hh index e08619ce1e..49592f4df7 100644 --- a/include/seastar/net/api.hh +++ b/include/seastar/net/api.hh @@ -131,13 +131,18 @@ public: class input_buffer_factory { public: + using buffer_t = temporary_buffer; + virtual ~input_buffer_factory() = default; /// Provide a rx buffer. Implementation is responsible for determining its size /// and memory. This is useful when a network stack implementation does not put /// extra requirements on these factors. The POSIX stack is the example here. /// \param allocator Memory allocator \c connected_socket implementation prefers. /// Maybe nullptr. - virtual temporary_buffer create(compat::polymorphic_allocator* allocator) = 0; + virtual buffer_t create(compat::polymorphic_allocator* allocator) = 0; + + // Give back to the factory unused part of a buffer obtained from it + virtual void return_unused(buffer_t&& buf) = 0; }; } /* namespace net */ diff --git a/src/net/posix-stack.cc b/src/net/posix-stack.cc index 5c08011788..2c27ea9e06 100644 --- a/src/net/posix-stack.cc +++ b/src/net/posix-stack.cc @@ -119,9 +119,11 @@ class posix_connected_socket_impl final : public connected_socket_impl, posix_co virtual data_source source(net::input_buffer_factory* ibf) override { if (!ibf) { static struct final : input_buffer_factory { - temporary_buffer create(compat::polymorphic_allocator* const allocator) override { + buffer_t create(compat::polymorphic_allocator* const allocator) override { return make_temporary_buffer(allocator, 8192); } + void return_unused(buffer_t&&) override { + } } default_posix_inbuf_factory{}; ibf = &default_posix_inbuf_factory; } @@ -327,9 +329,11 @@ future> posix_data_source_impl::get() { _buf = _buffer_factory->create(_buffer_allocator); return _fd->read_some(_buf.get_write(), _buf.size()).then([this] (size_t size) { - _buf.trim(size); - auto ret = std::move(_buf); - return make_ready_future>(std::move(ret)); + if (size < _buf.size()) { + _buffer_factory->return_unused(_buf.share(size, _buf.size() - size)); + _buf.trim(size); + } + return make_ready_future>(std::move(_buf)); }); }