Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DNM, future: use tagged pointer for tracking future_state #11

Open
wants to merge 4 commits into
base: ceph-octopus-19.06.0-206-g39d139e2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 137 additions & 44 deletions include/seastar/core/future.hh
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,11 @@ struct future_state : public future_state_base, private internal::uninitialized
static_assert(sizeof(future_state<>) <= 8, "future_state<> is too large");
static_assert(sizeof(future_state<long>) <= 16, "future_state<long> is too large");

namespace internal {
class stateptr_t;
class taskable_stateptr_t;
}

template <typename... T>
class continuation_base : public task {
protected:
Expand All @@ -413,6 +418,8 @@ public:
friend class internal::promise_base_with_type<T...>;
friend class promise<T...>;
friend class future<T...>;
friend class internal::stateptr_t;
friend class internal::taskable_stateptr_t;
};

template <typename Func, typename... T>
Expand All @@ -433,21 +440,107 @@ void set_callback(future<T...>& fut, std::unique_ptr<U> callback);

class future_base;

// Tagged pointer to track state and derive _future and maybe even
// _task.
struct stateptr_t {
protected:
struct nopctor_t{};
stateptr_t(nopctor_t) {};
public:
enum class location : uintptr_t {
promise = 0,
future = 1,
continuation = 2 // not for SEASTAR_COROUTINES_TS but it looks that,
// at least for crimson, this is disabled.
};
static constexpr uintptr_t location_mask = 0x3;
static constexpr uintptr_t state_mask = ~location_mask;

static_assert(alignof(future_state_base) >= 4);
uintptr_t _state_and_location;

public:
template <class... T>
stateptr_t(class promise<T...>& pr) {
_state_and_location = \
reinterpret_cast<uintptr_t>(&pr._local_state) | static_cast<uintptr_t>(location::promise);
}
template <class... T>
stateptr_t(class future<T...>& fut) {
static_assert(alignof(future<T...>) >= 4);
_state_and_location = \
reinterpret_cast<uintptr_t>(&fut._state) | static_cast<uintptr_t>(location::future);
}

future_state_base* get_state() {
return reinterpret_cast<future_state_base*>(_state_and_location & state_mask);
}

template <class... T>
future_base* get_dependent_future() {
future_base* ret = nullptr;
if (_state_and_location & static_cast<uintptr_t>(location::future)) {
ret = reinterpret_cast<future_base*>((_state_and_location & state_mask) - offsetof(future<>, _state));
}
return ret;
}

void clear() {
_state_and_location = 0;
}
};

struct taskable_stateptr_t : stateptr_t {
template <class... T>
taskable_stateptr_t(class std::unique_ptr<continuation_base<T...>> cont) : stateptr_t(nopctor_t{}) {
_state_and_location = \
reinterpret_cast<uintptr_t>(&cont.release()->_state) | static_cast<uintptr_t>(location::continuation);
}

taskable_stateptr_t(const taskable_stateptr_t&) = delete;
taskable_stateptr_t(taskable_stateptr_t&& other) : stateptr_t(nopctor_t{}) {
_state_and_location = other._state_and_location;
other.clear();
}
taskable_stateptr_t(stateptr_t&& other) : stateptr_t(nopctor_t{}) {
_state_and_location = other._state_and_location;
}
void operator=(stateptr_t&& other) {
_state_and_location = other._state_and_location;
}
void operator=(taskable_stateptr_t&& other) {
_state_and_location = other._state_and_location;
other.clear();
}

std::unique_ptr<task> release_dependent_task() {
// FIXME: offsetof() on continuation_base<>
return std::unique_ptr<task>{_state_and_location & static_cast<uintptr_t>(location::continuation) ?
reinterpret_cast<task*>((std::exchange(_state_and_location, 0) & state_mask) - offsetof(continuation_base<>, _state)) : nullptr
};
}

~taskable_stateptr_t() {
if (_state_and_location & static_cast<uintptr_t>(location::continuation)) {
delete reinterpret_cast<task*>(_state_and_location & state_mask);
}
}
};

