From ec8e3657e58a16c6ac6de54e46f9a24bc2c35dbd Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Mon, 16 Mar 2026 00:32:36 +0100 Subject: [PATCH 1/6] Fix event loop not triggered after timer/FD events After dispatching timer or FD events, the worker now sends task_ready to itself to trigger _run_once processing. Also modified process_ready_tasks to call _run_once when there are pending events (not just new coroutines). Without this fix, asyncio.sleep and other async operations would never complete because the event loop wasn't being driven after timer events. Changes: - py_event_worker: Send task_ready after handling {timeout, ...} and {select, ...} messages to trigger event loop processing - py_event_loop.c: Call _run_once when pending_count > 0 in addition to when coros_scheduled > 0 --- c_src/py_event_loop.c | 8 ++++++-- src/py_event_worker.erl | 7 +++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/c_src/py_event_loop.c b/c_src/py_event_loop.c index af8281c..29c6bf5 100644 --- a/c_src/py_event_loop.c +++ b/c_src/py_event_loop.c @@ -2912,13 +2912,17 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc, /* NOTE: We don't DECREF asyncio and run_and_send here because they're cached * in the loop structure. They'll be freed when the loop is destroyed. */ - /* Run one iteration of the event loop only if coroutines were scheduled. + /* Run one iteration of the event loop if: + * 1. New coroutines were scheduled (need to start them), OR + * 2. There are pending events (timers, FD callbacks) to process + * * For sync functions (like math.sqrt), results are sent directly via enif_send * and we don't need to drive the Python event loop. * * Pass timeout_hint=0 so we don't block - we just added work that needs * processing immediately. This is a uvloop-style optimization. */ - if (coros_scheduled > 0) { + int pending_count = atomic_load(&loop->pending_count); + if (coros_scheduled > 0 || pending_count > 0) { PyObject *run_result = PyObject_CallMethod(loop->py_loop, "_run_once", "i", 0); if (run_result != NULL) { Py_DECREF(run_result); diff --git a/src/py_event_worker.erl b/src/py_event_worker.erl index b1aa877..634176e 100644 --- a/src/py_event_worker.erl +++ b/src/py_event_worker.erl @@ -45,10 +45,14 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({select, FdRes, _Ref, ready_input}, State) -> py_nif:handle_fd_event_and_reselect(FdRes, read), + %% Trigger event processing after FD event dispatch + self() ! task_ready, {noreply, State}; handle_info({select, FdRes, _Ref, ready_output}, State) -> py_nif:handle_fd_event_and_reselect(FdRes, write), + %% Trigger event processing after FD event dispatch + self() ! task_ready, {noreply, State}; handle_info({start_timer, _LoopRef, DelayMs, CallbackId, TimerRef}, State) -> @@ -80,6 +84,9 @@ handle_info({timeout, TimerRef}, State) -> {_ErlTimerRef, CallbackId} -> py_nif:dispatch_timer(LoopRef, CallbackId), NewTimers = maps:remove(TimerRef, Timers), + %% Trigger event processing after timer dispatch + %% This ensures _run_once is called to handle the timer callback + self() ! task_ready, {noreply, State#state{timers = NewTimers}} end; From 472e592e8eed630cf6b789fd22f511c12fca3649 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Mon, 16 Mar 2026 06:51:03 +0100 Subject: [PATCH 2/6] Fix async task completion and add erlang.atom() type Timer events weren't triggering task completion because: - dispatch_timer didn't notify worker to process ready tasks - Python strings became binaries, but await expected atoms - ErlangRef conversion happened before resource type checks Changes: - Add global worker PID storage for timer dispatch notification - Add nif_set_shared_worker NIF and py_nif:set_shared_worker/1 - Add ErlangAtomObject type and erlang.atom() function - Modify _run_and_send to use atoms for message keys - Move enif_is_ref check after resource type checks - Add _get_ready_count method for event loop processing - Fix lazy loop creation to use ErlangEventLoop directly All py_async_task_SUITE (26) and py_SUITE (50) tests pass. --- c_src/py_callback.c | 196 +++++++++++++++++++++++++++++++++- c_src/py_convert.c | 42 ++++++++ c_src/py_event_loop.c | 197 ++++++++++++++++++++++++++++++----- c_src/py_event_loop.h | 16 +++ c_src/py_nif.c | 1 + c_src/py_nif.h | 15 +++ priv/_erlang_impl/_loop.py | 42 ++++++-- src/py_event_loop.erl | 4 + src/py_nif.erl | 8 ++ test/py_async_task_SUITE.erl | 41 ++++++++ 10 files changed, 530 insertions(+), 32 deletions(-) diff --git a/c_src/py_callback.c b/c_src/py_callback.c index b7eaa74..2e78d6a 100644 --- a/c_src/py_callback.c +++ b/c_src/py_callback.c @@ -1276,6 +1276,168 @@ PyTypeObject ErlangPidType = { .tp_doc = "Opaque Erlang process identifier", }; +/* ============================================================================ + * ErlangRef - opaque Erlang reference type + * + * Stores the reference as a serialized binary for round-trip through Python. + * ============================================================================ */ + +static void ErlangRef_dealloc(ErlangRefObject *self) { + if (self->data != NULL) { + PyMem_Free(self->data); + } + Py_TYPE(self)->tp_free((PyObject *)self); +} + +static PyObject *ErlangRef_repr(ErlangRefObject *self) { + return PyUnicode_FromFormat("", self->size); +} + +static PyObject *ErlangRef_richcompare(PyObject *a, PyObject *b, int op) { + if (!Py_IS_TYPE(b, &ErlangRefType)) { + Py_RETURN_NOTIMPLEMENTED; + } + ErlangRefObject *ra = (ErlangRefObject *)a; + ErlangRefObject *rb = (ErlangRefObject *)b; + + int cmp = 0; + if (ra->size != rb->size) { + cmp = (ra->size < rb->size) ? -1 : 1; + } else { + cmp = memcmp(ra->data, rb->data, ra->size); + } + + switch (op) { + case Py_EQ: return PyBool_FromLong(cmp == 0); + case Py_NE: return PyBool_FromLong(cmp != 0); + default: Py_RETURN_NOTIMPLEMENTED; + } +} + +static Py_hash_t ErlangRef_hash(ErlangRefObject *self) { + /* Simple hash of the serialized data */ + Py_hash_t h = 0; + for (size_t i = 0; i < self->size; i++) { + h = h * 31 + self->data[i]; + } + if (h == -1) h = -2; + return h; +} + +PyTypeObject ErlangRefType = { + PyVarObject_HEAD_INIT(NULL, 0) + .tp_name = "erlang.Ref", + .tp_basicsize = sizeof(ErlangRefObject), + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_dealloc = (destructor)ErlangRef_dealloc, + .tp_repr = (reprfunc)ErlangRef_repr, + .tp_richcompare = ErlangRef_richcompare, + .tp_hash = (hashfunc)ErlangRef_hash, + .tp_doc = "Opaque Erlang reference", +}; + +/* ============================================================================ + * ErlangAtom - Python type for Erlang atoms + * + * Erlang atoms are symbols (like Ruby symbols or Lisp symbols). This type + * allows Python code to explicitly create atoms for message passing. + * ============================================================================ */ + +static void ErlangAtom_dealloc(ErlangAtomObject *self) { + if (self->name != NULL) { + PyMem_Free(self->name); + } + Py_TYPE(self)->tp_free((PyObject *)self); +} + +static PyObject *ErlangAtom_repr(ErlangAtomObject *self) { + return PyUnicode_FromFormat("", self->name); +} + +static PyObject *ErlangAtom_str(ErlangAtomObject *self) { + return PyUnicode_FromString(self->name); +} + +static PyObject *ErlangAtom_richcompare(PyObject *a, PyObject *b, int op) { + if (!Py_IS_TYPE(b, &ErlangAtomType)) { + Py_RETURN_NOTIMPLEMENTED; + } + ErlangAtomObject *aa = (ErlangAtomObject *)a; + ErlangAtomObject *ab = (ErlangAtomObject *)b; + + int cmp = strcmp(aa->name, ab->name); + + switch (op) { + case Py_EQ: return PyBool_FromLong(cmp == 0); + case Py_NE: return PyBool_FromLong(cmp != 0); + case Py_LT: return PyBool_FromLong(cmp < 0); + case Py_LE: return PyBool_FromLong(cmp <= 0); + case Py_GT: return PyBool_FromLong(cmp > 0); + case Py_GE: return PyBool_FromLong(cmp >= 0); + default: Py_RETURN_NOTIMPLEMENTED; + } +} + +static Py_hash_t ErlangAtom_hash(ErlangAtomObject *self) { + /* Use Python's string hash for consistency */ + PyObject *str = PyUnicode_FromString(self->name); + if (str == NULL) return -1; + Py_hash_t h = PyObject_Hash(str); + Py_DECREF(str); + return h; +} + +PyTypeObject ErlangAtomType = { + PyVarObject_HEAD_INIT(NULL, 0) + .tp_name = "erlang.Atom", + .tp_basicsize = sizeof(ErlangAtomObject), + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_dealloc = (destructor)ErlangAtom_dealloc, + .tp_repr = (reprfunc)ErlangAtom_repr, + .tp_str = (reprfunc)ErlangAtom_str, + .tp_richcompare = ErlangAtom_richcompare, + .tp_hash = (hashfunc)ErlangAtom_hash, + .tp_doc = "Erlang atom (symbol)", +}; + +/** + * erlang.atom(name) - Create an Erlang atom + * + * Args: name (string) + * Returns: ErlangAtomObject + */ +static PyObject *erlang_atom_impl(PyObject *self, PyObject *args) { + (void)self; + const char *name; + Py_ssize_t name_len; + + if (!PyArg_ParseTuple(args, "s#", &name, &name_len)) { + return NULL; + } + + /* Validate atom name length (Erlang limit is 255 bytes) */ + if (name_len > 255) { + PyErr_SetString(PyExc_ValueError, "Atom name too long (max 255 bytes)"); + return NULL; + } + + ErlangAtomObject *obj = PyObject_New(ErlangAtomObject, &ErlangAtomType); + if (obj == NULL) { + return NULL; + } + + obj->name = PyMem_Malloc(name_len + 1); + if (obj->name == NULL) { + Py_DECREF(obj); + return PyErr_NoMemory(); + } + + memcpy(obj->name, name, name_len); + obj->name[name_len] = '\0'; + + return (PyObject *)obj; +} + /* ============================================================================ * ScheduleMarker - marker type for explicit scheduler release * @@ -2002,7 +2164,9 @@ static PyObject *erlang_send_impl(PyObject *self, PyObject *args) { } /* Fire-and-forget send */ - if (!enif_send(NULL, &pid->pid, msg_env, msg)) { + int send_result = enif_send(NULL, &pid->pid, msg_env, msg); + + if (!send_result) { enif_free_env(msg_env); PyErr_SetString(get_current_process_error(), "Failed to send message: process may not exist"); @@ -2698,6 +2862,10 @@ static PyMethodDef ErlangModuleMethods[] = { "Call a registered Erlang function.\n\n" "Usage: erlang.call('func_name', arg1, arg2, ...)\n" "Returns: The result from the Erlang function."}, + {"atom", erlang_atom_impl, METH_VARARGS, + "Create an Erlang atom.\n\n" + "Usage: erlang.atom('name')\n" + "Returns: An ErlangAtom object that converts to an Erlang atom."}, {"send", erlang_send_impl, METH_VARARGS, "Send a message to an Erlang process (fire-and-forget).\n\n" "Usage: erlang.send(pid, term)\n" @@ -2840,6 +3008,16 @@ static int create_erlang_module(void) { return -1; } + /* Initialize ErlangRef type */ + if (PyType_Ready(&ErlangRefType) < 0) { + return -1; + } + + /* Initialize ErlangAtom type */ + if (PyType_Ready(&ErlangAtomType) < 0) { + return -1; + } + /* Initialize ScheduleMarker type */ if (PyType_Ready(&ScheduleMarkerType) < 0) { return -1; @@ -2911,6 +3089,22 @@ static int create_erlang_module(void) { return -1; } + /* Add ErlangRef type to module */ + Py_INCREF(&ErlangRefType); + if (PyModule_AddObject(module, "Ref", (PyObject *)&ErlangRefType) < 0) { + Py_DECREF(&ErlangRefType); + Py_DECREF(module); + return -1; + } + + /* Add ErlangAtom type to module */ + Py_INCREF(&ErlangAtomType); + if (PyModule_AddObject(module, "Atom", (PyObject *)&ErlangAtomType) < 0) { + Py_DECREF(&ErlangAtomType); + Py_DECREF(module); + return -1; + } + /* Add ScheduleMarker type to module */ Py_INCREF(&ScheduleMarkerType); if (PyModule_AddObject(module, "ScheduleMarker", (PyObject *)&ScheduleMarkerType) < 0) { diff --git a/c_src/py_convert.c b/c_src/py_convert.c index a0ccd48..5e77c7c 100644 --- a/c_src/py_convert.c +++ b/c_src/py_convert.c @@ -339,6 +339,23 @@ ERL_NIF_TERM py_to_term(ErlNifEnv *env, PyObject *obj) { return enif_make_pid(env, &pid_obj->pid); } + /* Handle ErlangRef → Erlang reference (deserialize from binary) */ + if (Py_IS_TYPE(obj, &ErlangRefType)) { + ErlangRefObject *ref_obj = (ErlangRefObject *)obj; + ERL_NIF_TERM result; + if (enif_binary_to_term(env, ref_obj->data, ref_obj->size, &result, 0) > 0) { + return result; + } + /* Failed to deserialize - return undefined */ + return enif_make_atom(env, "undefined"); + } + + /* Handle ErlangAtom → Erlang atom */ + if (Py_IS_TYPE(obj, &ErlangAtomType)) { + ErlangAtomObject *atom_obj = (ErlangAtomObject *)obj; + return enif_make_atom(env, atom_obj->name); + } + /* Handle NumPy arrays by converting to Python list first */ if (is_numpy_ndarray(obj)) { PyObject *tolist = PyObject_CallMethod(obj, "tolist", NULL); @@ -603,6 +620,31 @@ static PyObject *term_to_py(ErlNifEnv *env, ERL_NIF_TERM term) { return PyBuffer_from_resource(pybuf, pybuf); } + /* Check for reference - serialize to binary for round-trip. + * IMPORTANT: This must come AFTER all resource checks, because NIF + * resource terms also satisfy enif_is_ref() but should be handled + * as their specific resource type (PyObj, Channel, Buffer, etc). */ + if (enif_is_ref(env, term)) { + ErlNifBinary bin; + if (enif_term_to_binary(env, term, &bin)) { + ErlangRefObject *obj = PyObject_New(ErlangRefObject, &ErlangRefType); + if (obj == NULL) { + enif_release_binary(&bin); + return NULL; + } + obj->data = PyMem_Malloc(bin.size); + if (obj->data == NULL) { + Py_DECREF(obj); + enif_release_binary(&bin); + return NULL; + } + memcpy(obj->data, bin.data, bin.size); + obj->size = bin.size; + enif_release_binary(&bin); + return (PyObject *)obj; + } + } + /* Fallback: return None for unknown types */ Py_RETURN_NONE; } diff --git a/c_src/py_event_loop.c b/c_src/py_event_loop.c index 29c6bf5..837360f 100644 --- a/c_src/py_event_loop.c +++ b/c_src/py_event_loop.c @@ -144,6 +144,15 @@ static ErlNifPid g_global_shared_router; static bool g_global_shared_router_valid = false; static pthread_mutex_t g_global_router_mutex = PTHREAD_MUTEX_INITIALIZER; +/* Global shared worker for scalable I/O model. + * Used by dispatch_timer to send task_ready, ensuring process_ready_tasks + * is called after timer events. This centralizes the wakeup mechanism + * so both router-dispatched and worker-dispatched timers work correctly. + */ +static ErlNifPid g_global_shared_worker; +static bool g_global_shared_worker_valid = false; +static pthread_mutex_t g_global_worker_mutex = PTHREAD_MUTEX_INITIALIZER; + /* ============================================================================ * Per-Interpreter Reactor Cache * ============================================================================ @@ -1786,6 +1795,9 @@ ERL_NIF_TERM nif_dispatch_callback(ErlNifEnv *env, int argc, * dispatch_timer(LoopRef, CallbackId) -> ok * * Called when a timer expires. + * Adds timer event to pending queue and sends task_ready to worker + * to trigger process_ready_tasks. This ensures _run_once is called + * to handle the timer callback. */ ERL_NIF_TERM nif_dispatch_timer(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { @@ -1804,6 +1816,21 @@ ERL_NIF_TERM nif_dispatch_timer(ErlNifEnv *env, int argc, event_loop_add_pending(loop, EVENT_TYPE_TIMER, callback_id, -1); + /* Send task_ready to worker to trigger process_ready_tasks. + * This ensures _run_once is called to handle the timer callback. + * Without this, timers dispatched via the router would never be processed + * because the worker wouldn't know there are pending events. */ + pthread_mutex_lock(&g_global_worker_mutex); + if (g_global_shared_worker_valid) { + ErlNifEnv *msg_env = enif_alloc_env(); + if (msg_env != NULL) { + ERL_NIF_TERM task_ready_atom = enif_make_atom(msg_env, "task_ready"); + enif_send(NULL, &g_global_shared_worker, msg_env, task_ready_atom); + enif_free_env(msg_env); + } + } + pthread_mutex_unlock(&g_global_worker_mutex); + return ATOM_OK; } @@ -2535,10 +2562,17 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc, */ atomic_store(&loop->task_wake_pending, false); - /* OPTIMIZATION: Check task count BEFORE acquiring GIL - * This avoids expensive GIL acquisition when there's nothing to do */ + /* OPTIMIZATION: Check if there's any work BEFORE acquiring GIL + * This avoids expensive GIL acquisition when there's nothing to do. + * + * We need to check BOTH: + * - task_count: new tasks submitted via submit_task + * - pending_count: timer/FD events dispatched via dispatch_timer/handle_fd_event + * + * If either has work, we need to proceed and call _run_once. */ uint_fast64_t task_count = atomic_load(&loop->task_count); - if (task_count == 0) { + int pending_count = atomic_load(&loop->pending_count); + if (task_count == 0 && pending_count == 0) { return ATOM_OK; /* Nothing to process, skip GIL entirely */ } @@ -2598,10 +2632,10 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc, pthread_mutex_unlock(&loop->task_queue_mutex); - /* If no tasks were dequeued, return early (no GIL needed) */ - if (num_tasks == 0) { - return ATOM_OK; - } + /* NOTE: We do NOT return early here even if num_tasks == 0. + * We may have pending timer/FD events that need _run_once to process. + * The first check (task_count == 0 && pending_count == 0) at the start + * of this function already handles the case where there's truly no work. */ /* ======================================================================== * PHASE 2: Process all tasks WITH GIL (Python operations) @@ -2670,8 +2704,37 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc, /* Lazy loop creation (uvloop-style): create Python loop on first use */ if (!loop->py_loop_valid || loop->py_loop == NULL) { - /* Create new event loop via asyncio policy (triggers ErlangEventLoop.__init__) */ - PyObject *new_loop = PyObject_CallMethod(asyncio, "new_event_loop", NULL); + /* Create ErlangEventLoop directly instead of via asyncio.new_event_loop(). + * This is necessary because dirty NIF scheduler threads don't have the + * event loop policy set. asyncio.new_event_loop() would create a + * SelectorEventLoop instead of our ErlangEventLoop. */ + PyObject *erlang_loop_mod = PyImport_ImportModule("_erlang_impl._loop"); + if (erlang_loop_mod == NULL) { + PyErr_Clear(); + erlang_loop_mod = PyImport_ImportModule("erlang_loop"); + } + if (erlang_loop_mod == NULL) { + PyErr_Clear(); + for (int i = 0; i < num_tasks; i++) { + enif_free_env(tasks[i].term_env); + } + PyGILState_Release(gstate); + return make_error(env, "loop_module_import_failed"); + } + + PyObject *loop_class = PyObject_GetAttrString(erlang_loop_mod, "ErlangEventLoop"); + Py_DECREF(erlang_loop_mod); + if (loop_class == NULL) { + PyErr_Clear(); + for (int i = 0; i < num_tasks; i++) { + enif_free_env(tasks[i].term_env); + } + PyGILState_Release(gstate); + return make_error(env, "loop_class_not_found"); + } + + PyObject *new_loop = PyObject_CallNoArgs(loop_class); + Py_DECREF(loop_class); if (new_loop == NULL) { PyErr_Clear(); for (int i = 0; i < num_tasks; i++) { @@ -2681,7 +2744,7 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc, return make_error(env, "loop_creation_failed"); } - /* Set as current event loop */ + /* Set as current event loop for this thread */ PyObject *set_result = PyObject_CallMethod(asyncio, "set_event_loop", "O", new_loop); Py_XDECREF(set_result); @@ -2850,6 +2913,7 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc, enif_free_env(term_env); continue; } + /* Copy PID */ pid_obj->pid = caller_pid; /* Convert ref to Python */ @@ -2912,23 +2976,48 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc, /* NOTE: We don't DECREF asyncio and run_and_send here because they're cached * in the loop structure. They'll be freed when the loop is destroyed. */ - /* Run one iteration of the event loop if: - * 1. New coroutines were scheduled (need to start them), OR - * 2. There are pending events (timers, FD callbacks) to process + /* Run the event loop until there's no more immediate work. * - * For sync functions (like math.sqrt), results are sent directly via enif_send - * and we don't need to drive the Python event loop. + * We need to keep calling _run_once because: + * 1. First call may schedule timers (coroutine hits await asyncio.sleep) + * 2. Timer dispatch adds callback to _ready queue + * 3. Next _run_once processes the _ready queue (resumes coroutine) + * 4. Coroutine may complete and send result, or schedule more work * - * Pass timeout_hint=0 so we don't block - we just added work that needs - * processing immediately. This is a uvloop-style optimization. */ - int pending_count = atomic_load(&loop->pending_count); - if (coros_scheduled > 0 || pending_count > 0) { + * We loop until both pending_count AND Python's _ready queue are empty. + * Pass timeout_hint=0 so we don't block. */ + int current_pending = atomic_load(&loop->pending_count); + int py_ready = 0; + int iterations = 0; + const int max_iterations = 100; /* Safety limit */ + + /* Loop while there's work: new coroutines, pending events, OR ready callbacks */ + while ((coros_scheduled > 0 || current_pending > 0 || py_ready > 0) && iterations < max_iterations) { + iterations++; + PyObject *run_result = PyObject_CallMethod(loop->py_loop, "_run_once", "i", 0); if (run_result != NULL) { Py_DECREF(run_result); } else { + PyErr_Print(); PyErr_Clear(); + break; + } + + /* Check if Python loop has more ready callbacks */ + PyObject *ready_len = PyObject_CallMethod(loop->py_loop, "_get_ready_count", NULL); + py_ready = 0; + if (ready_len != NULL) { + py_ready = (int)PyLong_AsLong(ready_len); + Py_DECREF(ready_len); + if (PyErr_Occurred()) { + PyErr_Clear(); + py_ready = 0; + } } + + current_pending = atomic_load(&loop->pending_count); + coros_scheduled = 0; /* Already processed on first iteration */ } /* Restore original event loop context before releasing GIL */ @@ -5457,6 +5546,28 @@ ERL_NIF_TERM nif_set_shared_router(ErlNifEnv *env, int argc, return ATOM_OK; } +/** + * Set the shared worker PID for task_ready notifications. + * This worker receives task_ready messages from dispatch_timer and other + * event sources to trigger process_ready_tasks. + */ +ERL_NIF_TERM nif_set_shared_worker(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]) { + (void)argc; + + ErlNifPid worker_pid; + if (!enif_get_local_pid(env, argv[0], &worker_pid)) { + return make_error(env, "invalid_pid"); + } + + pthread_mutex_lock(&g_global_worker_mutex); + g_global_shared_worker = worker_pid; + g_global_shared_worker_valid = true; + pthread_mutex_unlock(&g_global_worker_mutex); + + return ATOM_OK; +} + /* Python function: _poll_events(timeout_ms) -> num_events */ static PyObject *py_poll_events(PyObject *self, PyObject *args) { (void)self; @@ -6256,6 +6367,30 @@ static PyObject *py_set_global_loop_ref(PyObject *self, PyObject *args) { Py_RETURN_NONE; } +/* Python function: _get_global_loop_capsule() -> capsule + * + * Returns a capsule for the global interpreter event loop. + * This is the loop created by Erlang that has has_worker=true. + * Python's ErlangEventLoop should use this capsule instead of creating + * a new one, so that timers and FD events are properly dispatched to + * the worker which triggers process_ready_tasks. + */ +static PyObject *py_get_global_loop_capsule(PyObject *self, PyObject *args) { + (void)self; + (void)args; + + erlang_event_loop_t *loop = get_interpreter_event_loop(); + if (loop == NULL) { + PyErr_SetString(PyExc_RuntimeError, "Global event loop not initialized"); + return NULL; + } + + /* Keep the resource alive while capsule exists */ + enif_keep_resource(loop); + + return PyCapsule_New(loop, LOOP_CAPSULE_NAME, NULL); +} + /* Python function: _run_once_native_for(capsule, timeout_ms) -> [(callback_id, event_type), ...] */ static PyObject *py_run_once_for(PyObject *self, PyObject *args) { (void)self; @@ -6657,14 +6792,27 @@ static PyObject *py_schedule_timer_for(PyObject *self, PyObject *args) { return NULL; } - if (!event_loop_ensure_router(loop)) { + /* For timer scheduling, we need to use the global interpreter loop which + * has the worker process. The capsule's loop may be a Python-created loop + * that doesn't have has_worker set, which would cause timer dispatches + * to go to the router instead of the worker, breaking the event loop flow. + * + * The global loop (created by Erlang) has has_worker=true and its worker + * properly triggers process_ready_tasks after timer dispatch. */ + erlang_event_loop_t *target_loop = get_interpreter_event_loop(); + if (target_loop == NULL) { + /* Fall back to capsule's loop if global not available */ + target_loop = loop; + } + + if (!event_loop_ensure_router(target_loop)) { PyErr_SetString(PyExc_RuntimeError, "Event loop has no router or worker"); return NULL; } if (delay_ms < 0) delay_ms = 0; - uint64_t timer_ref_id = atomic_fetch_add(&loop->next_callback_id, 1); + uint64_t timer_ref_id = atomic_fetch_add(&target_loop->next_callback_id, 1); ErlNifEnv *msg_env = enif_alloc_env(); if (msg_env == NULL) { @@ -6672,8 +6820,8 @@ static PyObject *py_schedule_timer_for(PyObject *self, PyObject *args) { return NULL; } - /* Include loop resource in message so router dispatches to correct loop */ - ERL_NIF_TERM loop_term = enif_make_resource(msg_env, loop); + /* Include the target loop resource in message so dispatch goes to correct loop */ + ERL_NIF_TERM loop_term = enif_make_resource(msg_env, target_loop); ERL_NIF_TERM msg = enif_make_tuple5( msg_env, @@ -6685,7 +6833,7 @@ static PyObject *py_schedule_timer_for(PyObject *self, PyObject *args) { ); /* Use worker_pid when available for scalable I/O */ - ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid; + ErlNifPid *target_pid = target_loop->has_worker ? &target_loop->worker_pid : &target_loop->router_pid; int send_result = enif_send(NULL, target_pid, msg_env, msg); enif_free_env(msg_env); @@ -6865,6 +7013,7 @@ static PyMethodDef PyEventLoopMethods[] = { {"_cancel_timer", py_cancel_timer, METH_VARARGS, "Cancel an Erlang timer"}, /* Handle-based API (takes explicit loop capsule) */ {"_loop_new", py_loop_new, METH_NOARGS, "Create a new event loop, returns capsule"}, + {"_get_global_loop_capsule", py_get_global_loop_capsule, METH_NOARGS, "Get capsule for global event loop"}, {"_loop_destroy", py_loop_destroy, METH_VARARGS, "Destroy an event loop"}, {"_set_loop_ref", py_set_loop_ref, METH_VARARGS, "Store Python loop reference in C struct"}, {"_set_global_loop_ref", py_set_global_loop_ref, METH_VARARGS, "Store Python loop reference in global loop"}, diff --git a/c_src/py_event_loop.h b/c_src/py_event_loop.h index 52762ed..1d4c5a8 100644 --- a/c_src/py_event_loop.h +++ b/c_src/py_event_loop.h @@ -934,6 +934,22 @@ int py_event_loop_init_python(ErlNifEnv *env, erlang_event_loop_t *loop); ERL_NIF_TERM nif_set_python_event_loop(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +/** + * @brief Set the shared router PID for per-loop created loops + * + * NIF: set_shared_router(RouterPid) -> ok | {error, Reason} + */ +ERL_NIF_TERM nif_set_shared_router(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + +/** + * @brief Set the shared worker PID for task_ready notifications + * + * NIF: set_shared_worker(WorkerPid) -> ok | {error, Reason} + */ +ERL_NIF_TERM nif_set_shared_worker(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + /** * @brief Create and register the py_event_loop Python module * diff --git a/c_src/py_nif.c b/c_src/py_nif.c index db0bcd4..422faca 100644 --- a/c_src/py_nif.c +++ b/c_src/py_nif.c @@ -6583,6 +6583,7 @@ static ErlNifFunc nif_funcs[] = { {"set_python_event_loop", 1, nif_set_python_event_loop, 0}, {"set_isolation_mode", 1, nif_set_isolation_mode, 0}, {"set_shared_router", 1, nif_set_shared_router, 0}, + {"set_shared_worker", 1, nif_set_shared_worker, 0}, /* ASGI optimizations */ {"asgi_build_scope", 1, nif_asgi_build_scope, ERL_NIF_DIRTY_JOB_IO_BOUND}, diff --git a/c_src/py_nif.h b/c_src/py_nif.h index 20e8418..3aa4d7f 100644 --- a/c_src/py_nif.h +++ b/c_src/py_nif.h @@ -1373,6 +1373,21 @@ extern PyObject *ProcessErrorException; typedef struct { PyObject_HEAD; ErlNifPid pid; } ErlangPidObject; extern PyTypeObject ErlangPidType; +/** @brief Python type for opaque Erlang references (stored as serialized binary) */ +typedef struct { + PyObject_HEAD; + unsigned char *data; /* Serialized reference data */ + size_t size; /* Size of serialized data */ +} ErlangRefObject; +extern PyTypeObject ErlangRefType; + +/** @brief Python type for Erlang atoms (stored as string) */ +typedef struct { + PyObject_HEAD; + char *name; /* Atom name (null-terminated) */ +} ErlangAtomObject; +extern PyTypeObject ErlangAtomType; + /** @brief Cached numpy.ndarray type for fast isinstance checks (NULL if numpy unavailable) */ extern PyObject *g_numpy_ndarray_type; diff --git a/priv/_erlang_impl/_loop.py b/priv/_erlang_impl/_loop.py index 1329bf7..d0d30de 100644 --- a/priv/_erlang_impl/_loop.py +++ b/priv/_erlang_impl/_loop.py @@ -114,8 +114,20 @@ def __init__(self): # Fallback for testing without actual NIF self._pel = _MockNifModule() - # Create isolated loop capsule - self._loop_capsule = self._pel._loop_new() + # Use the global loop capsule instead of creating a new one. + # The global loop (created by Erlang) has has_worker=true, which ensures + # that timers and FD events are dispatched to the worker process. + # The worker triggers process_ready_tasks which calls _run_once. + # Without this, Python-created loops would have their own pending queues + # that never get processed because the worker doesn't know about them. + if hasattr(self._pel, '_get_global_loop_capsule'): + try: + self._loop_capsule = self._pel._get_global_loop_capsule() + except RuntimeError: + # Fall back to creating a new loop if global not available + self._loop_capsule = self._pel._loop_new() + else: + self._loop_capsule = self._pel._loop_new() # Store reference to this Python loop in the C struct # This enables process_ready_tasks to access the loop directly @@ -1058,6 +1070,13 @@ def _dispatch(self, callback_id, event_type): if not handle._cancelled: self._ready_append(handle) + def _get_ready_count(self): + """Return the number of ready callbacks. + + Used by C code to check if there's more work to do. + """ + return len(self._ready) + def _check_closed(self): """Raise an error if the loop is closed.""" if self._closed: @@ -1281,16 +1300,25 @@ async def _run_and_send(coro, caller_pid, ref): ref: A reference to include in the result message The result message format is: - ('async_result', ref, ('ok', result)) - on success - ('async_result', ref, ('error', error_str)) - on failure + (async_result, ref, (ok, result)) - on success + (async_result, ref, (error, error_str)) - on failure + + Note: Uses erlang.atom() to create atoms for message keys, since Python + strings become Erlang binaries but the await function expects atoms. """ import erlang + + # Create atoms for message keys (strings become binaries, await expects atoms) + async_result = erlang.atom('async_result') + ok = erlang.atom('ok') + error = erlang.atom('error') + try: result = await coro - erlang.send(caller_pid, ('async_result', ref, ('ok', result))) + erlang.send(caller_pid, (async_result, ref, (ok, result))) except asyncio.CancelledError: - erlang.send(caller_pid, ('async_result', ref, ('error', 'cancelled'))) + erlang.send(caller_pid, (async_result, ref, (error, 'cancelled'))) except Exception as e: import traceback tb = traceback.format_exc() - erlang.send(caller_pid, ('async_result', ref, ('error', f'{type(e).__name__}: {e}\n{tb}'))) + erlang.send(caller_pid, (async_result, ref, (error, f'{type(e).__name__}: {e}\n{tb}'))) diff --git a/src/py_event_loop.erl b/src/py_event_loop.erl index 3d2803a..de7904b 100644 --- a/src/py_event_loop.erl +++ b/src/py_event_loop.erl @@ -290,6 +290,8 @@ init([]) -> {ok, WorkerPid} = py_event_worker:start_link(WorkerId, LoopRef), ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid), ok = py_nif:event_loop_set_id(LoopRef, WorkerId), + %% Set global shared worker for dispatch_timer task_ready notifications + ok = py_nif:set_shared_worker(WorkerPid), %% Also start legacy router for backward compatibility {ok, RouterPid} = py_event_router:start_link(LoopRef), @@ -356,7 +358,9 @@ handle_call(get_loop, _From, #state{loop_ref = undefined} = State) -> {ok, WorkerPid} = py_event_worker:start_link(WorkerId, LoopRef), ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid), ok = py_nif:event_loop_set_id(LoopRef, WorkerId), + ok = py_nif:set_shared_worker(WorkerPid), {ok, RouterPid} = py_event_router:start_link(LoopRef), + ok = py_nif:set_shared_router(RouterPid), ok = py_nif:set_python_event_loop(LoopRef), NewState = State#state{ loop_ref = LoopRef, diff --git a/src/py_nif.erl b/src/py_nif.erl index 917aef3..a6c8836 100644 --- a/src/py_nif.erl +++ b/src/py_nif.erl @@ -152,6 +152,7 @@ set_python_event_loop/1, set_isolation_mode/1, set_shared_router/1, + set_shared_worker/1, %% ASGI optimizations asgi_build_scope/1, asgi_run/5, @@ -1060,6 +1061,13 @@ set_isolation_mode(_Mode) -> set_shared_router(_RouterPid) -> ?NIF_STUB. +%% @doc Set the shared worker PID for task_ready notifications. +%% The worker receives task_ready messages from dispatch_timer and other +%% event sources to trigger process_ready_tasks. +-spec set_shared_worker(pid()) -> ok | {error, term()}. +set_shared_worker(_WorkerPid) -> + ?NIF_STUB. + %%% ============================================================================ %%% ASGI Optimizations %%% ============================================================================ diff --git a/test/py_async_task_SUITE.erl b/test/py_async_task_SUITE.erl index db14c0a..9548b1f 100644 --- a/test/py_async_task_SUITE.erl +++ b/test/py_async_task_SUITE.erl @@ -17,6 +17,7 @@ test_async_coroutine/1, test_async_with_args/1, test_async_sleep/1, + test_timer_event_triggering/1, %% Error handling tests test_async_error/1, test_invalid_module/1, @@ -54,6 +55,7 @@ all() -> test_async_coroutine, test_async_with_args, test_async_sleep, + test_timer_event_triggering, %% Error handling tests test_async_error, test_invalid_module, @@ -176,6 +178,45 @@ test_async_sleep(_Config) -> true = abs(R - float(N)) < 0.0001 end, Results). +test_timer_event_triggering(_Config) -> + %% Test that timer events properly trigger event loop processing. + %% + %% This verifies the fix for the timer event triggering issue where + %% asyncio.sleep would never complete because dispatch_timer added + %% events to pending_head but nothing called _run_once to process them. + %% + %% The fix ensures that after dispatching timer/FD events, the worker + %% sends task_ready to itself to trigger _run_once processing. + %% + %% Uses test_async_task module which has async functions with asyncio.sleep. + + %% Test simple_task which uses asyncio.sleep(0.01) + ct:log("Testing simple async task with asyncio.sleep..."), + Ref1 = py_event_loop:create_task(test_async_task, simple_task, []), + Result1 = py_event_loop:await(Ref1, 5000), + ct:log("simple_task result: ~p", [Result1]), + {ok, <<"hello from async">>} = Result1, + + %% Test task_with_args which uses asyncio.sleep(0.01) + ct:log("Testing async task with args and asyncio.sleep..."), + Ref2 = py_event_loop:create_task(test_async_task, task_with_args, [10, 32]), + Result2 = py_event_loop:await(Ref2, 5000), + ct:log("task_with_args result: ~p", [Result2]), + {ok, 42} = Result2, + + %% Test concurrent async tasks with sleep + ct:log("Testing concurrent async tasks with asyncio.sleep..."), + Refs = [py_event_loop:create_task(test_async_task, task_with_args, [N, N]) + || N <- lists:seq(1, 5)], + Results = [py_event_loop:await(Ref, 5000) || Ref <- Refs], + ct:log("Concurrent results: ~p", [Results]), + + %% Verify all completed with correct values (N + N) + Expected = [{ok, N * 2} || N <- lists:seq(1, 5)], + Expected = Results, + + ct:log("timer_event_triggering test: all asyncio.sleep operations completed"). + %% ============================================================================ %% Error handling tests %% ============================================================================ From 766ce80733a24cf2b3d42924b521b07b9c3261b7 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Mon, 16 Mar 2026 07:14:41 +0100 Subject: [PATCH 3/6] Fix reentrancy and global capsule destruction issues - Skip _run_once when Python loop is already running to prevent reentrancy issues when erlang.run() is used inside py:exec - Don't destroy global loop capsule on close() since it's shared between multiple Python ErlangEventLoop instances - Clean up events_module in early return path Fixes py_async_e2e_SUITE hangs and crashes. --- c_src/py_event_loop.c | 24 ++++++++++++++++++++++++ priv/_erlang_impl/_loop.py | 15 +++++++++------ 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/c_src/py_event_loop.c b/c_src/py_event_loop.c index 837360f..ba0d8bb 100644 --- a/c_src/py_event_loop.c +++ b/c_src/py_event_loop.c @@ -2976,6 +2976,30 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc, /* NOTE: We don't DECREF asyncio and run_and_send here because they're cached * in the loop structure. They'll be freed when the loop is destroyed. */ + /* Check if Python loop is already running (e.g., from erlang.run() in py:exec). + * If so, skip calling _run_once - the running loop will handle events itself + * when poll_events_wait returns. Calling _run_once on a running loop is not + * safe because _run_once is not reentrant. */ + PyObject *is_running = PyObject_CallMethod(loop->py_loop, "is_running", NULL); + if (is_running != NULL) { + int running = PyObject_IsTrue(is_running); + Py_DECREF(is_running); + if (running) { + /* Loop is already running - just signal it and clean up. + * The pending events were already added by dispatch_timer/handle_fd_event, + * and the condition variable was signaled. The running loop will wake up + * and process them. */ + if (events_module != NULL) { + Py_XDECREF(old_running_loop); + Py_DECREF(events_module); + } + PyGILState_Release(gstate); + return ATOM_OK; + } + } else { + PyErr_Clear(); + } + /* Run the event loop until there's no more immediate work. * * We need to keep calling _run_once because: diff --git a/priv/_erlang_impl/_loop.py b/priv/_erlang_impl/_loop.py index d0d30de..f918abb 100644 --- a/priv/_erlang_impl/_loop.py +++ b/priv/_erlang_impl/_loop.py @@ -70,7 +70,7 @@ class ErlangEventLoop(asyncio.AbstractEventLoop): # Use __slots__ for faster attribute access and reduced memory __slots__ = ( - '_pel', '_loop_capsule', + '_pel', '_loop_capsule', '_uses_global_capsule', '_readers', '_writers', '_callbacks_by_cid', # callback_id -> (callback, args, event_type) for O(1) dispatch '_fd_resources', # fd -> fd_key (shared fd_resource_t per fd) @@ -120,9 +120,11 @@ def __init__(self): # The worker triggers process_ready_tasks which calls _run_once. # Without this, Python-created loops would have their own pending queues # that never get processed because the worker doesn't know about them. + self._uses_global_capsule = False if hasattr(self._pel, '_get_global_loop_capsule'): try: self._loop_capsule = self._pel._get_global_loop_capsule() + self._uses_global_capsule = True except RuntimeError: # Fall back to creating a new loop if global not available self._loop_capsule = self._pel._loop_new() @@ -316,11 +318,12 @@ def close(self): self._default_executor.shutdown(wait=True) self._default_executor = None - # Destroy loop capsule - try: - self._pel._loop_destroy(self._loop_capsule) - except Exception: - pass + # Destroy loop capsule (but not if using shared global capsule) + if not self._uses_global_capsule: + try: + self._pel._loop_destroy(self._loop_capsule) + except Exception: + pass self._loop_capsule = None async def shutdown_asyncgens(self): From fb5f5a1369dd4d3d8b040acd507568e63d1b1738 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Mon, 16 Mar 2026 07:45:33 +0100 Subject: [PATCH 4/6] Remove global worker send from dispatch_timer dispatch_timer may be called on a loop different from the one the global worker manages. The worker already sends task_ready to itself after handling the timer timeout, so the global send was redundant and incorrect. --- c_src/py_event_loop.c | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/c_src/py_event_loop.c b/c_src/py_event_loop.c index ba0d8bb..6587b05 100644 --- a/c_src/py_event_loop.c +++ b/c_src/py_event_loop.c @@ -1816,20 +1816,13 @@ ERL_NIF_TERM nif_dispatch_timer(ErlNifEnv *env, int argc, event_loop_add_pending(loop, EVENT_TYPE_TIMER, callback_id, -1); - /* Send task_ready to worker to trigger process_ready_tasks. - * This ensures _run_once is called to handle the timer callback. - * Without this, timers dispatched via the router would never be processed - * because the worker wouldn't know there are pending events. */ - pthread_mutex_lock(&g_global_worker_mutex); - if (g_global_shared_worker_valid) { - ErlNifEnv *msg_env = enif_alloc_env(); - if (msg_env != NULL) { - ERL_NIF_TERM task_ready_atom = enif_make_atom(msg_env, "task_ready"); - enif_send(NULL, &g_global_shared_worker, msg_env, task_ready_atom); - enif_free_env(msg_env); - } - } - pthread_mutex_unlock(&g_global_worker_mutex); + /* Note: We rely on event_loop_add_pending signaling the condition variable + * to wake up poll_events_wait. This works for both: + * - erlang.run() inside py:exec: Python loop is waiting on poll_events_wait + * - create_task: The worker is triggered by its own timer handling + * + * We don't send task_ready to the global worker here because dispatch_timer + * may be called on a loop different from the one the global worker manages. */ return ATOM_OK; } From 9e063fa1dd88da1314e7ccde512cb5bf3461d8b2 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Mon, 16 Mar 2026 08:06:19 +0100 Subject: [PATCH 5/6] Ignore task_queue_not_initialized during shutdown This error is expected when the loop is being destroyed while task_ready messages are still in the worker's mailbox. --- src/py_event_worker.erl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/py_event_worker.erl b/src/py_event_worker.erl index 634176e..efa455e 100644 --- a/src/py_event_worker.erl +++ b/src/py_event_worker.erl @@ -134,6 +134,9 @@ drain_tasks_loop(LoopRef) -> ok; {error, py_loop_not_set} -> ok; + {error, task_queue_not_initialized} -> + %% Loop is being destroyed, ignore + ok; {error, Reason} -> error_logger:warning_msg("py_event_worker: task processing failed: ~p~n", [Reason]), ok From 1b87ea2adf127ac525a4b6999c7ab6a97879011c Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Mon, 16 Mar 2026 09:43:50 +0100 Subject: [PATCH 6/6] Fix event loop isolation, atom safety, and Python 3.14 compat - Add _has_loop_ref() to prevent concurrent loops while allowing sequential replacement (checks is_running() not just exists) - Add _clear_loop_ref() called on loop close for proper cleanup - Add global_loop_capsule_destructor to fix resource leak - Rename atom() to _atom() in C, add Python wrapper with cache and configurable limit (ERLANG_PYTHON_MAX_ATOMS, default 10000) - Use enif_make_existing_atom() first to avoid duplicate atoms - Fix venv .pth file processing for Python 3.14 subinterpreters by embedding site-packages path directly in exec code --- c_src/py_callback.c | 9 +-- c_src/py_convert.c | 6 ++ c_src/py_event_loop.c | 108 +++++++++++++++++++++++++++++++++- priv/_erlang_impl/__init__.py | 49 +++++++++++++++ priv/_erlang_impl/_loop.py | 31 ++++++++-- src/py.erl | 24 ++++++-- 6 files changed, 212 insertions(+), 15 deletions(-) diff --git a/c_src/py_callback.c b/c_src/py_callback.c index 2e78d6a..f593b01 100644 --- a/c_src/py_callback.c +++ b/c_src/py_callback.c @@ -2862,10 +2862,11 @@ static PyMethodDef ErlangModuleMethods[] = { "Call a registered Erlang function.\n\n" "Usage: erlang.call('func_name', arg1, arg2, ...)\n" "Returns: The result from the Erlang function."}, - {"atom", erlang_atom_impl, METH_VARARGS, - "Create an Erlang atom.\n\n" - "Usage: erlang.atom('name')\n" - "Returns: An ErlangAtom object that converts to an Erlang atom."}, + {"_atom", erlang_atom_impl, METH_VARARGS, + "Internal: Create an Erlang atom.\n\n" + "Usage: erlang._atom('name')\n" + "Returns: An ErlangAtom object that converts to an Erlang atom.\n" + "NOTE: Use erlang.atom() wrapper instead for safety limits."}, {"send", erlang_send_impl, METH_VARARGS, "Send a message to an Erlang process (fire-and-forget).\n\n" "Usage: erlang.send(pid, term)\n" diff --git a/c_src/py_convert.c b/c_src/py_convert.c index 5e77c7c..1257e0e 100644 --- a/c_src/py_convert.c +++ b/c_src/py_convert.c @@ -353,6 +353,12 @@ ERL_NIF_TERM py_to_term(ErlNifEnv *env, PyObject *obj) { /* Handle ErlangAtom → Erlang atom */ if (Py_IS_TYPE(obj, &ErlangAtomType)) { ErlangAtomObject *atom_obj = (ErlangAtomObject *)obj; + ERL_NIF_TERM atom_term; + /* Try existing atom first (no new allocation) */ + if (enif_make_existing_atom(env, atom_obj->name, &atom_term, ERL_NIF_LATIN1)) { + return atom_term; + } + /* Atom doesn't exist yet, create it */ return enif_make_atom(env, atom_obj->name); } diff --git a/c_src/py_event_loop.c b/c_src/py_event_loop.c index 6587b05..e3e89d0 100644 --- a/c_src/py_event_loop.c +++ b/c_src/py_event_loop.c @@ -6176,6 +6176,20 @@ static void loop_capsule_destructor(PyObject *capsule) { } } +/** + * Destructor for global loop capsules. + * Only releases reference - does NOT signal shutdown since the global + * loop is shared and managed by Erlang, not Python. + */ +static void global_loop_capsule_destructor(PyObject *capsule) { + erlang_event_loop_t *loop = (erlang_event_loop_t *)PyCapsule_GetPointer( + capsule, LOOP_CAPSULE_NAME); + if (loop != NULL) { + /* Only release the reference, don't shutdown */ + enif_release_resource(loop); + } +} + /* Python function: _loop_new() -> capsule */ static PyObject *py_loop_new(PyObject *self, PyObject *args) { (void)self; @@ -6384,6 +6398,36 @@ static PyObject *py_set_global_loop_ref(PyObject *self, PyObject *args) { Py_RETURN_NONE; } +/** + * Python function: _clear_loop_ref(capsule) + * + * Clear the Python loop reference from an event loop capsule. + * Should be called when the Python loop is closed to allow + * creating a new loop later. + */ +static PyObject *py_clear_loop_ref(PyObject *self, PyObject *args) { + (void)self; + PyObject *capsule; + + if (!PyArg_ParseTuple(args, "O", &capsule)) { + return NULL; + } + + erlang_event_loop_t *loop = loop_from_capsule(capsule); + if (loop == NULL) { + return NULL; + } + + /* Clear the Python loop reference */ + if (loop->py_loop != NULL) { + Py_DECREF(loop->py_loop); + loop->py_loop = NULL; + } + loop->py_loop_valid = false; + + Py_RETURN_NONE; +} + /* Python function: _get_global_loop_capsule() -> capsule * * Returns a capsule for the global interpreter event loop. @@ -6405,7 +6449,67 @@ static PyObject *py_get_global_loop_capsule(PyObject *self, PyObject *args) { /* Keep the resource alive while capsule exists */ enif_keep_resource(loop); - return PyCapsule_New(loop, LOOP_CAPSULE_NAME, NULL); + return PyCapsule_New(loop, LOOP_CAPSULE_NAME, global_loop_capsule_destructor); +} + +/** + * Python function: _has_loop_ref(capsule) -> bool + * + * Check if a loop capsule has an ACTIVE Python loop reference. + * Returns True only if there's a valid loop that is currently RUNNING. + * This prevents multiple concurrent loops while allowing sequential + * loop replacement (e.g., between test cases). + * + * The key insight is that the event confusion bug occurs when multiple + * loops are running simultaneously. A non-running loop (even if not + * explicitly closed) can be safely replaced. + */ +static PyObject *py_has_loop_ref(PyObject *self, PyObject *args) { + (void)self; + PyObject *capsule; + + if (!PyArg_ParseTuple(args, "O", &capsule)) { + return NULL; + } + + erlang_event_loop_t *loop = loop_from_capsule(capsule); + if (loop == NULL) { + return NULL; + } + + if (loop->py_loop_valid && loop->py_loop != NULL) { + /* Check if the existing loop is running - only block if running */ + PyObject *is_running = PyObject_CallMethod(loop->py_loop, "is_running", NULL); + if (is_running != NULL) { + int running = PyObject_IsTrue(is_running); + Py_DECREF(is_running); + if (running) { + /* Loop is still running - prevent concurrent loop creation */ + Py_RETURN_TRUE; + } + } else { + /* Error calling is_running - clear error and check is_closed as fallback */ + PyErr_Clear(); + } + + /* Loop exists but is not running - check if closed for cleanup */ + PyObject *is_closed = PyObject_CallMethod(loop->py_loop, "is_closed", NULL); + if (is_closed != NULL) { + int closed = PyObject_IsTrue(is_closed); + Py_DECREF(is_closed); + if (closed) { + /* Loop is closed, clean up reference */ + Py_DECREF(loop->py_loop); + loop->py_loop = NULL; + loop->py_loop_valid = false; + } + } else { + PyErr_Clear(); + } + /* Not running, allow replacement */ + Py_RETURN_FALSE; + } + Py_RETURN_FALSE; } /* Python function: _run_once_native_for(capsule, timeout_ms) -> [(callback_id, event_type), ...] */ @@ -7031,6 +7135,8 @@ static PyMethodDef PyEventLoopMethods[] = { /* Handle-based API (takes explicit loop capsule) */ {"_loop_new", py_loop_new, METH_NOARGS, "Create a new event loop, returns capsule"}, {"_get_global_loop_capsule", py_get_global_loop_capsule, METH_NOARGS, "Get capsule for global event loop"}, + {"_has_loop_ref", py_has_loop_ref, METH_VARARGS, "Check if loop capsule has Python loop reference"}, + {"_clear_loop_ref", py_clear_loop_ref, METH_VARARGS, "Clear Python loop reference from C struct"}, {"_loop_destroy", py_loop_destroy, METH_VARARGS, "Destroy an event loop"}, {"_set_loop_ref", py_set_loop_ref, METH_VARARGS, "Store Python loop reference in C struct"}, {"_set_global_loop_ref", py_set_global_loop_ref, METH_VARARGS, "Store Python loop reference in global loop"}, diff --git a/priv/_erlang_impl/__init__.py b/priv/_erlang_impl/__init__.py index ee56b0b..3be6b2f 100644 --- a/priv/_erlang_impl/__init__.py +++ b/priv/_erlang_impl/__init__.py @@ -44,6 +44,7 @@ loop.run_until_complete(main()) """ +import os import sys import asyncio import time @@ -82,12 +83,60 @@ 'Channel', 'reply', 'ChannelClosed', + 'atom', ] +# Atom caching with configurable limit to prevent BEAM atom table exhaustion. +# The BEAM VM has a hard limit (~1M atoms) and crashes when exceeded. +# This provides a Python-level safety valve well under that limit. +_MAX_USER_ATOMS = int(os.environ.get('ERLANG_PYTHON_MAX_ATOMS', '10000')) +_atom_cache = {} + # Re-export for uvloop API compatibility EventLoopPolicy = ErlangEventLoopPolicy +def atom(name): + """Create an Erlang atom with safety limit. + + Atoms in Erlang are permanent and the BEAM VM has a hard limit + (~1M atoms). This function provides a Python-level cache with + a configurable limit to prevent atom table exhaustion from + untrusted Python code. + + Args: + name: The atom name as a string. + + Returns: + An ErlangAtom object that converts to an Erlang atom. + + Raises: + RuntimeError: If the atom limit is reached. + + The limit can be configured via the ERLANG_PYTHON_MAX_ATOMS + environment variable (default: 10000). + + Example: + >>> import erlang + >>> ok = erlang.atom('ok') + >>> error = erlang.atom('error') + """ + if name in _atom_cache: + return _atom_cache[name] + + if len(_atom_cache) >= _MAX_USER_ATOMS: + raise RuntimeError( + f"Atom limit ({_MAX_USER_ATOMS}) reached. " + "Set ERLANG_PYTHON_MAX_ATOMS env var to increase." + ) + + # Import erlang module to access internal _atom function + import erlang as _erlang + result = _erlang._atom(name) + _atom_cache[name] = result + return result + + def get_event_loop_policy() -> ErlangEventLoopPolicy: """Get an Erlang event loop policy instance. diff --git a/priv/_erlang_impl/_loop.py b/priv/_erlang_impl/_loop.py index f918abb..e75013b 100644 --- a/priv/_erlang_impl/_loop.py +++ b/priv/_erlang_impl/_loop.py @@ -125,9 +125,20 @@ def __init__(self): try: self._loop_capsule = self._pel._get_global_loop_capsule() self._uses_global_capsule = True - except RuntimeError: + # Check if another loop already owns this capsule. + # Only one ErlangEventLoop per interpreter is supported. + if hasattr(self._pel, '_has_loop_ref') and self._pel._has_loop_ref(self._loop_capsule): + raise RuntimeError( + "An ErlangEventLoop already exists for this interpreter. " + "Only one loop per interpreter is supported." + ) + except RuntimeError as e: + # Re-raise our "already exists" error + if "already exists" in str(e): + raise # Fall back to creating a new loop if global not available self._loop_capsule = self._pel._loop_new() + self._uses_global_capsule = False else: self._loop_capsule = self._pel._loop_new() @@ -318,6 +329,15 @@ def close(self): self._default_executor.shutdown(wait=True) self._default_executor = None + # Clear loop ref to allow creating a new loop later. + # This is important for the global capsule case where the capsule + # persists but a new Python loop may be created. + if self._loop_capsule is not None and hasattr(self._pel, '_clear_loop_ref'): + try: + self._pel._clear_loop_ref(self._loop_capsule) + except Exception: + pass + # Destroy loop capsule (but not if using shared global capsule) if not self._uses_global_capsule: try: @@ -1306,15 +1326,16 @@ async def _run_and_send(coro, caller_pid, ref): (async_result, ref, (ok, result)) - on success (async_result, ref, (error, error_str)) - on failure - Note: Uses erlang.atom() to create atoms for message keys, since Python + Note: Uses cached atom() to create atoms for message keys, since Python strings become Erlang binaries but the await function expects atoms. """ import erlang + from . import atom # Use cached version from _erlang_impl # Create atoms for message keys (strings become binaries, await expects atoms) - async_result = erlang.atom('async_result') - ok = erlang.atom('ok') - error = erlang.atom('error') + async_result = atom('async_result') + ok = atom('ok') + error = atom('error') try: result = await coro diff --git a/src/py.erl b/src/py.erl index 72e7b15..f748487 100644 --- a/src/py.erl +++ b/src/py.erl @@ -1040,11 +1040,18 @@ activate_venv_with_site_packages(VenvBin, SitePackages) -> {ok, _} = eval(<<"setattr(__import__('sys'), '_active_venv', vp)">>, #{vp => VenvBin}), {ok, _} = eval(<<"setattr(__import__('sys'), '_venv_site_packages', sp)">>, #{sp => SitePackages}), %% Add site-packages and process .pth files (editable installs) - ok = exec(<<"import site as _site, sys as _sys\n" - "_b = frozenset(_sys.path)\n" - "_site.addsitedir(_sys._venv_site_packages)\n" - "_sys.path[:] = [p for p in _sys.path if p not in _b] + [p for p in _sys.path if p in _b]\n" - "del _site, _sys, _b\n">>), + %% Note: We embed the site-packages path directly since exec doesn't support + %% variables and sys attributes may not persist across calls in subinterpreters + SitePackagesStr = binary_to_list(SitePackages), + ExecCode = iolist_to_binary([ + <<"import site as _site, sys as _sys\n">>, + <<"_sp = '">>, escape_python_string(SitePackagesStr), <<"'\n">>, + <<"_b = frozenset(_sys.path)\n">>, + <<"_site.addsitedir(_sp)\n">>, + <<"_sys.path[:] = [p for p in _sys.path if p not in _b] + [p for p in _sys.path if p in _b]\n">>, + <<"del _site, _sys, _b, _sp\n">> + ]), + ok = exec(ExecCode), ok; {ok, false} -> {error, {invalid_venv, SitePackages}}; @@ -1052,6 +1059,13 @@ activate_venv_with_site_packages(VenvBin, SitePackages) -> Error end. +%% @private Escape a string for embedding in Python code +escape_python_string(Str) -> + lists:flatmap(fun($') -> "\\'"; + ($\\) -> "\\\\"; + (C) -> [C] + end, Str). + %% @doc Deactivate the current virtual environment. %% Restores sys.path to its original state. -spec deactivate_venv() -> ok | {error, term()}.