Skip to content

Commit 941e5d4

Browse files
committed
Add direct FD operations for hornbeam
- fd_read/2: Read from raw file descriptor - fd_write/2: Write to raw file descriptor - fd_select_read/1: Register FD for read events, returns {ok, FdRef} - fd_select_write/1: Register FD for write events, returns {ok, FdRef} - fd_close/1: Close raw file descriptor - socketpair/0: Create Unix socketpair These operations enable hornbeam_socketpair and hornbeam_proxy_bridge to work with raw file descriptors without requiring Python context.
1 parent 97de5e0 commit 941e5d4

5 files changed

Lines changed: 505 additions & 2 deletions

File tree

c_src/py_event_loop.c

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3337,6 +3337,237 @@ ERL_NIF_TERM nif_reactor_close_fd(ErlNifEnv *env, int argc,
33373337
return ATOM_OK;
33383338
}
33393339

3340+
/* ============================================================================
3341+
* Direct FD Operations
3342+
*
3343+
* These functions provide direct FD read/write for proxy/bridge use cases.
3344+
* ============================================================================ */
3345+
3346+
/**
3347+
* fd_read(Fd, Size) -> {ok, Data} | {error, Reason}
3348+
*
3349+
* Read up to Size bytes from a file descriptor.
3350+
*/
3351+
ERL_NIF_TERM nif_fd_read(ErlNifEnv *env, int argc,
3352+
const ERL_NIF_TERM argv[]) {
3353+
(void)argc;
3354+
3355+
int fd;
3356+
if (!enif_get_int(env, argv[0], &fd)) {
3357+
return make_error(env, "invalid_fd");
3358+
}
3359+
3360+
unsigned long size;
3361+
if (!enif_get_ulong(env, argv[1], &size)) {
3362+
return make_error(env, "invalid_size");
3363+
}
3364+
3365+
if (size > 1024 * 1024) {
3366+
size = 1024 * 1024;
3367+
}
3368+
3369+
ErlNifBinary bin;
3370+
if (!enif_alloc_binary(size, &bin)) {
3371+
return make_error(env, "alloc_failed");
3372+
}
3373+
3374+
ssize_t n = read(fd, bin.data, bin.size);
3375+
if (n < 0) {
3376+
enif_release_binary(&bin);
3377+
if (errno == EAGAIN || errno == EWOULDBLOCK) {
3378+
return make_error(env, "eagain");
3379+
}
3380+
return make_error(env, strerror(errno));
3381+
}
3382+
3383+
if ((size_t)n < bin.size) {
3384+
enif_realloc_binary(&bin, n);
3385+
}
3386+
3387+
ERL_NIF_TERM data = enif_make_binary(env, &bin);
3388+
return enif_make_tuple2(env, ATOM_OK, data);
3389+
}
3390+
3391+
/**
3392+
* fd_write(Fd, Data) -> {ok, Written} | {error, Reason}
3393+
*
3394+
* Write data to a file descriptor.
3395+
*/
3396+
ERL_NIF_TERM nif_fd_write(ErlNifEnv *env, int argc,
3397+
const ERL_NIF_TERM argv[]) {
3398+
(void)argc;
3399+
3400+
int fd;
3401+
if (!enif_get_int(env, argv[0], &fd)) {
3402+
return make_error(env, "invalid_fd");
3403+
}
3404+
3405+
ErlNifBinary bin;
3406+
if (!enif_inspect_binary(env, argv[1], &bin)) {
3407+
return make_error(env, "invalid_data");
3408+
}
3409+
3410+
ssize_t n = write(fd, bin.data, bin.size);
3411+
if (n < 0) {
3412+
if (errno == EAGAIN || errno == EWOULDBLOCK) {
3413+
return make_error(env, "eagain");
3414+
}
3415+
return make_error(env, strerror(errno));
3416+
}
3417+
3418+
return enif_make_tuple2(env, ATOM_OK, enif_make_long(env, n));
3419+
}
3420+
3421+
/**
3422+
* fd_select_read(Fd) -> {ok, FdRef} | {error, Reason}
3423+
*
3424+
* Register FD for read selection. Caller receives {select, FdRef, Ref, ready_input}.
3425+
* Returns a resource reference that must be kept alive while monitoring.
3426+
*/
3427+
ERL_NIF_TERM nif_fd_select_read(ErlNifEnv *env, int argc,
3428+
const ERL_NIF_TERM argv[]) {
3429+
(void)argc;
3430+
3431+
int fd;
3432+
if (!enif_get_int(env, argv[0], &fd)) {
3433+
return make_error(env, "invalid_fd");
3434+
}
3435+
3436+
ErlNifPid caller_pid;
3437+
if (!enif_self(env, &caller_pid)) {
3438+
return make_error(env, "no_caller_pid");
3439+
}
3440+
3441+
/* Allocate fd resource */
3442+
fd_resource_t *fd_res = enif_alloc_resource(FD_RESOURCE_TYPE,
3443+
sizeof(fd_resource_t));
3444+
if (fd_res == NULL) {
3445+
return make_error(env, "alloc_failed");
3446+
}
3447+
3448+
fd_res->fd = fd;
3449+
fd_res->read_callback_id = 0;
3450+
fd_res->write_callback_id = 0;
3451+
fd_res->owner_pid = caller_pid;
3452+
fd_res->reader_active = true;
3453+
fd_res->writer_active = false;
3454+
fd_res->loop = NULL;
3455+
atomic_store(&fd_res->closing_state, FD_STATE_OPEN);
3456+
fd_res->monitor_active = false;
3457+
fd_res->owns_fd = false; /* Caller owns the fd */
3458+
3459+
int ret = enif_select(env, (ErlNifEvent)fd, ERL_NIF_SELECT_READ,
3460+
fd_res, &caller_pid, enif_make_ref(env));
3461+
if (ret < 0) {
3462+
enif_release_resource(fd_res);
3463+
return make_error(env, "select_failed");
3464+
}
3465+
3466+
ERL_NIF_TERM fd_term = enif_make_resource(env, fd_res);
3467+
enif_release_resource(fd_res); /* Term now holds the reference */
3468+
3469+
return enif_make_tuple2(env, ATOM_OK, fd_term);
3470+
}
3471+
3472+
/**
3473+
* fd_select_write(Fd) -> {ok, FdRef} | {error, Reason}
3474+
*
3475+
* Register FD for write selection. Caller receives {select, FdRef, Ref, ready_output}.
3476+
* Returns a resource reference that must be kept alive while monitoring.
3477+
*/
3478+
ERL_NIF_TERM nif_fd_select_write(ErlNifEnv *env, int argc,
3479+
const ERL_NIF_TERM argv[]) {
3480+
(void)argc;
3481+
3482+
int fd;
3483+
if (!enif_get_int(env, argv[0], &fd)) {
3484+
return make_error(env, "invalid_fd");
3485+
}
3486+
3487+
ErlNifPid caller_pid;
3488+
if (!enif_self(env, &caller_pid)) {
3489+
return make_error(env, "no_caller_pid");
3490+
}
3491+
3492+
/* Allocate fd resource */
3493+
fd_resource_t *fd_res = enif_alloc_resource(FD_RESOURCE_TYPE,
3494+
sizeof(fd_resource_t));
3495+
if (fd_res == NULL) {
3496+
return make_error(env, "alloc_failed");
3497+
}
3498+
3499+
fd_res->fd = fd;
3500+
fd_res->read_callback_id = 0;
3501+
fd_res->write_callback_id = 0;
3502+
fd_res->owner_pid = caller_pid;
3503+
fd_res->reader_active = false;
3504+
fd_res->writer_active = true;
3505+
fd_res->loop = NULL;
3506+
atomic_store(&fd_res->closing_state, FD_STATE_OPEN);
3507+
fd_res->monitor_active = false;
3508+
fd_res->owns_fd = false; /* Caller owns the fd */
3509+
3510+
int ret = enif_select(env, (ErlNifEvent)fd, ERL_NIF_SELECT_WRITE,
3511+
fd_res, &caller_pid, enif_make_ref(env));
3512+
if (ret < 0) {
3513+
enif_release_resource(fd_res);
3514+
return make_error(env, "select_failed");
3515+
}
3516+
3517+
ERL_NIF_TERM fd_term = enif_make_resource(env, fd_res);
3518+
enif_release_resource(fd_res); /* Term now holds the reference */
3519+
3520+
return enif_make_tuple2(env, ATOM_OK, fd_term);
3521+
}
3522+
3523+
/**
3524+
* socketpair() -> {ok, {Fd1, Fd2}} | {error, Reason}
3525+
*
3526+
* Create a Unix socketpair for bidirectional communication.
3527+
*/
3528+
ERL_NIF_TERM nif_socketpair(ErlNifEnv *env, int argc,
3529+
const ERL_NIF_TERM argv[]) {
3530+
(void)argc;
3531+
(void)argv;
3532+
3533+
int fds[2];
3534+
if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) {
3535+
return make_error(env, strerror(errno));
3536+
}
3537+
3538+
int flags1 = fcntl(fds[0], F_GETFL, 0);
3539+
int flags2 = fcntl(fds[1], F_GETFL, 0);
3540+
fcntl(fds[0], F_SETFL, flags1 | O_NONBLOCK);
3541+
fcntl(fds[1], F_SETFL, flags2 | O_NONBLOCK);
3542+
3543+
ERL_NIF_TERM fd_tuple = enif_make_tuple2(env,
3544+
enif_make_int(env, fds[0]),
3545+
enif_make_int(env, fds[1]));
3546+
3547+
return enif_make_tuple2(env, ATOM_OK, fd_tuple);
3548+
}
3549+
3550+
/**
3551+
* fd_close(Fd) -> ok | {error, Reason}
3552+
*
3553+
* Close a raw file descriptor (integer).
3554+
*/
3555+
ERL_NIF_TERM nif_fd_close(ErlNifEnv *env, int argc,
3556+
const ERL_NIF_TERM argv[]) {
3557+
(void)argc;
3558+
3559+
int fd;
3560+
if (!enif_get_int(env, argv[0], &fd)) {
3561+
return make_error(env, "invalid_fd");
3562+
}
3563+
3564+
if (close(fd) < 0) {
3565+
return make_error(env, strerror(errno));
3566+
}
3567+
3568+
return ATOM_OK;
3569+
}
3570+
33403571
/* ============================================================================
33413572
* Python Module: py_event_loop
33423573
*

c_src/py_event_loop.h

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -867,4 +867,56 @@ ERL_NIF_TERM nif_reactor_init_connection(ErlNifEnv *env, int argc,
867867
ERL_NIF_TERM nif_reactor_close_fd(ErlNifEnv *env, int argc,
868868
const ERL_NIF_TERM argv[]);
869869

870+
/* ============================================================================
871+
* Direct FD Operations
872+
* ============================================================================ */
873+
874+
/**
875+
* @brief Read from file descriptor
876+
*
877+
* NIF: fd_read(Fd, Size) -> {ok, Data} | {error, Reason}
878+
*/
879+
ERL_NIF_TERM nif_fd_read(ErlNifEnv *env, int argc,
880+
const ERL_NIF_TERM argv[]);
881+
882+
/**
883+
* @brief Write to file descriptor
884+
*
885+
* NIF: fd_write(Fd, Data) -> {ok, Written} | {error, Reason}
886+
*/
887+
ERL_NIF_TERM nif_fd_write(ErlNifEnv *env, int argc,
888+
const ERL_NIF_TERM argv[]);
889+
890+
/**
891+
* @brief Register FD for read selection
892+
*
893+
* NIF: fd_select_read(Fd) -> ok | {error, Reason}
894+
*/
895+
ERL_NIF_TERM nif_fd_select_read(ErlNifEnv *env, int argc,
896+
const ERL_NIF_TERM argv[]);
897+
898+
/**
899+
* @brief Register FD for write selection
900+
*
901+
* NIF: fd_select_write(Fd) -> ok | {error, Reason}
902+
*/
903+
ERL_NIF_TERM nif_fd_select_write(ErlNifEnv *env, int argc,
904+
const ERL_NIF_TERM argv[]);
905+
906+
/**
907+
* @brief Create Unix socketpair
908+
*
909+
* NIF: socketpair() -> {ok, {Fd1, Fd2}} | {error, Reason}
910+
*/
911+
ERL_NIF_TERM nif_socketpair(ErlNifEnv *env, int argc,
912+
const ERL_NIF_TERM argv[]);
913+
914+
/**
915+
* @brief Close raw file descriptor (integer)
916+
*
917+
* NIF: fd_close(Fd) -> ok | {error, Reason}
918+
*/
919+
ERL_NIF_TERM nif_fd_close(ErlNifEnv *env, int argc,
920+
const ERL_NIF_TERM argv[]);
921+
870922
#endif /* PY_EVENT_LOOP_H */

