From ce10b5dd49975170074fed998c8ab280570e7793 Mon Sep 17 00:00:00 2001 From: Paul Menage Date: Tue, 8 Oct 2024 18:42:55 -0700 Subject: [PATCH] Abstract the state type for futexes - provide a `trait Futex` that supports the `wait`/`wake`/`wake_all` API. - implement `Futex` for `AtomicU32` in most platforms, and for atomic values generally on Windows. - update futex users to use the exported `crate::sys::futex::Atomic` type rather than a raw `AtomicU32`. --- library/std/src/sys/pal/hermit/futex.rs | 64 +-- library/std/src/sys/pal/unix/futex.rs | 473 +++++++++--------- library/std/src/sys/pal/wasm/atomics/futex.rs | 55 +- library/std/src/sys/pal/windows/futex.rs | 34 +- library/std/src/sys/sync/condvar/futex.rs | 14 +- library/std/src/sys/sync/mod.rs | 23 + library/std/src/sys/sync/mutex/futex.rs | 14 +- library/std/src/sys/sync/once/futex.rs | 22 +- library/std/src/sys/sync/rwlock/futex.rs | 25 +- .../std/src/sys/sync/thread_parking/futex.rs | 10 +- 10 files changed, 397 insertions(+), 337 deletions(-) diff --git a/library/std/src/sys/pal/hermit/futex.rs b/library/std/src/sys/pal/hermit/futex.rs index 21c5facd52fbd..73324849530f8 100644 --- a/library/std/src/sys/pal/hermit/futex.rs +++ b/library/std/src/sys/pal/hermit/futex.rs @@ -1,44 +1,52 @@ use super::hermit_abi; use crate::ptr::null; use crate::sync::atomic::AtomicU32; +use crate::sys::sync::Futex; use crate::time::Duration; +/// An atomic for use as a futex that is at least 32-bits but may be larger +pub type Atomic = AtomicU32; +/// Must be the underlying type of Atomic +pub type Primitive = u32; + /// An atomic for use as a futex that is at least 8-bits but may be larger. pub type SmallAtomic = AtomicU32; /// Must be the underlying type of SmallAtomic pub type SmallPrimitive = u32; -pub fn futex_wait(futex: &AtomicU32, expected: u32, timeout: Option) -> bool { - // Calculate the timeout as a relative timespec. - // - // Overflows are rounded up to an infinite timeout (None). - let timespec = timeout.and_then(|dur| { - Some(hermit_abi::timespec { - tv_sec: dur.as_secs().try_into().ok()?, - tv_nsec: dur.subsec_nanos().try_into().ok()?, - }) - }); +impl Futex for AtomicU32 { + fn wait(&self, expected: u32, timeout: Option) -> bool { + // Calculate the timeout as a relative timespec. + // + // Overflows are rounded up to an infinite timeout (None). + let timespec = timeout.and_then(|dur| { + Some(hermit_abi::timespec { + tv_sec: dur.as_secs().try_into().ok()?, + tv_nsec: dur.subsec_nanos().try_into().ok()?, + }) + }); - let r = unsafe { - hermit_abi::futex_wait( - futex.as_ptr(), - expected, - timespec.as_ref().map_or(null(), |t| t as *const hermit_abi::timespec), - hermit_abi::FUTEX_RELATIVE_TIMEOUT, - ) - }; + let r = unsafe { + hermit_abi::futex_wait( + self.as_ptr(), + expected, + timespec.as_ref().map_or(null(), |t| t as *const hermit_abi::timespec), + hermit_abi::FUTEX_RELATIVE_TIMEOUT, + ) + }; - r != -hermit_abi::errno::ETIMEDOUT -} + r != -hermit_abi::errno::ETIMEDOUT + } -#[inline] -pub fn futex_wake(futex: &AtomicU32) -> bool { - unsafe { hermit_abi::futex_wake(futex.as_ptr(), 1) > 0 } -} + #[inline] + fn wake(&self) -> bool { + unsafe { hermit_abi::futex_wake(futex.as_ptr(), 1) > 0 } + } -#[inline] -pub fn futex_wake_all(futex: &AtomicU32) { - unsafe { - hermit_abi::futex_wake(futex.as_ptr(), i32::MAX); + #[inline] + fn wake_all(&self) { + unsafe { + hermit_abi::futex_wake(self.as_ptr(), i32::MAX); + } } } diff --git a/library/std/src/sys/pal/unix/futex.rs b/library/std/src/sys/pal/unix/futex.rs index cc725045c4810..3b8000649f817 100644 --- a/library/std/src/sys/pal/unix/futex.rs +++ b/library/std/src/sys/pal/unix/futex.rs @@ -9,239 +9,273 @@ ))] use crate::sync::atomic::AtomicU32; +use crate::sys::sync::Futex; use crate::time::Duration; -/// An atomic for use as a futex that is at least 8-bits but may be larger. +/// An atomic for use as a futex that is at least 32-bits but may be larger +pub type Atomic = AtomicU32; + +/// A futex that is at least 8-bits but may be larger. pub type SmallAtomic = AtomicU32; -/// Must be the underlying type of SmallAtomic +/// Must be the underlying type of SmallFutex pub type SmallPrimitive = u32; -/// Waits for a `futex_wake` operation to wake us. -/// -/// Returns directly if the futex doesn't hold the expected value. -/// -/// Returns false on timeout, and true in all other cases. -#[cfg(any(target_os = "linux", target_os = "android", target_os = "freebsd"))] -pub fn futex_wait(futex: &AtomicU32, expected: u32, timeout: Option) -> bool { - use super::time::Timespec; - use crate::ptr::null; - use crate::sync::atomic::Ordering::Relaxed; - - // Calculate the timeout as an absolute timespec. - // - // Overflows are rounded up to an infinite timeout (None). - let timespec = timeout - .and_then(|d| Timespec::now(libc::CLOCK_MONOTONIC).checked_add_duration(&d)) - .and_then(|t| t.to_timespec()); - - loop { - // No need to wait if the value already changed. - if futex.load(Relaxed) != expected { - return true; - } +impl Futex for AtomicU32 { + /// Waits for a `futex_wake` operation to wake us. + /// + /// Returns directly if the futex doesn't hold the expected value. + /// + /// Returns false on timeout, and true in all other cases. + #[cfg(any(target_os = "linux", target_os = "android", target_os = "freebsd"))] + fn wait(&self, expected: u32, timeout: Option) -> bool { + use super::time::Timespec; + use crate::ptr::null; + use crate::sync::atomic::Ordering::Relaxed; + + // Calculate the timeout as an absolute timespec. + // + // Overflows are rounded up to an infinite timeout (None). + let timespec = timeout + .and_then(|d| Timespec::now(libc::CLOCK_MONOTONIC).checked_add_duration(&d)) + .and_then(|t| t.to_timespec()); + + loop { + // No need to wait if the value already changed. + if self.load(Relaxed) != expected { + return true; + } - let r = unsafe { - cfg_if::cfg_if! { - if #[cfg(target_os = "freebsd")] { - // FreeBSD doesn't have futex(), but it has - // _umtx_op(UMTX_OP_WAIT_UINT_PRIVATE), which is nearly - // identical. It supports absolute timeouts through a flag - // in the _umtx_time struct. - let umtx_timeout = timespec.map(|t| libc::_umtx_time { - _timeout: t, - _flags: libc::UMTX_ABSTIME, - _clockid: libc::CLOCK_MONOTONIC as u32, - }); - let umtx_timeout_ptr = umtx_timeout.as_ref().map_or(null(), |t| t as *const _); - let umtx_timeout_size = umtx_timeout.as_ref().map_or(0, |t| crate::mem::size_of_val(t)); - libc::_umtx_op( - futex as *const AtomicU32 as *mut _, - libc::UMTX_OP_WAIT_UINT_PRIVATE, - expected as libc::c_ulong, - crate::ptr::without_provenance_mut(umtx_timeout_size), - umtx_timeout_ptr as *mut _, - ) - } else if #[cfg(any(target_os = "linux", target_os = "android"))] { - // Use FUTEX_WAIT_BITSET rather than FUTEX_WAIT to be able to give an - // absolute time rather than a relative time. - libc::syscall( - libc::SYS_futex, - futex as *const AtomicU32, - libc::FUTEX_WAIT_BITSET | libc::FUTEX_PRIVATE_FLAG, - expected, - timespec.as_ref().map_or(null(), |t| t as *const libc::timespec), - null::(), // This argument is unused for FUTEX_WAIT_BITSET. - !0u32, // A full bitmask, to make it behave like a regular FUTEX_WAIT. - ) - } else { - compile_error!("unknown target_os"); + let r = unsafe { + cfg_if::cfg_if! { + if #[cfg(target_os = "freebsd")] { + // FreeBSD doesn't have futex(), but it has + // _umtx_op(UMTX_OP_WAIT_UINT_PRIVATE), which is nearly + // identical. It supports absolute timeouts through a flag + // in the _umtx_time struct. + let umtx_timeout = timespec.map(|t| libc::_umtx_time { + _timeout: t, + _flags: libc::UMTX_ABSTIME, + _clockid: libc::CLOCK_MONOTONIC as u32, + }); + let umtx_timeout_ptr = umtx_timeout.as_ref().map_or(null(), |t| t as *const _); + let umtx_timeout_size = umtx_timeout.as_ref().map_or(0, |t| crate::mem::size_of_val(t)); + libc::_umtx_op( + self as *const AtomicU32 as *mut _, + libc::UMTX_OP_WAIT_UINT_PRIVATE, + expected as libc::c_ulong, + crate::ptr::without_provenance_mut(umtx_timeout_size), + umtx_timeout_ptr as *mut _, + ) + } else if #[cfg(any(target_os = "linux", target_os = "android"))] { + // Use FUTEX_WAIT_BITSET rather than FUTEX_WAIT to be able to give an + // absolute time rather than a relative time. + libc::syscall( + libc::SYS_futex, + self as *const AtomicU32, + libc::FUTEX_WAIT_BITSET | libc::FUTEX_PRIVATE_FLAG, + expected, + timespec.as_ref().map_or(null(), |t| t as *const libc::timespec), + null::(), // This argument is unused for FUTEX_WAIT_BITSET. + !0u32, // A full bitmask, to make it behave like a regular FUTEX_WAIT. + ) + } else { + compile_error!("unknown target_os"); + } } + }; + + match (r < 0).then(super::os::errno) { + Some(libc::ETIMEDOUT) => return false, + Some(libc::EINTR) => continue, + _ => return true, } - }; + } + } + + /// Wakes up one thread that's blocked on `futex_wait` on this futex. + /// + /// Returns true if this actually woke up such a thread, + /// or false if no thread was waiting on this futex. + /// + /// On some platforms, this always returns false. + #[cfg(any(target_os = "linux", target_os = "android"))] + fn wake(&self) -> bool { + let ptr = self as *const AtomicU32; + let op = libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG; + unsafe { libc::syscall(libc::SYS_futex, ptr, op, 1) > 0 } + } - match (r < 0).then(super::os::errno) { - Some(libc::ETIMEDOUT) => return false, - Some(libc::EINTR) => continue, - _ => return true, + /// Wakes up all threads that are waiting on `futex_wait` on this futex. + #[cfg(any(target_os = "linux", target_os = "android"))] + fn wake_all(&self) { + let ptr = self as *const AtomicU32; + let op = libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG; + unsafe { + libc::syscall(libc::SYS_futex, ptr, op, i32::MAX); } } -} -/// Wakes up one thread that's blocked on `futex_wait` on this futex. -/// -/// Returns true if this actually woke up such a thread, -/// or false if no thread was waiting on this futex. -/// -/// On some platforms, this always returns false. -#[cfg(any(target_os = "linux", target_os = "android"))] -pub fn futex_wake(futex: &AtomicU32) -> bool { - let ptr = futex as *const AtomicU32; - let op = libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG; - unsafe { libc::syscall(libc::SYS_futex, ptr, op, 1) > 0 } -} + // FreeBSD doesn't tell us how many threads are woken up, so this always returns false. + #[cfg(target_os = "freebsd")] + fn wake(&self) -> bool { + use crate::ptr::null_mut; + unsafe { + libc::_umtx_op( + self as *const AtomicU32 as *mut _, + libc::UMTX_OP_WAKE_PRIVATE, + 1, + null_mut(), + null_mut(), + ) + }; + false + } -/// Wakes up all threads that are waiting on `futex_wait` on this futex. -#[cfg(any(target_os = "linux", target_os = "android"))] -pub fn futex_wake_all(futex: &AtomicU32) { - let ptr = futex as *const AtomicU32; - let op = libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG; - unsafe { - libc::syscall(libc::SYS_futex, ptr, op, i32::MAX); + #[cfg(target_os = "freebsd")] + fn wake_all(&self) { + use crate::ptr::null_mut; + unsafe { + libc::_umtx_op( + self as *const AtomicU32 as *mut _, + libc::UMTX_OP_WAKE_PRIVATE, + i32::MAX as libc::c_ulong, + null_mut(), + null_mut(), + ) + }; } -} -// FreeBSD doesn't tell us how many threads are woken up, so this always returns false. -#[cfg(target_os = "freebsd")] -pub fn futex_wake(futex: &AtomicU32) -> bool { - use crate::ptr::null_mut; - unsafe { - libc::_umtx_op( - futex as *const AtomicU32 as *mut _, - libc::UMTX_OP_WAKE_PRIVATE, - 1, - null_mut(), - null_mut(), - ) - }; - false -} + #[cfg(target_os = "openbsd")] + fn wait(&self, expected: u32, timeout: Option) -> bool { + use super::time::Timespec; + use crate::ptr::{null, null_mut}; -#[cfg(target_os = "freebsd")] -pub fn futex_wake_all(futex: &AtomicU32) { - use crate::ptr::null_mut; - unsafe { - libc::_umtx_op( - futex as *const AtomicU32 as *mut _, - libc::UMTX_OP_WAKE_PRIVATE, - i32::MAX as libc::c_ulong, - null_mut(), - null_mut(), - ) - }; -} + // Overflows are rounded up to an infinite timeout (None). + let timespec = timeout + .and_then(|d| Timespec::zero().checked_add_duration(&d)) + .and_then(|t| t.to_timespec()); -#[cfg(target_os = "openbsd")] -pub fn futex_wait(futex: &AtomicU32, expected: u32, timeout: Option) -> bool { - use super::time::Timespec; - use crate::ptr::{null, null_mut}; - - // Overflows are rounded up to an infinite timeout (None). - let timespec = timeout - .and_then(|d| Timespec::zero().checked_add_duration(&d)) - .and_then(|t| t.to_timespec()); - - let r = unsafe { - libc::futex( - futex as *const AtomicU32 as *mut u32, - libc::FUTEX_WAIT, - expected as i32, - timespec.as_ref().map_or(null(), |t| t as *const libc::timespec), - null_mut(), - ) - }; - - r == 0 || super::os::errno() != libc::ETIMEDOUT -} + let r = unsafe { + libc::futex( + self as *const AtomicU32 as *mut u32, + libc::FUTEX_WAIT, + expected as i32, + timespec.as_ref().map_or(null(), |t| t as *const libc::timespec), + null_mut(), + ) + }; -#[cfg(target_os = "openbsd")] -pub fn futex_wake(futex: &AtomicU32) -> bool { - use crate::ptr::{null, null_mut}; - unsafe { - libc::futex(futex as *const AtomicU32 as *mut u32, libc::FUTEX_WAKE, 1, null(), null_mut()) - > 0 + r == 0 || super::os::errno() != libc::ETIMEDOUT } -} -#[cfg(target_os = "openbsd")] -pub fn futex_wake_all(futex: &AtomicU32) { - use crate::ptr::{null, null_mut}; - unsafe { - libc::futex( - futex as *const AtomicU32 as *mut u32, - libc::FUTEX_WAKE, - i32::MAX, - null(), - null_mut(), - ); + #[cfg(target_os = "openbsd")] + fn wake(&self) -> bool { + use crate::ptr::{null, null_mut}; + unsafe { + libc::futex( + self as *const AtomicU32 as *mut u32, + libc::FUTEX_WAKE, + 1, + null(), + null_mut(), + ) > 0 + } } -} -#[cfg(target_os = "dragonfly")] -pub fn futex_wait(futex: &AtomicU32, expected: u32, timeout: Option) -> bool { - // A timeout of 0 means infinite. - // We round smaller timeouts up to 1 millisecond. - // Overflows are rounded up to an infinite timeout. - let timeout_ms = - timeout.and_then(|d| Some(i32::try_from(d.as_millis()).ok()?.max(1))).unwrap_or(0); + #[cfg(target_os = "openbsd")] + fn wake_all(&self) { + use crate::ptr::{null, null_mut}; + unsafe { + libc::futex( + self as *const AtomicU32 as *mut u32, + libc::FUTEX_WAKE, + i32::MAX, + null(), + null_mut(), + ); + } + } - let r = unsafe { - libc::umtx_sleep(futex as *const AtomicU32 as *const i32, expected as i32, timeout_ms) - }; + #[cfg(target_os = "dragonfly")] + fn wait(&self, expected: u32, timeout: Option) -> bool { + // A timeout of 0 means infinite. + // We round smaller timeouts up to 1 millisecond. + // Overflows are rounded up to an infinite timeout. + let timeout_ms = + timeout.and_then(|d| Some(i32::try_from(d.as_millis()).ok()?.max(1))).unwrap_or(0); - r == 0 || super::os::errno() != libc::ETIMEDOUT -} + let r = unsafe { + libc::umtx_sleep(self as *const AtomicU32 as *const i32, expected as i32, timeout_ms) + }; -// DragonflyBSD doesn't tell us how many threads are woken up, so this always returns false. -#[cfg(target_os = "dragonfly")] -pub fn futex_wake(futex: &AtomicU32) -> bool { - unsafe { libc::umtx_wakeup(futex as *const AtomicU32 as *const i32, 1) }; - false -} + r == 0 || super::os::errno() != libc::ETIMEDOUT + } -#[cfg(target_os = "dragonfly")] -pub fn futex_wake_all(futex: &AtomicU32) { - unsafe { libc::umtx_wakeup(futex as *const AtomicU32 as *const i32, i32::MAX) }; -} + // DragonflyBSD doesn't tell us how many threads are woken up, so this always returns false. + #[cfg(target_os = "dragonfly")] + fn wake(&self) -> bool { + unsafe { libc::umtx_wakeup(self as *const AtomicU32 as *const i32, 1) }; + false + } -#[cfg(target_os = "emscripten")] -extern "C" { - fn emscripten_futex_wake(addr: *const AtomicU32, count: libc::c_int) -> libc::c_int; - fn emscripten_futex_wait( - addr: *const AtomicU32, - val: libc::c_uint, - max_wait_ms: libc::c_double, - ) -> libc::c_int; -} + #[cfg(target_os = "dragonfly")] + fn wake_all(&self) { + unsafe { libc::umtx_wakeup(self as *const AtomicU32 as *const i32, i32::MAX) }; + } -#[cfg(target_os = "emscripten")] -pub fn futex_wait(futex: &AtomicU32, expected: u32, timeout: Option) -> bool { - unsafe { - emscripten_futex_wait( - futex, - expected, - timeout.map_or(f64::INFINITY, |d| d.as_secs_f64() * 1000.0), - ) != -libc::ETIMEDOUT + #[cfg(target_os = "emscripten")] + fn wait(&self, expected: u32, timeout: Option) -> bool { + unsafe { + emscripten_futex_wait( + self, + expected, + timeout.map_or(f64::INFINITY, |d| d.as_secs_f64() * 1000.0), + ) != -libc::ETIMEDOUT + } } -} -#[cfg(target_os = "emscripten")] -pub fn futex_wake(futex: &AtomicU32) -> bool { - unsafe { emscripten_futex_wake(futex, 1) > 0 } -} + #[cfg(target_os = "emscripten")] + fn wake(&self) -> bool { + unsafe { emscripten_futex_wake(self, 1) > 0 } + } -#[cfg(target_os = "emscripten")] -pub fn futex_wake_all(futex: &AtomicU32) { - unsafe { emscripten_futex_wake(futex, i32::MAX) }; + #[cfg(target_os = "emscripten")] + fn wake_all(&self) { + unsafe { emscripten_futex_wake(self, i32::MAX) }; + } + + #[cfg(target_os = "fuchsia")] + fn wait(&self, expected: u32, timeout: Option) -> bool { + // Sleep forever if the timeout is longer than fits in a i64. + let deadline = timeout + .and_then(|d| { + i64::try_from(d.as_nanos()) + .ok()? + .checked_add(unsafe { zircon::zx_clock_get_monotonic() }) + }) + .unwrap_or(zircon::ZX_TIME_INFINITE); + + unsafe { + zircon::zx_futex_wait( + self, + AtomicU32::new(expected), + zircon::ZX_HANDLE_INVALID, + deadline, + ) != zircon::ZX_ERR_TIMED_OUT + } + } + + // Fuchsia doesn't tell us how many threads are woken up, so this always returns false. + #[cfg(target_os = "fuchsia")] + fn wake(&self) -> bool { + unsafe { zircon::zx_futex_wake(self, 1) }; + false + } + + #[cfg(target_os = "fuchsia")] + fn wake_all(&self) { + unsafe { zircon::zx_futex_wake(self, u32::MAX) }; + } } #[cfg(target_os = "fuchsia")] @@ -276,31 +310,12 @@ pub mod zircon { } } -#[cfg(target_os = "fuchsia")] -pub fn futex_wait(futex: &AtomicU32, expected: u32, timeout: Option) -> bool { - // Sleep forever if the timeout is longer than fits in a i64. - let deadline = timeout - .and_then(|d| { - i64::try_from(d.as_nanos()) - .ok()? - .checked_add(unsafe { zircon::zx_clock_get_monotonic() }) - }) - .unwrap_or(zircon::ZX_TIME_INFINITE); - - unsafe { - zircon::zx_futex_wait(futex, AtomicU32::new(expected), zircon::ZX_HANDLE_INVALID, deadline) - != zircon::ZX_ERR_TIMED_OUT - } -} - -// Fuchsia doesn't tell us how many threads are woken up, so this always returns false. -#[cfg(target_os = "fuchsia")] -pub fn futex_wake(futex: &AtomicU32) -> bool { - unsafe { zircon::zx_futex_wake(futex, 1) }; - false -} - -#[cfg(target_os = "fuchsia")] -pub fn futex_wake_all(futex: &AtomicU32) { - unsafe { zircon::zx_futex_wake(futex, u32::MAX) }; +#[cfg(target_os = "emscripten")] +extern "C" { + fn emscripten_futex_wake(addr: *const AtomicU32, count: libc::c_int) -> libc::c_int; + fn emscripten_futex_wait( + addr: *const AtomicU32, + val: libc::c_uint, + max_wait_ms: libc::c_double, + ) -> libc::c_int; } diff --git a/library/std/src/sys/pal/wasm/atomics/futex.rs b/library/std/src/sys/pal/wasm/atomics/futex.rs index 42913a99ee9d6..7a840bb486a0c 100644 --- a/library/std/src/sys/pal/wasm/atomics/futex.rs +++ b/library/std/src/sys/pal/wasm/atomics/futex.rs @@ -4,37 +4,48 @@ use core::arch::wasm32 as wasm; use core::arch::wasm64 as wasm; use crate::sync::atomic::AtomicU32; +use crate::sys::sync::Futex; use crate::time::Duration; +/// An atomic for use as a futex that is at least 32-bits but may be larger +pub type Atomic = AtomicU32; +/// Must be the underlying type of Atomic +pub type Primitive = u32; + /// An atomic for use as a futex that is at least 8-bits but may be larger. pub type SmallAtomic = AtomicU32; /// Must be the underlying type of SmallAtomic pub type SmallPrimitive = u32; -/// Wait for a futex_wake operation to wake us. -/// -/// Returns directly if the futex doesn't hold the expected value. -/// -/// Returns false on timeout, and true in all other cases. -pub fn futex_wait(futex: &AtomicU32, expected: u32, timeout: Option) -> bool { - let timeout = timeout.and_then(|t| t.as_nanos().try_into().ok()).unwrap_or(-1); - unsafe { - wasm::memory_atomic_wait32(futex as *const AtomicU32 as *mut i32, expected as i32, timeout) - < 2 +impl Futex for AtomicU32 { + /// Wait for a futex_wake operation to wake us. + /// + /// Returns directly if the futex doesn't hold the expected value. + /// + /// Returns false on timeout, and true in all other cases. + fn wait(&self, expected: u32, timeout: Option) -> bool { + let timeout = timeout.and_then(|t| t.as_nanos().try_into().ok()).unwrap_or(-1); + unsafe { + wasm::memory_atomic_wait32( + self as *const AtomicU32 as *mut i32, + expected as i32, + timeout, + ) < 2 + } } -} -/// Wakes up one thread that's blocked on `futex_wait` on this futex. -/// -/// Returns true if this actually woke up such a thread, -/// or false if no thread was waiting on this futex. -pub fn futex_wake(futex: &AtomicU32) -> bool { - unsafe { wasm::memory_atomic_notify(futex as *const AtomicU32 as *mut i32, 1) > 0 } -} + /// Wakes up one thread that's blocked on `futex_wait` on this futex. + /// + /// Returns true if this actually woke up such a thread, + /// or false if no thread was waiting on this futex. + fn wake(&self) -> bool { + unsafe { wasm::memory_atomic_notify(self as *const AtomicU32 as *mut i32, 1) > 0 } + } -/// Wakes up all threads that are waiting on `futex_wait` on this futex. -pub fn futex_wake_all(futex: &AtomicU32) { - unsafe { - wasm::memory_atomic_notify(futex as *const AtomicU32 as *mut i32, i32::MAX as u32); + /// Wakes up all threads that are waiting on `futex_wait` on this futex. + fn wake_all(&self) { + unsafe { + wasm::memory_atomic_notify(self as *const AtomicU32 as *mut i32, i32::MAX as u32); + } } } diff --git a/library/std/src/sys/pal/windows/futex.rs b/library/std/src/sys/pal/windows/futex.rs index 4d6c4df9a5a90..98034e3a242e4 100644 --- a/library/std/src/sys/pal/windows/futex.rs +++ b/library/std/src/sys/pal/windows/futex.rs @@ -1,20 +1,20 @@ use core::ffi::c_void; use core::sync::atomic::{ - AtomicBool, AtomicI8, AtomicI16, AtomicI32, AtomicI64, AtomicIsize, AtomicPtr, AtomicU8, - AtomicU16, AtomicU32, AtomicU64, AtomicUsize, + AtomicBool, AtomicI16, AtomicI32, AtomicI64, AtomicI8, AtomicIsize, AtomicPtr, AtomicU16, + AtomicU32, AtomicU64, AtomicU8, AtomicUsize, }; use core::time::Duration; use core::{mem, ptr}; use super::api::{self, WinError}; use crate::sys::{c, dur2timeout}; +use crate::sys::sync::Futex; /// An atomic for use as a futex that is at least 8-bits but may be larger. pub type SmallAtomic = AtomicU8; /// Must be the underlying type of SmallAtomic pub type SmallPrimitive = u8; -pub unsafe trait Futex {} pub unsafe trait Waitable { type Atomic; } @@ -24,7 +24,6 @@ macro_rules! unsafe_waitable_int { unsafe impl Waitable for $int { type Atomic = $atomic; } - unsafe impl Futex for $atomic {} )* }; } @@ -47,7 +46,6 @@ unsafe impl Waitable for *const T { unsafe impl Waitable for *mut T { type Atomic = AtomicPtr; } -unsafe impl Futex for AtomicPtr {} pub fn wait_on_address( address: &W::Atomic, @@ -63,30 +61,32 @@ pub fn wait_on_address( } } -pub fn wake_by_address_single(address: &T) { +pub fn wake_by_address_single(address: &W::Atomic) { unsafe { let addr = ptr::from_ref(address).cast::(); c::WakeByAddressSingle(addr); } } -pub fn wake_by_address_all(address: &T) { +pub fn wake_by_address_all(address: &W::Atomic) { unsafe { let addr = ptr::from_ref(address).cast::(); c::WakeByAddressAll(addr); } } -pub fn futex_wait(futex: &W::Atomic, expected: W, timeout: Option) -> bool { - // return false only on timeout - wait_on_address(futex, expected, timeout) || api::get_last_error() != WinError::TIMEOUT -} +impl Futex for W::Atomic { + pub fn wait(&self, expected: W, timeout: Option) -> bool { + // return false only on timeout + wait_on_address(self, expected, timeout) || api::get_last_error() != WinError::TIMEOUT + } -pub fn futex_wake(futex: &T) -> bool { - wake_by_address_single(futex); - false -} + pub fn wake(&self) -> bool { + wake_by_address_single(self); + false + } -pub fn futex_wake_all(futex: &T) { - wake_by_address_all(futex) + pub fn wake_all(&self) { + wake_by_address_all(self) + } } diff --git a/library/std/src/sys/sync/condvar/futex.rs b/library/std/src/sys/sync/condvar/futex.rs index 39cd97c01ea32..a1a7454aad408 100644 --- a/library/std/src/sys/sync/condvar/futex.rs +++ b/library/std/src/sys/sync/condvar/futex.rs @@ -1,6 +1,6 @@ -use crate::sync::atomic::AtomicU32; use crate::sync::atomic::Ordering::Relaxed; -use crate::sys::futex::{futex_wait, futex_wake, futex_wake_all}; +use crate::sys::futex::Atomic; +use crate::sys::sync::Futex; use crate::sys::sync::Mutex; use crate::time::Duration; @@ -8,13 +8,13 @@ pub struct Condvar { // The value of this atomic is simply incremented on every notification. // This is used by `.wait()` to not miss any notifications after // unlocking the mutex and before waiting for notifications. - futex: AtomicU32, + futex: Atomic, } impl Condvar { #[inline] pub const fn new() -> Self { - Self { futex: AtomicU32::new(0) } + Self{futex: Atomic::new(0)} } // All the memory orderings here are `Relaxed`, @@ -22,12 +22,12 @@ impl Condvar { pub fn notify_one(&self) { self.futex.fetch_add(1, Relaxed); - futex_wake(&self.futex); + self.futex.wake(); } pub fn notify_all(&self) { self.futex.fetch_add(1, Relaxed); - futex_wake_all(&self.futex); + self.futex.wake_all(); } pub unsafe fn wait(&self, mutex: &Mutex) { @@ -47,7 +47,7 @@ impl Condvar { // Wait, but only if there hasn't been any // notification since we unlocked the mutex. - let r = futex_wait(&self.futex, futex_value, timeout); + let r = self.futex.wait(futex_value, timeout); // Lock the mutex again. mutex.lock(); diff --git a/library/std/src/sys/sync/mod.rs b/library/std/src/sys/sync/mod.rs index 0691e96785198..ecaa4dcf5dfa7 100644 --- a/library/std/src/sys/sync/mod.rs +++ b/library/std/src/sys/sync/mod.rs @@ -12,3 +12,26 @@ pub use once::{Once, OnceState}; use once_box::OnceBox; pub use rwlock::RwLock; pub use thread_parking::Parker; + +use core::time::Duration; + +/// A trait that provides futex semantics for `std::sys::futex::{Atomic, +/// SmallAtomic}`. Generally implemented on an `AtomicU` type, but +/// this allows the futex implementation to abstract the type more if +/// necessary. +pub(crate) trait Futex { + /// If the value of the futex object does not equal `expected`, + /// sleep until woken by `wake()` or `wake_all()`. Returns false on + /// timeout, and true in all other cases. + fn wait(&self, expected: u32, timeout: Option) -> bool; + + /// Wakes up one thread that's waiting on this futex. + /// + /// May return true if this actually woke up such a thread, but some + /// platforms always return false; must return false if no thread + /// was waiting on this futex. + fn wake(&self) -> bool; + + /// Wakes up all threads that are waiting on this futex. + fn wake_all(&self); +} diff --git a/library/std/src/sys/sync/mutex/futex.rs b/library/std/src/sys/sync/mutex/futex.rs index 81afa94b14787..85ce436677bd6 100644 --- a/library/std/src/sys/sync/mutex/futex.rs +++ b/library/std/src/sys/sync/mutex/futex.rs @@ -1,11 +1,11 @@ use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release}; -use crate::sys::futex::{self, futex_wait, futex_wake}; +use crate::sys::futex::{SmallAtomic, SmallPrimitive}; +use crate::sys::sync::Futex; -type Atomic = futex::SmallAtomic; -type State = futex::SmallPrimitive; +type State = SmallPrimitive; pub struct Mutex { - futex: Atomic, + futex: SmallAtomic, } const UNLOCKED: State = 0; @@ -15,7 +15,7 @@ const CONTENDED: State = 2; // locked, and other threads waiting (contended) impl Mutex { #[inline] pub const fn new() -> Self { - Self { futex: Atomic::new(UNLOCKED) } + Self { futex: SmallAtomic::new(UNLOCKED) } } #[inline] @@ -54,7 +54,7 @@ impl Mutex { } // Wait for the futex to change state, assuming it is still CONTENDED. - futex_wait(&self.futex, CONTENDED, None); + self.futex.wait(CONTENDED, None); // Spin again after waking up. state = self.spin(); @@ -92,6 +92,6 @@ impl Mutex { #[cold] fn wake(&self) { - futex_wake(&self.futex); + self.futex.wake(); } } diff --git a/library/std/src/sys/sync/once/futex.rs b/library/std/src/sys/sync/once/futex.rs index 25588a4217b62..9198512d2a939 100644 --- a/library/std/src/sys/sync/once/futex.rs +++ b/library/std/src/sys/sync/once/futex.rs @@ -1,9 +1,9 @@ use crate::cell::Cell; use crate::sync as public; -use crate::sync::atomic::AtomicU32; use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release}; use crate::sync::once::ExclusiveState; -use crate::sys::futex::{futex_wait, futex_wake_all}; +use crate::sys::sync::Futex; +use crate::sys::futex::Atomic; // On some platforms, the OS is very nice and handles the waiter queue for us. // This means we only need one atomic value with 4 states: @@ -25,9 +25,9 @@ const COMPLETE: u32 = 3; /// May only be set if the state is not COMPLETE. const QUEUED: u32 = 4; -// Threads wait by setting the QUEUED bit and calling `futex_wait` on the state +// Threads wait by setting the QUEUED bit and calling `wait()` on the state // variable. When the running thread finishes, it will wake all waiting threads using -// `futex_wake_all`. +// `wake_all()`. const STATE_MASK: u32 = 0b11; @@ -49,29 +49,29 @@ impl OnceState { } struct CompletionGuard<'a> { - state_and_queued: &'a AtomicU32, + state_and_queued: &'a Atomic, set_state_on_drop_to: u32, } impl<'a> Drop for CompletionGuard<'a> { fn drop(&mut self) { // Use release ordering to propagate changes to all threads checking - // up on the Once. `futex_wake_all` does its own synchronization, hence + // up on the Once. `wake_all()` does its own synchronization, hence // we do not need `AcqRel`. if self.state_and_queued.swap(self.set_state_on_drop_to, Release) & QUEUED != 0 { - futex_wake_all(self.state_and_queued); + self.state_and_queued.wake_all(); } } } pub struct Once { - state_and_queued: AtomicU32, + state_and_queued: Atomic, } impl Once { #[inline] pub const fn new() -> Once { - Once { state_and_queued: AtomicU32::new(INCOMPLETE) } + Once { state_and_queued: Atomic::new(INCOMPLETE) } } #[inline] @@ -128,7 +128,7 @@ impl Once { } } - futex_wait(&self.state_and_queued, state_and_queued, None); + self.state_and_queued.wait(state_and_queued, None); state_and_queued = self.state_and_queued.load(Acquire); } } @@ -196,7 +196,7 @@ impl Once { } } - futex_wait(&self.state_and_queued, state_and_queued, None); + self.state_and_queued.wait(state_and_queued, None); state_and_queued = self.state_and_queued.load(Acquire); } } diff --git a/library/std/src/sys/sync/rwlock/futex.rs b/library/std/src/sys/sync/rwlock/futex.rs index 75ecc2ab5c52f..81fe974b96526 100644 --- a/library/std/src/sys/sync/rwlock/futex.rs +++ b/library/std/src/sys/sync/rwlock/futex.rs @@ -1,6 +1,6 @@ -use crate::sync::atomic::AtomicU32; use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release}; -use crate::sys::futex::{futex_wait, futex_wake, futex_wake_all}; +use crate::sys::sync::Futex; +use crate::sys::futex::Atomic; pub struct RwLock { // The state consists of a 30-bit reader counter, a 'readers waiting' flag, and a 'writers waiting' flag. @@ -10,10 +10,10 @@ pub struct RwLock { // 0x3FFF_FFFF: Write locked // Bit 30: Readers are waiting on this futex. // Bit 31: Writers are waiting on the writer_notify futex. - state: AtomicU32, + state: Atomic, // The 'condition variable' to notify writers through. // Incremented on every signal. - writer_notify: AtomicU32, + writer_notify: Atomic, } const READ_LOCKED: u32 = 1; @@ -62,7 +62,7 @@ fn has_reached_max_readers(state: u32) -> bool { impl RwLock { #[inline] pub const fn new() -> Self { - Self { state: AtomicU32::new(0), writer_notify: AtomicU32::new(0) } + Self { state: Atomic::new(0), writer_notify: Atomic::new(0) } } #[inline] @@ -132,7 +132,7 @@ impl RwLock { } // Wait for the state to change. - futex_wait(&self.state, state | READERS_WAITING, None); + self.state.wait(state | READERS_WAITING, None); // Spin again after waking up. state = self.spin_read(); @@ -213,7 +213,7 @@ impl RwLock { } // Wait for the state to change. - futex_wait(&self.writer_notify, seq, None); + self.writer_notify.wait(seq, None); // Spin again after waking up. state = self.spin_write(); @@ -261,27 +261,28 @@ impl RwLock { if self.wake_writer() { return; } - // No writers were actually blocked on futex_wait, so we continue - // to wake up readers instead, since we can't be sure if we notified a writer. + // No writers were actually blocked on wait(), so we + // continue to wake up readers instead, since we can't be + // sure if we notified a writer. state = READERS_WAITING; } // If readers are waiting, wake them all up. if state == READERS_WAITING { if self.state.compare_exchange(state, 0, Relaxed, Relaxed).is_ok() { - futex_wake_all(&self.state); + self.state.wake_all(); } } } /// This wakes one writer and returns true if we woke up a writer that was - /// blocked on futex_wait. + /// blocked on wait(). /// /// If this returns false, it might still be the case that we notified a /// writer that was about to go to sleep. fn wake_writer(&self) -> bool { self.writer_notify.fetch_add(1, Release); - futex_wake(&self.writer_notify) + self.writer_notify.wake() // Note that FreeBSD and DragonFlyBSD don't tell us whether they woke // up any threads or not, and always return `false` here. That still // results in correct behaviour: it just means readers get woken up as diff --git a/library/std/src/sys/sync/thread_parking/futex.rs b/library/std/src/sys/sync/thread_parking/futex.rs index ce852eaadc4d9..551d5bef00be5 100644 --- a/library/std/src/sys/sync/thread_parking/futex.rs +++ b/library/std/src/sys/sync/thread_parking/futex.rs @@ -1,8 +1,10 @@ #![forbid(unsafe_op_in_unsafe_fn)] use crate::pin::Pin; use crate::sync::atomic::Ordering::{Acquire, Release}; -use crate::sys::futex::{self, futex_wait, futex_wake}; +use crate::sys::futex; use crate::time::Duration; +use crate::sys::sync::Futex; + type Atomic = futex::SmallAtomic; type State = futex::SmallPrimitive; @@ -52,7 +54,7 @@ impl Parker { } loop { // Wait for something to happen, assuming it's still set to PARKED. - futex_wait(&self.state, PARKED, None); + self.state.wait(PARKED, None); // Change NOTIFIED=>EMPTY and return in that case. if self.state.compare_exchange(NOTIFIED, EMPTY, Acquire, Acquire).is_ok() { return; @@ -72,7 +74,7 @@ impl Parker { return; } // Wait for something to happen, assuming it's still set to PARKED. - futex_wait(&self.state, PARKED, Some(timeout)); + self.state.wait(PARKED, Some(timeout)); // This is not just a store, because we need to establish a // release-acquire ordering with unpark(). if self.state.swap(EMPTY, Acquire) == NOTIFIED { @@ -94,7 +96,7 @@ impl Parker { // purpose, to make sure every unpark() has a release-acquire ordering // with park(). if self.state.swap(NOTIFIED, Release) == PARKED { - futex_wake(&self.state); + self.state.wake(); } } }