class promise_base {
protected:
enum class urgent { no, yes };
future_base* _future = nullptr;

// This points to the future_state that is currently being
// used. See comment above the future_state struct definition for
// details.
future_state_base* _state;

std::unique_ptr<task> _task;
// details. It also holds the information where the state is
// actually stored allowing to derive corresponding future without
// extra pointer.
taskable_stateptr_t _state_tracker;

promise_base(const promise_base&) = delete;
promise_base(future_state_base* state) noexcept : _state(state) {}
promise_base(future_base* future, future_state_base* state) noexcept;
promise_base(stateptr_t&& state_tracker) noexcept;
promise_base(future_base&, stateptr_t&& state_tracker) noexcept;
promise_base(promise_base&& x) noexcept;

// We never need to destruct this polymorphicly, so we can make it
Expand All @@ -462,8 +555,8 @@ protected:
void make_ready() noexcept;

void set_exception(std::exception_ptr&& ex) noexcept {
if (_state) {
_state->set_exception(std::move(ex));
if (auto* state = _state_tracker.get_state(); state) {
state->set_exception(std::move(ex));
make_ready<urgent::no>();
} else {
// We get here if promise::get_future is called and the
Expand Down Expand Up @@ -500,12 +593,12 @@ template <typename... T>
class promise_base_with_type : protected internal::promise_base {
protected:
future_state<T...>* get_state() {
return static_cast<future_state<T...>*>(_state);
return static_cast<future_state<T...>*>(_state_tracker.get_state());
}
static constexpr bool copy_noexcept = future_state<T...>::copy_noexcept;
public:
promise_base_with_type(future_state_base* state) noexcept : promise_base(state) { }
promise_base_with_type(future<T...>* future) noexcept : promise_base(future, &future->_state) { }
promise_base_with_type(future_base& fut, stateptr_t&& state_tracker) noexcept : promise_base(fut, std::move(state_tracker)) { }
promise_base_with_type(stateptr_t&& state_tracker) noexcept : promise_base(std::move(state_tracker)) { }
promise_base_with_type(promise_base_with_type&& x) noexcept : promise_base(std::move(x)) { }
promise_base_with_type(const promise_base_with_type&) = delete;
promise_base_with_type& operator=(promise_base_with_type&& x) noexcept {
Expand All @@ -516,16 +609,16 @@ public:
void operator=(const promise_base_with_type&) = delete;

void set_urgent_state(future_state<T...>&& state) noexcept {
if (_state) {
*get_state() = std::move(state);
if (auto* s = get_state(); s) {
*s = std::move(state);
make_ready<urgent::yes>();
}
}

template <typename... A>
void set_value(A&&... a) {
if (auto *s = get_state()) {
s->set(std::forward<A>(a)...);
if (auto* state = get_state(); state) {
state->set(std::forward<A>(a)...);
make_ready<urgent::no>();
}
}
Expand All @@ -539,13 +632,11 @@ public:
private:
template <typename Func>
void schedule(Func&& func) {
auto tws = std::make_unique<continuation<Func, T...>>(std::move(func));
_state = &tws->_state;
_task = std::move(tws);
auto tws = std::unique_ptr<continuation_base<T...>>(new continuation<Func, T...>(std::move(func)));
_state_tracker = taskable_stateptr_t{std::move(tws)};
}
void schedule(std::unique_ptr<continuation_base<T...>> callback) {
_state = &callback->_state;
_task = std::move(callback);
_state_tracker = taskable_stateptr_t{std::move(callback)};
}

template <typename... U>
Expand All @@ -569,7 +660,7 @@ public:
/// \brief Constructs an empty \c promise.
///
/// Creates promise with no associated future yet (see get_future()).
promise() noexcept : internal::promise_base_with_type<T...>(&_local_state) {}
promise() noexcept : internal::promise_base_with_type<T...>(internal::stateptr_t{*this}) {}

/// \brief Moves a \c promise object.
promise(promise&& x) noexcept;
Expand Down Expand Up @@ -630,6 +721,7 @@ public:

template <typename... U>
friend class future;
friend class internal::stateptr_t;
};

/// \brief Specialization of \c promise<void>
Expand Down Expand Up @@ -777,16 +869,14 @@ class future_base {
protected:
promise_base* _promise;
future_base() noexcept : _promise(nullptr) {}
future_base(promise_base* promise, future_state_base* state) noexcept : _promise(promise) {
_promise->_future = this;
_promise->_state = state;
future_base(promise_base* promise, stateptr_t&& state_tracker) noexcept : _promise(promise) {
_promise->_state_tracker = std::move(state_tracker);
}

future_base(future_base&& x, future_state_base* state) noexcept : _promise(x._promise) {
future_base(future_base&& x, stateptr_t&& state_tracker) noexcept : _promise(x._promise) {
if (auto* p = _promise) {
x.detach_promise();
p->_future = this;
p->_state = state;
p->_state_tracker = std::move(state_tracker);
}
}
~future_base() noexcept {
Expand All @@ -796,8 +886,7 @@ protected:
}

promise_base* detach_promise() noexcept {
_promise->_state = nullptr;
_promise->_future = nullptr;
_promise->_state_tracker.clear();
return std::exchange(_promise, nullptr);
}

Expand Down Expand Up @@ -855,7 +944,7 @@ private:
// future_state.
future(future_for_get_promise_marker m) { }

future(promise<T...>* pr) noexcept : future_base(pr, &_state), _state(std::move(pr->_local_state)) { }
future(promise<T...>* pr) noexcept : future_base(pr, internal::stateptr_t{*this}), _state(std::move(pr->_local_state)) { }
template <typename... A>
future(ready_future_marker m, A&&... a) : _state(m, std::forward<A>(a)...) { }
future(exception_future_marker m, std::exception_ptr ex) noexcept : _state(m, std::move(ex)) { }
Expand All @@ -866,7 +955,7 @@ private:
}
internal::promise_base_with_type<T...> get_promise() noexcept {
assert(!_promise);
return internal::promise_base_with_type<T...>(this);
return internal::promise_base_with_type<T...>(*this, internal::stateptr_t{*this});
}
internal::promise_base_with_type<T...>* detach_promise() {
return static_cast<internal::promise_base_with_type<T...>*>(future_base::detach_promise());
Expand Down Expand Up @@ -930,7 +1019,7 @@ public:
using promise_type = promise<T...>;
/// \brief Moves the future into a new object.
[[gnu::always_inline]]
future(future&& x) noexcept : future_base(std::move(x), &_state), _state(std::move(x._state)) { }
future(future&& x) noexcept : future_base(std::move(x), internal::stateptr_t{*this}), _state(std::move(x._state)) { }
future(const future&) = delete;
future& operator=(future&& x) noexcept {
this->~future();
Expand Down Expand Up @@ -1159,7 +1248,7 @@ public:
void forward_to(promise<T...>&& pr) noexcept {
if (_state.available()) {
pr.set_urgent_state(std::move(_state));
} else if (&pr._local_state != pr._state) {
} else if (&pr._local_state != pr._state_tracker.get_state()) {
// The only case when _state points to _local_state is
// when get_future was never called. Given that pr will
// soon be destroyed, we know get_future will never be
Expand Down Expand Up @@ -1350,6 +1439,7 @@ private:
friend class promise;
template <typename... U>
friend class internal::promise_base_with_type;
friend class internal::stateptr_t;
template <typename... U, typename... A>
friend future<U...> make_ready_future(A&&... value);
template <typename... U>
Expand All @@ -1361,37 +1451,40 @@ private:
/// \endcond
};

inline internal::promise_base::promise_base(future_base* future, future_state_base* state) noexcept
: _future(future), _state(state) {
_future->_promise = this;
inline internal::promise_base::promise_base(stateptr_t&& state_tracker) noexcept
: _state_tracker(std::move(state_tracker)) {
}

inline internal::promise_base::promise_base(future_base& fut, stateptr_t&& state_tracker) noexcept
: _state_tracker(std::move(state_tracker)) {
fut._promise = this;
}

template <typename... T>
inline
future<T...>
promise<T...>::get_future() noexcept {
assert(!this->_future && this->_state && !this->_task);
assert(!this->_state_tracker.get_dependent_future() && this->_state_tracker.get_state());
return future<T...>(this);
}

template<internal::promise_base::urgent Urgent>
inline
void internal::promise_base::make_ready() noexcept {
if (_task) {
_state = nullptr;
if (auto task = _state_tracker.release_dependent_task(); task) {
if (Urgent == urgent::yes && !need_preempt()) {
::seastar::schedule_urgent(std::move(_task));
::seastar::schedule_urgent(std::move(task));
} else {
::seastar::schedule(std::move(_task));
::seastar::schedule(std::move(task));
}
}
}

template <typename... T>
inline
promise<T...>::promise(promise&& x) noexcept : internal::promise_base_with_type<T...>(std::move(x)) {
if (this->_state == &x._local_state) {
this->_state = &_local_state;
if (this->_state_tracker.get_state() == &x._local_state) {
this->_state_tracker = internal::stateptr_t{*this};
_local_state = std::move(x._local_state);
}
}
Expand Down
22 changes: 11 additions & 11 deletions src/core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5854,23 +5854,23 @@ void report_failed_future(const std::exception_ptr& eptr) noexcept {
broken_promise::broken_promise() : logic_error("broken promise") { }

promise_base::promise_base(promise_base&& x) noexcept
: _future(x._future), _state(x._state), _task(std::move(x._task)) {
x._state = nullptr;
if (auto* fut = _future) {
: _state_tracker(std::move(x._state_tracker)) {
x._state_tracker.clear();
if (auto* fut = _state_tracker.get_dependent_future(); fut) {
fut->detach_promise();
fut->_promise = this;
}
}

promise_base::~promise_base() noexcept {
if (_future) {
assert(_state);
assert(_state->available() || !_task);
_future->detach_promise();
} else if (__builtin_expect(bool(_task), false)) {
assert(_state && !_state->available());
_state->set_to_broken_promise();
::seastar::schedule(std::move(_task));
if (auto* future = _state_tracker.get_dependent_future(); future) {
assert(_state_tracker.get_state()->available());
future->detach_promise();
} else if (auto task = _state_tracker.release_dependent_task(); __builtin_expect(bool(task), false)) {
auto* state = _state_tracker.get_state();
assert(state && !state->available());
state->set_to_broken_promise();
::seastar::schedule(std::move(task));
}
}

Expand Down