diff --git a/Cargo.lock b/Cargo.lock index b8b19256f47..97d8c3a9f1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8200,7 +8200,6 @@ name = "spacetimedb-durability" version = "2.2.0" dependencies = [ "anyhow", - "async-channel", "futures", "itertools 0.12.1", "log", @@ -8481,9 +8480,11 @@ dependencies = [ name = "spacetimedb-runtime" version = "2.2.0" dependencies = [ + "async-channel", "async-task", "futures", "libc", + "parking_lot 0.12.5", "spin", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index f4f74204ea3..783255afd22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -158,7 +158,7 @@ ahash = { version = "0.8", default-features = false, features = ["std"] } anyhow = "1.0.68" anymap = "0.12" arrayvec = "0.7.2" -async-channel = "2.5" +async-channel = { version = "2.5", default-features = false } async-stream = "0.3.6" async-trait = "0.1.68" axum = { version = "0.7", features = ["tracing"] } diff --git a/crates/durability/Cargo.toml b/crates/durability/Cargo.toml index 4eaa3870001..155cde76740 100644 --- a/crates/durability/Cargo.toml +++ b/crates/durability/Cargo.toml @@ -13,7 +13,7 @@ fallocate = ["spacetimedb-commitlog/fallocate"] [dependencies] anyhow.workspace = true -async-channel.workspace = true + futures.workspace = true itertools.workspace = true log.workspace = true diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index e3eca56e5d9..af44ce3ff77 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -100,7 +100,7 @@ where /// /// The queue is bounded to /// `Options::QUEUE_CAPACITY_MULTIPLIER * Options::batch_capacity`. - queue: async_channel::Sender>>, + queue: spacetimedb_runtime::channel::Sender>>, /// How many transactions are pending durability, including items buffered /// in the queue and items currently being written by the actor. /// @@ -167,7 +167,7 @@ where lock: Option, ) -> Result { let queue_capacity = opts.queue_capacity(); - let (queue, txdata_rx) = async_channel::bounded(queue_capacity); + let (queue, txdata_rx) = spacetimedb_runtime::channel::bounded(queue_capacity, rt.clone()); let queue_depth = Arc::new(AtomicU64::new(0)); let (durable_tx, durable_rx) = watch::channel(clog.max_committed_offset()); let actor = rt.spawn( @@ -253,7 +253,7 @@ where R: Repo + Send + Sync + 'static, { #[instrument(name = "durability::local::actor", skip_all)] - async fn run(self, transactions_rx: async_channel::Receiver>>) { + async fn run(self, transactions_rx: spacetimedb_runtime::channel::Receiver>>) { info!("starting durability actor"); let mut tx_buf = Vec::with_capacity(self.batch_capacity.get()); @@ -419,8 +419,8 @@ where } } -/// Implement tokio's `recv_many` for an `async_channel` receiver. -async fn recv_many(chan: &async_channel::Receiver, buf: &mut Vec, limit: usize) -> usize { +/// Implement tokio's `recv_many` for a `spacetimedb_runtime::channel::Receiver` receiver. +async fn recv_many(chan: &spacetimedb_runtime::channel::Receiver, buf: &mut Vec, limit: usize) -> usize { let mut n = 0; if !chan.is_empty() { buf.reserve(chan.len().min(limit)); diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index 4cd0af60869..03290cffb2a 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -12,13 +12,15 @@ workspace = true [dependencies] tokio = { workspace = true, optional = true } async-task = { version = "4.4", default-features = false, optional = true } -spin = { version = "0.9", default-features = false, features = ["mutex", "spin_mutex"], optional = true } +spin = { version = "0.9", default-features = false, features = ["mutex", "spin_mutex", "once"], optional = true } libc = { version = "0.2", optional = true } +async-channel.workspace = true [dev-dependencies] futures.workspace = true +parking_lot = "0.12" [features] default = ["tokio"] -tokio = ["dep:tokio"] +tokio = ["dep:tokio", "async-channel/std"] simulation = ["dep:async-task", "dep:spin", "dep:libc"] diff --git a/crates/runtime/src/channel.rs b/crates/runtime/src/channel.rs new file mode 100644 index 00000000000..c6954f0b2fe --- /dev/null +++ b/crates/runtime/src/channel.rs @@ -0,0 +1,88 @@ +use async_channel::SendError; + +#[cfg(feature = "simulation")] +use async_channel::TrySendError; + +/// Sending end of a bounded channel. +/// +/// Production: uses `send_blocking` (futex, true backpressure). +/// Simulation: uses `try_send` + executor tick when full (no futex). +pub struct Sender { + inner: async_channel::Sender, + rt: crate::Handle, +} + +/// Receiving end of a bounded channel. +/// +/// Identical to `async_channel::Receiver` in both modes. +pub struct Receiver { + inner: async_channel::Receiver, +} + +impl Receiver { + pub fn recv(&self) -> async_channel::Recv<'_, T> { + self.inner.recv() + } + + pub fn try_recv(&self) -> Result { + self.inner.try_recv() + } + + pub fn close(&self) { + self.inner.close(); + } + + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + pub fn len(&self) -> usize { + self.inner.len() + } +} + +/// Create a bounded channel with the given capacity and runtime handle. +/// +/// The returned `Sender` adapts its send strategy based on the runtime variant: +/// - `Handle::Tokio` → `send_blocking` (OS thread park) +/// - `Handle::Simulation` → `try_send` + executor tick on full (no futex) +pub fn bounded(cap: usize, rt: crate::Handle) -> (Sender, Receiver) { + let (tx, rx) = async_channel::bounded(cap); + (Sender { inner: tx, rt }, Receiver { inner: rx }) +} + +impl Sender { + /// Close the sender, signalling that no more messages will be sent. + pub fn close(&self) { + self.inner.close(); + } + + /// Send a message, applying backpressure. + /// + /// In production (`Tokio`) this parks the OS thread via futex. + /// In simulation it loops on `try_send`, calling `sim::Handle::run_all_ready()` + /// when the channel is full so the actor can make progress without + /// actually parking the (sole) thread. + pub fn send_blocking(&self, msg: T) -> Result<(), SendError> { + match &self.rt { + #[cfg(feature = "tokio")] + crate::Handle::Tokio(_) => self.inner.send_blocking(msg), + #[cfg(feature = "simulation")] + crate::Handle::Simulation(sim) => { + let mut msg = msg; + loop { + match self.inner.try_send(msg) { + Ok(()) => return Ok(()), + Err(TrySendError::Full(m)) => { + msg = m; + sim.run_all_ready(); + } + Err(TrySendError::Closed(m)) => return Err(SendError(m)), + } + } + } + #[cfg(not(any(feature = "tokio", feature = "simulation")))] + _ => unreachable!("runtime::channel::send called with no backend enabled"), + } + } +} diff --git a/crates/runtime/src/hooks.rs b/crates/runtime/src/hooks.rs new file mode 100644 index 00000000000..38377b3db33 --- /dev/null +++ b/crates/runtime/src/hooks.rs @@ -0,0 +1,474 @@ +//! Experimental: detect thread parking inside the single-threaded simulator. +//! +//! This module installs a **seccomp BPF filter** that traps any +//! `futex(FUTEX_WAIT, …)` or `futex(FUTEX_WAIT_BITSET, …)` syscall while a +//! simulation is running. Because the simulator only has one OS thread, a +//! blocking call such as `send_blocking` or `thread::park` would park that +//! thread and deadlock the runtime. The trap delivers `SIGSYS`, the handler +//! prints a diagnostic, and the process aborts – giving a clear failure +//! instead of a silent hang. +//! +//! # Caveats (experimental) +//! +//! - **Linux + x86_64 only.** The BPF instructions and `ucontext` layout are +//! arch‑specific. Building on other targets compiles this module away. +//! - **Process‑wide side effect.** Once installed, the seccomp filter stays +//! for the lifetime of the process. Outside a simulation the handler +//! silently skips the blocking instruction (returning 0), so normal code is +//! not affected. +//! - **No false positive from mutex contention.** The filter specifically +//! targets `FUTEX_WAIT` / `FUTEX_WAIT_BITSET`. Mutex lock operations +//! use `FUTEX_LOCK_PI` or a different futex command and are allowed. +//! Since the simulation is single‑threaded, internal `pthread_mutex_lock` +//! calls never contend and never reach `FUTEX_WAIT`. +//! - **`std::process::abort` in the signal handler** is intentional – it is one +//! of the few async‑signal‑safe operations available. The panic machinery +//! would re‑enter `futex` for lock acquisition and cause a recursive trap. + +#![allow(clippy::disallowed_macros)] + +#[cfg(all(target_os = "linux", target_arch = "x86_64"))] +mod imp { + use core::sync::atomic::AtomicBool; + use core::sync::atomic::Ordering; + + // ── constants from kernel headers ────────────────────────────────── + // Most come from `libc` directly; a few are defined here because + // `libc` does not export them (e.g. `AUDIT_ARCH_X86_64`). + const AUDIT_ARCH_X86_64: u32 = 0xC000003E; // — EM_X86_64 | __AUDIT_ARCH_64BIT + + // ── BPF instruction builders ─────────────────────────────────────── + // Classic BPF instruction format used by seccomp. + // Each instruction is a `sock_filter { code, jt, jf, k }`: + // code — opcode (class | size | mode) + // jt — jump offset if true + // jf — jump offset if false + // k — generic operand / immediate / offset + // + // Available opcode components from : + // class: BPF_LD (0x00), BPF_LDX (0x01), BPF_ALU (0x04), BPF_JMP (0x05), BPF_RET (0x06) + // size: BPF_W (0x00), BPF_H (0x08), BPF_B (0x10) + // mode: BPF_ABS(0x20), BPF_IND(0x40), BPF_MEM(0x60), BPF_LEN(0x80) + // jmp-op: BPF_JA (0x00), BPF_JEQ(0x10), BPF_JGT(0x20), BPF_JGE(0x30), BPF_JSET(0x40) + // alu-op: BPF_ADD(0x00), BPF_SUB(0x10), BPF_MUL(0x20), BPF_AND(0x50) + // src: BPF_K (0x00 — use k field), BPF_X (0x08 — use X register) + + /// One BPF statement (no jump): reads data or returns a value. + fn bpf_stmt(op: u32, k: u32) -> libc::sock_filter { + libc::sock_filter { code: op as u16, jt: 0, jf: 0, k } + } + + /// One BPF jump: compares A against `k` and branches. + fn bpf_jmp(op: u32, jt: u8, jf: u8, k: u32) -> libc::sock_filter { + libc::sock_filter { code: op as u16, jt, jf, k } + } + + /// Install a seccomp BPF filter that traps `futex(FUTEX_WAIT)`. + /// + /// Everything (prctl + sigaction + BPF) is done once per process via + /// an `AtomicBool`. The first thread to enter simulation performs the + /// syscalls; subsequent threads inherit the filter at creation time. + pub fn install() { + static INSTALLED: AtomicBool = AtomicBool::new(false); + if INSTALLED.swap(true, Ordering::Relaxed) { + return; + } + unsafe { + // ── step 1: PR_SET_NO_NEW_PRIVS ───────────────────────────── + // Lets unprivileged threads install a seccomp filter. + let ret = libc::prctl(libc::PR_SET_NO_NEW_PRIVS, 1, 0, 0, 0); + assert_eq!(ret, 0, "parking_detect: PR_SET_NO_NEW_PRIVS failed"); + + // ── step 2: register SIGSYS handler ───────────────────────── + // SA_NODEFER: allow re‑entering the handler if an abort‑time + // syscall also hits the filter. + let mut sa: libc::sigaction = core::mem::zeroed(); + sa.sa_flags = libc::SA_SIGINFO | libc::SA_NODEFER; + let ptr = sigsys_handler as extern "C" fn(i32, *mut libc::siginfo_t, *mut libc::c_void); + // The sa_handler / sa_sigaction field is a union; write via raw + // bytes to avoid fighting the libc type definitions. + let dst: *mut libc::c_void = (&mut sa) as *mut _ as *mut libc::c_void; + dst.cast::().write(ptr as usize); + let ret = libc::sigaction(libc::SIGSYS, &sa, core::ptr::null_mut()); + assert_eq!(ret, 0, "parking_detect: sigaction(SIGSYS) failed"); + + // ── step 3: install the BPF filter ────────────────────────── + // Every syscall is checked by this 11-instruction seccomp + // program. The kernel provides `struct seccomp_data`: + // + // offset size field + // 0 4 nr (syscall number, 202 = futex) + // 4 4 arch (AUDIT_ARCH_*) + // 24 8 args[1] (futex op | flags) + // + // We verify the architecture, then the syscall number, + // then the futex operation (after masking the PRIVATE flag). + let bpf: [libc::sock_filter; 11] = [ + // ── insn 0: ld [4] ───────────────────────────────── + // Load the `arch` field of `seccomp_data` into A. + bpf_stmt( + libc::BPF_LD | libc::BPF_W | libc::BPF_ABS, + 4, + ), + + // ── insn 1: jeq AUDIT_ARCH_X86_64, 0, 8 ────────── + // If arch == x86_64 → continue (jt:0 → insn 2). + // Otherwise → jump forward 8 (jf:8 → insn 10, KILL). + // x86 compat syscalls have a different data layout + // and must be rejected outright. + bpf_jmp( + libc::BPF_JMP | libc::BPF_JEQ | libc::BPF_K, + 0, 8, + AUDIT_ARCH_X86_64, + ), + + // ── insn 2: ld [0] ───────────────────────────────── + // Load the `nr` (syscall number) into A. + bpf_stmt( + libc::BPF_LD | libc::BPF_W | libc::BPF_ABS, + 0, + ), + + // ── insn 3: jeq __NR_FUTEX, 0, 5 ───────────────── + // If nr == FUTEX (202) → continue (jt:0 → insn 4). + // Otherwise → jump forward 5 (jf:5 → insn 9, ALLOW). + bpf_jmp( + libc::BPF_JMP | libc::BPF_JEQ | libc::BPF_K, + 0, 5, + libc::SYS_futex as u32, + ), + + // ── insn 4: ld [24] ──────────────────────────────── + // Load `args[1]` — the futex operation word (op | flags). + // e.g. FUTEX_WAIT (0), FUTEX_WAIT_BITSET (9), + // FUTEX_PRIVATE_FLAG (0x80) + bpf_stmt( + libc::BPF_LD | libc::BPF_W | libc::BPF_ABS, + 24, + ), + + // ── insn 5: and 0x7F ────────────────────────────── + // Strip the PRIVATE flag bit (0x80). + // After masking: + // FUTEX_WAIT (0), FUTEX_WAIT|PRIVATE (0x80) → 0 + // FUTEX_WAIT_BITSET (9), FUTEX_WAIT_BITSET|PRIVATE (0x89) → 9 + bpf_stmt( + libc::BPF_ALU | libc::BPF_AND | libc::BPF_K, + 0x7F, + ), + + // ── insn 6: jeq 0, 1, 0 ────────────────────────── + // If masked op == FUTEX_WAIT (0) → jump forward 1 + // (jt:1 → insn 8, TRAP). + // Otherwise → fall through (jf:0 → insn 7). + bpf_jmp( + libc::BPF_JMP | libc::BPF_JEQ | libc::BPF_K, + 1, 0, + 0, // FUTEX_WAIT + ), + + // ── insn 7: jeq 9, 0, 1 ────────────────────────── + // If masked op == FUTEX_WAIT_BITSET (9) → fall + // through (jt:0 → insn 8, TRAP). + // Otherwise → jump forward 1 (jf:1 → insn 9, ALLOW). + bpf_jmp( + libc::BPF_JMP | libc::BPF_JEQ | libc::BPF_K, + 0, 1, + 9, // FUTEX_WAIT_BITSET + ), + + // ── insn 8: ret SECCOMP_RET_TRAP ──────────────── + // Deliver SIGSYS. Our handler checks + // `sim_std::in_simulation()` and aborts if inside a + // simulation, or skips the instruction otherwise. + bpf_stmt( + libc::BPF_RET | libc::BPF_K, + libc::SECCOMP_RET_TRAP, + ), + + // ── insn 9: ret SECCOMP_RET_ALLOW ────────────── + // Not a futex wait — let the syscall through. + bpf_stmt( + libc::BPF_RET | libc::BPF_K, + libc::SECCOMP_RET_ALLOW, + ), + + // ── insn 10: ret SECCOMP_RET_KILL ───────────── + // Architecture mismatch — kill the process. + bpf_stmt( + libc::BPF_RET | libc::BPF_K, + libc::SECCOMP_RET_KILL, + ), + ]; + + let prog = libc::sock_fprog { + len: bpf.len() as u16, + filter: &bpf as *const libc::sock_filter as *mut libc::sock_filter, + }; + let ret = libc::syscall( + libc::SYS_seccomp, + libc::SECCOMP_SET_MODE_FILTER, + 0, + &prog, + ); + assert_eq!( + ret, + 0, + "parking_detect: seccomp(SECCOMP_SET_MODE_FILTER) failed", + ); + } + } + + /// SIGSYS handler — traps a `futex_wait` inside simulation and aborts. + /// + /// Outside simulation the seccomp filter is also active (it is process‑wide), + /// so non‑simulation futex waits are harmless — the handler skips the + /// `syscall` instruction and returns 0 (spurious wakeup). In the final + /// simulation binary `in_simulation()` is always true, so this branch is + /// dead code and the optimizer removes it. + extern "C" fn sigsys_handler( + _sig: i32, + _info: *mut libc::siginfo_t, + ctx: *mut libc::c_void, + ) { + if crate::sim_std::in_simulation() { + const MSG: &[u8] = b"\ + blocking syscall (futex wait) detected inside deterministic simulation\n\ + \x20 note: use non-blocking alternatives or run with the tokio runtime\n\ + "; + unsafe { + libc::write(libc::STDERR_FILENO, MSG.as_ptr() as *const _, MSG.len()); + libc::abort(); + } + } + + // Outside simulation: skip the `syscall` instruction and return 0. + // The x86_64 `syscall` opcode is 2 bytes (0x0f 0x05). + #[cfg(target_arch = "x86_64")] + unsafe { + let uc = &mut *(ctx as *mut libc::ucontext_t); + uc.uc_mcontext.gregs[libc::REG_RIP as usize] = + uc.uc_mcontext.gregs[libc::REG_RIP as usize].wrapping_add(2); + uc.uc_mcontext.gregs[libc::REG_RAX as usize] = 0; + } + #[cfg(not(target_arch = "x86_64"))] + { + let _ = ctx; + unsafe { libc::abort(); } + } + } +} + +// —— RTLD_NEXT interposition hooks ------------------------------------------- + +/// Hook Unix thread creation by interposing `pthread_attr_init`. +/// +/// `std::thread::Builder::spawn` initializes pthread attributes before creating +/// the thread. Returning an error here while simulation is active makes hidden +/// OS thread creation fail early, before host scheduling can affect replay. +/// Outside simulation, this delegates to the real libc symbol through `RTLD_NEXT`. +#[cfg(unix)] +#[unsafe(no_mangle)] +#[inline(never)] +unsafe extern "C" fn pthread_attr_init(attr: *mut libc::pthread_attr_t) -> libc::c_int { + if crate::sim_std::in_simulation() { + unsafe { + let msg = b"attempt to spawn a system thread in simulation.\nnote: use simulator tasks instead.\n"; + libc::write(libc::STDERR_FILENO, msg.as_ptr() as *const _, msg.len()); + } + return -1; + } + + type PthreadAttrInit = unsafe extern "C" fn(*mut libc::pthread_attr_t) -> libc::c_int; + static PTHREAD_ATTR_INIT: spin::once::Once = spin::once::Once::new(); + let original = PTHREAD_ATTR_INIT.call_once(|| unsafe { + let ptr = libc::dlsym(libc::RTLD_NEXT, c"pthread_attr_init".as_ptr().cast()); + assert!(!ptr.is_null(), "failed to resolve original pthread_attr_init"); + core::mem::transmute(ptr) + }); + unsafe { original(attr) } +} + +/// Hook OS randomness by interposing `getrandom`. +/// +/// This crate no longer tries to make host randomness deterministic. Any such +/// request is surfaced with a warning and then delegated to the host OS. +#[unsafe(no_mangle)] +#[inline(never)] +unsafe extern "C" fn getrandom(buf: *mut u8, buflen: usize, flags: u32) -> isize { + unsafe { + let msg = b"warning: randomness requested; delegating to host OS\n"; + libc::write(libc::STDERR_FILENO, msg.as_ptr() as *const _, msg.len()); + } + eprintln!("{}", std::backtrace::Backtrace::force_capture()); + unsafe { real_getrandom()(buf, buflen, flags) } +} + +#[cfg(target_os = "linux")] +fn real_getrandom() -> unsafe extern "C" fn(*mut u8, usize, u32) -> isize { + type GetrandomFn = unsafe extern "C" fn(*mut u8, usize, u32) -> isize; + static GETRANDOM: spin::once::Once = spin::once::Once::new(); + *GETRANDOM.call_once(|| unsafe { + let ptr = libc::dlsym(libc::RTLD_NEXT, c"getrandom".as_ptr().cast()); + assert!(!ptr.is_null(), "failed to resolve original getrandom"); + core::mem::transmute(ptr) + }) +} + +#[cfg(not(target_os = "linux"))] +fn real_getrandom() -> unsafe extern "C" fn(*mut u8, usize, u32) -> isize { + compile_error!("unsupported OS for DST getrandom override"); +} + +/// Hook `getentropy` and route it through the same deterministic path as +/// `getrandom`. +/// +/// The 256-byte limit is part of the getentropy contract. Keeping this wrapper +/// small means all entropy decisions stay centralized in `getrandom`. +#[unsafe(no_mangle)] +#[inline(never)] +unsafe extern "C" fn getentropy(buf: *mut u8, buflen: usize) -> i32 { + if buflen > 256 { + return -1; + } + match unsafe { getrandom(buf, buflen, 0) } { + -1 => -1, + _ => 0, + } +} + +// —— public API -------------------------------------------------------------- + +/// Install the parking‑detection seccomp filter (if supported). +/// +/// On non‑Linux or non‑x86_64 this is a no‑op. +#[allow(dead_code)] +pub fn install() { + #[cfg(all(target_os = "linux", target_arch = "x86_64"))] + imp::install(); +} + +#[cfg(test)] +mod tests { + #[cfg(all(target_os = "linux", target_arch = "x86_64"))] + fn assert_subprocess_aborts(test_name: &str, env_var: &str) { + let exe = std::env::current_exe().expect("failed to get test binary path"); + let output = std::process::Command::new(&exe) + .env(env_var, "1") + .arg("--exact") + .arg(test_name) + .output() + .expect("failed to run subprocess"); + + assert!(!output.status.success(), "expected {test_name} to abort"); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!( + stderr.contains("blocking syscall (futex wait)"), + "expected blocking message in stderr, got:\n{stderr}", + ); + } + + #[test] + #[cfg(all(target_os = "linux", target_arch = "x86_64"))] + fn futex_block_trigger() { + if std::env::var("SPACETIMEDB_FUTEX_BLOCK").is_err() { + return; + } + let mut runtime = crate::sim::Runtime::new(42); + crate::sim_std::block_on(&mut runtime, async { + let (_tx, rx) = std::sync::mpsc::channel::<()>(); + let _ = rx.recv(); + }); + } + + #[test] + #[cfg(all(target_os = "linux", target_arch = "x86_64"))] + fn mpsc_recv_blocks_in_simulation() { + assert_subprocess_aborts("hooks::tests::futex_block_trigger", "SPACETIMEDB_FUTEX_BLOCK"); + } + + #[test] + #[cfg(all(target_os = "linux", target_arch = "x86_64"))] + fn contended_parking_lot_mutex_trigger() { + if std::env::var("SPACETIMEDB_PARKING_LOT_CONTEND").is_err() { + return; + } + let mut runtime = crate::sim::Runtime::new(42); + crate::sim_std::block_on(&mut runtime, async { + let lock = parking_lot::Mutex::new(42); + let _guard = lock.lock(); + let _guard2 = lock.lock(); + }); + } + + #[test] + #[cfg(all(target_os = "linux", target_arch = "x86_64"))] + fn parking_lot_contended_blocks_in_simulation() { + assert_subprocess_aborts( + "hooks::tests::contended_parking_lot_mutex_trigger", + "SPACETIMEDB_PARKING_LOT_CONTEND", + ); + } + + #[test] + #[cfg(all(target_os = "linux", target_arch = "x86_64"))] + fn send_blocking_at_bound_trigger() { + if std::env::var("SPACETIMEDB_SENDBLOCK_BOUND").is_err() { + return; + } + let mut runtime = crate::sim::Runtime::new(42); + crate::sim_std::block_on(&mut runtime, async { + let (tx, _rx) = async_channel::bounded::(1); + tx.send_blocking(1).expect("first send"); + tx.send_blocking(2).expect("full — never reaches"); + }); + } + + #[test] + #[cfg(all(target_os = "linux", target_arch = "x86_64"))] + fn send_blocking_at_bound_blocks_in_simulation() { + assert_subprocess_aborts( + "hooks::tests::send_blocking_at_bound_trigger", + "SPACETIMEDB_SENDBLOCK_BOUND", + ); + } + + #[test] + #[cfg(unix)] + fn runtime_forbids_system_thread_spawn() { + let mut runtime = crate::sim::Runtime::new(200); + crate::sim_std::block_on(&mut runtime, async { + let result = std::panic::catch_unwind(|| std::thread::Builder::new().spawn(|| {})); + assert!(result.is_err()); + }); + } + + #[test] + fn getentropy_delegates_to_host_randomness_outside_simulation() { + let mut actual = [0u8; 24]; + unsafe { + assert_eq!(super::getentropy(actual.as_mut_ptr(), actual.len()), 0); + } + } + + #[test] + #[cfg(unix)] + fn uncontended_parking_lot_mutex_works_in_simulation() { + let mut runtime = crate::sim::Runtime::new(42); + crate::sim_std::block_on(&mut runtime, async { + let lock = parking_lot::Mutex::new(42); + assert_eq!(*lock.lock(), 42); + }); + } + + #[test] + fn bounded_async_channel_send_blocking_not_full() { + let mut runtime = crate::sim::Runtime::new(42); + crate::sim_std::block_on(&mut runtime, async { + let (tx, rx) = async_channel::bounded::(2); + tx.send_blocking(1).expect("send within capacity"); + tx.send_blocking(2).expect("send within capacity"); + drop(rx); + }); + } +} diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index eaed2f35f46..00a1d102f83 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -1,4 +1,3 @@ -#[cfg(feature = "simulation")] extern crate alloc; use core::{ @@ -14,6 +13,10 @@ use core::{ pub mod sim; #[cfg(feature = "simulation")] pub mod sim_std; +#[cfg(feature = "simulation")] +pub mod hooks; + +pub mod channel; #[cfg(feature = "tokio")] pub type TokioHandle = tokio::runtime::Handle; @@ -347,4 +350,38 @@ mod tests { assert!(!flag.load(Ordering::Acquire)); }); } + + #[cfg(feature = "simulation")] + #[test] + fn simulation_bounded_queue_backpressure_ticks_executor() { + use crate::sim::Runtime; + let mut rt = Runtime::new(42); + let handle = Handle::simulation(rt.handle()); + + // Small capacity so we hit backpressure quickly. + let cap = 4; + let total = 100; + let (tx, rx) = crate::channel::bounded::(cap, handle.clone()); + + // Spawn consumer on the simulation executor. + let consumer = handle.spawn(async move { + let mut count = 0u64; + while let Ok(_) = rx.recv().await { + count += 1; + if count == total as u64 { + break; + } + } + count + }); + + rt.block_on(async { + for i in 0..total { + tx.send_blocking(i).unwrap(); + } + tx.close(); + let received = consumer.await.unwrap(); + assert_eq!(received, total as u64); + }); + } } diff --git a/crates/runtime/src/sim/executor/mod.rs b/crates/runtime/src/sim/executor/mod.rs index ff75cba0aef..727ce46fc74 100644 --- a/crates/runtime/src/sim/executor/mod.rs +++ b/crates/runtime/src/sim/executor/mod.rs @@ -342,6 +342,14 @@ impl Handle { pub fn buggify_with_prob(&self, probability: f64) -> bool { self.executor.buggify_with_prob(probability) } + + /// Drain the runnable queue, polling all ready tasks. + /// + /// This can be used by external code (e.g. the bounded queue's + /// simulation-aware sender) to make progress when a channel is full. + pub fn run_all_ready(&self) { + self.executor.run_all_ready(); + } } /// Core single-threaded scheduler backing a simulation [`Runtime`]. @@ -350,8 +358,8 @@ impl Handle { /// RNG, and virtual time. Tasks are selected from the queue using the runtime /// RNG so the schedule is reproducible for a given seed. struct Executor { - queue: Receiver, - sender: Sender, + queue: super::utils::Receiver, + sender: super::utils::Sender, nodes: spin::Mutex>>, next_node: AtomicU64, rng: Rng, @@ -361,7 +369,7 @@ struct Executor { impl Executor { /// Construct a fresh executor with one default `MAIN` node. fn new(config: RuntimeConfig) -> Self { - let queue = Queue::new(); + let queue = super::utils::Queue::new(); let mut nodes = BTreeMap::new(); nodes.insert(NodeId::MAIN, Arc::new(NodeRecord::default())); Self { @@ -507,7 +515,7 @@ impl Executor { /// /// Paused-node tasks are diverted into that node's paused buffer instead of /// being polled immediately. - fn run_all_ready(&self) { + pub(crate) fn run_all_ready(&self) { while let Some(runnable) = self.queue.try_recv_random(&self.rng) { let node = *runnable.metadata(); let record = self.node_record(node); @@ -575,70 +583,6 @@ impl Future for YieldNow { } } -/// Shared runnable queue used by the simulation executor. -/// TODO: Make it generic over T -struct Queue { - inner: Arc, -} - -/// Sending end of the runnable queue. -#[derive(Clone)] -struct Sender { - inner: Arc, -} - -/// Receiving end of the runnable queue. -#[derive(Clone)] -struct Receiver { - inner: Arc, -} - -/// Queue storage for runnables awaiting scheduling. -struct QueueInner { - queue: Mutex>, -} - -impl Queue { - fn new() -> Self { - Self { - inner: Arc::new(QueueInner { - queue: Mutex::new(Vec::new()), - }), - } - } - - fn sender(&self) -> Sender { - Sender { - inner: self.inner.clone(), - } - } - - fn receiver(&self) -> Receiver { - Receiver { - inner: self.inner.clone(), - } - } -} - -impl Sender { - /// Push a runnable onto the shared queue. - fn send(&self, runnable: Runnable) { - self.inner.queue.lock().push(runnable); - } -} - -impl Receiver { - /// Remove one runnable using the runtime RNG to choose among ready tasks. - fn try_recv_random(&self, rng: &Rng) -> Option { - let mut queue = self.inner.queue.lock(); - if queue.is_empty() { - return None; - } - let idx = rng.index(queue.len()); - Some(queue.swap_remove(idx)) - } -} - #[cfg(test)] mod tests { use std::sync::{ diff --git a/crates/runtime/src/sim/mod.rs b/crates/runtime/src/sim/mod.rs index ccdcc104991..f73d8cef074 100644 --- a/crates/runtime/src/sim/mod.rs +++ b/crates/runtime/src/sim/mod.rs @@ -1,6 +1,8 @@ pub mod buggify; mod executor; + mod rng; +pub mod utils; pub mod time; pub use executor::{ diff --git a/crates/runtime/src/sim/utils.rs b/crates/runtime/src/sim/utils.rs new file mode 100644 index 00000000000..c0a04bbdf46 --- /dev/null +++ b/crates/runtime/src/sim/utils.rs @@ -0,0 +1,80 @@ +use alloc::vec::Vec; +use alloc::sync::Arc; + +use spin::Mutex; + +use super::Rng; + +/// Shared queue used by the simulation executor. +pub struct Queue { + inner: Arc>, +} + +/// Sending end of a queue. +pub struct Sender { + inner: Arc>, +} + +/// Receiving end of a queue. +pub struct Receiver { + inner: Arc>, +} + +// Manual Clone impls avoid the `T: Clone` bound that `derive` adds. +impl Clone for Sender { + fn clone(&self) -> Self { + Self { inner: self.inner.clone() } + } +} + +impl Clone for Receiver { + fn clone(&self) -> Self { + Self { inner: self.inner.clone() } + } +} + +/// Queue storage. +struct QueueInner { + queue: Mutex>, +} + +impl Queue { + pub fn new() -> Self { + Self { + inner: Arc::new(QueueInner { + queue: Mutex::new(Vec::new()), + }), + } + } + + pub fn sender(&self) -> Sender { + Sender { + inner: self.inner.clone(), + } + } + + pub fn receiver(&self) -> Receiver { + Receiver { + inner: self.inner.clone(), + } + } +} + +impl Sender { + /// Push a value onto the shared queue. + pub fn send(&self, value: T) { + self.inner.queue.lock().push(value); + } +} + +impl Receiver { + /// Remove one value using the runtime RNG to choose among ready items. + pub fn try_recv_random(&self, rng: &Rng) -> Option { + let mut queue = self.inner.queue.lock(); + if queue.is_empty() { + return None; + } + let idx = rng.index(queue.len()); + Some(queue.swap_remove(idx)) + } +} diff --git a/crates/runtime/src/sim_std.rs b/crates/runtime/src/sim_std.rs index 5305c6ea166..9bfc6604cf8 100644 --- a/crates/runtime/src/sim_std.rs +++ b/crates/runtime/src/sim_std.rs @@ -1,19 +1,33 @@ -//! Std-hosted entry points for running the deterministic simulator in tests. -//! -//! The portable simulator lives in [`crate::sim`]. This module is deliberately -//! host-specific: it installs thread-local context while a simulation is -//! running, checks determinism by replaying a seed in fresh OS threads, and -//! intercepts a few libc calls so std code cannot silently escape determinism. - #![allow(clippy::disallowed_macros)] -use alloc::boxed::Box; -use core::{cell::Cell, future::Future}; -use std::sync::OnceLock; +use core::cell::Cell; +use core::future::Future; use crate::sim; -// Public entry points. +std::thread_local! { + static IN_SIMULATION: Cell = const { Cell::new(false) }; +} + +struct SimulationThreadGuard { + previous: bool, +} + +fn enter_simulation_thread() -> SimulationThreadGuard { + crate::hooks::install(); + let previous = IN_SIMULATION.with(|state| state.replace(true)); + SimulationThreadGuard { previous } +} + +pub(crate) fn in_simulation() -> bool { + IN_SIMULATION.with(|state| state.get()) +} + +impl Drop for SimulationThreadGuard { + fn drop(&mut self) { + IN_SIMULATION.with(|state| state.set(self.previous)); + } +} /// Run a future to completion with std-hosted determinism guards installed. /// @@ -61,147 +75,11 @@ where .unwrap() } -fn panic_with_seed(seed: u64, payload: Box) -> ! { - eprintln!("note: run with --seed {seed} to reproduce this error"); - std::panic::resume_unwind(payload); -} - -// Simulation thread context. - -// Ambient state used only while `sim_std::block_on` is driving a simulation. -// -// The simulator itself stays explicit-handle based. This thread-local only -// marks whether the current OS thread is owned by a running simulation so -// host thread creation can be rejected. -thread_local! { - // Marks the current OS thread as simulation-owned so thread creation hooks - // can reject accidental escapes to the host scheduler. - static IN_SIMULATION: Cell = const { Cell::new(false) }; -} - -struct SimulationThreadGuard { - previous: bool, -} - -fn enter_simulation_thread() -> SimulationThreadGuard { - let previous = IN_SIMULATION.with(|state| state.replace(true)); - SimulationThreadGuard { previous } -} - -fn in_simulation() -> bool { - IN_SIMULATION.with(Cell::get) -} - -impl Drop for SimulationThreadGuard { - fn drop(&mut self) { - IN_SIMULATION.with(|state| { - state.set(self.previous); - }); - } -} - -// Thread hook. - -// Hook Unix thread creation by interposing `pthread_attr_init`. -// -// `std::thread::Builder::spawn` initializes pthread attributes before creating -// the thread. Returning an error here while simulation is active makes hidden -// OS thread creation fail early, before host scheduling can affect replay. -// Outside simulation, this delegates to the real libc symbol through `RTLD_NEXT`. -#[cfg(unix)] -#[unsafe(no_mangle)] -#[inline(never)] -unsafe extern "C" fn pthread_attr_init(attr: *mut libc::pthread_attr_t) -> libc::c_int { - // std::thread enters libc through pthread_attr_init on Unix. Refusing that - // call while in simulation keeps hidden OS scheduling out of DST. - if in_simulation() { - eprintln!("attempt to spawn a system thread in simulation."); - eprintln!("note: use simulator tasks instead."); - return -1; - } - - type PthreadAttrInit = unsafe extern "C" fn(*mut libc::pthread_attr_t) -> libc::c_int; - static PTHREAD_ATTR_INIT: OnceLock = OnceLock::new(); - let original = PTHREAD_ATTR_INIT.get_or_init(|| unsafe { - // `RTLD_NEXT` skips this interposed function and finds the libc - // implementation that would have been called without the simulator. - let ptr = libc::dlsym(libc::RTLD_NEXT, c"pthread_attr_init".as_ptr().cast()); - assert!(!ptr.is_null(), "failed to resolve original pthread_attr_init"); - std::mem::transmute(ptr) - }); - unsafe { original(attr) } -} - -// Randomness syscall hooks. - -// Hook OS randomness by interposing `getrandom`. -// -// This crate no longer tries to make host randomness deterministic. Any such -// request is surfaced with a warning and then delegated to the host OS. -#[unsafe(no_mangle)] -#[inline(never)] -unsafe extern "C" fn getrandom(buf: *mut u8, buflen: usize, flags: u32) -> isize { - if in_simulation() { - eprintln!("warning: randomness requested; delegating to host OS"); - eprintln!("{}", std::backtrace::Backtrace::force_capture()); - } - unsafe { real_getrandom()(buf, buflen, flags) } -} - -#[cfg(target_os = "linux")] -fn real_getrandom() -> unsafe extern "C" fn(*mut u8, usize, u32) -> isize { - type GetrandomFn = unsafe extern "C" fn(*mut u8, usize, u32) -> isize; - static GETRANDOM: OnceLock = OnceLock::new(); - *GETRANDOM.get_or_init(|| unsafe { - let ptr = libc::dlsym(libc::RTLD_NEXT, c"getrandom".as_ptr().cast()); - assert!(!ptr.is_null(), "failed to resolve original getrandom"); - std::mem::transmute(ptr) - }) -} - -#[cfg(not(target_os = "linux"))] -fn real_getrandom() -> unsafe extern "C" fn(*mut u8, usize, u32) -> isize { - compile_error!("unsupported OS for DST getrandom override"); -} - -// Hook `getentropy` and route it through the same deterministic path as -// `getrandom`. -// -// The 256-byte limit is part of the getentropy contract. Keeping this wrapper -// small means all entropy decisions stay centralized in `getrandom`. -#[unsafe(no_mangle)] -#[inline(never)] -unsafe extern "C" fn getentropy(buf: *mut u8, buflen: usize) -> i32 { - if buflen > 256 { - return -1; - } - match unsafe { getrandom(buf, buflen, 0) } { - -1 => -1, - _ => 0, - } -} - -#[cfg(test)] -mod tests { - use crate::sim; - - use super::getentropy; - - #[test] - #[cfg(unix)] - fn runtime_forbids_system_thread_spawn() { - let mut runtime = sim::Runtime::new(200); - super::block_on(&mut runtime, async { - let result = std::panic::catch_unwind(|| std::thread::Builder::new().spawn(|| {})); - assert!(result.is_err()); - }); - } - - #[test] - fn getentropy_delegates_to_host_randomness_outside_simulation() { - let mut actual = [0u8; 24]; - unsafe { - assert_eq!(getentropy(actual.as_mut_ptr(), actual.len()), 0); - } +fn panic_with_seed(seed: u64, payload: alloc::boxed::Box) -> ! { + // Write panic message directly — no `eprintln!` in no_std. + unsafe { + let msg = alloc::format!("note: run with --seed {} to reproduce this error\n", seed); + libc::write(libc::STDERR_FILENO, msg.as_ptr() as *const _, msg.len()); } + std::panic::resume_unwind(payload); }