Skip to content

Commit 0851392

Browse files
committed
Add OWN_GIL features test suite with reactor dispatch support
- Create py_owngil_features_SUITE.erl with 42 tests across 7 groups: channels, buffers, reentrant callbacks, pid_send, reactor, async_task, asyncio - Implement OWN_GIL reactor dispatch for true parallel Python execution: - Add CTX_REQ_REACTOR_ON_READ_READY, CTX_REQ_REACTOR_ON_WRITE_READY, CTX_REQ_REACTOR_INIT_CONNECTION request types - Add reactor_buffer_ptr field to py_context_t for buffer passing - Implement owngil_reactor_on_read_ready/on_write_ready/init_connection - Add dispatch_reactor_read/write/init_to_owngil functions - Modify reactor NIFs to dispatch to OWN_GIL thread when uses_own_gil=true - Test results: 39 passed, 3 skipped (py_reactor_context integration)
1 parent 492a4f6 commit 0851392

5 files changed

Lines changed: 1871 additions & 1 deletion

File tree

c_src/py_event_loop.c

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4103,6 +4103,13 @@ ERL_NIF_TERM nif_reactor_on_read_ready(ErlNifEnv *env, int argc,
41034103
enif_make_atom(env, read_result == 1 ? "close" : "continue"));
41044104
}
41054105

