Skip to content

Commit 3001b53

Browse files
committed
Optimize Channel API for large messages
Add channel_send_owned_binary() that takes ownership of a binary and directly enqueues it to the IOQueue without an additional copy. Before: nif_channel_send called enif_term_to_binary (alloc + copy), then channel_send which called enif_alloc_binary + memcpy (alloc + copy). This resulted in 2 allocations and 2 copies per message. After: nif_channel_send calls enif_term_to_binary then passes the binary directly to channel_send_owned_binary which enqueues without copying. This results in 1 allocation and 1 copy per message. For 16KB messages, this eliminates 16KB of unnecessary memory operations per send, improving speedup vs Reactor from 1.6x to 2.0x.
1 parent 2e5447a commit 3001b53

2 files changed

Lines changed: 79 additions & 3 deletions

File tree

c_src/py_channel.c

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,61 @@ int channel_send(py_channel_t *channel, const unsigned char *data, size_t size)
157157
return 0;
158158
}
159159

160+
int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin) {
161+
pthread_mutex_lock(&channel->mutex);
162+
163+
if (channel->closed) {
164+
enif_release_binary(bin);
165+
pthread_mutex_unlock(&channel->mutex);
166+
return -1;
167+
}
168+
169+
/* Check backpressure */
170+
if (channel->max_size > 0 && channel->current_size + bin->size > channel->max_size) {
171+
enif_release_binary(bin);
172+
pthread_mutex_unlock(&channel->mutex);
173+
return 1; /* Busy - backpressure */
174+
}
175+
176+
size_t msg_size = bin->size;
177+
178+
/* Directly enqueue - transfers ownership to IOQueue */
179+
if (!enif_ioq_enq_binary(channel->queue, bin, 0)) {
180+
enif_release_binary(bin);
181+
pthread_mutex_unlock(&channel->mutex);
182+
return -1;
183+
}
184+
185+
channel->current_size += msg_size;
186+
187+
/* Check if there's a waiting context to resume */
188+
bool should_resume = (channel->waiting != NULL);
189+
190+
/* Check if there's an async waiter to dispatch */
191+
erlang_event_loop_t *loop_to_wake = NULL;
192+
uint64_t callback_id = 0;
193+
194+
if (channel->has_waiter) {
195+
loop_to_wake = channel->waiter_loop;
196+
callback_id = channel->waiter_callback_id;
197+
channel->has_waiter = false;
198+
channel->waiter_loop = NULL;
199+
}
200+
201+
pthread_mutex_unlock(&channel->mutex);
202+
203+
if (should_resume) {
204+
channel_resume_waiting(channel);
205+
}
206+
207+
if (loop_to_wake != NULL) {
208+
event_loop_add_pending(loop_to_wake, EVENT_TYPE_TIMER, callback_id, -1);
209+
enif_release_resource(loop_to_wake);
210+
}
211+
212+
return 0;
213+
}
214+
160215
int channel_try_receive(py_channel_t *channel, unsigned char **out_data, size_t *out_size) {
161216
pthread_mutex_lock(&channel->mutex);
162217

@@ -516,6 +571,10 @@ static ERL_NIF_TERM nif_channel_create(ErlNifEnv *env, int argc, const ERL_NIF_T
516571
* @brief Send a message to a channel
517572
*
518573
* nif_channel_send(ChannelRef, Term) -> ok | busy | {error, closed}
574+
*
575+
* Optimized: serializes the term once and passes the binary directly to
576+
* channel_send_owned_binary, eliminating the extra allocation and copy
577+
* that would occur in channel_send.
519578
*/
520579
static ERL_NIF_TERM nif_channel_send(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
521580
(void)argc;
@@ -525,15 +584,19 @@ static ERL_NIF_TERM nif_channel_send(ErlNifEnv *env, int argc, const ERL_NIF_TER
525584
return make_error(env, "invalid_channel");
526585
}
527586

528-
/* Serialize term to binary using external term format */
587+
/*
588+
* Serialize term to binary using external term format.
589+
* term_to_binary allocates and we own the binary, so pass it
590+
* directly to channel_send_owned_binary to avoid double copy.
591+
*/
529592
ErlNifBinary bin;
530593
if (!enif_term_to_binary(env, argv[1], &bin)) {
531594
return make_error(env, "term_to_binary_failed");
532595
}
533596

534-
int result = channel_send(channel, bin.data, bin.size);
535-
enif_release_binary(&bin);
597+
int result = channel_send_owned_binary(channel, &bin);
536598

599+
/* bin is now owned by IOQueue or released on error */
537600
switch (result) {
538601
case 0:
539602
return ATOM_OK;

c_src/py_channel.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,19 @@ py_channel_t *channel_alloc(size_t max_size);
204204
*/
205205
int channel_send(py_channel_t *channel, const unsigned char *data, size_t size);
206206

207+
/**
208+
* @brief Send an owned binary to a channel (zero-copy fast path)
209+
*
210+
* Takes ownership of the binary and directly enqueues it to the IOQueue
211+
* without an additional copy. The binary is either consumed by the queue
212+
* or released on error.
213+
*
214+
* @param channel Channel to send to
215+
* @param bin Binary to send (ownership transferred, do not release after call)
216+
* @return 0 on success, 1 if busy (backpressure), -1 on error
217+
*/
218+
int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin);
219+
207220
/**
208221
* @brief Try to receive a message from a channel (non-blocking)
209222
*

0 commit comments

Comments
 (0)