Skip to content

Commit bff3aa1

Browse files
committed
Add automatic env reuse for py_event_loop:run and spawn_task
When a process has a local Python environment (from py:context/py:exec), py_event_loop functions now automatically pass that env to the NIF. This allows functions defined via py:exec to be called directly without manual env parameter passing. Changes: - Add submit_task_with_env/8 NIF that accepts an env reference - Add get_env_resource_type() getter for accessing env resource type - Modify process_ready_tasks to handle 7-tuple tasks with env ref - Update py_event_loop:create_task to auto-detect and pass process env - Update py_event_loop:spawn_task to use caller's env (not receiver's) - Add get_process_env() helper to retrieve env from process dictionary - Add tests for env reuse with sync, async, and spawn_task functions
1 parent 522c42a commit bff3aa1

7 files changed

Lines changed: 290 additions & 9 deletions

File tree

c_src/py_event_loop.c

Lines changed: 103 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2501,6 +2501,76 @@ ERL_NIF_TERM nif_submit_task(ErlNifEnv *env, int argc,
25012501
return ATOM_OK;
25022502
}
25032503

2504+
/**
2505+
* submit_task_with_env(LoopRef, CallerPid, Ref, Module, Func, Args, Kwargs, EnvRef) -> ok | {error, Reason}
2506+
*
2507+
* Like submit_task but includes a process-local env resource reference.
2508+
* The env's globals dict is used for function lookup, allowing functions
2509+
* defined via py:exec() to be called from the event loop.
2510+
*/
2511+
ERL_NIF_TERM nif_submit_task_with_env(ErlNifEnv *env, int argc,
2512+
const ERL_NIF_TERM argv[]) {
2513+
(void)argc;
2514+
2515+
erlang_event_loop_t *loop;
2516+
if (!enif_get_resource(env, argv[0], EVENT_LOOP_RESOURCE_TYPE,
2517+
(void **)&loop)) {
2518+
return make_error(env, "invalid_loop");
2519+
}
2520+
2521+
if (!loop->task_queue_initialized) {
2522+
return make_error(env, "task_queue_not_initialized");
2523+
}
2524+
2525+
/* Validate caller_pid */
2526+
ErlNifPid caller_pid;
2527+
if (!enif_get_local_pid(env, argv[1], &caller_pid)) {
2528+
return make_error(env, "invalid_caller_pid");
2529+
}
2530+
2531+
/* Create task tuple: {CallerPid, Ref, Module, Func, Args, Kwargs, EnvRef} */
2532+
/* argv[1] = CallerPid, argv[2] = Ref, argv[3] = Module,
2533+
* argv[4] = Func, argv[5] = Args, argv[6] = Kwargs, argv[7] = EnvRef */
2534+
ERL_NIF_TERM task_tuple = enif_make_tuple7(env,
2535+
argv[1], argv[2], argv[3], argv[4], argv[5], argv[6], argv[7]);
2536+
2537+
/* Serialize to binary */
2538+
ErlNifBinary task_bin;
2539+
if (!enif_term_to_binary(env, task_tuple, &task_bin)) {
2540+
return make_error(env, "serialization_failed");
2541+
}
2542+
2543+
/* Thread-safe enqueue */
2544+
pthread_mutex_lock(&loop->task_queue_mutex);
2545+
int enq_result = enif_ioq_enq_binary(loop->task_queue, &task_bin, 0);
2546+
pthread_mutex_unlock(&loop->task_queue_mutex);
2547+
2548+
if (enq_result != 1) {
2549+
enif_release_binary(&task_bin);
2550+
return make_error(env, "enqueue_failed");
2551+
}
2552+
2553+
/* Increment task count */
2554+
atomic_fetch_add(&loop->task_count, 1);
2555+
2556+
/* Coalesced wakeup (uvloop-style) */
2557+
if (loop->has_worker) {
2558+
if (!atomic_exchange(&loop->task_wake_pending, true)) {
2559+
ErlNifEnv *msg_env = enif_alloc_env();
2560+
if (msg_env != NULL) {
2561+
if (ATOM_TASK_READY == 0) {
2562+
ATOM_TASK_READY = enif_make_atom(msg_env, "task_ready");
2563+
}
2564+
ERL_NIF_TERM msg = enif_make_atom(msg_env, "task_ready");
2565+
enif_send(NULL, &loop->worker_pid, msg_env, msg);
2566+
enif_free_env(msg_env);
2567+
}
2568+
}
2569+
}
2570+
2571+
return ATOM_OK;
2572+
}
2573+
25042574
/**
25052575
* Maximum tasks to dequeue in one batch before acquiring GIL.
25062576
* This bounds memory usage while still amortizing GIL acquisition cost.
@@ -2789,10 +2859,12 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
27892859
ErlNifEnv *term_env = tasks[task_idx].term_env;
27902860
ERL_NIF_TERM task_term = tasks[task_idx].task_term;
27912861

2792-
/* Extract: {CallerPid, Ref, Module, Func, Args, Kwargs} */
2862+
/* Extract: {CallerPid, Ref, Module, Func, Args, Kwargs} or
2863+
* {CallerPid, Ref, Module, Func, Args, Kwargs, EnvRef} */
27932864
int arity;
27942865
const ERL_NIF_TERM *tuple_elems;
2795-
if (!enif_get_tuple(term_env, task_term, &arity, &tuple_elems) || arity != 6) {
2866+
if (!enif_get_tuple(term_env, task_term, &arity, &tuple_elems) ||
2867+
(arity != 6 && arity != 7)) {
27962868
enif_free_env(term_env);
27972869
continue;
27982870
}
@@ -2810,6 +2882,16 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
28102882
continue;
28112883
}
28122884

