-
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathpy_event_loop.c
More file actions
7525 lines (6373 loc) · 239 KB
/
py_event_loop.c
File metadata and controls
7525 lines (6373 loc) · 239 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright 2026 Benoit Chesneau
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* @file py_event_loop.c
* @brief Erlang-native asyncio event loop implementation using enif_select
*
* This module implements an asyncio-compatible event loop that delegates
* I/O multiplexing to Erlang's scheduler via enif_select. This provides:
*
* - Sub-millisecond latency (vs 10ms polling in the old approach)
* - Zero CPU usage when idle (no polling)
* - Full GIL release during waits
* - Native Erlang scheduler integration
*
* The flow is:
* 1. Python calls add_reader(fd, callback) -> enif_select(fd, READ)
* 2. Erlang scheduler monitors fd
* 3. When fd is ready, Erlang sends {select, Res, Ref, ready_input}
* 4. py_event_router receives message, calls dispatch_callback NIF
* 5. Python callback is invoked
*/
#include "py_nif.h"
#include "py_event_loop.h"
#include "py_reactor_buffer.h"
/* ============================================================================
* Global State
* ============================================================================ */
/** Resource type for event loops */
ErlNifResourceType *EVENT_LOOP_RESOURCE_TYPE = NULL;
/** Resource type for fd monitoring */
ErlNifResourceType *FD_RESOURCE_TYPE = NULL;
/** Resource type for timers */
ErlNifResourceType *TIMER_RESOURCE_TYPE = NULL;
/** @brief Global priv_dir path for module imports in subinterpreters */
static char g_priv_dir[1024] = {0};
static bool g_priv_dir_set = false;
/**
* Thread-local for current event loop namespace during task execution.
* This allows reentrant calls (erlang.call -> Python) to use the same namespace.
*/
__thread process_namespace_t *tl_current_event_loop_namespace = NULL;
/** Atoms for event loop messages */
ERL_NIF_TERM ATOM_SELECT;
ERL_NIF_TERM ATOM_READY_INPUT;
ERL_NIF_TERM ATOM_READY_OUTPUT;
ERL_NIF_TERM ATOM_READ;
ERL_NIF_TERM ATOM_WRITE;
ERL_NIF_TERM ATOM_TIMER;
ERL_NIF_TERM ATOM_START_TIMER;
ERL_NIF_TERM ATOM_CANCEL_TIMER;
ERL_NIF_TERM ATOM_EVENT_LOOP;
ERL_NIF_TERM ATOM_DISPATCH;
/* ============================================================================
* Per-Interpreter Event Loop Storage
* ============================================================================
*
* Event loop references are stored as module attributes in py_event_loop,
* using PyCapsule for safe C pointer storage. This approach:
*
* - Works uniformly for main interpreter and sub-interpreters
* - Each interpreter has its own py_event_loop module with its own attribute
* - Thread-safe for free-threading (Python 3.13+)
* - Uses gil_acquire()/gil_release() for safe GIL management
*
* Flow:
* NIF set_python_event_loop() -> stores capsule in py_event_loop._loop
* Python _is_initialized() -> checks if _loop attribute exists and is valid
* Python operations -> retrieve loop from py_event_loop._loop
*/
/** @brief Name for the PyCapsule storing event loop pointer */
static const char *EVENT_LOOP_CAPSULE_NAME = "erlang_python.event_loop";
/** @brief Module attribute name for storing the event loop */
static const char *EVENT_LOOP_ATTR_NAME = "_loop";
/* ============================================================================
* Module State Structure
* ============================================================================
*
* Instead of using global variables, we store state in the Python module.
* This enables proper per-interpreter/per-context isolation.
*/
typedef struct {
/** @brief Event loop for this interpreter */
erlang_event_loop_t *event_loop;
/** @brief Shared router PID for loops created via _loop_new() */
ErlNifPid shared_router;
/** @brief Whether shared_router has been set */
bool shared_router_valid;
/** @brief Isolation mode: 0=global, 1=per_loop */
int isolation_mode;
/* ========== Per-Interpreter Reactor Cache ========== */
/** @brief Cached erlang.reactor module for this interpreter */
PyObject *reactor_module;
/** @brief Cached on_read_ready callable */
PyObject *reactor_on_read;
/** @brief Cached on_write_ready callable */
PyObject *reactor_on_write;
/** @brief Whether reactor cache has been initialized */
bool reactor_initialized;
} py_event_loop_module_state_t;
/* ============================================================================
* Global Shared Router
* ============================================================================
*
* A global shared router that can be used by all interpreters (main and sub).
* This is separate from the per-module state to allow subinterpreters to
* access the router even when their module state doesn't have it set.
*/
static ErlNifPid g_global_shared_router;
static bool g_global_shared_router_valid = false;
static pthread_mutex_t g_global_router_mutex = PTHREAD_MUTEX_INITIALIZER;
/* Global shared worker for scalable I/O model.
* Used by dispatch_timer to send task_ready, ensuring process_ready_tasks
* is called after timer events. This centralizes the wakeup mechanism
* so both router-dispatched and worker-dispatched timers work correctly.
*/
static ErlNifPid g_global_shared_worker;
static bool g_global_shared_worker_valid = false;
static pthread_mutex_t g_global_worker_mutex = PTHREAD_MUTEX_INITIALIZER;
/* ============================================================================
* Per-Interpreter Reactor Cache
* ============================================================================
*
* Reactor callables (erlang.reactor.on_read_ready, on_write_ready) are cached
* per-interpreter in the module state. This ensures that subinterpreters use
* their own reactor module instance rather than the main interpreter's.
*
* The cache is populated lazily on first reactor operation within each
* interpreter.
*/
/**
* Initialize cached reactor callables for the current interpreter.
* MUST be called with GIL held.
*
* Uses the module state to cache per-interpreter reactor references.
* This is safe for subinterpreters since each has its own module state.
*
* @param state Module state for current interpreter
* @return true if callables are cached and ready, false on error
*/
static bool ensure_reactor_cached_for_interp(py_event_loop_module_state_t *state) {
if (state == NULL) {
return false;
}
/* Fast path: already cached for this interpreter */
if (state->reactor_initialized) {
return true;
}
/* Import erlang.reactor module in THIS interpreter */
PyObject *module = PyImport_ImportModule("erlang.reactor");
if (module == NULL) {
return false;
}
/* Get on_read_ready function */
PyObject *on_read = PyObject_GetAttrString(module, "on_read_ready");
if (on_read == NULL || !PyCallable_Check(on_read)) {
Py_XDECREF(on_read);
Py_DECREF(module);
return false;
}
/* Get on_write_ready function */
PyObject *on_write = PyObject_GetAttrString(module, "on_write_ready");
if (on_write == NULL || !PyCallable_Check(on_write)) {
Py_XDECREF(on_write);
Py_DECREF(on_read);
Py_DECREF(module);
return false;
}
/* Store cached references in module state */
state->reactor_module = module;
state->reactor_on_read = on_read;
state->reactor_on_write = on_write;
state->reactor_initialized = true;
return true;
}
/**
* Clean up reactor cache in module state.
* Called during module deallocation.
*/
static void cleanup_reactor_cache(py_event_loop_module_state_t *state) {
if (state == NULL) {
return;
}
Py_XDECREF(state->reactor_module);
Py_XDECREF(state->reactor_on_read);
Py_XDECREF(state->reactor_on_write);
state->reactor_module = NULL;
state->reactor_on_read = NULL;
state->reactor_on_write = NULL;
state->reactor_initialized = false;
}
/* Forward declaration for module state access */
static py_event_loop_module_state_t *get_module_state(void);
static py_event_loop_module_state_t *get_module_state_from_module(PyObject *module);
/* Forward declarations for callable cache */
static void callable_cache_clear(erlang_event_loop_t *loop);
static PyObject *callable_cache_lookup(erlang_event_loop_t *loop,
const char *module_name,
const char *func_name);
static bool callable_cache_insert(erlang_event_loop_t *loop,
const char *module_name,
const char *func_name,
PyObject *callable);
/**
* Try to acquire a router for the event loop.
*
* If the loop doesn't have a router/worker configured, check the global
* shared router and use it if available. This allows subinterpreters
* to use the main interpreter's router.
*
* @param loop Event loop to check/update
* @return true if a router/worker is available, false otherwise
*/
static bool event_loop_ensure_router(erlang_event_loop_t *loop) {
if (loop == NULL) {
return false;
}
/* Already have a router or worker */
if (loop->has_router || loop->has_worker) {
return true;
}
/* Try to get the global shared router */
pthread_mutex_lock(&g_global_router_mutex);
if (g_global_shared_router_valid) {
loop->router_pid = g_global_shared_router;
loop->has_router = true;
}
pthread_mutex_unlock(&g_global_router_mutex);
return loop->has_router || loop->has_worker;
}
/**
* Get the py_event_loop module for the current interpreter.
* MUST be called with GIL held.
* Returns borrowed reference.
*/
static PyObject *get_event_loop_module(void) {
PyObject *modules = PyImport_GetModuleDict();
if (modules == NULL) {
return NULL;
}
return PyDict_GetItemString(modules, "py_event_loop");
}
/**
* Get module state from a module object.
* MUST be called with GIL held.
*
* @param module The py_event_loop module object
* @return Module state or NULL if not available
*/
static py_event_loop_module_state_t *get_module_state_from_module(PyObject *module) {
if (module == NULL) {
return NULL;
}
void *state = PyModule_GetState(module);
return (py_event_loop_module_state_t *)state;
}
/**
* Get module state for the current interpreter.
* MUST be called with GIL held.
*
* @return Module state or NULL if not available
*/
static py_event_loop_module_state_t *get_module_state(void) {
PyObject *module = get_event_loop_module();
return get_module_state_from_module(module);
}
/**
* Get the event loop for the current Python interpreter.
* MUST be called with GIL held.
*
* Uses module state for proper per-interpreter isolation.
*
* @return Event loop pointer or NULL if not set
*/
static erlang_event_loop_t *get_interpreter_event_loop(void) {
py_event_loop_module_state_t *state = get_module_state();
if (state == NULL) {
return NULL;
}
return state->event_loop;
}
/**
* Set the event loop for the current interpreter.
* MUST be called with GIL held.
* Stores in module state for proper per-interpreter isolation.
*
* @param loop Event loop to set (NULL to clear)
* @return 0 on success, -1 on error
*/
static int set_interpreter_event_loop(erlang_event_loop_t *loop) {
py_event_loop_module_state_t *state = get_module_state();
if (state == NULL) {
return -1;
}
state->event_loop = loop;
return 0;
}
/* ============================================================================
* Resource Callbacks
* ============================================================================ */
/* Forward declaration */
int create_default_event_loop(ErlNifEnv *env);
/**
* @brief Destructor for event loop resources
*
* Memory/Resource Management Note:
* This destructor intentionally skips Python object cleanup (Py_DECREF) in
* certain scenarios to avoid crashes:
*
* 1. Subinterpreter event loops (interp_id > 0): The subinterpreter may have
* been destroyed by Py_EndInterpreter before this destructor runs (which
* runs on the Erlang GC thread). Calling PyGILState_Ensure would crash.
*
* 2. Runtime shutdown: If runtime_is_running() returns false, Python is
* shutting down or stopped. Calling Python C API would crash.
*
* 3. Thread state issues: If PyGILState_Check() returns true, we already
* hold the GIL from somewhere else - calling PyGILState_Ensure would
* deadlock or corrupt thread state.
*
* In all these cases, we accept a small memory leak (the Python objects)
* rather than risking a crash. This is the standard Python embedding pattern
* for destructor-time cleanup from non-Python threads.
*
* The leaked Python objects will be reclaimed when the Python runtime fully
* shuts down via Py_FinalizeEx().
*/
void event_loop_destructor(ErlNifEnv *env, void *obj) {
(void)env;
erlang_event_loop_t *loop = (erlang_event_loop_t *)obj;
#ifdef HAVE_SUBINTERPRETERS
/* For subinterpreter event loops, skip Python API calls.
* PyGILState_Ensure only works for the main interpreter. The subinterpreter
* may already be destroyed via Py_EndInterpreter before this destructor runs. */
if (loop->interp_id > 0) {
goto cleanup_native;
}
#endif
/* Main interpreter: safe to use PyGILState_Ensure if runtime is running
* and this thread doesn't already have Python state bound and GIL is not held. */
if (runtime_is_running() &&
PyGILState_GetThisThreadState() == NULL &&
!PyGILState_Check()) {
PyGILState_STATE gstate = PyGILState_Ensure();
erlang_event_loop_t *interp_loop = get_interpreter_event_loop();
if (interp_loop == loop) {
set_interpreter_event_loop(NULL);
}
PyGILState_Release(gstate);
}
#ifdef HAVE_SUBINTERPRETERS
cleanup_native:
#endif
/* Signal shutdown */
loop->shutdown = true;
/* Wake up any waiting threads */
pthread_mutex_lock(&loop->mutex);
pthread_cond_broadcast(&loop->event_cond);
pthread_mutex_unlock(&loop->mutex);
/* Clear pending events (returns them to freelist) */
event_loop_clear_pending(loop);
/* Free the freelist itself (Phase 7: clear freelist on destruction) */
pending_event_t *freelist_item = loop->event_freelist;
while (freelist_item != NULL) {
pending_event_t *next = freelist_item->next;
enif_free(freelist_item);
freelist_item = next;
}
loop->event_freelist = NULL;
loop->freelist_count = 0;
/* Clean up async task queue (uvloop-inspired) */
if (loop->task_queue_initialized) {
pthread_mutex_destroy(&loop->task_queue_mutex);
loop->task_queue_initialized = false;
}
if (loop->task_queue != NULL) {
enif_ioq_destroy(loop->task_queue);
loop->task_queue = NULL;
}
/* Release Python loop reference if held */
if (loop->py_loop_valid && loop->py_loop != NULL) {
/* Only decref if Python runtime is still running and we can safely acquire GIL */
if (runtime_is_running() && loop->interp_id == 0 &&
PyGILState_GetThisThreadState() == NULL &&
!PyGILState_Check()) {
PyGILState_STATE gstate = PyGILState_Ensure();
Py_DECREF(loop->py_loop);
/* Also release cached Python objects (uvloop-style cache cleanup) */
if (loop->py_cache_valid) {
Py_XDECREF(loop->cached_asyncio);
Py_XDECREF(loop->cached_run_and_send);
loop->cached_asyncio = NULL;
loop->cached_run_and_send = NULL;
loop->py_cache_valid = false;
}
/* Clear callable cache */
callable_cache_clear(loop);
PyGILState_Release(gstate);
}
loop->py_loop = NULL;
loop->py_loop_valid = false;
}
/* Free message environment */
if (loop->msg_env != NULL) {
enif_free_env(loop->msg_env);
loop->msg_env = NULL;
}
/*
* Clean up per-process namespaces.
*
* Lock ordering: GIL first, then namespaces_mutex (consistent with normal path).
* This prevents ABBA deadlock with execution paths that acquire GIL then mutex.
*
* For subinterpreters (interp_id != 0), we can't use PyGILState_Ensure.
* Just free the native structs without Py_DECREF - Python objects will be
* cleaned up when the interpreter is destroyed.
*/
if (runtime_is_running() && loop->interp_id == 0 &&
PyGILState_GetThisThreadState() == NULL &&
!PyGILState_Check()) {
/* Main interpreter: GIL first, then mutex */
PyGILState_STATE gstate = PyGILState_Ensure();
pthread_mutex_lock(&loop->namespaces_mutex);
process_namespace_t *ns = loop->namespaces_head;
while (ns != NULL) {
process_namespace_t *next = ns->next;
Py_XDECREF(ns->globals);
Py_XDECREF(ns->locals);
Py_XDECREF(ns->module_cache);
enif_free(ns);
ns = next;
}
loop->namespaces_head = NULL;
/* Clean up PID-to-env mappings */
pid_env_mapping_t *mapping = loop->pid_env_head;
while (mapping != NULL) {
pid_env_mapping_t *next = mapping->next;
if (mapping->env != NULL) {
enif_release_resource(mapping->env);
}
enif_free(mapping);
mapping = next;
}
loop->pid_env_head = NULL;
pthread_mutex_unlock(&loop->namespaces_mutex);
PyGILState_Release(gstate);
} else {
/* Subinterpreter or runtime not running: just free structs */
pthread_mutex_lock(&loop->namespaces_mutex);
process_namespace_t *ns = loop->namespaces_head;
while (ns != NULL) {
process_namespace_t *next = ns->next;
/* Skip Py_XDECREF - can't safely acquire GIL */
enif_free(ns);
ns = next;
}
loop->namespaces_head = NULL;
/* Clean up PID-to-env mappings */
pid_env_mapping_t *mapping = loop->pid_env_head;
while (mapping != NULL) {
pid_env_mapping_t *next = mapping->next;
if (mapping->env != NULL) {
enif_release_resource(mapping->env);
}
enif_free(mapping);
mapping = next;
}
loop->pid_env_head = NULL;
pthread_mutex_unlock(&loop->namespaces_mutex);
}
pthread_mutex_destroy(&loop->namespaces_mutex);
/* Destroy synchronization primitives */
pthread_mutex_destroy(&loop->mutex);
pthread_cond_destroy(&loop->event_cond);
}
/**
* @brief Destructor for fd resources
*
* Safety net: close the FD if it's still open when the resource is GC'd.
* This should rarely happen if proper lifecycle management is used.
*/
void fd_resource_destructor(ErlNifEnv *env, void *obj) {
fd_resource_t *fd_res = (fd_resource_t *)obj;
(void)env;
/* Safety net: close if still open and we own the FD */
int state = atomic_load(&fd_res->closing_state);
if (state != FD_STATE_CLOSED && fd_res->owns_fd && fd_res->fd >= 0) {
close(fd_res->fd);
fd_res->fd = -1;
}
}
/**
* @brief Stop callback for fd resources (enif_select stop event)
*
* Called when ERL_NIF_SELECT_STOP is issued. Performs proper cleanup:
* - Atomically transitions to CLOSED state
* - Closes FD if we own it
* - Demonitors owner process if active
*/
void fd_resource_stop(ErlNifEnv *env, void *obj, ErlNifEvent event,
int is_direct_call) {
fd_resource_t *fd_res = (fd_resource_t *)obj;
(void)event;
(void)is_direct_call;
/* Atomically transition to CLOSED state */
int expected = FD_STATE_OPEN;
if (!atomic_compare_exchange_strong(&fd_res->closing_state,
&expected, FD_STATE_CLOSED)) {
/* Try from CLOSING state */
expected = FD_STATE_CLOSING;
if (!atomic_compare_exchange_strong(&fd_res->closing_state,
&expected, FD_STATE_CLOSED)) {
/* Already closed, nothing to do */
return;
}
}
/* Close FD if we own it */
if (fd_res->owns_fd && fd_res->fd >= 0) {
close(fd_res->fd);
fd_res->fd = -1;
}
/* Demonitor if active */
if (fd_res->monitor_active && env != NULL) {
enif_demonitor_process(env, fd_res, &fd_res->owner_monitor);
fd_res->monitor_active = false;
}
fd_res->reader_active = false;
fd_res->writer_active = false;
}
/**
* @brief Down callback for fd resources (owner process died)
*
* Called when the monitored owner process dies. Initiates cleanup:
* - Marks monitor as inactive
* - Transitions to CLOSING state
* - Triggers ERL_NIF_SELECT_STOP or closes FD directly
*/
void fd_resource_down(ErlNifEnv *env, void *obj, ErlNifPid *pid,
ErlNifMonitor *mon) {
fd_resource_t *fd_res = (fd_resource_t *)obj;
(void)pid;
(void)mon;
/* Mark monitor as inactive */
fd_res->monitor_active = false;
/* Transition to CLOSING state */
int expected = FD_STATE_OPEN;
if (!atomic_compare_exchange_strong(&fd_res->closing_state,
&expected, FD_STATE_CLOSING)) {
/* Already closing or closed */
return;
}
/* Take ownership for cleanup */
fd_res->owns_fd = true;
/* If select is active, trigger stop via ERL_NIF_SELECT_STOP */
if (fd_res->reader_active || fd_res->writer_active) {
enif_select(env, (ErlNifEvent)fd_res->fd, ERL_NIF_SELECT_STOP,
fd_res, NULL, enif_make_atom(env, "owner_down"));
} else if (fd_res->fd >= 0) {
/* No active select, close directly */
int exp = FD_STATE_CLOSING;
if (atomic_compare_exchange_strong(&fd_res->closing_state,
&exp, FD_STATE_CLOSED)) {
close(fd_res->fd);
fd_res->fd = -1;
}
}
}
/**
* @brief Destructor for timer resources
*/
void timer_resource_destructor(ErlNifEnv *env, void *obj) {
(void)env;
(void)obj;
/* Timer cleanup is handled via cancel_timer */
}
/* ============================================================================
* Per-Process Namespace Management
* ============================================================================ */
/**
* @brief Down callback for event loop resources (process monitor)
*
* Called when a monitored process dies. Cleans up the process's namespace.
*
* Lock ordering: GIL first, then namespaces_mutex (consistent with normal path)
*/
void event_loop_down(ErlNifEnv *env, void *obj, ErlNifPid *pid,
ErlNifMonitor *mon) {
(void)env;
(void)mon;
erlang_event_loop_t *loop = (erlang_event_loop_t *)obj;
/*
* For subinterpreters (interp_id != 0), we can't use PyGILState_Ensure.
* Just remove from the list without Py_DECREF - the Python objects will
* be cleaned up when the interpreter is destroyed.
*/
if (!runtime_is_running() || loop->interp_id != 0) {
pthread_mutex_lock(&loop->namespaces_mutex);
process_namespace_t **pp = &loop->namespaces_head;
while (*pp != NULL) {
if (enif_compare_pids(&(*pp)->owner_pid, pid) == 0) {
process_namespace_t *to_free = *pp;
*pp = to_free->next;
/* Skip Py_XDECREF - can't safely acquire GIL for subinterp */
enif_free(to_free);
break;
}
pp = &(*pp)->next;
}
pthread_mutex_unlock(&loop->namespaces_mutex);
return;
}
/*
* For main interpreter: acquire GIL FIRST to maintain consistent lock
* ordering with the normal execution path (which acquires GIL, then mutex).
* This prevents ABBA deadlock.
*/
PyGILState_STATE gstate = PyGILState_Ensure();
pthread_mutex_lock(&loop->namespaces_mutex);
/* Find and remove namespace for this pid */
process_namespace_t **pp = &loop->namespaces_head;
while (*pp != NULL) {
if (enif_compare_pids(&(*pp)->owner_pid, pid) == 0) {
process_namespace_t *to_free = *pp;
*pp = to_free->next;
Py_XDECREF(to_free->globals);
Py_XDECREF(to_free->locals);
Py_XDECREF(to_free->module_cache);
enif_free(to_free);
break;
}
pp = &(*pp)->next;
}
pthread_mutex_unlock(&loop->namespaces_mutex);
PyGILState_Release(gstate);
}
/**
* @brief Look up namespace for a process (without creating)
*
* @param loop Event loop containing namespace registry
* @param pid Process to look up
* @return Namespace or NULL if not found
*
* @note Thread-safe (uses namespaces_mutex)
*/
static process_namespace_t *lookup_process_namespace(
erlang_event_loop_t *loop,
ErlNifPid *pid
) {
pthread_mutex_lock(&loop->namespaces_mutex);
process_namespace_t *ns = loop->namespaces_head;
while (ns != NULL) {
if (enif_compare_pids(&ns->owner_pid, pid) == 0) {
pthread_mutex_unlock(&loop->namespaces_mutex);
return ns;
}
ns = ns->next;
}
pthread_mutex_unlock(&loop->namespaces_mutex);
return NULL;
}
/**
* @brief Get or create namespace for a process
*
* Each Erlang process gets its own isolated Python namespace (globals/locals).
* The namespace is automatically cleaned up when the process exits.
*
* @param env NIF environment (for monitoring)
* @param loop Event loop containing namespace registry
* @param pid Process to get namespace for
* @return Namespace or NULL on failure
*
* @note Must be called with GIL held
* @note Thread-safe (uses namespaces_mutex)
*/
static process_namespace_t *ensure_process_namespace(
ErlNifEnv *env,
erlang_event_loop_t *loop,
ErlNifPid *pid
) {
pthread_mutex_lock(&loop->namespaces_mutex);
/* Search for existing namespace */
process_namespace_t *ns = loop->namespaces_head;
while (ns != NULL) {
if (enif_compare_pids(&ns->owner_pid, pid) == 0) {
pthread_mutex_unlock(&loop->namespaces_mutex);
return ns;
}
ns = ns->next;
}
/* Create new namespace */
ns = enif_alloc(sizeof(process_namespace_t));
if (ns == NULL) {
pthread_mutex_unlock(&loop->namespaces_mutex);
return NULL;
}
ns->owner_pid = *pid;
ns->globals = PyDict_New();
ns->locals = PyDict_New();
ns->module_cache = PyDict_New();
if (ns->globals == NULL || ns->locals == NULL || ns->module_cache == NULL) {
Py_XDECREF(ns->globals);
Py_XDECREF(ns->locals);
Py_XDECREF(ns->module_cache);
enif_free(ns);
pthread_mutex_unlock(&loop->namespaces_mutex);
return NULL;
}
/* Import builtins into globals */
PyObject *builtins = PyEval_GetBuiltins();
if (builtins != NULL) {
PyDict_SetItemString(ns->globals, "__builtins__", builtins);
}
/* Import erlang module into globals */
PyObject *erlang_module = PyImport_ImportModule("erlang");
if (erlang_module != NULL) {
PyDict_SetItemString(ns->globals, "erlang", erlang_module);
Py_DECREF(erlang_module);
}
/* Monitor process for cleanup */
if (enif_monitor_process(env, loop, pid, &ns->monitor) != 0) {
Py_DECREF(ns->globals);
Py_DECREF(ns->locals);
Py_DECREF(ns->module_cache);
enif_free(ns);
pthread_mutex_unlock(&loop->namespaces_mutex);
return NULL;
}
/* Add to list */
ns->next = loop->namespaces_head;
loop->namespaces_head = ns;
pthread_mutex_unlock(&loop->namespaces_mutex);
return ns;
}
/**
* @brief Look up function in process namespace or module
*
* For __main__ module, looks in process namespace first.
* For other modules, uses PyImport_ImportModule.
*
* @param loop Event loop (for callable cache)
* @param ns Process namespace (may be NULL)
* @param module_name Module name
* @param func_name Function name
* @return New reference to callable, or NULL on failure
*
* @note Must be called with GIL held
*/
static PyObject *get_function_for_task(
erlang_event_loop_t *loop,
process_namespace_t *ns,
const char *module_name,
const char *func_name
) {
PyObject *func = NULL;
/* For __main__ or _process_, check process namespace first */
if (ns != NULL &&
(strcmp(module_name, "__main__") == 0 ||
strcmp(module_name, "_process_") == 0)) {
func = PyDict_GetItemString(ns->globals, func_name);
if (func != NULL) {
Py_INCREF(func);
return func;
}
}
/* Try callable cache (uvloop-style optimization) */
func = callable_cache_lookup(loop, module_name, func_name);
if (func != NULL) {
Py_INCREF(func);
return func;
}
/* Cache miss - import module and get function */
PyObject *module = PyImport_ImportModule(module_name);
if (module == NULL) {
PyErr_Clear();
return NULL;
}
func = PyObject_GetAttrString(module, func_name);
Py_DECREF(module);
if (func == NULL) {
PyErr_Clear();
return NULL;
}
/* Cache for next lookup (only for non-__main__ modules) */
if (strcmp(module_name, "__main__") != 0 &&
strcmp(module_name, "_process_") != 0) {
callable_cache_insert(loop, module_name, func_name, func);
}
return func;
}
/* ============================================================================
* Initialization
* ============================================================================ */
int event_loop_init(ErlNifEnv *env) {
/* Create event loop resource type with down callback for process monitors */
ErlNifResourceTypeInit loop_init = {
.dtor = event_loop_destructor,
.stop = NULL,
.down = event_loop_down,
.members = 3
};
EVENT_LOOP_RESOURCE_TYPE = enif_init_resource_type(
env, "erlang_event_loop", &loop_init,
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL);
if (EVENT_LOOP_RESOURCE_TYPE == NULL) {
return -1;
}
/* Create fd resource type with select support */
ErlNifResourceTypeInit fd_init = {
.dtor = fd_resource_destructor,
.stop = fd_resource_stop,
.down = fd_resource_down,
.members = 3
};
FD_RESOURCE_TYPE = enif_init_resource_type(
env, "fd_resource", &fd_init,
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL);
if (FD_RESOURCE_TYPE == NULL) {
return -1;
}
/* Create timer resource type */
ErlNifResourceTypeInit timer_init = {
.dtor = timer_resource_destructor,
.stop = NULL,
.down = NULL,
.members = 1
};
TIMER_RESOURCE_TYPE = enif_init_resource_type(
env, "timer_resource", &timer_init,
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL);
if (TIMER_RESOURCE_TYPE == NULL) {
return -1;
}
/* Create atoms */
ATOM_SELECT = enif_make_atom(env, "select");
ATOM_READY_INPUT = enif_make_atom(env, "ready_input");
ATOM_READY_OUTPUT = enif_make_atom(env, "ready_output");
ATOM_READ = enif_make_atom(env, "read");
ATOM_WRITE = enif_make_atom(env, "write");
ATOM_TIMER = enif_make_atom(env, "timer");
ATOM_START_TIMER = enif_make_atom(env, "start_timer");
ATOM_CANCEL_TIMER = enif_make_atom(env, "cancel_timer");
ATOM_EVENT_LOOP = enif_make_atom(env, "event_loop");
ATOM_DISPATCH = enif_make_atom(env, "dispatch");
return 0;
}
void event_loop_cleanup(void) {
/* Resource types are cleaned up by the runtime */
}
/**
* set_event_loop_priv_dir(Path) -> ok
*
* Store the priv_dir path for use when importing modules in subinterpreters.
* Called from Erlang during application startup.
*/
ERL_NIF_TERM nif_set_event_loop_priv_dir(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
(void)argc;
ErlNifBinary path_bin;
if (!enif_inspect_binary(env, argv[0], &path_bin) &&
!enif_inspect_iolist_as_binary(env, argv[0], &path_bin)) {
return make_error(env, "invalid_path");
}
size_t len = path_bin.size;
if (len >= sizeof(g_priv_dir)) {