@@ -3337,6 +3337,237 @@ ERL_NIF_TERM nif_reactor_close_fd(ErlNifEnv *env, int argc,
33373337 return ATOM_OK ;
33383338}
33393339
3340+ /* ============================================================================
3341+ * Direct FD Operations
3342+ *
3343+ * These functions provide direct FD read/write for proxy/bridge use cases.
3344+ * ============================================================================ */
3345+
3346+ /**
3347+ * fd_read(Fd, Size) -> {ok, Data} | {error, Reason}
3348+ *
3349+ * Read up to Size bytes from a file descriptor.
3350+ */
3351+ ERL_NIF_TERM nif_fd_read (ErlNifEnv * env , int argc ,
3352+ const ERL_NIF_TERM argv []) {
3353+ (void )argc ;
3354+
3355+ int fd ;
3356+ if (!enif_get_int (env , argv [0 ], & fd )) {
3357+ return make_error (env , "invalid_fd" );
3358+ }
3359+
3360+ unsigned long size ;
3361+ if (!enif_get_ulong (env , argv [1 ], & size )) {
3362+ return make_error (env , "invalid_size" );
3363+ }
3364+
3365+ if (size > 1024 * 1024 ) {
3366+ size = 1024 * 1024 ;
3367+ }
3368+
3369+ ErlNifBinary bin ;
3370+ if (!enif_alloc_binary (size , & bin )) {
3371+ return make_error (env , "alloc_failed" );
3372+ }
3373+
3374+ ssize_t n = read (fd , bin .data , bin .size );
3375+ if (n < 0 ) {
3376+ enif_release_binary (& bin );
3377+ if (errno == EAGAIN || errno == EWOULDBLOCK ) {
3378+ return make_error (env , "eagain" );
3379+ }
3380+ return make_error (env , strerror (errno ));
3381+ }
3382+
3383+ if ((size_t )n < bin .size ) {
3384+ enif_realloc_binary (& bin , n );
3385+ }
3386+
3387+ ERL_NIF_TERM data = enif_make_binary (env , & bin );
3388+ return enif_make_tuple2 (env , ATOM_OK , data );
3389+ }
3390+
3391+ /**
3392+ * fd_write(Fd, Data) -> {ok, Written} | {error, Reason}
3393+ *
3394+ * Write data to a file descriptor.
3395+ */
3396+ ERL_NIF_TERM nif_fd_write (ErlNifEnv * env , int argc ,
3397+ const ERL_NIF_TERM argv []) {
3398+ (void )argc ;
3399+
3400+ int fd ;
3401+ if (!enif_get_int (env , argv [0 ], & fd )) {
3402+ return make_error (env , "invalid_fd" );
3403+ }
3404+
3405+ ErlNifBinary bin ;
3406+ if (!enif_inspect_binary (env , argv [1 ], & bin )) {
3407+ return make_error (env , "invalid_data" );
3408+ }
3409+
3410+ ssize_t n = write (fd , bin .data , bin .size );
3411+ if (n < 0 ) {
3412+ if (errno == EAGAIN || errno == EWOULDBLOCK ) {
3413+ return make_error (env , "eagain" );
3414+ }
3415+ return make_error (env , strerror (errno ));
3416+ }
3417+
3418+ return enif_make_tuple2 (env , ATOM_OK , enif_make_long (env , n ));
3419+ }
3420+
3421+ /**
3422+ * fd_select_read(Fd) -> {ok, FdRef} | {error, Reason}
3423+ *
3424+ * Register FD for read selection. Caller receives {select, FdRef, Ref, ready_input}.
3425+ * Returns a resource reference that must be kept alive while monitoring.
3426+ */
3427+ ERL_NIF_TERM nif_fd_select_read (ErlNifEnv * env , int argc ,
3428+ const ERL_NIF_TERM argv []) {
3429+ (void )argc ;
3430+
3431+ int fd ;
3432+ if (!enif_get_int (env , argv [0 ], & fd )) {
3433+ return make_error (env , "invalid_fd" );
3434+ }
3435+
3436+ ErlNifPid caller_pid ;
3437+ if (!enif_self (env , & caller_pid )) {
3438+ return make_error (env , "no_caller_pid" );
3439+ }
3440+
3441+ /* Allocate fd resource */
3442+ fd_resource_t * fd_res = enif_alloc_resource (FD_RESOURCE_TYPE ,
3443+ sizeof (fd_resource_t ));
3444+ if (fd_res == NULL ) {
3445+ return make_error (env , "alloc_failed" );
3446+ }
3447+
3448+ fd_res -> fd = fd ;
3449+ fd_res -> read_callback_id = 0 ;
3450+ fd_res -> write_callback_id = 0 ;
3451+ fd_res -> owner_pid = caller_pid ;
3452+ fd_res -> reader_active = true;
3453+ fd_res -> writer_active = false;
3454+ fd_res -> loop = NULL ;
3455+ atomic_store (& fd_res -> closing_state , FD_STATE_OPEN );
3456+ fd_res -> monitor_active = false;
3457+ fd_res -> owns_fd = false; /* Caller owns the fd */
3458+
3459+ int ret = enif_select (env , (ErlNifEvent )fd , ERL_NIF_SELECT_READ ,
3460+ fd_res , & caller_pid , enif_make_ref (env ));
3461+ if (ret < 0 ) {
3462+ enif_release_resource (fd_res );
3463+ return make_error (env , "select_failed" );
3464+ }
3465+
3466+ ERL_NIF_TERM fd_term = enif_make_resource (env , fd_res );
3467+ enif_release_resource (fd_res ); /* Term now holds the reference */
3468+
3469+ return enif_make_tuple2 (env , ATOM_OK , fd_term );
3470+ }
3471+
3472+ /**
3473+ * fd_select_write(Fd) -> {ok, FdRef} | {error, Reason}
3474+ *
3475+ * Register FD for write selection. Caller receives {select, FdRef, Ref, ready_output}.
3476+ * Returns a resource reference that must be kept alive while monitoring.
3477+ */
3478+ ERL_NIF_TERM nif_fd_select_write (ErlNifEnv * env , int argc ,
3479+ const ERL_NIF_TERM argv []) {
3480+ (void )argc ;
3481+
3482+ int fd ;
3483+ if (!enif_get_int (env , argv [0 ], & fd )) {
3484+ return make_error (env , "invalid_fd" );
3485+ }
3486+
3487+ ErlNifPid caller_pid ;
3488+ if (!enif_self (env , & caller_pid )) {
3489+ return make_error (env , "no_caller_pid" );
3490+ }
3491+
3492+ /* Allocate fd resource */
3493+ fd_resource_t * fd_res = enif_alloc_resource (FD_RESOURCE_TYPE ,
3494+ sizeof (fd_resource_t ));
3495+ if (fd_res == NULL ) {
3496+ return make_error (env , "alloc_failed" );
3497+ }
3498+
3499+ fd_res -> fd = fd ;
3500+ fd_res -> read_callback_id = 0 ;
3501+ fd_res -> write_callback_id = 0 ;
3502+ fd_res -> owner_pid = caller_pid ;
3503+ fd_res -> reader_active = false;
3504+ fd_res -> writer_active = true;
3505+ fd_res -> loop = NULL ;
3506+ atomic_store (& fd_res -> closing_state , FD_STATE_OPEN );
3507+ fd_res -> monitor_active = false;
3508+ fd_res -> owns_fd = false; /* Caller owns the fd */
3509+
3510+ int ret = enif_select (env , (ErlNifEvent )fd , ERL_NIF_SELECT_WRITE ,
3511+ fd_res , & caller_pid , enif_make_ref (env ));
3512+ if (ret < 0 ) {
3513+ enif_release_resource (fd_res );
3514+ return make_error (env , "select_failed" );
3515+ }
3516+
3517+ ERL_NIF_TERM fd_term = enif_make_resource (env , fd_res );
3518+ enif_release_resource (fd_res ); /* Term now holds the reference */
3519+
3520+ return enif_make_tuple2 (env , ATOM_OK , fd_term );
3521+ }
3522+
3523+ /**
3524+ * socketpair() -> {ok, {Fd1, Fd2}} | {error, Reason}
3525+ *
3526+ * Create a Unix socketpair for bidirectional communication.
3527+ */
3528+ ERL_NIF_TERM nif_socketpair (ErlNifEnv * env , int argc ,
3529+ const ERL_NIF_TERM argv []) {
3530+ (void )argc ;
3531+ (void )argv ;
3532+
3533+ int fds [2 ];
3534+ if (socketpair (AF_UNIX , SOCK_STREAM , 0 , fds ) < 0 ) {
3535+ return make_error (env , strerror (errno ));
3536+ }
3537+
3538+ int flags1 = fcntl (fds [0 ], F_GETFL , 0 );
3539+ int flags2 = fcntl (fds [1 ], F_GETFL , 0 );
3540+ fcntl (fds [0 ], F_SETFL , flags1 | O_NONBLOCK );
3541+ fcntl (fds [1 ], F_SETFL , flags2 | O_NONBLOCK );
3542+
3543+ ERL_NIF_TERM fd_tuple = enif_make_tuple2 (env ,
3544+ enif_make_int (env , fds [0 ]),
3545+ enif_make_int (env , fds [1 ]));
3546+
3547+ return enif_make_tuple2 (env , ATOM_OK , fd_tuple );
3548+ }
3549+
3550+ /**
3551+ * fd_close(Fd) -> ok | {error, Reason}
3552+ *
3553+ * Close a raw file descriptor (integer).
3554+ */
3555+ ERL_NIF_TERM nif_fd_close (ErlNifEnv * env , int argc ,
3556+ const ERL_NIF_TERM argv []) {
3557+ (void )argc ;
3558+
3559+ int fd ;
3560+ if (!enif_get_int (env , argv [0 ], & fd )) {
3561+ return make_error (env , "invalid_fd" );
3562+ }
3563+
3564+ if (close (fd ) < 0 ) {
3565+ return make_error (env , strerror (errno ));
3566+ }
3567+
3568+ return ATOM_OK ;
3569+ }
3570+
33403571/* ============================================================================
33413572 * Python Module: py_event_loop
33423573 *
0 commit comments