c_src/py_nif.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3876,7 +3876,15 @@ static ErlNifFunc nif_funcs[] = {
38763876
{"reactor_on_read_ready", 2, nif_reactor_on_read_ready, ERL_NIF_DIRTY_JOB_CPU_BOUND},
38773877
{"reactor_on_write_ready", 2, nif_reactor_on_write_ready, ERL_NIF_DIRTY_JOB_CPU_BOUND},
38783878
{"reactor_init_connection", 3, nif_reactor_init_connection, ERL_NIF_DIRTY_JOB_CPU_BOUND},
3879-
{"reactor_close_fd", 2, nif_reactor_close_fd, 0}
3879+
{"reactor_close_fd", 2, nif_reactor_close_fd, 0},
3880+
3881+
/* Direct FD operations */
3882+
{"fd_read", 2, nif_fd_read, 0},
3883+
{"fd_write", 2, nif_fd_write, 0},
3884+
{"fd_select_read", 1, nif_fd_select_read, 0},
3885+
{"fd_select_write", 1, nif_fd_select_write, 0},
3886+
{"fd_close", 1, nif_fd_close, 0},
3887+
{"socketpair", 0, nif_socketpair, 0}
38803888
};
38813889

38823890
ERL_NIF_INIT(py_nif, nif_funcs, load, NULL, upgrade, unload)

