diff --git a/ext/msgpack/buffer.c b/ext/msgpack/buffer.c index f6a037ff..bcb33d90 100644 --- a/ext/msgpack/buffer.c +++ b/ext/msgpack/buffer.c @@ -25,8 +25,6 @@ int msgpack_rb_encindex_ascii8bit; ID s_uminus; -static msgpack_rmem_t s_rmem; - void msgpack_buffer_static_init(void) { s_uminus = rb_intern("-@"); @@ -34,13 +32,10 @@ void msgpack_buffer_static_init(void) msgpack_rb_encindex_utf8 = rb_utf8_encindex(); msgpack_rb_encindex_usascii = rb_usascii_encindex(); msgpack_rb_encindex_ascii8bit = rb_ascii8bit_encindex(); - - msgpack_rmem_init(&s_rmem); } void msgpack_buffer_static_destroy(void) { - msgpack_rmem_destroy(&s_rmem); } void msgpack_buffer_init(msgpack_buffer_t* b) @@ -59,9 +54,7 @@ static void _msgpack_buffer_chunk_destroy(msgpack_buffer_chunk_t* c) { if(c->mem != NULL) { if(c->rmem) { - if(!msgpack_rmem_free(&s_rmem, c->mem)) { - rb_bug("Failed to free an rmem pointer, memory leak?"); - } + msgpack_rmem_free(c->mem); } else { xfree(c->mem); } @@ -354,7 +347,7 @@ static inline void* _msgpack_buffer_chunk_malloc( if((size_t)(b->rmem_end - b->rmem_last) < required_size) { /* alloc new rmem page */ *allocated_size = MSGPACK_RMEM_PAGE_SIZE; - char* buffer = msgpack_rmem_alloc(&s_rmem); + char* buffer = msgpack_rmem_alloc(); c->mem = buffer; /* update rmem owner */ diff --git a/ext/msgpack/extconf.rb b/ext/msgpack/extconf.rb index b1b921c8..237a174f 100644 --- a/ext/msgpack/extconf.rb +++ b/ext/msgpack/extconf.rb @@ -1,6 +1,7 @@ require 'mkmf' have_func("rb_enc_interned_str", "ruby.h") # Ruby 3.0+ +have_func("rb_ext_ractor_safe", "ruby.h") # Ruby 3.0+ have_func("rb_hash_new_capa", "ruby.h") # Ruby 3.2+ have_func("rb_proc_call_with_block", "ruby.h") # CRuby (TruffleRuby doesn't have it) have_func("rb_gc_mark_locations", "ruby.h") # Missing on TruffleRuby diff --git a/ext/msgpack/rbinit.c b/ext/msgpack/rbinit.c index 104cc19e..3e35f69d 100644 --- a/ext/msgpack/rbinit.c +++ b/ext/msgpack/rbinit.c @@ -24,6 +24,12 @@ RUBY_FUNC_EXPORTED void Init_msgpack(void) { + /* No process-global mutable state, so packing/unpacking is safe per-Ractor. + * rb_ext_ractor_safe is Ruby 3.0+, so guard for older supported Rubies. */ +#ifdef HAVE_RB_EXT_RACTOR_SAFE + rb_ext_ractor_safe(true); +#endif + VALUE mMessagePack = rb_define_module("MessagePack"); MessagePack_Buffer_module_init(mMessagePack); diff --git a/ext/msgpack/rmem.c b/ext/msgpack/rmem.c deleted file mode 100644 index a6206f9a..00000000 --- a/ext/msgpack/rmem.c +++ /dev/null @@ -1,93 +0,0 @@ -/* - * MessagePack for Ruby - * - * Copyright (C) 2008-2013 Sadayuki Furuhashi - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "rmem.h" - -void msgpack_rmem_init(msgpack_rmem_t* pm) -{ - memset(pm, 0, sizeof(msgpack_rmem_t)); - pm->head.pages = xmalloc(MSGPACK_RMEM_PAGE_SIZE * 32); - pm->head.mask = 0xffffffff; /* all bit is 1 = available */ -} - -void msgpack_rmem_destroy(msgpack_rmem_t* pm) -{ - msgpack_rmem_chunk_t* c = pm->array_first; - msgpack_rmem_chunk_t* cend = pm->array_last; - for(; c != cend; c++) { - xfree(c->pages); - } - xfree(pm->head.pages); - xfree(pm->array_first); -} - -void* _msgpack_rmem_alloc2(msgpack_rmem_t* pm) -{ - msgpack_rmem_chunk_t* c = pm->array_first; - msgpack_rmem_chunk_t* last = pm->array_last; - for(; c != last; c++) { - if(_msgpack_rmem_chunk_available(c)) { - void* mem = _msgpack_rmem_chunk_alloc(c); - - /* move to head */ - msgpack_rmem_chunk_t tmp = pm->head; - pm->head = *c; - *c = tmp; - return mem; - } - } - - if(c == pm->array_end) { - size_t capacity = c - pm->array_first; - size_t length = last - pm->array_first; - capacity = (capacity == 0) ? 8 : capacity * 2; - msgpack_rmem_chunk_t* array = xrealloc(pm->array_first, capacity * sizeof(msgpack_rmem_chunk_t)); - pm->array_first = array; - pm->array_last = array + length; - pm->array_end = array + capacity; - } - - /* allocate new chunk */ - c = pm->array_last++; - - /* move head to array */ - *c = pm->head; - - pm->head.pages = NULL; /* make sure we don't point to another chunk's pages in case xmalloc triggers GC */ - pm->head.mask = 0xffffffff & (~1); /* "& (~1)" means first chunk is already allocated */ - pm->head.pages = xmalloc(MSGPACK_RMEM_PAGE_SIZE * 32); - - return pm->head.pages; -} - -void _msgpack_rmem_chunk_free(msgpack_rmem_t* pm, msgpack_rmem_chunk_t* c) -{ - if(pm->array_first->mask == 0xffffffff) { - /* free and move to last */ - pm->array_last--; - xfree(c->pages); - *c = *pm->array_last; - return; - } - - /* move to first */ - msgpack_rmem_chunk_t tmp = *pm->array_first; - *pm->array_first = *c; - *c = tmp; -} - diff --git a/ext/msgpack/rmem.h b/ext/msgpack/rmem.h index a22b8f06..6712bb28 100644 --- a/ext/msgpack/rmem.h +++ b/ext/msgpack/rmem.h @@ -19,91 +19,22 @@ #define MSGPACK_RUBY_RMEM_H__ #include "compat.h" -#include "sysdep.h" #ifndef MSGPACK_RMEM_PAGE_SIZE #define MSGPACK_RMEM_PAGE_SIZE (4*1024) #endif -struct msgpack_rmem_t; -typedef struct msgpack_rmem_t msgpack_rmem_t; - -struct msgpack_rmem_chunk_t; -typedef struct msgpack_rmem_chunk_t msgpack_rmem_chunk_t; - -/* - * a chunk contains 32 pages. - * size of each buffer is MSGPACK_RMEM_PAGE_SIZE bytes. - */ -struct msgpack_rmem_chunk_t { - unsigned int mask; - char* pages; -}; - -struct msgpack_rmem_t { - msgpack_rmem_chunk_t head; - msgpack_rmem_chunk_t* array_first; - msgpack_rmem_chunk_t* array_last; - msgpack_rmem_chunk_t* array_end; -}; - -/* assert MSGPACK_RMEM_PAGE_SIZE % sysconf(_SC_PAGE_SIZE) == 0 */ -void msgpack_rmem_init(msgpack_rmem_t* pm); - -void msgpack_rmem_destroy(msgpack_rmem_t* pm); - -void* _msgpack_rmem_alloc2(msgpack_rmem_t* pm); - -#define _msgpack_rmem_chunk_available(c) ((c)->mask != 0) - -static inline void* _msgpack_rmem_chunk_alloc(msgpack_rmem_chunk_t* c) +/* Fixed-size scratch pages for the buffer and unpacker stack, served straight + * from xmalloc/xfree so they can be allocated and freed from any Ractor. */ +static inline void* msgpack_rmem_alloc(void) { - _msgpack_bsp32(pos, c->mask); - (c)->mask &= ~(1 << pos); - return ((char*)(c)->pages) + (pos * (MSGPACK_RMEM_PAGE_SIZE)); + return xmalloc(MSGPACK_RMEM_PAGE_SIZE); } -static inline bool _msgpack_rmem_chunk_try_free(msgpack_rmem_chunk_t* c, void* mem) +static inline void msgpack_rmem_free(void* mem) { - ptrdiff_t pdiff = ((char*)(mem)) - ((char*)(c)->pages); - if(0 <= pdiff && pdiff < MSGPACK_RMEM_PAGE_SIZE * 32) { - size_t pos = pdiff / MSGPACK_RMEM_PAGE_SIZE; - (c)->mask |= (1 << pos); - return true; - } - return false; + xfree(mem); } -static inline void* msgpack_rmem_alloc(msgpack_rmem_t* pm) -{ - if(_msgpack_rmem_chunk_available(&pm->head)) { - return _msgpack_rmem_chunk_alloc(&pm->head); - } - return _msgpack_rmem_alloc2(pm); -} - -void _msgpack_rmem_chunk_free(msgpack_rmem_t* pm, msgpack_rmem_chunk_t* c); - -static inline bool msgpack_rmem_free(msgpack_rmem_t* pm, void* mem) -{ - if(_msgpack_rmem_chunk_try_free(&pm->head, mem)) { - return true; - } - - /* search from last */ - msgpack_rmem_chunk_t* c = pm->array_last - 1; - msgpack_rmem_chunk_t* before_first = pm->array_first - 1; - for(; c != before_first; c--) { - if(_msgpack_rmem_chunk_try_free(c, mem)) { - if(c != pm->array_first && c->mask == 0xffffffff) { - _msgpack_rmem_chunk_free(pm, c); - } - return true; - } - } - return false; -} - - #endif diff --git a/ext/msgpack/unpacker.c b/ext/msgpack/unpacker.c index ad4d6630..38540951 100644 --- a/ext/msgpack/unpacker.c +++ b/ext/msgpack/unpacker.c @@ -64,8 +64,6 @@ static int RAW_TYPE_STRING = 256; static int RAW_TYPE_BINARY = 257; static int16_t INITIAL_BUFFER_CAPACITY_MAX = SHRT_MAX; -static msgpack_rmem_t s_stack_rmem; - #if !defined(HAVE_RB_HASH_NEW_CAPA) static inline VALUE rb_hash_new_capa_inline(long capa) { @@ -82,13 +80,10 @@ static inline int16_t initial_buffer_size(long size) void msgpack_unpacker_static_init(void) { assert(sizeof(msgpack_unpacker_stack_entry_t) * MSGPACK_UNPACKER_STACK_CAPACITY <= MSGPACK_RMEM_PAGE_SIZE); - - msgpack_rmem_init(&s_stack_rmem); } void msgpack_unpacker_static_destroy(void) { - msgpack_rmem_destroy(&s_stack_rmem); } #define HEAD_BYTE_REQUIRED 0xc1 @@ -96,7 +91,7 @@ void msgpack_unpacker_static_destroy(void) static inline bool _msgpack_unpacker_stack_init(msgpack_unpacker_stack_t *stack) { if (!stack->data) { stack->capacity = MSGPACK_UNPACKER_STACK_CAPACITY; - stack->data = msgpack_rmem_alloc(&s_stack_rmem); + stack->data = msgpack_rmem_alloc(); stack->depth = 0; return true; } @@ -105,9 +100,7 @@ static inline bool _msgpack_unpacker_stack_init(msgpack_unpacker_stack_t *stack) static inline void _msgpack_unpacker_free_stack(msgpack_unpacker_stack_t* stack) { if (stack->data) { - if (!msgpack_rmem_free(&s_stack_rmem, stack->data)) { - rb_bug("Failed to free an rmem pointer, memory leak?"); - } + msgpack_rmem_free(stack->data); stack->data = NULL; stack->depth = 0; } diff --git a/spec/cruby/ractor_spec.rb b/spec/cruby/ractor_spec.rb new file mode 100644 index 00000000..1ac9d298 --- /dev/null +++ b/spec/cruby/ractor_spec.rb @@ -0,0 +1,61 @@ +require 'spec_helper' + +ractor_supported = defined?(Ractor) && RUBY_ENGINE == 'ruby' + +describe 'Ractor safety', skip: (ractor_supported ? false : 'Ractor not supported on this Ruby') do + def ractor_value(ractor) + # Ractor#value replaced #take in newer rubies; support both. + ractor.respond_to?(:value) ? ractor.value : ractor.take + end + + # Ruby prints a one-time "Ractor API is experimental" warning to stderr. Quiet + # it for this group, and restore afterwards so we don't suppress the warning + # for unrelated specs sharing the process. + before(:all) do + @experimental_warning = Warning[:experimental] + Warning[:experimental] = false + end + + after(:all) do + Warning[:experimental] = @experimental_warning + end + + it 'round-trips via a Factory inside a non-main Ractor' do + result = ractor_value(Ractor.new do + factory = MessagePack::Factory.new + factory.load(factory.dump([1, "two", 3.0, nil, true, {"k" => "v"}])) + end) + expect(result).to eq([1, "two", 3.0, nil, true, {"k" => "v"}]) + end + + it 'round-trips via a Packer and Unpacker inside a non-main Ractor' do + result = ractor_value(Ractor.new do + packed = MessagePack::Packer.new.write({"x" => [1, 2, 3]}).to_s + MessagePack::Unpacker.new.feed(packed).read + end) + expect(result).to eq({"x" => [1, 2, 3]}) + end + + it 'packs and unpacks concurrently across many Ractors without corruption' do + ractor_count = 8 + + ractors = ractor_count.times.map do |n| + Ractor.new(n) do |seed| + obj = { + "seed" => seed, + "nums" => (0..20).to_a, + "str" => "x" * 100, + "nested" => {"deep" => [seed] * 10}, + } + ok = true + 2_000.times do + packed = MessagePack::Packer.new.write(obj).to_s + ok &&= MessagePack::Unpacker.new.feed(packed).read == obj + end + ok + end + end + + expect(ractors.map { |r| ractor_value(r) }).to all(be true) + end +end