Skip to content

Commit 1394a35

Browse files
committed
Fix event loop thread-local context in process_ready_tasks
Set Python event loop in thread-local storage before processing async tasks. process_ready_tasks runs on dirty NIF scheduler threads (named 'Dummy-X'), not the main thread, and Python's asyncio uses thread-local storage for event loops. The fix imports asyncio.events and sets: - The current event loop via asyncio.set_event_loop() - The running loop via events._set_running_loop() This mirrors what Python's asyncio.run() does internally. The original context is restored before releasing the GIL.
1 parent d65b301 commit 1394a35

1 file changed

Lines changed: 39 additions & 0 deletions

File tree

c_src/py_event_loop.c

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2275,6 +2275,10 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
22752275
PyObject *asyncio;
22762276
PyObject *run_and_send;
22772277

2278+
/* For thread-local event loop context (dirty NIF scheduler workaround) */
2279+
PyObject *events_module = NULL;
2280+
PyObject *old_running_loop = NULL;
2281+
22782282
if (loop->py_cache_valid && loop->cached_asyncio != NULL && loop->cached_run_and_send != NULL) {
22792283
/* Use cached references */
22802284
asyncio = loop->cached_asyncio;
@@ -2356,6 +2360,32 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
23562360
}
23572361
}
23582362

2363+
/* ========================================================================
2364+
* Set event loop in current thread's context (dirty NIF scheduler fix)
2365+
*
2366+
* process_ready_tasks runs on dirty NIF scheduler threads (named 'Dummy-X'),
2367+
* not the main thread. Python's asyncio uses thread-local storage for event
2368+
* loops, so we must explicitly set our loop as both the current event loop
2369+
* and the running loop for this thread.
2370+
*
2371+
* This mirrors what Python's asyncio.run() does internally (see _loop.py).
2372+
* ======================================================================== */
2373+
events_module = PyImport_ImportModule("asyncio.events");
2374+
if (events_module != NULL) {
2375+
/* Set our loop as current event loop for this thread */
2376+
PyObject *set_result = PyObject_CallMethod(asyncio, "set_event_loop", "O", loop->py_loop);
2377+
Py_XDECREF(set_result);
2378+
2379+
/* Save and set running loop (needed for asyncio.Task creation) */
2380+
old_running_loop = PyObject_CallMethod(events_module, "_get_running_loop", NULL);
2381+
if (old_running_loop == NULL) {
2382+
PyErr_Clear();
2383+
old_running_loop = Py_NewRef(Py_None);
2384+
}
2385+
PyObject *set_running = PyObject_CallMethod(events_module, "_set_running_loop", "O", loop->py_loop);
2386+
Py_XDECREF(set_running);
2387+
}
2388+
23592389
/* Process all dequeued tasks */
23602390
ERL_NIF_TERM result = ATOM_OK;
23612391
int coros_scheduled = 0; /* Track if any coroutines were scheduled */
@@ -2571,6 +2601,15 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
25712601
}
25722602
}
25732603

2604+
/* Restore original event loop context before releasing GIL */
2605+
if (events_module != NULL) {
2606+
PyObject *restore = PyObject_CallMethod(events_module, "_set_running_loop", "O",
2607+
old_running_loop ? old_running_loop : Py_None);
2608+
Py_XDECREF(restore);
2609+
Py_XDECREF(old_running_loop);
2610+
Py_DECREF(events_module);
2611+
}
2612+
25742613
PyGILState_Release(gstate);
25752614

25762615
/*

0 commit comments

Comments
 (0)