Skip to content

Commit

Permalink
Synchronization: Add support for true relative timeouts using
Browse files Browse the repository at this point in the history
monotonic clocks on Linux when the implementation uses futexes

After this change, when synchronization methods that wait are passed
an absl::Duration to limit the wait time, these methods will wait for
that interval, even if the system clock is changed (subject to any
limitations with how CLOCK_MONOTONIC keeps track of time). In other
words, an observer measuring the time with a stop watch will now see
the correct interval, even if the system clock is changed. Previously,
the duration was added to the current time, and methods would wait
until that time was reached on the possibly changed realtime system
clock.

The behavior of the synchronization methods that take an absl::Time is
unchanged.  These methods always wait until the absolute point in time
is reached and respect changes to the system clock. In other words, an
observer will always see the timeout occur when a wall clock reaches
that time, even if the clock is manipulated externally.

Note: ABSL_PREDICT_FALSE was removed from the error case in Futex as
timeouts are handled by this case, and timeouts are part of normal
operation.
PiperOrigin-RevId: 510405347
Change-Id: I0b3ea390de97014cfa353079ae2e0c1c637aca69
  • Loading branch information
derekmauro authored and copybara-github committed Feb 17, 2023
1 parent 0372af1 commit ed37a45
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 47 deletions.
70 changes: 47 additions & 23 deletions absl/synchronization/internal/futex.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@

#include "absl/base/config.h"

