Skip to content

Commit a79b522

Browse files
committed
Fix async_callback for subinterpreter compatibility
Use per-interpreter module state instead of global state for async callbacks. Each subinterpreter now gets its own pipe and futures dict. Changes: - Add erlang_module_state_t struct with pipe, futures dict, and mutex - Update ErlangModuleDef to use sizeof(erlang_module_state_t) for m_size - Add get_erlang_module_state() accessor function - Add erlang_module_free() for cleanup on module deallocation - Update async_callback_init(), process_async_callback_response(), get_async_callback_fd(), send_async_callback_request(), and register_async_future() to use module state - Initialize module state in create_erlang_module()
1 parent fd2008e commit a79b522

File tree

1 file changed

+138
-52
lines changed

1 file changed

+138
-52
lines changed

c_src/py_callback.c

Lines changed: 138 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -2022,52 +2022,75 @@ static PyObject *erlang_send_impl(PyObject *self, PyObject *args) {
20222022
extern ErlNifPid g_thread_coordinator_pid;
20232023
extern bool g_has_thread_coordinator;
20242024

2025-
/* Global state for async callbacks */
2026-
static int g_async_callback_pipe[2] = {-1, -1}; /* [0]=read, [1]=write */
2027-
static PyObject *g_async_pending_futures = NULL; /* Dict: callback_id -> Future */
2028-
static pthread_mutex_t g_async_futures_mutex = PTHREAD_MUTEX_INITIALIZER;
2025+
/* Per-interpreter module state for async callbacks.
2026+
* Each subinterpreter gets its own pipe and futures dict. */
2027+
typedef struct {
2028+
int async_callback_pipe[2]; /* [0]=read, [1]=write - per-interpreter pipe */
2029+
PyObject *async_pending_futures; /* Dict: callback_id -> Future */
2030+
pthread_mutex_t async_futures_mutex;
2031+
bool pipe_initialized;
2032+
} erlang_module_state_t;
20292033

2030-
/* Thread-safe initialization using pthread_once */
2031-
static pthread_once_t g_async_callback_init_once = PTHREAD_ONCE_INIT;
2032-
static int g_async_callback_init_result = 0;
2034+
/* Forward declaration for module state accessor */
2035+
static erlang_module_state_t *get_erlang_module_state(void);
20332036

20342037
/**
2035-
* Internal initialization function called by pthread_once.
2036-
* Thread-safe: only called once by pthread_once.
2038+
* Get the erlang module state for the current interpreter.
2039+
* Returns NULL if module not available.
20372040
*/
2038-
static void async_callback_init_impl(void) {
2039-
if (pipe(g_async_callback_pipe) < 0) {
2040-
g_async_callback_init_result = -1;
2041-
return;
2042-
}
2043-
2044-
/* Set the read end to non-blocking for asyncio compatibility */
2045-
int flags = fcntl(g_async_callback_pipe[0], F_GETFL, 0);
2046-
if (flags >= 0) {
2047-
fcntl(g_async_callback_pipe[0], F_SETFL, flags | O_NONBLOCK);
2041+
static erlang_module_state_t *get_erlang_module_state(void) {
2042+
PyObject *name = PyUnicode_FromString("erlang");
2043+
if (name == NULL) {
2044+
PyErr_Clear();
2045+
return NULL;
20482046
}
2049-
2050-
g_async_pending_futures = PyDict_New();
2051-
if (g_async_pending_futures == NULL) {
2052-
close(g_async_callback_pipe[0]);
2053-
close(g_async_callback_pipe[1]);
2054-
g_async_callback_pipe[0] = -1;
2055-
g_async_callback_pipe[1] = -1;
2056-
g_async_callback_init_result = -1;
2057-
return;
2047+
PyObject *module = PyImport_GetModule(name);
2048+
Py_DECREF(name);
2049+
if (module == NULL) {
2050+
PyErr_Clear();
2051+
return NULL;
20582052
}
2059-
2060-
g_async_callback_init_result = 0;
2053+
erlang_module_state_t *state = (erlang_module_state_t *)PyModule_GetState(module);
2054+
Py_DECREF(module);
2055+
return state;
20612056
}
20622057

20632058
/**
2064-
* Initialize async callback system.
2059+
* Initialize async callback system for the current interpreter.
20652060
* Creates the response pipe and pending futures dict.
2066-
* Thread-safe: uses pthread_once for initialization.
2061+
* Uses per-interpreter module state.
20672062
*/
20682063
static int async_callback_init(void) {
2069-
pthread_once(&g_async_callback_init_once, async_callback_init_impl);
2070-
return g_async_callback_init_result;
2064+
erlang_module_state_t *state = get_erlang_module_state();
2065+
if (state == NULL) {
2066+
return -1;
2067+
}
2068+
2069+
if (state->pipe_initialized) {
2070+
return 0; /* Already initialized for this interpreter */
2071+
}
2072+
2073+
if (pipe(state->async_callback_pipe) < 0) {
2074+
return -1;
2075+
}
2076+
2077+
/* Set the read end to non-blocking for asyncio compatibility */
2078+
int flags = fcntl(state->async_callback_pipe[0], F_GETFL, 0);
2079+
if (flags >= 0) {
2080+
fcntl(state->async_callback_pipe[0], F_SETFL, flags | O_NONBLOCK);
2081+
}
2082+
2083+
state->async_pending_futures = PyDict_New();
2084+
if (state->async_pending_futures == NULL) {
2085+
close(state->async_callback_pipe[0]);
2086+
close(state->async_callback_pipe[1]);
2087+
state->async_callback_pipe[0] = -1;
2088+
state->async_callback_pipe[1] = -1;
2089+
return -1;
2090+
}
2091+
2092+
state->pipe_initialized = true;
2093+
return 0;
20712094
}
20722095

20732096
/**
@@ -2076,12 +2099,17 @@ static int async_callback_init(void) {
20762099
* Returns: 1 if processed, 0 if no data, -1 on error
20772100
*/
20782101
static int process_async_callback_response(void) {
2102+
erlang_module_state_t *state = get_erlang_module_state();
2103+
if (state == NULL || !state->pipe_initialized) {
2104+
return -1;
2105+
}
2106+
20792107
/* Read callback_id (8 bytes) + response_len (4 bytes) + response_data */
20802108
uint64_t callback_id;
20812109
uint32_t response_len;
20822110
ssize_t n;
20832111

2084-
n = read(g_async_callback_pipe[0], &callback_id, sizeof(callback_id));
2112+
n = read(state->async_callback_pipe[0], &callback_id, sizeof(callback_id));
20852113
if (n < 0) {
20862114
if (errno == EAGAIN || errno == EWOULDBLOCK) {
20872115
return 0; /* No data available (non-blocking) */
@@ -2095,7 +2123,7 @@ static int process_async_callback_response(void) {
20952123
return -1; /* Partial read - error */
20962124
}
20972125

2098-
n = read(g_async_callback_pipe[0], &response_len, sizeof(response_len));
2126+
n = read(state->async_callback_pipe[0], &response_len, sizeof(response_len));
20992127
if (n != sizeof(response_len)) {
21002128
return -1;
21012129
}
@@ -2106,26 +2134,26 @@ static int process_async_callback_response(void) {
21062134
if (response_data == NULL) {
21072135
return -1;
21082136
}
2109-
n = read(g_async_callback_pipe[0], response_data, response_len);
2137+
n = read(state->async_callback_pipe[0], response_data, response_len);
21102138
if (n != (ssize_t)response_len) {
21112139
enif_free(response_data);
21122140
return -1;
21132141
}
21142142
}
21152143

21162144
/* Look up and resolve the Future */
2117-
pthread_mutex_lock(&g_async_futures_mutex);
2145+
pthread_mutex_lock(&state->async_futures_mutex);
21182146

21192147
PyObject *key = PyLong_FromUnsignedLongLong(callback_id);
2120-
PyObject *future = PyDict_GetItem(g_async_pending_futures, key);
2148+
PyObject *future = PyDict_GetItem(state->async_pending_futures, key);
21212149

21222150
if (future != NULL) {
21232151
Py_INCREF(future); /* Keep reference while we use it */
2124-
PyDict_DelItem(g_async_pending_futures, key);
2152+
PyDict_DelItem(state->async_pending_futures, key);
21252153
}
21262154
Py_DECREF(key);
21272155

2128-
pthread_mutex_unlock(&g_async_futures_mutex);
2156+
pthread_mutex_unlock(&state->async_futures_mutex);
21292157

21302158
if (future != NULL) {
21312159
/* Parse response and resolve Future */
@@ -2206,13 +2234,19 @@ static PyObject *get_async_callback_fd(PyObject *self, PyObject *args) {
22062234
(void)self;
22072235
(void)args;
22082236

2209-
/* async_callback_init uses pthread_once, so it's safe to call multiple times */
2237+
/* Initialize per-interpreter pipe if needed */
22102238
if (async_callback_init() < 0) {
22112239
PyErr_SetString(PyExc_RuntimeError, "Failed to initialize async callback system");
22122240
return NULL;
22132241
}
22142242

2215-
return PyLong_FromLong(g_async_callback_pipe[0]);
2243+
erlang_module_state_t *state = get_erlang_module_state();
2244+
if (state == NULL) {
2245+
PyErr_SetString(PyExc_RuntimeError, "Module state not available");
2246+
return NULL;
2247+
}
2248+
2249+
return PyLong_FromLong(state->async_callback_pipe[0]);
22162250
}
22172251

22182252
/**
@@ -2252,6 +2286,13 @@ static PyObject *send_async_callback_request(PyObject *self, PyObject *args) {
22522286
return NULL;
22532287
}
22542288

2289+
/* Get per-interpreter state for the pipe */
2290+
erlang_module_state_t *state = get_erlang_module_state();
2291+
if (state == NULL || !state->pipe_initialized) {
2292+
PyErr_SetString(PyExc_RuntimeError, "Async callback system not initialized");
2293+
return NULL;
2294+
}
2295+
22552296
/* Generate callback ID */
22562297
uint64_t callback_id = atomic_fetch_add(&g_callback_id_counter, 1);
22572298

@@ -2277,13 +2318,13 @@ static PyObject *send_async_callback_request(PyObject *self, PyObject *args) {
22772318
ERL_NIF_TERM id_term = enif_make_uint64(msg_env, callback_id);
22782319

22792320
/* Send message: {async_callback, CallbackId, FuncName, Args, WriteFd}
2280-
* The WriteFd is the async callback pipe write end */
2321+
* The WriteFd is the per-interpreter async callback pipe write end */
22812322
ERL_NIF_TERM msg = enif_make_tuple5(msg_env,
22822323
enif_make_atom(msg_env, "async_callback"),
22832324
id_term,
22842325
func_term,
22852326
args_term,
2286-
enif_make_int(msg_env, g_async_callback_pipe[1]));
2327+
enif_make_int(msg_env, state->async_callback_pipe[1]));
22872328

22882329
if (!enif_send(NULL, &g_thread_coordinator_pid, msg_env, msg)) {
22892330
enif_free_env(msg_env);
@@ -2308,14 +2349,20 @@ static PyObject *register_async_future(PyObject *self, PyObject *args) {
23082349
return NULL;
23092350
}
23102351

2311-
pthread_mutex_lock(&g_async_futures_mutex);
2352+
erlang_module_state_t *state = get_erlang_module_state();
2353+
if (state == NULL || state->async_pending_futures == NULL) {
2354+
PyErr_SetString(PyExc_RuntimeError, "Async callback system not initialized");
2355+
return NULL;
2356+
}
2357+
2358+
pthread_mutex_lock(&state->async_futures_mutex);
23122359

23132360
PyObject *key = PyLong_FromUnsignedLongLong(callback_id);
23142361
Py_INCREF(future);
2315-
PyDict_SetItem(g_async_pending_futures, key, future);
2362+
PyDict_SetItem(state->async_pending_futures, key, future);
23162363
Py_DECREF(key);
23172364

2318-
pthread_mutex_unlock(&g_async_futures_mutex);
2365+
pthread_mutex_unlock(&state->async_futures_mutex);
23192366

23202367
Py_RETURN_NONE;
23212368
}
@@ -2704,13 +2751,42 @@ static PyMethodDef getattr_method = {
27042751
"Get an Erlang function wrapper by name."
27052752
};
27062753

2754+
/**
2755+
* Module cleanup - called when module is deallocated.
2756+
* Closes per-interpreter pipe and frees futures dict.
2757+
*/
2758+
static void erlang_module_free(void *module) {
2759+
erlang_module_state_t *state = PyModule_GetState((PyObject *)module);
2760+
if (state == NULL) {
2761+
return;
2762+
}
2763+
2764+
if (state->async_callback_pipe[0] >= 0) {
2765+
close(state->async_callback_pipe[0]);
2766+
state->async_callback_pipe[0] = -1;
2767+
}
2768+
if (state->async_callback_pipe[1] >= 0) {
2769+
close(state->async_callback_pipe[1]);
2770+
state->async_callback_pipe[1] = -1;
2771+
}
2772+
2773+
Py_XDECREF(state->async_pending_futures);
2774+
state->async_pending_futures = NULL;
2775+
2776+
if (state->pipe_initialized) {
2777+
pthread_mutex_destroy(&state->async_futures_mutex);
2778+
state->pipe_initialized = false;
2779+
}
2780+
}
2781+
27072782
/* Module definition */
27082783
static struct PyModuleDef ErlangModuleDef = {
27092784
PyModuleDef_HEAD_INIT,
2710-
"erlang", /* Module name */
2711-
"Interface for calling Erlang functions from Python.", /* Docstring */
2712-
-1, /* Size of per-interpreter state (-1 = global) */
2713-
ErlangModuleMethods /* Methods */
2785+
.m_name = "erlang",
2786+
.m_doc = "Interface for calling Erlang functions from Python.",
2787+
.m_size = sizeof(erlang_module_state_t), /* Per-interpreter state */
2788+
.m_methods = ErlangModuleMethods,
2789+
.m_free = erlang_module_free,
27142790
};
27152791

27162792
/**
@@ -2762,6 +2838,16 @@ static int create_erlang_module(void) {
27622838
return -1;
27632839
}
27642840

2841+
/* Initialize per-interpreter module state */
2842+
erlang_module_state_t *state = PyModule_GetState(module);
2843+
if (state != NULL) {
2844+
state->async_callback_pipe[0] = -1;
2845+
state->async_callback_pipe[1] = -1;
2846+
state->async_pending_futures = NULL;
2847+
pthread_mutex_init(&state->async_futures_mutex, NULL);
2848+
state->pipe_initialized = false;
2849+
}
2850+
27652851
/* Create the SuspensionRequired exception.
27662852
* This exception is raised internally when erlang.call() needs to suspend.
27672853
* It carries callback info in args: (callback_id, func_name, args_tuple) */

0 commit comments

Comments
 (0)