Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net: introduce input_buffer_factory concept. #6

Open
wants to merge 2 commits into
base: ceph-octopus
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion include/seastar/net/api.hh
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,22 @@ public:
void close();
};

class input_buffer_factory {
public:
using buffer_t = temporary_buffer<char>;

virtual ~input_buffer_factory() = default;
/// Provide a rx buffer. Implementation is responsible for determining its size
/// and memory. This is useful when a network stack implementation does not put
/// extra requirements on these factors. The POSIX stack is the example here.
/// \param allocator Memory allocator \c connected_socket implementation prefers.
/// Maybe nullptr.
virtual buffer_t create(compat::polymorphic_allocator<char>* allocator) = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this is not the final implementation, just for the record:

For input_buffer_factory of v2 protocol, return seastar::make_temporary_buffer<char>(allocator, get_current_msg_size() + FRAME_PLAIN_EPILOGUE_SIZE + FRAME_PREAMBLE_SIZE + 41) would be too simple, because it is the 3rd segment (DATA) which needs to be page-aligned.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're entirely correct. We would need to allocate a bigger buffer and ::trim_front it accordingly.


// Give back to the factory unused part of a buffer obtained from it
virtual void return_unused(buffer_t&& buf) = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about how to make use of the unused tail of the rx buffer, feed back to the data_souce later?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that's the idea. For huge chunks even the POSIX stack tends to fragment the output. This is likely because of the readable(fd) triggering just when something becomes available to read:

future<size_t>
reactor::read_some(pollable_fd_state& fd, void* buffer, size_t len) {
    return readable(fd).then([this, &fd, buffer, len] () mutable {
        auto r = fd.fd.read(buffer, len);
        // ...
}

};

} /* namespace net */

/// \addtogroup networking-module
Expand Down Expand Up @@ -156,7 +172,10 @@ public:
/// Gets the input stream.
///
/// Gets an object returning data sent from the remote endpoint.
input_stream<char> input();
/// \param ibf_hint optional factory of rx buffers. The decision
/// whether to use the factory is opt to an implementation of \c
/// connected_socket.
input_stream<char> input(net::input_buffer_factory* ibf_hint = nullptr);
/// Gets the output stream.
///
/// Gets an object that sends data to the remote endpoint.
Expand Down
7 changes: 3 additions & 4 deletions include/seastar/net/posix-stack.hh
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,10 @@ class posix_data_source_impl final : public data_source_impl {
compat::polymorphic_allocator<char>* _buffer_allocator;
lw_shared_ptr<pollable_fd> _fd;
temporary_buffer<char> _buf;
size_t _buf_size;
net::input_buffer_factory* _buffer_factory;
public:
explicit posix_data_source_impl(lw_shared_ptr<pollable_fd> fd, compat::polymorphic_allocator<char>* allocator=memory::malloc_allocator,
size_t buf_size = 8192) : _buffer_allocator(allocator), _fd(std::move(fd)),
_buf(make_temporary_buffer<char>(_buffer_allocator, buf_size)), _buf_size(buf_size) {}
explicit posix_data_source_impl(lw_shared_ptr<pollable_fd> fd, net::input_buffer_factory* ibf, compat::polymorphic_allocator<char>* allocator=memory::malloc_allocator)
: _buffer_allocator(allocator), _fd(std::move(fd)), _buffer_factory(ibf) {}
future<temporary_buffer<char>> get() override;
future<> close() override;
};
Expand Down
2 changes: 1 addition & 1 deletion include/seastar/net/stack.hh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace net {
class connected_socket_impl {
public:
virtual ~connected_socket_impl() {}
virtual data_source source() = 0;
virtual data_source source(input_buffer_factory* ibf = nullptr) = 0;
virtual data_sink sink() = 0;
virtual void shutdown_input() = 0;
virtual void shutdown_output() = 0;
Expand Down
4 changes: 2 additions & 2 deletions src/net/native-stack-impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class native_connected_socket_impl : public connected_socket_impl {
public:
explicit native_connected_socket_impl(lw_shared_ptr<typename Protocol::connection> conn)
: _conn(std::move(conn)) {}
virtual data_source source() override;
virtual data_source source(net::input_buffer_factory*) override;
virtual data_sink sink() override;
virtual void shutdown_input() override;
virtual void shutdown_output() override;
Expand Down Expand Up @@ -180,7 +180,7 @@ public:
};

template <typename Protocol>
data_source native_connected_socket_impl<Protocol>::source() {
data_source native_connected_socket_impl<Protocol>::source(net::input_buffer_factory*) {
return data_source(std::make_unique<native_data_source_impl>(_conn));
}

Expand Down
26 changes: 19 additions & 7 deletions src/net/posix-stack.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,18 @@ class posix_connected_socket_impl final : public connected_socket_impl, posix_co
explicit posix_connected_socket_impl(lw_shared_ptr<pollable_fd> fd, conntrack::handle&& handle,
compat::polymorphic_allocator<char>* allocator=memory::malloc_allocator) : _fd(std::move(fd)), _handle(std::move(handle)), _allocator(allocator) {}
public:
virtual data_source source() override {
return data_source(std::make_unique< posix_data_source_impl>(_fd, _allocator));
virtual data_source source(net::input_buffer_factory* ibf) override {
if (!ibf) {
static struct final : input_buffer_factory {
buffer_t create(compat::polymorphic_allocator<char>* const allocator) override {
return make_temporary_buffer<char>(allocator, 8192);
}
void return_unused(buffer_t&&) override {
}
} default_posix_inbuf_factory{};
ibf = &default_posix_inbuf_factory;
}
return data_source(std::make_unique<posix_data_source_impl>(_fd, ibf, _allocator));
}
virtual data_sink sink() override {
return data_sink(std::make_unique< posix_data_sink_impl>(_fd));
Expand Down Expand Up @@ -317,11 +327,13 @@ posix_ap_server_socket_impl<Transport>::move_connected_socket(socket_address sa,

future<temporary_buffer<char>>
posix_data_source_impl::get() {
return _fd->read_some(_buf.get_write(), _buf_size).then([this] (size_t size) {
_buf.trim(size);
auto ret = std::move(_buf);
_buf = make_temporary_buffer<char>(_buffer_allocator, _buf_size);
return make_ready_future<temporary_buffer<char>>(std::move(ret));
_buf = _buffer_factory->create(_buffer_allocator);
return _fd->read_some(_buf.get_write(), _buf.size()).then([this] (size_t size) {
if (size < _buf.size()) {
_buffer_factory->return_unused(_buf.share(size, _buf.size() - size));
_buf.trim(size);
}
return make_ready_future<temporary_buffer<char>>(std::move(_buf));
});
}

Expand Down
4 changes: 2 additions & 2 deletions src/net/stack.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ connected_socket& connected_socket::operator=(connected_socket&& cs) noexcept =
connected_socket::~connected_socket()
{}

input_stream<char> connected_socket::input() {
return input_stream<char>(_csi->source());
input_stream<char> connected_socket::input(net::input_buffer_factory* const ise) {
return input_stream<char>(_csi->source(ise));
}

output_stream<char> connected_socket::output(size_t buffer_size) {
Expand Down
4 changes: 2 additions & 2 deletions src/net/tls.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@ class tls_connected_socket_impl : public net::connected_socket_impl, public sess
class source_impl;
class sink_impl;

data_source source() override;
data_source source(net::input_buffer_factory*) override;
data_sink sink() override;

void shutdown_input() override {
Expand Down Expand Up @@ -1160,7 +1160,7 @@ class tls_socket_impl : public net::socket_impl {

}

data_source tls::tls_connected_socket_impl::source() {
data_source tls::tls_connected_socket_impl::source(net::input_buffer_factory*) {
return data_source(std::make_unique<source_impl>(_session));
}

Expand Down