#ifdef _WIN32
#include <windows.h>
#else
#ifndef _WIN32
#include <sys/time.h>
#include <unistd.h>
#endif
Expand Down Expand Up @@ -85,34 +83,60 @@ namespace synchronization_internal {

class FutexImpl {
public:
static int WaitUntil(std::atomic<int32_t> *v, int32_t val,
// Atomically check that `*v == val`, and if it is, then sleep until the
// timeout `t` has been reached, or until woken by `Wake()`.
static int WaitUntil(std::atomic<int32_t>* v, int32_t val,
KernelTimeout t) {
long err = 0; // NOLINT(runtime/int)
if (t.has_timeout()) {
// https://locklessinc.com/articles/futex_cheat_sheet/
// Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time.
struct timespec abs_timeout = t.MakeAbsTimespec();
// Atomically check that the futex value is still 0, and if it
// is, sleep until abs_timeout or until woken by FUTEX_WAKE.
err = syscall(
SYS_futex, reinterpret_cast<int32_t *>(v),
FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME, val,
&abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY);
if (!t.has_timeout()) {
return Wait(v, val);
} else if (t.is_absolute_timeout()) {
auto abs_timespec = t.MakeAbsTimespec();
return WaitAbsoluteTimeout(v, val, &abs_timespec);
} else {
// Atomically check that the futex value is still 0, and if it
// is, sleep until woken by FUTEX_WAKE.
err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v),
FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, nullptr);
auto rel_timespec = t.MakeRelativeTimespec();
return WaitRelativeTimeout(v, val, &rel_timespec);
}
if (ABSL_PREDICT_FALSE(err != 0)) {
}

// Atomically check that `*v == val`, and if it is, then sleep until the until
// woken by `Wake()`.
static int Wait(std::atomic<int32_t>* v, int32_t val) {
return WaitAbsoluteTimeout(v, val, nullptr);
}

// Atomically check that `*v == val`, and if it is, then sleep until
// CLOCK_REALTIME reaches `*abs_timeout`, or until woken by `Wake()`.
static int WaitAbsoluteTimeout(std::atomic<int32_t>* v, int32_t val,
const struct timespec* abs_timeout) {
// https://locklessinc.com/articles/futex_cheat_sheet/
// Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time.
auto err =
syscall(SYS_futex, reinterpret_cast<int32_t*>(v),
FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME,
val, abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY);
if (err != 0) {
return -errno;
}
return 0;
}

// Atomically check that `*v == val`, and if it is, then sleep until
// `*rel_timeout` has elapsed, or until woken by `Wake()`.
static int WaitRelativeTimeout(std::atomic<int32_t>* v, int32_t val,
const struct timespec* rel_timeout) {
// Atomically check that the futex value is still 0, and if it
// is, sleep until abs_timeout or until woken by FUTEX_WAKE.
auto err = syscall(SYS_futex, reinterpret_cast<int32_t*>(v),
FUTEX_PRIVATE_FLAG, val, rel_timeout);
if (err != 0) {
return -errno;
}
return 0;
}

static int Wake(std::atomic<int32_t> *v, int32_t count) {
// NOLINTNEXTLINE(runtime/int)
long err = syscall(SYS_futex, reinterpret_cast<int32_t*>(v),
// Wakes at most `count` waiters that have entered the sleep state on `v`.
static int Wake(std::atomic<int32_t>* v, int32_t count) {
auto err = syscall(SYS_futex, reinterpret_cast<int32_t*>(v),
FUTEX_WAKE | FUTEX_PRIVATE_FLAG, count);
if (ABSL_PREDICT_FALSE(err < 0)) {
return -errno;
Expand Down
106 changes: 101 additions & 5 deletions absl/synchronization/internal/waiter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,9 @@ static void MaybeBecomeIdle() {

#if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX

Waiter::Waiter() {
futex_.store(0, std::memory_order_relaxed);
}
Waiter::Waiter() : futex_(0) {}

bool Waiter::Wait(KernelTimeout t) {
bool Waiter::WaitAbsoluteTimeout(KernelTimeout t) {
// Loop until we can atomically decrement futex from a positive
// value, waiting on a futex while we believe it is zero.
// Note that, since the thread ticker is just reset, we don't need to check
Expand All @@ -90,7 +88,88 @@ bool Waiter::Wait(KernelTimeout t) {
}

if (!first_pass) MaybeBecomeIdle();
const int err = Futex::WaitUntil(&futex_, 0, t);
auto abs_timeout = t.MakeAbsTimespec();
const int err = Futex::WaitAbsoluteTimeout(&futex_, 0, &abs_timeout);
if (err != 0) {
if (err == -EINTR || err == -EWOULDBLOCK) {
// Do nothing, the loop will retry.
} else if (err == -ETIMEDOUT) {
return false;
} else {
ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
}
}
first_pass = false;
}
}

#ifdef CLOCK_MONOTONIC

// Subtracts the timespec `sub` from `in` if the result would not be negative,
// and returns true. Returns false if the result would be negative, and leaves
// `in` unchanged.
static bool TimespecSubtract(struct timespec& in, const struct timespec& sub) {
if (in.tv_sec < sub.tv_sec) {
return false;
}
if (in.tv_nsec < sub.tv_nsec) {
if (in.tv_sec == sub.tv_sec) {
return false;
}
// Borrow from tv_sec.
in.tv_sec -= 1;
in.tv_nsec += 1'000'000'000;
}
in.tv_sec -= sub.tv_sec;
in.tv_nsec -= sub.tv_nsec;
return true;
}

// On some platforms a background thread periodically calls `Poke()` to briefly
// wake waiter threads so that they may call `MaybeBecomeIdle()`. This means
// that `WaitRelativeTimeout()` differs slightly from `WaitAbsoluteTimeout()`
// because it must adjust the timeout by the amount of time that it has already
// slept.
bool Waiter::WaitRelativeTimeout(KernelTimeout t) {
struct timespec start;
ABSL_RAW_CHECK(clock_gettime(CLOCK_MONOTONIC, &start) == 0,
"clock_gettime() failed");

// Loop until we can atomically decrement futex from a positive
// value, waiting on a futex while we believe it is zero.
// Note that, since the thread ticker is just reset, we don't need to check
// whether the thread is idle on the very first pass of the loop.
bool first_pass = true;

while (true) {
int32_t x = futex_.load(std::memory_order_relaxed);
while (x != 0) {
if (!futex_.compare_exchange_weak(x, x - 1,
std::memory_order_acquire,
std::memory_order_relaxed)) {
continue; // Raced with someone, retry.
}
return true; // Consumed a wakeup, we are done.
}

auto relative_timeout = t.MakeRelativeTimespec();
if (!first_pass) {
MaybeBecomeIdle();

// Adjust relative_timeout for `Poke()`s.
struct timespec now;
ABSL_RAW_CHECK(clock_gettime(CLOCK_MONOTONIC, &now) == 0,
"clock_gettime() failed");
// If TimespecSubstract(now, start) returns false, then the clock isn't
// truly monotonic.
if (TimespecSubtract(now, start)) {
if (!TimespecSubtract(relative_timeout, now)) {
return false; // Timeout.
}
}
}

const int err = Futex::WaitRelativeTimeout(&futex_, 0, &relative_timeout);
if (err != 0) {
if (err == -EINTR || err == -EWOULDBLOCK) {
// Do nothing, the loop will retry.
Expand All @@ -104,6 +183,23 @@ bool Waiter::Wait(KernelTimeout t) {
}
}

#else // CLOCK_MONOTONIC

// No support for CLOCK_MONOTONIC.
// KernelTimeout will automatically convert to an absolute timeout.
bool Waiter::WaitRelativeTimeout(KernelTimeout t) {
return WaitAbsoluteTimeout(t);
}

#endif // CLOCK_MONOTONIC

bool Waiter::Wait(KernelTimeout t) {
if (t.is_absolute_timeout()) {
return WaitAbsoluteTimeout(t);
}
return WaitRelativeTimeout(t);
}

void Waiter::Post() {
if (futex_.fetch_add(1, std::memory_order_release) == 0) {
// We incremented from 0, need to wake a potential waiter.
Expand Down
3 changes: 3 additions & 0 deletions absl/synchronization/internal/waiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ class Waiter {
~Waiter() = delete;

#if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX
bool WaitAbsoluteTimeout(KernelTimeout t);
bool WaitRelativeTimeout(KernelTimeout t);

// Futexes are defined by specification to be 32-bits.
// Thus std::atomic<int32_t> must be just an int32_t with lockfree methods.
std::atomic<int32_t> futex_;
Expand Down
45 changes: 26 additions & 19 deletions absl/synchronization/mutex.cc
Original file line number Diff line number Diff line change
Expand Up @@ -635,21 +635,6 @@ void Mutex::InternalAttemptToUseMutexInFatalSignalHandler() {
std::memory_order_release);
}

// --------------------------time support

// Return the current time plus the timeout. Use the same clock as
// PerThreadSem::Wait() for consistency. Unfortunately, we don't have
// such a choice when a deadline is given directly.
static absl::Time DeadlineFromTimeout(absl::Duration timeout) {
#ifndef _WIN32
struct timeval tv;
gettimeofday(&tv, nullptr);
return absl::TimeFromTimeval(tv) + timeout;
#else
return absl::Now() + timeout;
#endif
}

// --------------------------Mutexes

// In the layout below, the msb of the bottom byte is currently unused. Also,
Expand Down Expand Up @@ -1546,7 +1531,13 @@ void Mutex::LockWhen(const Condition &cond) {
}

bool Mutex::LockWhenWithTimeout(const Condition &cond, absl::Duration timeout) {
return LockWhenWithDeadline(cond, DeadlineFromTimeout(timeout));
ABSL_TSAN_MUTEX_PRE_LOCK(this, 0);
GraphId id = DebugOnlyDeadlockCheck(this);
bool res = LockSlowWithDeadline(kExclusive, &cond,
KernelTimeout(timeout), 0);
DebugOnlyLockEnter(this, id);
ABSL_TSAN_MUTEX_POST_LOCK(this, 0, 0);
return res;
}

bool Mutex::LockWhenWithDeadline(const Condition &cond, absl::Time deadline) {
Expand All @@ -1569,7 +1560,12 @@ void Mutex::ReaderLockWhen(const Condition &cond) {

bool Mutex::ReaderLockWhenWithTimeout(const Condition &cond,
absl::Duration timeout) {
return ReaderLockWhenWithDeadline(cond, DeadlineFromTimeout(timeout));
ABSL_TSAN_MUTEX_PRE_LOCK(this, __tsan_mutex_read_lock);
GraphId id = DebugOnlyDeadlockCheck(this);
bool res = LockSlowWithDeadline(kShared, &cond, KernelTimeout(timeout), 0);
DebugOnlyLockEnter(this, id);
ABSL_TSAN_MUTEX_POST_LOCK(this, __tsan_mutex_read_lock, 0);
return res;
}

bool Mutex::ReaderLockWhenWithDeadline(const Condition &cond,
Expand All @@ -1594,7 +1590,18 @@ void Mutex::Await(const Condition &cond) {
}

bool Mutex::AwaitWithTimeout(const Condition &cond, absl::Duration timeout) {
return AwaitWithDeadline(cond, DeadlineFromTimeout(timeout));
if (cond.Eval()) { // condition already true; nothing to do
if (kDebugMode) {
this->AssertReaderHeld();
}
return true;
}

KernelTimeout t{timeout};
bool res = this->AwaitCommon(cond, t);
ABSL_RAW_CHECK(res || t.has_timeout(),
"condition untrue on return from Await");
return res;
}

bool Mutex::AwaitWithDeadline(const Condition &cond, absl::Time deadline) {
Expand Down Expand Up @@ -2660,7 +2667,7 @@ bool CondVar::WaitCommon(Mutex *mutex, KernelTimeout t) {
}

bool CondVar::WaitWithTimeout(Mutex *mu, absl::Duration timeout) {
return WaitWithDeadline(mu, DeadlineFromTimeout(timeout));
return WaitCommon(mu, KernelTimeout(timeout));
}

bool CondVar::WaitWithDeadline(Mutex *mu, absl::Time deadline) {
Expand Down

0 comments on commit ed37a45

Please sign in to comment.