src/py_nif.erl

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,14 @@
188188
reactor_on_read_ready/2,
189189
reactor_on_write_ready/2,
190190
reactor_init_connection/3,
191-
reactor_close_fd/2
191+
reactor_close_fd/2,
192+
%% Direct FD operations
193+
fd_read/2,
194+
fd_write/2,
195+
fd_select_read/1,
196+
fd_select_write/1,
197+
fd_close/1,
198+
socketpair/0
192199
]).
193200

194201
-on_load(load_nif/0).
@@ -1530,3 +1537,64 @@ reactor_init_connection(_ContextRef, _Fd, _ClientInfo) ->
15301537
-spec reactor_close_fd(reference(), reference()) -> ok | {error, term()}.
15311538
reactor_close_fd(_ContextRef, _FdRef) ->
15321539
?NIF_STUB.
1540+
1541+
%%% ============================================================================
1542+
%%% Direct FD Operations
1543+
%%% ============================================================================
1544+
1545+
%% @doc Read up to Size bytes from a file descriptor.
1546+
%%
1547+
%% @param Fd File descriptor
1548+
%% @param Size Maximum bytes to read
1549+
%% @returns {ok, Data} | {error, Reason}
1550+
-spec fd_read(integer(), non_neg_integer()) -> {ok, binary()} | {error, term()}.
1551+
fd_read(_Fd, _Size) ->
1552+
?NIF_STUB.
1553+
1554+
%% @doc Write data to a file descriptor.
1555+
%%
1556+
%% @param Fd File descriptor
1557+
%% @param Data Binary data to write
1558+
%% @returns {ok, Written} | {error, Reason}
1559+
-spec fd_write(integer(), binary()) -> {ok, non_neg_integer()} | {error, term()}.
1560+
fd_write(_Fd, _Data) ->
1561+
?NIF_STUB.
1562+
1563+
%% @doc Register FD for read selection.
1564+
%%
1565+
%% Caller will receive {select, FdRef, Ref, ready_input} when readable.
1566+
%% The returned FdRef must be kept alive while monitoring.
1567+
%%
1568+
%% @param Fd File descriptor
1569+
%% @returns {ok, FdRef} | {error, Reason}
1570+
-spec fd_select_read(integer()) -> {ok, reference()} | {error, term()}.
1571+
fd_select_read(_Fd) ->
1572+
?NIF_STUB.
1573+
1574+
%% @doc Register FD for write selection.
1575+
%%
1576+
%% Caller will receive {select, FdRef, Ref, ready_output} when writable.
1577+
%% The returned FdRef must be kept alive while monitoring.
1578+
%%
1579+
%% @param Fd File descriptor
1580+
%% @returns {ok, FdRef} | {error, Reason}
1581+
-spec fd_select_write(integer()) -> {ok, reference()} | {error, term()}.
1582+
fd_select_write(_Fd) ->
1583+
?NIF_STUB.
1584+
1585+
%% @doc Close a raw file descriptor.
1586+
%%
1587+
%% @param Fd File descriptor to close
1588+
%% @returns ok | {error, Reason}
1589+
-spec fd_close(integer()) -> ok | {error, term()}.
1590+
fd_close(_Fd) ->
1591+
?NIF_STUB.
1592+
1593+
%% @doc Create a Unix socketpair for bidirectional communication.
1594+
%%
1595+
%% Both FDs are set to non-blocking mode.
1596+
%%
1597+
%% @returns {ok, {Fd1, Fd2}} | {error, Reason}
1598+
-spec socketpair() -> {ok, {integer(), integer()}} | {error, term()}.
1599+
socketpair() ->
1600+
?NIF_STUB.

0 commit comments

Comments
 (0)