2 #include "ccan/list/list.h"
4 static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
5 static VALUE rb_eClosedQueueError;
11 struct list_head waitq;
19 struct list_node node;
22 #define MUTEX_ALLOW_TRAP FL_USER1
25 sync_wakeup(
struct list_head *head,
long max)
29 list_for_each_safe(head, cur, next, node) {
30 list_del_init(&cur->node);
32 if (cur->th->status != THREAD_KILLED) {
34 if (cur->th->scheduler !=
Qnil && rb_fiberptr_blocking(cur->fiber) == 0) {
38 rb_threadptr_interrupt(cur->th);
39 cur->th->status = THREAD_RUNNABLE;
42 if (--max == 0)
return;
48 wakeup_one(
struct list_head *head)
54 wakeup_all(
struct list_head *head)
56 sync_wakeup(head, LONG_MAX);
59 #if defined(HAVE_WORKING_FORK)
60 static void rb_mutex_abandon_all(
rb_mutex_t *mutexes);
61 static void rb_mutex_abandon_keeping_mutexes(
rb_thread_t *th);
62 static void rb_mutex_abandon_locking_mutex(
rb_thread_t *th);
90 #define mutex_mark ((void(*)(void*))0)
98 list_for_each(&mutex->waitq, w, node) {
108 mutex_free(
void *ptr)
113 const char *err = rb_mutex_unlock_th(mutex, rb_fiber_threadptr(mutex->fiber), mutex->fiber);
114 if (err)
rb_bug(
"%s", err);
120 mutex_memsize(
const void *ptr)
127 {mutex_mark, mutex_free, mutex_memsize,},
128 0, 0, RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_FREE_IMMEDIATELY
142 rb_obj_is_mutex(
VALUE obj)
148 mutex_alloc(
VALUE klass)
155 list_head_init(&mutex->waitq);
166 mutex_initialize(
VALUE self)
174 return mutex_alloc(rb_cMutex);
188 return RBOOL(mutex->fiber);
194 if (thread->keeping_mutexes) {
195 mutex->next_mutex = thread->keeping_mutexes;
198 thread->keeping_mutexes = mutex;
204 rb_mutex_t **keeping_mutexes = &thread->keeping_mutexes;
206 while (*keeping_mutexes && *keeping_mutexes != mutex) {
208 keeping_mutexes = &(*keeping_mutexes)->next_mutex;
211 if (*keeping_mutexes) {
212 *keeping_mutexes = mutex->next_mutex;
213 mutex->next_mutex = NULL;
222 thread_mutex_insert(th, mutex);
237 if (mutex->fiber == 0) {
240 mutex->fiber = fiber;
242 mutex_locked(th,
self);
259 return RBOOL(mutex->fiber == fiber);
263 call_rb_fiber_scheduler_block(
VALUE mutex)
269 delete_from_waitq(
VALUE value)
278 do_mutex_lock(
VALUE self,
int interruptible_p)
287 th->ec->interrupt_mask & TRAP_INTERRUPT_MASK) {
292 if (mutex->fiber == fiber) {
296 while (mutex->fiber != fiber) {
298 if (scheduler !=
Qnil) {
310 mutex->fiber = fiber;
314 if (!th->vm->thread_ignore_deadlock && rb_fiber_threadptr(mutex->fiber) == th) {
318 enum rb_thread_status prev_status = th->status;
319 rb_hrtime_t *timeout = 0;
320 rb_hrtime_t rel = rb_msec2hrtime(100);
322 th->status = THREAD_STOPPED_FOREVER;
323 th->locking_mutex =
self;
324 rb_ractor_sleeper_threads_inc(th->ractor);
330 if ((rb_ractor_living_thread_num(th->ractor) == rb_ractor_sleeper_thread_num(th->ractor)) &&
344 native_sleep(th, timeout);
349 mutex->fiber = fiber;
352 if (patrol_thread == th)
353 patrol_thread = NULL;
355 th->locking_mutex =
Qfalse;
356 if (mutex->fiber && timeout && !RUBY_VM_INTERRUPTED(th->ec)) {
357 rb_check_deadlock(th->ractor);
359 if (th->status == THREAD_STOPPED_FOREVER) {
360 th->status = prev_status;
362 rb_ractor_sleeper_threads_dec(th->ractor);
365 if (interruptible_p) {
368 if (mutex->fiber == fiber) mutex->fiber = 0;
369 RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
371 mutex->fiber = fiber;
376 if (mutex->fiber == fiber) mutex_locked(th,
self);
380 if (mutex_owned_p(fiber, mutex) ==
Qfalse)
rb_bug(
"do_mutex_lock: mutex is not owned.");
386 mutex_lock_uninterruptible(
VALUE self)
388 return do_mutex_lock(
self, 0);
401 return do_mutex_lock(
self, 1);
411 rb_mutex_owned_p(
VALUE self)
416 return mutex_owned_p(fiber, mutex);
422 const char *err = NULL;
424 if (mutex->fiber == 0) {
425 err =
"Attempt to unlock a mutex which is not locked";
427 else if (mutex->fiber != fiber) {
428 err =
"Attempt to unlock a mutex which is locked by another thread/fiber";
434 list_for_each_safe(&mutex->waitq, cur, next, node) {
435 list_del_init(&cur->node);
437 if (cur->th->scheduler !=
Qnil && rb_fiberptr_blocking(cur->fiber) == 0) {
442 switch (cur->th->status) {
443 case THREAD_RUNNABLE:
444 case THREAD_STOPPED_FOREVER:
445 rb_threadptr_interrupt(cur->th);
448 rb_bug(
"unexpected THREAD_STOPPED");
451 rb_bug(
"unexpected THREAD_KILLED");
458 thread_mutex_remove(th, mutex);
478 err = rb_mutex_unlock_th(mutex, th, GET_EC()->fiber_ptr);
484 #if defined(HAVE_WORKING_FORK)
488 rb_mutex_abandon_all(th->keeping_mutexes);
489 th->keeping_mutexes = NULL;
495 if (th->locking_mutex) {
496 rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
498 list_head_init(&mutex->waitq);
499 th->locking_mutex =
Qfalse;
510 mutexes = mutex->next_mutex;
512 mutex->next_mutex = 0;
513 list_head_init(&mutex->waitq);
519 rb_mutex_sleep_forever(
VALUE self)
521 rb_thread_sleep_deadly_allow_spurious_wakeup(
self);
526 rb_mutex_wait_for(
VALUE time)
528 rb_hrtime_t *rel = (rb_hrtime_t *)time;
530 return RBOOL(sleep_hrtime(GET_THREAD(), *rel, 0));
539 if (!
NIL_P(timeout)) {
544 time_t beg = time(0);
547 if (scheduler !=
Qnil) {
549 mutex_lock_uninterruptible(
self);
552 if (
NIL_P(timeout)) {
553 rb_ensure(rb_mutex_sleep_forever,
self, mutex_lock_uninterruptible,
self);
556 rb_hrtime_t rel = rb_timeval2hrtime(&t);
557 woken =
rb_ensure(rb_mutex_wait_for, (
VALUE)&rel, mutex_lock_uninterruptible,
self);
561 RUBY_VM_CHECK_INTS_BLOCKING(GET_EC());
562 if (!woken)
return Qnil;
563 time_t end = time(0) - beg;
564 return TIMET2NUM(end);
584 mutex_sleep(
int argc,
VALUE *argv,
VALUE self)
615 rb_mutex_synchronize_m(
VALUE self)
624 void rb_mutex_allow_trap(
VALUE self,
int val)
636 #define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq)
637 PACKED_STRUCT_UNALIGNED(
struct rb_queue {
638 struct list_head waitq;
639 rb_serial_t fork_gen;
644 #define szqueue_waitq(sq) UNALIGNED_MEMBER_PTR(sq, q.waitq)
645 #define szqueue_pushq(sq) UNALIGNED_MEMBER_PTR(sq, pushq)
648 int num_waiting_push;
649 struct list_head pushq;
654 queue_mark(
void *ptr)
663 queue_memsize(
const void *ptr)
671 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
675 queue_alloc(
VALUE klass)
681 list_head_init(queue_waitq(q));
686 queue_fork_check(
struct rb_queue *q)
688 rb_serial_t fork_gen = GET_VM()->fork_gen;
690 if (q->fork_gen == fork_gen) {
694 q->fork_gen = fork_gen;
695 list_head_init(queue_waitq(q));
711 #define QUEUE_CLOSED FL_USER5
714 szqueue_mark(
void *ptr)
722 szqueue_memsize(
const void *ptr)
730 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
734 szqueue_alloc(
VALUE klass)
738 &szqueue_data_type, sq);
739 list_head_init(szqueue_waitq(sq));
740 list_head_init(szqueue_pushq(sq));
745 szqueue_ptr(
VALUE obj)
750 if (queue_fork_check(&sq->q)) {
751 list_head_init(szqueue_pushq(sq));
752 sq->num_waiting_push = 0;
780 queue_closed_p(
VALUE self)
792 NORETURN(
static void raise_closed_queue_error(
VALUE self));
795 raise_closed_queue_error(
VALUE self)
797 rb_raise(rb_eClosedQueueError,
"queue closed");
803 assert(queue_length(
self, q) == 0);
868 rb_queue_initialize(
int argc,
VALUE *argv,
VALUE self)
871 struct rb_queue *q = queue_ptr(
self);
872 if ((argc =
rb_scan_args(argc, argv,
"01", &initial)) == 1) {
873 initial = rb_to_array(initial);
876 list_head_init(queue_waitq(q));
886 if (queue_closed_p(
self)) {
887 raise_closed_queue_error(
self);
890 wakeup_one(queue_waitq(q));
928 rb_queue_close(
VALUE self)
930 struct rb_queue *q = queue_ptr(
self);
932 if (!queue_closed_p(
self)) {
933 FL_SET(
self, QUEUE_CLOSED);
935 wakeup_all(queue_waitq(q));
949 rb_queue_closed_p(
VALUE self)
951 return RBOOL(queue_closed_p(
self));
967 return queue_do_push(
self, queue_ptr(
self), obj);
971 queue_sleep(
VALUE self)
973 rb_thread_sleep_deadly_allow_spurious_wakeup(
self);
986 queue_sleep_done(
VALUE p)
990 list_del(&qw->w.node);
991 qw->as.q->num_waiting--;
997 szqueue_sleep_done(
VALUE p)
1001 list_del(&qw->w.node);
1002 qw->as.sq->num_waiting_push--;
1008 queue_do_pop(
VALUE self,
struct rb_queue *q,
int should_block)
1010 check_array(
self, q->que);
1013 if (!should_block) {
1016 else if (queue_closed_p(
self)) {
1017 return queue_closed_result(
self, q);
1023 assert(queue_closed_p(
self) == 0);
1026 .w = {.self =
self, .th = ec->thread_ptr, .fiber = ec->fiber_ptr},
1030 struct list_head *waitq = queue_waitq(q);
1043 queue_pop_should_block(
int argc,
const VALUE *argv)
1045 int should_block = 1;
1048 should_block = !
RTEST(argv[0]);
1050 return should_block;
1068 rb_queue_pop(
int argc,
VALUE *argv,
VALUE self)
1070 int should_block = queue_pop_should_block(argc, argv);
1071 return queue_do_pop(
self, queue_ptr(
self), should_block);
1082 rb_queue_empty_p(
VALUE self)
1084 return RBOOL(queue_length(
self, queue_ptr(
self)) == 0);
1094 rb_queue_clear(
VALUE self)
1096 struct rb_queue *q = queue_ptr(
self);
1112 rb_queue_length(
VALUE self)
1114 return LONG2NUM(queue_length(
self, queue_ptr(
self)));
1124 rb_queue_num_waiting(
VALUE self)
1126 struct rb_queue *q = queue_ptr(
self);
1128 return INT2NUM(q->num_waiting);
1148 rb_szqueue_initialize(
VALUE self,
VALUE vmax)
1159 list_head_init(szqueue_waitq(sq));
1160 list_head_init(szqueue_pushq(sq));
1179 rb_szqueue_close(
VALUE self)
1181 if (!queue_closed_p(
self)) {
1184 FL_SET(
self, QUEUE_CLOSED);
1185 wakeup_all(szqueue_waitq(sq));
1186 wakeup_all(szqueue_pushq(sq));
1198 rb_szqueue_max_get(
VALUE self)
1200 return LONG2NUM(szqueue_ptr(
self)->max);
1220 if (max > sq->max) {
1221 diff = max - sq->max;
1224 sync_wakeup(szqueue_pushq(sq), diff);
1229 szqueue_push_should_block(
int argc,
const VALUE *argv)
1231 int should_block = 1;
1234 should_block = !
RTEST(argv[1]);
1236 return should_block;
1254 rb_szqueue_push(
int argc,
VALUE *argv,
VALUE self)
1257 int should_block = szqueue_push_should_block(argc, argv);
1259 while (queue_length(
self, &sq->q) >= sq->max) {
1260 if (!should_block) {
1263 else if (queue_closed_p(
self)) {
1269 .w = {.self =
self, .th = ec->thread_ptr, .fiber = ec->fiber_ptr},
1273 struct list_head *pushq = szqueue_pushq(sq);
1276 sq->num_waiting_push++;
1282 if (queue_closed_p(
self)) {
1283 raise_closed_queue_error(
self);
1286 return queue_do_push(
self, &sq->q, argv[0]);
1290 szqueue_do_pop(
VALUE self,
int should_block)
1293 VALUE retval = queue_do_pop(
self, &sq->q, should_block);
1295 if (queue_length(
self, &sq->q) < sq->max) {
1296 wakeup_one(szqueue_pushq(sq));
1317 rb_szqueue_pop(
int argc,
VALUE *argv,
VALUE self)
1319 int should_block = queue_pop_should_block(argc, argv);
1320 return szqueue_do_pop(
self, should_block);
1330 rb_szqueue_clear(
VALUE self)
1335 wakeup_all(szqueue_pushq(sq));
1349 rb_szqueue_length(
VALUE self)
1353 return LONG2NUM(queue_length(
self, &sq->q));
1363 rb_szqueue_num_waiting(
VALUE self)
1367 return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
1378 rb_szqueue_empty_p(
VALUE self)
1382 return RBOOL(queue_length(
self, &sq->q) == 0);
1388 struct list_head waitq;
1389 rb_serial_t fork_gen;
1421 condvar_memsize(
const void *ptr)
1429 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
1433 condvar_ptr(
VALUE self)
1436 rb_serial_t fork_gen = GET_VM()->fork_gen;
1441 if (cv->fork_gen != fork_gen) {
1442 cv->fork_gen = fork_gen;
1443 list_head_init(&cv->waitq);
1450 condvar_alloc(
VALUE klass)
1456 list_head_init(&cv->waitq);
1468 rb_condvar_initialize(
VALUE self)
1471 list_head_init(&cv->waitq);
1483 do_sleep(
VALUE args)
1486 return rb_funcallv(p->mutex, id_sleep, 1, &p->timeout);
1502 rb_condvar_wait(
int argc,
VALUE *argv,
VALUE self)
1509 rb_scan_args(argc, argv,
"11", &args.mutex, &args.timeout);
1513 .th = ec->thread_ptr,
1514 .fiber = ec->fiber_ptr
1528 rb_condvar_signal(
VALUE self)
1531 wakeup_one(&cv->waitq);
1542 rb_condvar_broadcast(
VALUE self)
1545 wakeup_all(&cv->waitq);
1549 NORETURN(
static VALUE undumpable(
VALUE obj));
1552 undumpable(
VALUE obj)
1559 define_thread_class(
VALUE outer,
const ID name,
VALUE super)
1567 Init_thread_sync(
void)
1570 #if defined(TEACH_RDOC) && TEACH_RDOC == 42
1577 #define DEFINE_CLASS(name, super) \
1578 rb_c##name = define_thread_class(rb_cThread, rb_intern(#name), rb_c##super)
1581 DEFINE_CLASS(Mutex, Object);
1593 DEFINE_CLASS(Queue, Object);
1616 DEFINE_CLASS(SizedQueue, Queue);
1628 rb_define_method(rb_cSizedQueue,
"num_waiting", rb_szqueue_num_waiting, 0);
1637 DEFINE_CLASS(ConditionVariable, Object);
1642 rb_define_method(rb_cConditionVariable,
"initialize", rb_condvar_initialize, 0);
1647 rb_define_method(rb_cConditionVariable,
"broadcast", rb_condvar_broadcast, 0);
VALUE rb_define_class(const char *name, VALUE super)
Defines a top-level class.
VALUE rb_define_class_under(VALUE outer, const char *name, VALUE super)
Defines a class under the namespace of outer.
VALUE rb_define_class_id_under(VALUE outer, ID id, VALUE super)
Identical to rb_define_class_under(), except it takes the name in ID instead of C's string.
void rb_define_alias(VALUE klass, const char *name1, const char *name2)
Defines an alias of a method.
void rb_undef_method(VALUE klass, const char *name)
Defines an undef of a method.
int rb_scan_args(int argc, const VALUE *argv, const char *fmt,...)
Retrieves argument from argc and argv to given VALUE references according to the format string.
void rb_define_method(VALUE klass, const char *name, VALUE(*func)(ANYARGS), int argc)
Defines a method.
int rb_block_given_p(void)
Determines if the current method is given a block.
#define FL_UNSET_RAW
Old name of RB_FL_UNSET_RAW.
#define Qundef
Old name of RUBY_Qundef.
#define UNREACHABLE_RETURN
Old name of RBIMPL_UNREACHABLE_RETURN.
#define FL_TEST_RAW
Old name of RB_FL_TEST_RAW.
#define FL_SET
Old name of RB_FL_SET.
#define LONG2NUM
Old name of RB_LONG2NUM.
#define Qtrue
Old name of RUBY_Qtrue.
#define INT2NUM
Old name of RB_INT2NUM.
#define Qnil
Old name of RUBY_Qnil.
#define Qfalse
Old name of RUBY_Qfalse.
#define T_ARRAY
Old name of RUBY_T_ARRAY.
#define NIL_P
Old name of RB_NIL_P.
#define Check_TypedStruct(v, t)
Old name of rb_check_typeddata.
#define NUM2LONG
Old name of RB_NUM2LONG.
#define FL_SET_RAW
Old name of RB_FL_SET_RAW.
void rb_raise(VALUE exc, const char *fmt,...)
Exception entry point.
int rb_typeddata_is_kind_of(VALUE obj, const rb_data_type_t *data_type)
Checks if the given object is of given kind.
void rb_bug(const char *fmt,...)
Interpreter panic switch.
VALUE rb_eTypeError
TypeError exception.
VALUE rb_eStopIteration
StopIteration exception.
VALUE rb_eArgError
ArgumentError exception.
VALUE rb_ensure(VALUE(*b_proc)(VALUE), VALUE data1, VALUE(*e_proc)(VALUE), VALUE data2)
An equivalent to ensure clause.
VALUE rb_eThreadError
ThreadError exception.
VALUE rb_obj_class(VALUE obj)
Queries the class of an object.
VALUE rb_cThread
Thread class.
#define RB_OBJ_WRITE(old, slot, young)
Declaration of a "back" pointer.
VALUE rb_funcallv(VALUE recv, ID mid, int argc, const VALUE *argv)
Identical to rb_funcall(), except it takes the method arguments as a C array.
VALUE rb_ary_concat(VALUE lhs, VALUE rhs)
Destructively appends the contents of latter into the end of former.
VALUE rb_ary_shift(VALUE ary)
Destructively deletes an element from the beginning of the passed array and returns what was deleted.
VALUE rb_ary_tmp_new(long capa)
Allocates a "temporary" array.
VALUE rb_ary_clear(VALUE ary)
Destructively removes everything form an array.
VALUE rb_ary_push(VALUE ary, VALUE elem)
Special case of rb_ary_cat() that it adds only one element.
static int rb_check_arity(int argc, int min, int max)
Ensures that the passed integer is in the passed range.
void rb_gc_mark(VALUE obj)
Marks an object.
void rb_provide(const char *feature)
Declares that the given feature is already provided by someone else.
VALUE rb_mutex_new(void)
Creates a mutex.
VALUE rb_mutex_trylock(VALUE mutex)
Attempts to lock the mutex, without waiting for other threads to unlock it.
VALUE rb_mutex_locked_p(VALUE mutex)
Queries if there are any threads that holds the lock.
VALUE rb_mutex_synchronize(VALUE mutex, VALUE(*func)(VALUE arg), VALUE arg)
Obtains the lock, runs the passed function, and releases the lock when it completes.
VALUE rb_mutex_sleep(VALUE self, VALUE timeout)
Releases the lock held in the mutex and waits for the period of time; reacquires the lock on wakeup.
VALUE rb_mutex_unlock(VALUE mutex)
Releases the mutex.
VALUE rb_mutex_lock(VALUE mutex)
Attempts to lock the mutex.
struct timeval rb_time_interval(VALUE num)
Creates a "time interval".
void rb_const_set(VALUE space, ID name, VALUE val)
Names a constant.
void rb_define_alloc_func(VALUE klass, rb_alloc_func_t func)
Sets the allocator function of a class.
ID rb_intern(const char *name)
Finds or creates a symbol of the given name.
VALUE rb_yield(VALUE val)
Yields the block.
#define RARRAY_LEN
Just another name of rb_array_len.
#define RUBY_TYPED_DEFAULT_FREE
This is a value you can set to rb_data_type_struct::dfree.
#define TypedData_Get_Struct(obj, type, data_type, sval)
Obtains a C struct from inside of a wrapper Ruby object.
#define TypedData_Make_Struct(klass, type, data_type, sval)
Identical to TypedData_Wrap_Struct, except it allocates a new data region internally instead of takin...
VALUE rb_fiber_scheduler_current(void)
Identical to rb_fiber_scheduler_get(), except it also returns RUBY_Qnil in case of a blocking fiber.
VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
Nonblocking wait for the passed "blocker", which is for instance Thread.join or Mutex....
VALUE rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE duration)
Nonblocking sleep.
VALUE rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
Wakes up a fiber previously blocked using rb_fiber_scheduler_block().
#define RTEST
This is an old name of RB_TEST.
This is the struct that holds necessary info for a struct.
uintptr_t ID
Type that represents a Ruby identifier such as a variable name.
uintptr_t VALUE
Type that represents a Ruby object.
static bool RB_TYPE_P(VALUE obj, enum ruby_value_type t)
Queries if the given object is of given type.
void ruby_xfree(void *ptr)
Deallocates a storage instance.