Skip to content

Commit a94d736

Browse files
authored
Optimize event loop for improved performance (#19)
* Optimize event loop for improved performance - Fix O(2n) list reversal in nif_get_pending using enif_make_list_from_array - Use bitwise AND instead of modulo for hash index calculation - Add single-lookup callback dispatch with _callbacks_by_cid dict - Implement coalesced wakeup flag to reduce condition variable signals - Add timer heap cleanup iteration limit (max 10 per cycle) - Lazy-load transport extra info (sockname/peername) on first access - Remove per-event stats updates from py_event_worker * Remove redundant wakeups and optimize transport buffer - Remove py_nif:event_loop_wakeup calls from worker FD and timer handlers (already handled by coalesced wakeup in event_loop_add_pending) - Replace O(n) buffer deletion with O(1) offset tracking in transport - Use memoryview for efficient buffer slicing during writes * Increase hash table size and add backpressure - Increase PENDING_HASH_SIZE from 128 to 256 for better load factor - Add backpressure check before acquiring lock in event_loop_add_pending - Drop events when queue exceeds MAX_PENDING_EVENTS (256) * Add benchmark scripts for event loop performance testing - run_benchmark.erl: Erlang module for callback and TCP benchmarks - simple_bench.py: Simple callback dispatch benchmark - io_bench.py: TCP echo I/O benchmark * Optimize pool and router for event loop performance - Pool: Create independent loops instead of duplicating same reference - Pool: Use tuple for O(1) element access instead of lists:nth - Pool: Each loop gets its own worker for true parallelism - Router: Use combined handle_fd_event_and_reselect NIF call - Router: Add message_queue_data off_heap for large queues * Cache reactor callables for faster hot path - Add global cache for erlang.reactor module and callbacks - Cache on_read_ready and on_write_ready function references - Use PyObject_CallFunctionObjArgs instead of PyObject_CallMethod - Avoids PyImport and attribute lookup on every read/write callback - Thread-safe lazy initialization with double-checked locking * Add callable caching and simplify ASGI/WSGI code - Add per-interpreter callable cache to avoid per-request module imports - Cache module/callable for ASGI apps - Cache runner module and _run_asgi_sync/_run_wsgi_sync functions - ~3-5µs saved per request on cache hit - Remove scope template cache (~200 lines) - Thread-local 64-entry cache added complexity for marginal gain - Direct scope building with interned keys is fast enough - Remove lazy header conversion (~450 lines) - LazyHeaderList type and iterator removed - Always use eager header conversion - Simpler code path, frameworks typically access headers eagerly - Update documentation to reflect new optimization strategy Net reduction: ~566 lines of code
1 parent a46b940 commit a94d736

16 files changed

Lines changed: 969 additions & 996 deletions

c_src/py_asgi.c

Lines changed: 193 additions & 843 deletions
Large diffs are not rendered by default.

c_src/py_asgi.h

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -90,22 +90,6 @@
9090
*/
9191
#define ASGI_MAX_INTERPRETERS 64
9292

93-
/**
94-
* @def SCOPE_CACHE_SIZE
95-
* @brief Number of scope templates to cache per thread
96-
*/
97-
#define SCOPE_CACHE_SIZE 64
98-
99-
/**
100-
* @def LAZY_HEADERS_THRESHOLD
101-
* @brief Minimum number of headers to use lazy conversion
102-
*
103-
* For small header counts, eager conversion is faster due to lower overhead.
104-
* Only use lazy conversion when there are enough headers to benefit.
105-
*/
106-
#ifndef LAZY_HEADERS_THRESHOLD
107-
#define LAZY_HEADERS_THRESHOLD 4
108-
#endif
10993

11094
/* ============================================================================
11195
* ASGI Erlang Atoms
@@ -120,9 +104,6 @@ extern ERL_NIF_TERM ATOM_ASGI_METHOD;
120104
/* Resource type for zero-copy body buffers */
121105
extern ErlNifResourceType *ASGI_BUFFER_RESOURCE_TYPE;
122106

123-
/* Resource type for lazy header conversion */
124-
extern ErlNifResourceType *ASGI_LAZY_HEADERS_RESOURCE_TYPE;
125-
126107
/* ============================================================================
127108
* Per-Interpreter State (Sub-interpreter & Free-threading Support)
128109
* ============================================================================ */
@@ -237,6 +218,15 @@ typedef struct asgi_interp_state {
237218
PyObject *status_500; /**< 500 Internal Server Error */
238219
PyObject *status_502; /**< 502 Bad Gateway */
239220
PyObject *status_503; /**< 503 Service Unavailable */
221+
222+
/* Callable cache (avoids per-request module imports) */
223+
char *cached_module_name; /**< Last used module name */
224+
char *cached_callable_name; /**< Last used callable name */
225+
PyObject *cached_callable; /**< Cached callable object */
226+
227+
char *cached_runner_name; /**< Last used runner module name */
228+
PyObject *cached_runner; /**< Cached runner module */
229+
PyObject *cached_run_func; /**< Cached _run_asgi_sync function */
240230
} asgi_interp_state_t;
241231

242232
/**

c_src/py_event_loop.c

Lines changed: 170 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,76 @@ static ErlNifPid g_global_shared_router;
120120
static bool g_global_shared_router_valid = false;
121121
static pthread_mutex_t g_global_router_mutex = PTHREAD_MUTEX_INITIALIZER;
122122

123+
/* ============================================================================
124+
* Cached Reactor Callables (Performance Optimization)
125+
* ============================================================================
126+
*
127+
* Cache erlang.reactor module and callbacks to avoid expensive PyImport
128+
* on every read/write callback in the hot path.
129+
*/
130+
static PyObject *g_reactor_module = NULL;
131+
static PyObject *g_on_read_ready = NULL;
132+
static PyObject *g_on_write_ready = NULL;
133+
static bool g_reactor_cached = false;
134+
static pthread_mutex_t g_reactor_cache_mutex = PTHREAD_MUTEX_INITIALIZER;
135+
136+
/**
137+
* Initialize cached reactor callables.
138+
* MUST be called with GIL held.
139+
* Thread-safe: uses mutex for first initialization.
140+
*
141+
* @return true if callables are cached and ready, false on error
142+
*/
143+
static bool ensure_reactor_cached(void) {
144+
/* Fast path: already cached */
145+
if (g_reactor_cached) {
146+
return true;
147+
}
148+
149+
pthread_mutex_lock(&g_reactor_cache_mutex);
150+
151+
/* Double-check after acquiring lock */
152+
if (g_reactor_cached) {
153+
pthread_mutex_unlock(&g_reactor_cache_mutex);
154+
return true;
155+
}
156+
157+
/* Import erlang.reactor module */
158+
PyObject *module = PyImport_ImportModule("erlang.reactor");
159+
if (module == NULL) {
160+
pthread_mutex_unlock(&g_reactor_cache_mutex);
161+
return false;
162+
}
163+
164+
/* Get on_read_ready function */
165+
PyObject *on_read = PyObject_GetAttrString(module, "on_read_ready");
166+
if (on_read == NULL || !PyCallable_Check(on_read)) {
167+
Py_XDECREF(on_read);
168+
Py_DECREF(module);
169+
pthread_mutex_unlock(&g_reactor_cache_mutex);
170+
return false;
171+
}
172+
173+
/* Get on_write_ready function */
174+
PyObject *on_write = PyObject_GetAttrString(module, "on_write_ready");
175+
if (on_write == NULL || !PyCallable_Check(on_write)) {
176+
Py_XDECREF(on_write);
177+
Py_DECREF(on_read);
178+
Py_DECREF(module);
179+
pthread_mutex_unlock(&g_reactor_cache_mutex);
180+
return false;
181+
}
182+
183+
/* Store cached references */
184+
g_reactor_module = module;
185+
g_on_read_ready = on_read;
186+
g_on_write_ready = on_write;
187+
g_reactor_cached = true;
188+
189+
pthread_mutex_unlock(&g_reactor_cache_mutex);
190+
return true;
191+
}
192+
123193
/* Forward declaration for module state access */
124194
static py_event_loop_module_state_t *get_module_state(void);
125195
static py_event_loop_module_state_t *get_module_state_from_module(PyObject *module);
@@ -977,6 +1047,9 @@ static int poll_events_wait(erlang_event_loop_t *loop, int timeout_ms) {
9771047

9781048
pthread_mutex_lock(&loop->mutex);
9791049

1050+
/* Reset wake_pending flag since we're about to process events */
1051+
atomic_store(&loop->wake_pending, false);
1052+
9801053
int current_count = atomic_load(&loop->pending_count);
9811054
if (current_count == 0 && !loop->shutdown) {
9821055
/* No events, wait with timeout */
@@ -1088,11 +1161,49 @@ ERL_NIF_TERM nif_get_pending(ErlNifEnv *env, int argc,
10881161
/*
10891162
* Phase 2: Build Erlang list outside lock (no contention)
10901163
* Term creation and memory operations happen without holding the mutex.
1164+
*
1165+
* Optimization: Count elements first, then use enif_make_list_from_array
1166+
* to build the list in O(n) instead of O(2n) with build-then-reverse.
10911167
*/
1092-
ERL_NIF_TERM list = enif_make_list(env, 0);
1093-
pending_event_t *current = snapshot_head;
10941168

1169+
/* Count events in the snapshot */
1170+
size_t count = 0;
1171+
pending_event_t *current = snapshot_head;
10951172
while (current != NULL) {
1173+
count++;
1174+
current = current->next;
1175+
}
1176+
1177+
if (count == 0) {
1178+
return enif_make_list(env, 0);
1179+
}
1180+
1181+
/* Allocate array for terms - use stack for small counts, heap for large */
1182+
ERL_NIF_TERM *terms;
1183+
ERL_NIF_TERM stack_terms[64];
1184+
bool heap_allocated = false;
1185+
1186+
if (count <= 64) {
1187+
terms = stack_terms;
1188+
} else {
1189+
terms = enif_alloc(count * sizeof(ERL_NIF_TERM));
1190+
if (terms == NULL) {
1191+
/* Fallback: free events and return empty list */
1192+
current = snapshot_head;
1193+
while (current != NULL) {
1194+
pending_event_t *next = current->next;
1195+
enif_free(current);
1196+
current = next;
1197+
}
1198+
return enif_make_list(env, 0);
1199+
}
1200+
heap_allocated = true;
1201+
}
1202+
1203+
/* Build terms array in forward order (matching linked list order) */
1204+
current = snapshot_head;
1205+
size_t i = 0;
1206+
while (current != NULL && i < count) {
10961207
ERL_NIF_TERM type_atom;
10971208
switch (current->type) {
10981209
case EVENT_TYPE_READ:
@@ -1108,26 +1219,26 @@ ERL_NIF_TERM nif_get_pending(ErlNifEnv *env, int argc,
11081219
type_atom = ATOM_UNDEFINED;
11091220
}
11101221

1111-
ERL_NIF_TERM event = enif_make_tuple2(
1222+
terms[i] = enif_make_tuple2(
11121223
env,
11131224
enif_make_uint64(env, current->callback_id),
11141225
type_atom
11151226
);
11161227

1117-
list = enif_make_list_cell(env, event, list);
11181228
pending_event_t *next = current->next;
11191229
enif_free(current);
11201230
current = next;
1231+
i++;
11211232
}
11221233

1123-
/* Reverse the list to maintain order */
1124-
ERL_NIF_TERM reversed = enif_make_list(env, 0);
1125-
ERL_NIF_TERM head;
1126-
while (enif_get_list_cell(env, list, &head, &list)) {
1127-
reversed = enif_make_list_cell(env, head, reversed);
1234+
/* Build list from array in O(n) */
1235+
ERL_NIF_TERM result = enif_make_list_from_array(env, terms, (unsigned int)i);
1236+
1237+
if (heap_allocated) {
1238+
enif_free(terms);
11281239
}
11291240

1130-
return reversed;
1241+
return result;
11311242
}
11321243

11331244
/**
@@ -1705,10 +1816,13 @@ static inline uint64_t pending_hash_key(uint64_t callback_id, event_type_t type)
17051816

17061817
/**
17071818
* @brief Compute hash bucket index
1819+
*
1820+
* Note: PENDING_HASH_SIZE must be a power of 2 for bitwise AND to work.
1821+
* Using AND instead of modulo is faster (single instruction vs division).
17081822
*/
17091823
static inline uint32_t pending_hash_index(uint64_t key) {
1710-
/* Simple hash: XOR fold and modulo */
1711-
return (uint32_t)((key ^ (key >> 32)) % PENDING_HASH_SIZE);
1824+
/* Simple hash: XOR fold and bitwise AND (faster than modulo) */
1825+
return (uint32_t)((key ^ (key >> 32)) & (PENDING_HASH_SIZE - 1));
17121826
}
17131827

17141828
/**
@@ -1728,9 +1842,9 @@ static inline bool pending_hash_contains(erlang_event_loop_t *loop,
17281842
uint64_t key = pending_hash_key(callback_id, type);
17291843
uint32_t idx = pending_hash_index(key);
17301844

1731-
/* Linear probing */
1845+
/* Linear probing with bitwise AND for wrap-around */
17321846
for (int i = 0; i < PENDING_HASH_SIZE; i++) {
1733-
uint32_t probe = (idx + i) % PENDING_HASH_SIZE;
1847+
uint32_t probe = (idx + i) & (PENDING_HASH_SIZE - 1);
17341848
if (!loop->pending_hash_occupied[probe]) {
17351849
return false; /* Empty slot means key not present */
17361850
}
@@ -1759,9 +1873,9 @@ static inline bool pending_hash_insert(erlang_event_loop_t *loop,
17591873
uint64_t key = pending_hash_key(callback_id, type);
17601874
uint32_t idx = pending_hash_index(key);
17611875

1762-
/* Linear probing */
1876+
/* Linear probing with bitwise AND for wrap-around */
17631877
for (int i = 0; i < PENDING_HASH_SIZE; i++) {
1764-
uint32_t probe = (idx + i) % PENDING_HASH_SIZE;
1878+
uint32_t probe = (idx + i) & (PENDING_HASH_SIZE - 1);
17651879
if (!loop->pending_hash_occupied[probe]) {
17661880
loop->pending_hash_keys[probe] = key;
17671881
loop->pending_hash_occupied[probe] = true;
@@ -1789,6 +1903,11 @@ static inline void pending_hash_clear(erlang_event_loop_t *loop) {
17891903

17901904
void event_loop_add_pending(erlang_event_loop_t *loop, event_type_t type,
17911905
uint64_t callback_id, int fd) {
1906+
/* Backpressure: check pending count before acquiring lock (fast path) */
1907+
if (atomic_load(&loop->pending_count) >= MAX_PENDING_EVENTS) {
1908+
return; /* Queue full, drop event */
1909+
}
1910+
17921911
pthread_mutex_lock(&loop->mutex);
17931912

17941913
/* O(1) duplicate check using hash set */
@@ -1822,7 +1941,14 @@ void event_loop_add_pending(erlang_event_loop_t *loop, event_type_t type,
18221941
pending_hash_insert(loop, callback_id, type);
18231942

18241943
atomic_fetch_add(&loop->pending_count, 1);
1825-
pthread_cond_signal(&loop->event_cond);
1944+
1945+
/*
1946+
* Coalesced wakeup (uvloop-style): Only signal if no wakeup is pending.
1947+
* This reduces condition variable signals under high event rates.
1948+
*/
1949+
if (!atomic_exchange(&loop->wake_pending, true)) {
1950+
pthread_cond_signal(&loop->event_cond);
1951+
}
18261952

18271953
pthread_mutex_unlock(&loop->mutex);
18281954
}
@@ -3120,19 +3246,25 @@ ERL_NIF_TERM nif_reactor_on_read_ready(ErlNifEnv *env, int argc,
31203246
return make_error(env, "buffer_creation_failed");
31213247
}
31223248

3123-
/* Import erlang.reactor module */
3124-
PyObject *reactor_module = PyImport_ImportModule("erlang.reactor");
3125-
if (reactor_module == NULL) {
3249+
/* Ensure reactor callables are cached (fast path after first call) */
3250+
if (!ensure_reactor_cached()) {
31263251
PyErr_Clear();
31273252
Py_DECREF(py_buffer);
31283253
py_context_release(&guard);
3129-
return make_error(env, "import_erlang_reactor_failed");
3254+
return make_error(env, "reactor_cache_init_failed");
31303255
}
31313256

3132-
/* Call on_read_ready(fd, data) with the buffer */
3133-
PyObject *result = PyObject_CallMethod(reactor_module, "on_read_ready",
3134-
"iO", fd, py_buffer);
3135-
Py_DECREF(reactor_module);
3257+
/* Call cached on_read_ready(fd, data) - avoids PyImport on every call */
3258+
PyObject *py_fd = PyLong_FromLong(fd);
3259+
if (py_fd == NULL) {
3260+
PyErr_Clear();
3261+
Py_DECREF(py_buffer);
3262+
py_context_release(&guard);
3263+
return make_error(env, "fd_conversion_failed");
3264+
}
3265+
3266+
PyObject *result = PyObject_CallFunctionObjArgs(g_on_read_ready, py_fd, py_buffer, NULL);
3267+
Py_DECREF(py_fd);
31363268
Py_DECREF(py_buffer);
31373269

31383270
if (result == NULL) {
@@ -3190,18 +3322,23 @@ ERL_NIF_TERM nif_reactor_on_write_ready(ErlNifEnv *env, int argc,
31903322
return make_error(env, "acquire_failed");
31913323
}
31923324

3193-
/* Import erlang.reactor module */
3194-
PyObject *reactor_module = PyImport_ImportModule("erlang.reactor");
3195-
if (reactor_module == NULL) {
3325+
/* Ensure reactor callables are cached (fast path after first call) */
3326+
if (!ensure_reactor_cached()) {
31963327
PyErr_Clear();
31973328
py_context_release(&guard);
3198-
return make_error(env, "import_erlang_reactor_failed");
3329+
return make_error(env, "reactor_cache_init_failed");
31993330
}
32003331

3201-
/* Call on_write_ready(fd) */
3202-
PyObject *result = PyObject_CallMethod(reactor_module, "on_write_ready",
3203-
"i", fd);
3204-
Py_DECREF(reactor_module);
3332+
/* Call cached on_write_ready(fd) - avoids PyImport on every call */
3333+
PyObject *py_fd = PyLong_FromLong(fd);
3334+
if (py_fd == NULL) {
3335+
PyErr_Clear();
3336+
py_context_release(&guard);
3337+
return make_error(env, "fd_conversion_failed");
3338+
}
3339+
3340+
PyObject *result = PyObject_CallFunctionObjArgs(g_on_write_ready, py_fd, NULL);
3341+
Py_DECREF(py_fd);
32053342

32063343
if (result == NULL) {
32073344
PyErr_Clear();

c_src/py_event_loop.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,9 @@
4949
/** @brief Maximum events to keep in freelist (Phase 7 optimization) */
5050
#define EVENT_FREELIST_SIZE 256
5151

52-
/** @brief Size of pending event hash set for O(1) duplicate detection */
53-
#define PENDING_HASH_SIZE 128
52+
/** @brief Size of pending event hash set for O(1) duplicate detection
53+
* Note: Must be a power of 2 for efficient bitwise AND indexing */
54+
#define PENDING_HASH_SIZE 256
5455

5556
/** @brief Event types for pending callbacks */
5657
typedef enum {
@@ -240,6 +241,11 @@ typedef struct erlang_event_loop {
240241
/** @brief Count of occupied slots in hash set */
241242
int pending_hash_count;
242243

244+
/* ========== Coalesced Wakeup Support ========== */
245+
246+
/** @brief Flag indicating a wakeup is pending (uvloop-style coalescing) */
247+
_Atomic bool wake_pending;
248+
243249
/* ========== Synchronous Sleep Support ========== */
244250

245251
/** @brief Current synchronous sleep ID being waited on */

c_src/py_nif.c

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3659,12 +3659,6 @@ static int load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) {
36593659
asgi_buffer_resource_dtor,
36603660
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL);
36613661

3662-
/* ASGI lazy headers resource type for on-demand header conversion */
3663-
ASGI_LAZY_HEADERS_RESOURCE_TYPE = enif_open_resource_type(
3664-
env, NULL, "asgi_lazy_headers",
3665-
lazy_headers_resource_dtor,
3666-
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL);
3667-
36683662
/* Reactor buffer resource type for zero-copy read handling */
36693663
REACTOR_BUFFER_RESOURCE_TYPE = enif_open_resource_type(
36703664
env, NULL, "reactor_buffer",

0 commit comments

Comments
 (0)