4106+
#ifdef HAVE_SUBINTERPRETERS
4107+
/* OWN_GIL mode: dispatch to dedicated thread */
4108+
if (ctx->uses_own_gil) {
4109+
return dispatch_reactor_read_to_owngil(env, ctx, fd, buffer);
4110+
}
4111+
#endif
4112+
41064113
/* Acquire context (handles both worker mode and subinterpreter mode) */
41074114
py_context_guard_t guard = py_context_acquire(ctx);
41084115
if (!guard.acquired) {
@@ -4192,6 +4199,13 @@ ERL_NIF_TERM nif_reactor_on_write_ready(ErlNifEnv *env, int argc,
41924199
return make_error(env, "invalid_fd");
41934200
}
41944201

4202+
#ifdef HAVE_SUBINTERPRETERS
4203+
/* OWN_GIL mode: dispatch to dedicated thread */
4204+
if (ctx->uses_own_gil) {
4205+
return dispatch_reactor_write_to_owngil(env, ctx, fd);
4206+
}
4207+
#endif
4208+
41954209
/* Acquire context (handles both worker mode and subinterpreter mode) */
41964210
py_context_guard_t guard = py_context_acquire(ctx);
41974211
if (!guard.acquired) {
@@ -4271,6 +4285,13 @@ ERL_NIF_TERM nif_reactor_init_connection(ErlNifEnv *env, int argc,
42714285
return make_error(env, "invalid_client_info");
42724286
}
42734287

4288+
#ifdef HAVE_SUBINTERPRETERS
4289+
/* OWN_GIL mode: dispatch to dedicated thread */
4290+
if (ctx->uses_own_gil) {
4291+
return dispatch_reactor_init_to_owngil(env, ctx, fd, argv[2]);
4292+
}
4293+
#endif
4294+
42744295
/* Acquire context (handles both worker mode and subinterpreter mode) */
42754296
py_context_guard_t guard = py_context_acquire(ctx);
42764297
if (!guard.acquired) {
@@ -4614,6 +4635,156 @@ ERL_NIF_TERM nif_fd_close(ErlNifEnv *env, int argc,
46144635
return ATOM_OK;
46154636
}
46164637

4638+
/* ============================================================================
4639+
* OWN_GIL Reactor Dispatch Functions
4640+
* ============================================================================
4641+
* These functions are called from the OWN_GIL thread in py_nif.c.
4642+
* The GIL is already held when these are called.
4643+
*/
4644+
4645+
/**
4646+
* Execute reactor on_read_ready in OWN_GIL thread.
4647+
* Called with GIL already held.
4648+
*/
4649+
ERL_NIF_TERM owngil_reactor_on_read_ready(ErlNifEnv *env, int fd, void *buffer_ptr) {
4650+
reactor_buffer_resource_t *buffer = (reactor_buffer_resource_t *)buffer_ptr;
4651+
4652+
/* Create ReactorBuffer Python object wrapping the resource */
4653+
PyObject *py_buffer = ReactorBuffer_from_resource(buffer, buffer);
4654+
/* Release our reference - Python now owns the only reference */
4655+
enif_release_resource(buffer);
4656+
4657+
if (py_buffer == NULL) {
4658+
PyErr_Clear();
4659+
return make_error(env, "buffer_creation_failed");
4660+
}
4661+
4662+
/* Get module state for THIS interpreter's reactor cache */
4663+
py_event_loop_module_state_t *state = get_module_state();
4664+
if (!ensure_reactor_cached_for_interp(state)) {
4665+
PyErr_Clear();
4666+
Py_DECREF(py_buffer);
4667+
return make_error(env, "reactor_cache_init_failed");
4668+
}
4669+
4670+
/* Call cached on_read_ready(fd, data) */
4671+
PyObject *py_fd = PyLong_FromLong(fd);
4672+
if (py_fd == NULL) {
4673+
PyErr_Clear();
4674+
Py_DECREF(py_buffer);
4675+
return make_error(env, "fd_conversion_failed");
4676+
}
4677+
4678+
PyObject *result = PyObject_CallFunctionObjArgs(state->reactor_on_read, py_fd, py_buffer, NULL);
4679+
Py_DECREF(py_fd);
4680+
Py_DECREF(py_buffer);
4681+
4682+
if (result == NULL) {
4683+
PyErr_Clear();
4684+
return make_error(env, "on_read_ready_failed");
4685+
}
4686+
4687+
/* Convert result to Erlang term */
4688+
ERL_NIF_TERM action;
4689+
if (PyUnicode_Check(result)) {
4690+
const char *str = PyUnicode_AsUTF8(result);
4691+
if (str != NULL) {
4692+
size_t len = strlen(str);
4693+
unsigned char *buf = enif_make_new_binary(env, len, &action);
4694+
memcpy(buf, str, len);
4695+
} else {
4696+
action = enif_make_atom(env, "unknown");
4697+
}
4698+
} else {
4699+
action = enif_make_atom(env, "unknown");
4700+
}
4701+
4702+
Py_DECREF(result);
4703+
return enif_make_tuple2(env, ATOM_OK, action);
4704+
}
4705+
4706+
/**
4707+
* Execute reactor on_write_ready in OWN_GIL thread.
4708+
* Called with GIL already held.
4709+
*/
4710+
ERL_NIF_TERM owngil_reactor_on_write_ready(ErlNifEnv *env, int fd) {
4711+
/* Get module state for THIS interpreter's reactor cache */
4712+
py_event_loop_module_state_t *state = get_module_state();
4713+
if (!ensure_reactor_cached_for_interp(state)) {
4714+
PyErr_Clear();
4715+
return make_error(env, "reactor_cache_init_failed");
4716+
}
4717+
4718+
/* Call cached on_write_ready(fd) */
4719+
PyObject *py_fd = PyLong_FromLong(fd);
4720+
if (py_fd == NULL) {
4721+
PyErr_Clear();
4722+
return make_error(env, "fd_conversion_failed");
4723+
}
4724+
4725+
PyObject *result = PyObject_CallFunctionObjArgs(state->reactor_on_write, py_fd, NULL);
4726+
Py_DECREF(py_fd);
4727+
4728+
if (result == NULL) {
4729+
PyErr_Clear();
4730+
return make_error(env, "on_write_ready_failed");
4731+
}
4732+
4733+
/* Convert result to Erlang term */
4734+
ERL_NIF_TERM action;
4735+
if (PyUnicode_Check(result)) {
4736+
const char *str = PyUnicode_AsUTF8(result);
4737+
if (str != NULL) {
4738+
size_t len = strlen(str);
4739+
unsigned char *buf = enif_make_new_binary(env, len, &action);
4740+
memcpy(buf, str, len);
4741+
} else {
4742+
action = enif_make_atom(env, "unknown");
4743+
}
4744+
} else {
4745+
action = enif_make_atom(env, "unknown");
4746+
}
4747+
4748+
Py_DECREF(result);
4749+
return enif_make_tuple2(env, ATOM_OK, action);
4750+
}
4751+
4752+
/**
4753+
* Execute reactor init_connection in OWN_GIL thread.
4754+
* Called with GIL already held.
4755+
*/
4756+
ERL_NIF_TERM owngil_reactor_init_connection(ErlNifEnv *env, int fd,
4757+
ERL_NIF_TERM client_info_term) {
4758+
/* Convert client_info to Python dict */
4759+
PyObject *py_client_info = term_to_py(env, client_info_term);
4760+
if (py_client_info == NULL) {
4761+
PyErr_Clear();
4762+
return make_error(env, "client_info_conversion_failed");
4763+
}
4764+
4765+
/* Import erlang.reactor module */
4766+
PyObject *reactor_module = PyImport_ImportModule("erlang.reactor");
4767+
if (reactor_module == NULL) {
4768+
Py_DECREF(py_client_info);
4769+
PyErr_Clear();
4770+
return make_error(env, "import_erlang_reactor_failed");
4771+
}
4772+
4773+
/* Call init_connection(fd, client_info) */
4774+
PyObject *result = PyObject_CallMethod(reactor_module, "init_connection",
4775+
"iO", fd, py_client_info);
4776+
Py_DECREF(reactor_module);
4777+
Py_DECREF(py_client_info);
4778+
4779+
if (result == NULL) {
4780+
PyErr_Clear();
4781+
return make_error(env, "init_connection_failed");
4782+
}
4783+
4784+
Py_DECREF(result);
4785+
return ATOM_OK;
4786+
}
4787+
46174788
/* ============================================================================
46184789
* Python Module: py_event_loop
46194790
*

c_src/py_event_loop.h

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,4 +1035,47 @@ ERL_NIF_TERM nif_socketpair(ErlNifEnv *env, int argc,
10351035
ERL_NIF_TERM nif_fd_close(ErlNifEnv *env, int argc,
10361036
const ERL_NIF_TERM argv[]);
10371037

1038+
/* ============================================================================
1039+
* OWN_GIL Reactor Dispatch Functions
1040+
* ============================================================================
1041+
* These functions execute reactor operations in the context of the OWN_GIL
1042+
* thread. They are called from owngil_execute_request() in py_nif.c.
1043+
*/
1044+
1045+
/**
1046+
* @brief Execute reactor on_read_ready in OWN_GIL thread
1047+
*
1048+
* Called with the GIL already held by the OWN_GIL thread.
1049+
*
1050+
* @param env Shared NIF environment
1051+
* @param fd File descriptor
1052+
* @param buffer_ptr Reactor buffer resource (transferred ownership)
1053+
* @return Erlang term: {ok, Action} | {error, Reason}
1054+
*/
1055+
ERL_NIF_TERM owngil_reactor_on_read_ready(ErlNifEnv *env, int fd, void *buffer_ptr);
1056+
1057+
/**
1058+
* @brief Execute reactor on_write_ready in OWN_GIL thread
1059+
*
1060+
* Called with the GIL already held by the OWN_GIL thread.
1061+
*
1062+
* @param env Shared NIF environment
1063+
* @param fd File descriptor
1064+
* @return Erlang term: {ok, Action} | {error, Reason}
1065+
*/
1066+
ERL_NIF_TERM owngil_reactor_on_write_ready(ErlNifEnv *env, int fd);
1067+
1068+
/**
1069+
* @brief Execute reactor init_connection in OWN_GIL thread
1070+
*
1071+
* Called with the GIL already held by the OWN_GIL thread.
1072+
*
1073+
* @param env Shared NIF environment
1074+
* @param fd File descriptor
1075+
* @param client_info_term Erlang term with client info map
1076+
* @return Erlang term: ok | {error, Reason}
1077+
*/
1078+
ERL_NIF_TERM owngil_reactor_init_connection(ErlNifEnv *env, int fd,
1079+
ERL_NIF_TERM client_info_term);
1080+
10381081
#endif /* PY_EVENT_LOOP_H */

0 commit comments

Comments
 (0)