2885+
/* Check for env resource (7-tuple) */
2886+
py_env_resource_t *task_env = NULL;
2887+
if (arity == 7) {
2888+
/* Try to get env resource from tuple_elems[6] */
2889+
if (!enif_get_resource(term_env, tuple_elems[6],
2890+
get_env_resource_type(), (void **)&task_env)) {
2891+
task_env = NULL; /* Invalid env ref, continue without it */
2892+
}
2893+
}
2894+
28132895
/* Convert module/func to C strings */
28142896
char *module_name = enif_alloc(module_bin.size + 1);
28152897
char *func_name = enif_alloc(func_bin.size + 1);
@@ -2824,11 +2906,27 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
28242906
memcpy(func_name, func_bin.data, func_bin.size);
28252907
func_name[func_bin.size] = '\0';
28262908

2827-
/* Look up namespace for caller process (only exists if they called exec/eval) */
2909+
/* Look up namespace for caller process (used for reentrant calls) */
28282910
process_namespace_t *ns = lookup_process_namespace(loop, &caller_pid);
28292911

2830-
/* Look up function (checks process namespace for __main__, then cache/import) */
2831-
PyObject *func = get_function_for_task(loop, ns, module_name, func_name);
2912+
/* Look up function - check task_env first, then process namespace, then import */
2913+
PyObject *func = NULL;
2914+
2915+
/* First, check the passed env's globals (from py:exec) */
2916+
if (task_env != NULL && task_env->globals != NULL) {
2917+
if (strcmp(module_name, "__main__") == 0 ||
2918+
strcmp(module_name, "_process_") == 0) {
2919+
func = PyDict_GetItemString(task_env->globals, func_name);
2920+
if (func != NULL) {
2921+
Py_INCREF(func);
2922+
}
2923+
}
2924+
}
2925+
2926+
/* Fallback to process namespace and cache/import */
2927+
if (func == NULL) {
2928+
func = get_function_for_task(loop, ns, module_name, func_name);
2929+
}
28322930

28332931
enif_free(module_name);
28342932
enif_free(func_name);

c_src/py_event_loop.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,18 @@ ERL_NIF_TERM nif_dispatch_sleep_complete(ErlNifEnv *env, int argc,
624624
ERL_NIF_TERM nif_submit_task(ErlNifEnv *env, int argc,
625625
const ERL_NIF_TERM argv[]);
626626

627+
/**
628+
* @brief Submit an async task with process-local env (thread-safe)
629+
*
630+
* Like submit_task but includes an env resource reference. The env's globals
631+
* dict is used for function lookup, allowing functions defined via py:exec()
632+
* to be called from the event loop.
633+
*
634+
* NIF: submit_task_with_env(LoopRef, CallerPid, Ref, Module, Func, Args, Kwargs, EnvRef) -> ok | {error, Reason}
635+
*/
636+
ERL_NIF_TERM nif_submit_task_with_env(ErlNifEnv *env, int argc,
637+
const ERL_NIF_TERM argv[]);
638+
627639
/**
628640
* @brief Process all pending tasks from the task queue
629641
*

c_src/py_nif.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ ErlNifResourceType *INLINE_CONTINUATION_RESOURCE_TYPE = NULL;
7070
/* Process-local Python environment resource type */
7171
ErlNifResourceType *PY_ENV_RESOURCE_TYPE = NULL;
7272

73+
/* Getter for PY_ENV_RESOURCE_TYPE (used by py_event_loop.c) */
74+
ErlNifResourceType *get_env_resource_type(void) {
75+
return PY_ENV_RESOURCE_TYPE;
76+
}
77+
7378
_Atomic uint32_t g_context_id_counter = 1;
7479

7580
/* ============================================================================
@@ -6534,6 +6539,7 @@ static ErlNifFunc nif_funcs[] = {
65346539
{"event_loop_run_async", 7, nif_event_loop_run_async, ERL_NIF_DIRTY_JOB_IO_BOUND},
65356540
/* Async task queue NIFs (uvloop-inspired) */
65366541
{"submit_task", 7, nif_submit_task, 0}, /* Thread-safe, no GIL needed */
6542+
{"submit_task_with_env", 8, nif_submit_task_with_env, 0}, /* With process-local env */
65376543
{"process_ready_tasks", 1, nif_process_ready_tasks, ERL_NIF_DIRTY_JOB_CPU_BOUND},
65386544
{"event_loop_set_py_loop", 2, nif_event_loop_set_py_loop, 0},
65396545
/* Per-process namespace NIFs */

c_src/py_nif.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1313,6 +1313,12 @@ extern ErlNifResourceType *PY_CONTEXT_SUSPENDED_RESOURCE_TYPE;
13131313
/** @brief Resource type for inline_continuation_t (inline scheduler continuation) */
13141314
extern ErlNifResourceType *INLINE_CONTINUATION_RESOURCE_TYPE;
13151315

1316+
/** @brief Resource type for py_env_resource_t (process-local Python environment) */
1317+
extern ErlNifResourceType *PY_ENV_RESOURCE_TYPE;
1318+
1319+
/** @brief Get the PY_ENV_RESOURCE_TYPE (for use by other modules) */
1320+
ErlNifResourceType *get_env_resource_type(void);
1321+
13161322
/** @brief Atomic counter for unique interpreter IDs */
13171323
extern _Atomic uint32_t g_context_id_counter;
13181324

src/py_event_loop.erl

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,32 @@ create_task(Module, Func, Args, Kwargs) ->
166166
Caller = self(),
167167
ModuleBin = py_util:to_binary(Module),
168168
FuncBin = py_util:to_binary(Func),
169-
ok = py_nif:submit_task(LoopRef, Caller, Ref, ModuleBin, FuncBin, Args, Kwargs),
169+
%% Check if there's a process-local env (from py:exec) and use it
170+
ok = case get_process_env() of
171+
undefined ->
172+
py_nif:submit_task(LoopRef, Caller, Ref, ModuleBin, FuncBin, Args, Kwargs);
173+
EnvRef ->
174+
py_nif:submit_task_with_env(LoopRef, Caller, Ref, ModuleBin, FuncBin, Args, Kwargs, EnvRef)
175+
end,
170176
Ref.
171177

178+
%% @doc Get the process-local env reference if available.
179+
%% Returns the first env found (typically there's only one per process).
180+
-spec get_process_env() -> reference() | undefined.
181+
get_process_env() ->
182+
case get(py_local_env) of
183+
undefined -> undefined;
184+
Envs when is_map(Envs) ->
185+
%% Return any env (for __main__ lookup, any env works)
186+
case maps:values(Envs) of
187+
[EnvRef | _] -> EnvRef;
188+
[] -> undefined
189+
end;
190+
EnvRef when is_reference(EnvRef) ->
191+
%% Legacy single-env format
192+
EnvRef
193+
end.
194+
172195
%% @doc Wait for an async task result.
173196
%%
174197
%% Blocks until the result is received or timeout is reached.
@@ -207,6 +230,8 @@ spawn_task(Module, Func, Args) ->
207230
spawn_task(Module, Func, Args, Kwargs) ->
208231
{ok, LoopRef} = get_loop(),
209232
Ref = make_ref(),
233+
%% Get env from caller's process BEFORE spawning receiver
234+
CallerEnv = get_process_env(),
210235
%% Spawn a process that will receive and discard the result
211236
Receiver = erlang:spawn(fun() ->
212237
receive
@@ -218,7 +243,13 @@ spawn_task(Module, Func, Args, Kwargs) ->
218243
end),
219244
ModuleBin = py_util:to_binary(Module),
220245
FuncBin = py_util:to_binary(Func),
221-
ok = py_nif:submit_task(LoopRef, Receiver, Ref, ModuleBin, FuncBin, Args, Kwargs),
246+
%% Submit task with caller's env if available
247+
ok = case CallerEnv of
248+
undefined ->
249+
py_nif:submit_task(LoopRef, Receiver, Ref, ModuleBin, FuncBin, Args, Kwargs);
250+
EnvRef ->
251+
py_nif:submit_task_with_env(LoopRef, Receiver, Ref, ModuleBin, FuncBin, Args, Kwargs, EnvRef)
252+
end,
222253
ok.
223254

224255
%% ============================================================================

src/py_nif.erl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@
102102
event_loop_run_async/7,
103103
%% Async task queue NIFs (uvloop-inspired)
104104
submit_task/7,
105+
submit_task_with_env/8,
105106
process_ready_tasks/1,
106107
event_loop_set_py_loop/2,
107108
%% Per-process namespace NIFs
@@ -769,6 +770,16 @@ event_loop_run_async(_LoopRef, _CallerPid, _Ref, _Module, _Func, _Args, _Kwargs)
769770
submit_task(_LoopRef, _CallerPid, _Ref, _Module, _Func, _Args, _Kwargs) ->
770771
?NIF_STUB.
771772

773+
%% @doc Submit an async task with process-local env.
774+
%%
775+
%% Like submit_task but includes an env resource reference. The env's globals
776+
%% dict is used for function lookup, allowing functions defined via py:exec()
777+
%% to be called from the event loop.
778+
-spec submit_task_with_env(reference(), pid(), reference(), binary(), binary(), list(), map(), reference()) ->
779+
ok | {error, term()}.
780+
submit_task_with_env(_LoopRef, _CallerPid, _Ref, _Module, _Func, _Args, _Kwargs, _EnvRef) ->
781+
?NIF_STUB.
782+
772783
%% @doc Process all pending tasks from the task queue.
773784
%%
774785
%% Called by the event worker when it receives 'task_ready' message.

0 commit comments

Comments
 (0)