Ruby  3.1.4p223 (2023-03-30 revision HEAD)
thread_sync.c
1 /* included by thread.c */
2 #include "ccan/list/list.h"
3 
4 static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
5 static VALUE rb_eClosedQueueError;
6 
7 /* Mutex */
8 typedef struct rb_mutex_struct {
9  rb_fiber_t *fiber;
10  struct rb_mutex_struct *next_mutex;
11  struct list_head waitq; /* protected by GVL */
12 } rb_mutex_t;
13 
14 /* sync_waiter is always on-stack */
15 struct sync_waiter {
16  VALUE self;
17  rb_thread_t *th;
18  rb_fiber_t *fiber;
19  struct list_node node;
20 };
21 
22 #define MUTEX_ALLOW_TRAP FL_USER1
23 
24 static void
25 sync_wakeup(struct list_head *head, long max)
26 {
27  struct sync_waiter *cur = 0, *next;
28 
29  list_for_each_safe(head, cur, next, node) {
30  list_del_init(&cur->node);
31 
32  if (cur->th->status != THREAD_KILLED) {
33 
34  if (cur->th->scheduler != Qnil && rb_fiberptr_blocking(cur->fiber) == 0) {
35  rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
36  }
37  else {
38  rb_threadptr_interrupt(cur->th);
39  cur->th->status = THREAD_RUNNABLE;
40  }
41 
42  if (--max == 0) return;
43  }
44  }
45 }
46 
47 static void
48 wakeup_one(struct list_head *head)
49 {
50  sync_wakeup(head, 1);
51 }
52 
53 static void
54 wakeup_all(struct list_head *head)
55 {
56  sync_wakeup(head, LONG_MAX);
57 }
58 
59 #if defined(HAVE_WORKING_FORK)
60 static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
61 static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
62 static void rb_mutex_abandon_locking_mutex(rb_thread_t *th);
63 #endif
64 static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber);
65 
66 /*
67  * Document-class: Thread::Mutex
68  *
69  * Thread::Mutex implements a simple semaphore that can be used to
70  * coordinate access to shared data from multiple concurrent threads.
71  *
72  * Example:
73  *
74  * semaphore = Thread::Mutex.new
75  *
76  * a = Thread.new {
77  * semaphore.synchronize {
78  * # access shared resource
79  * }
80  * }
81  *
82  * b = Thread.new {
83  * semaphore.synchronize {
84  * # access shared resource
85  * }
86  * }
87  *
88  */
89 
90 #define mutex_mark ((void(*)(void*))0)
91 
92 static size_t
93 rb_mutex_num_waiting(rb_mutex_t *mutex)
94 {
95  struct sync_waiter *w = 0;
96  size_t n = 0;
97 
98  list_for_each(&mutex->waitq, w, node) {
99  n++;
100  }
101 
102  return n;
103 }
104 
105 rb_thread_t* rb_fiber_threadptr(const rb_fiber_t *fiber);
106 
107 static void
108 mutex_free(void *ptr)
109 {
110  rb_mutex_t *mutex = ptr;
111  if (mutex->fiber) {
112  /* rb_warn("free locked mutex"); */
113  const char *err = rb_mutex_unlock_th(mutex, rb_fiber_threadptr(mutex->fiber), mutex->fiber);
114  if (err) rb_bug("%s", err);
115  }
116  ruby_xfree(ptr);
117 }
118 
119 static size_t
120 mutex_memsize(const void *ptr)
121 {
122  return sizeof(rb_mutex_t);
123 }
124 
125 static const rb_data_type_t mutex_data_type = {
126  "mutex",
127  {mutex_mark, mutex_free, mutex_memsize,},
128  0, 0, RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_FREE_IMMEDIATELY
129 };
130 
131 static rb_mutex_t *
132 mutex_ptr(VALUE obj)
133 {
134  rb_mutex_t *mutex;
135 
136  TypedData_Get_Struct(obj, rb_mutex_t, &mutex_data_type, mutex);
137 
138  return mutex;
139 }
140 
141 VALUE
142 rb_obj_is_mutex(VALUE obj)
143 {
144  return RBOOL(rb_typeddata_is_kind_of(obj, &mutex_data_type));
145 }
146 
147 static VALUE
148 mutex_alloc(VALUE klass)
149 {
150  VALUE obj;
151  rb_mutex_t *mutex;
152 
153  obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex);
154 
155  list_head_init(&mutex->waitq);
156  return obj;
157 }
158 
159 /*
160  * call-seq:
161  * Thread::Mutex.new -> mutex
162  *
163  * Creates a new Mutex
164  */
165 static VALUE
166 mutex_initialize(VALUE self)
167 {
168  return self;
169 }
170 
171 VALUE
173 {
174  return mutex_alloc(rb_cMutex);
175 }
176 
177 /*
178  * call-seq:
179  * mutex.locked? -> true or false
180  *
181  * Returns +true+ if this lock is currently held by some thread.
182  */
183 VALUE
185 {
186  rb_mutex_t *mutex = mutex_ptr(self);
187 
188  return RBOOL(mutex->fiber);
189 }
190 
191 static void
192 thread_mutex_insert(rb_thread_t *thread, rb_mutex_t *mutex)
193 {
194  if (thread->keeping_mutexes) {
195  mutex->next_mutex = thread->keeping_mutexes;
196  }
197 
198  thread->keeping_mutexes = mutex;
199 }
200 
201 static void
202 thread_mutex_remove(rb_thread_t *thread, rb_mutex_t *mutex)
203 {
204  rb_mutex_t **keeping_mutexes = &thread->keeping_mutexes;
205 
206  while (*keeping_mutexes && *keeping_mutexes != mutex) {
207  // Move to the next mutex in the list:
208  keeping_mutexes = &(*keeping_mutexes)->next_mutex;
209  }
210 
211  if (*keeping_mutexes) {
212  *keeping_mutexes = mutex->next_mutex;
213  mutex->next_mutex = NULL;
214  }
215 }
216 
217 static void
218 mutex_locked(rb_thread_t *th, VALUE self)
219 {
220  rb_mutex_t *mutex = mutex_ptr(self);
221 
222  thread_mutex_insert(th, mutex);
223 }
224 
225 /*
226  * call-seq:
227  * mutex.try_lock -> true or false
228  *
229  * Attempts to obtain the lock and returns immediately. Returns +true+ if the
230  * lock was granted.
231  */
232 VALUE
234 {
235  rb_mutex_t *mutex = mutex_ptr(self);
236 
237  if (mutex->fiber == 0) {
238  rb_fiber_t *fiber = GET_EC()->fiber_ptr;
239  rb_thread_t *th = GET_THREAD();
240  mutex->fiber = fiber;
241 
242  mutex_locked(th, self);
243  return Qtrue;
244  }
245 
246  return Qfalse;
247 }
248 
249 /*
250  * At maximum, only one thread can use cond_timedwait and watch deadlock
251  * periodically. Multiple polling thread (i.e. concurrent deadlock check)
252  * introduces new race conditions. [Bug #6278] [ruby-core:44275]
253  */
254 static const rb_thread_t *patrol_thread = NULL;
255 
256 static VALUE
257 mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex)
258 {
259  return RBOOL(mutex->fiber == fiber);
260 }
261 
262 static VALUE
263 call_rb_fiber_scheduler_block(VALUE mutex)
264 {
266 }
267 
268 static VALUE
269 delete_from_waitq(VALUE value)
270 {
271  struct sync_waiter *sync_waiter = (void *)value;
272  list_del(&sync_waiter->node);
273 
274  return Qnil;
275 }
276 
277 static VALUE
278 do_mutex_lock(VALUE self, int interruptible_p)
279 {
280  rb_execution_context_t *ec = GET_EC();
281  rb_thread_t *th = ec->thread_ptr;
282  rb_fiber_t *fiber = ec->fiber_ptr;
283  rb_mutex_t *mutex = mutex_ptr(self);
284 
285  /* When running trap handler */
286  if (!FL_TEST_RAW(self, MUTEX_ALLOW_TRAP) &&
287  th->ec->interrupt_mask & TRAP_INTERRUPT_MASK) {
288  rb_raise(rb_eThreadError, "can't be called from trap context");
289  }
290 
291  if (rb_mutex_trylock(self) == Qfalse) {
292  if (mutex->fiber == fiber) {
293  rb_raise(rb_eThreadError, "deadlock; recursive locking");
294  }
295 
296  while (mutex->fiber != fiber) {
297  VALUE scheduler = rb_fiber_scheduler_current();
298  if (scheduler != Qnil) {
299  struct sync_waiter sync_waiter = {
300  .self = self,
301  .th = th,
302  .fiber = fiber
303  };
304 
305  list_add_tail(&mutex->waitq, &sync_waiter.node);
306 
307  rb_ensure(call_rb_fiber_scheduler_block, self, delete_from_waitq, (VALUE)&sync_waiter);
308 
309  if (!mutex->fiber) {
310  mutex->fiber = fiber;
311  }
312  }
313  else {
314  if (!th->vm->thread_ignore_deadlock && rb_fiber_threadptr(mutex->fiber) == th) {
315  rb_raise(rb_eThreadError, "deadlock; lock already owned by another fiber belonging to the same thread");
316  }
317 
318  enum rb_thread_status prev_status = th->status;
319  rb_hrtime_t *timeout = 0;
320  rb_hrtime_t rel = rb_msec2hrtime(100);
321 
322  th->status = THREAD_STOPPED_FOREVER;
323  th->locking_mutex = self;
324  rb_ractor_sleeper_threads_inc(th->ractor);
325  /*
326  * Carefully! while some contended threads are in native_sleep(),
327  * ractor->sleeper is unstable value. we have to avoid both deadlock
328  * and busy loop.
329  */
330  if ((rb_ractor_living_thread_num(th->ractor) == rb_ractor_sleeper_thread_num(th->ractor)) &&
331  !patrol_thread) {
332  timeout = &rel;
333  patrol_thread = th;
334  }
335 
336  struct sync_waiter sync_waiter = {
337  .self = self,
338  .th = th,
339  .fiber = fiber
340  };
341 
342  list_add_tail(&mutex->waitq, &sync_waiter.node);
343 
344  native_sleep(th, timeout); /* release GVL */
345 
346  list_del(&sync_waiter.node);
347 
348  if (!mutex->fiber) {
349  mutex->fiber = fiber;
350  }
351 
352  if (patrol_thread == th)
353  patrol_thread = NULL;
354 
355  th->locking_mutex = Qfalse;
356  if (mutex->fiber && timeout && !RUBY_VM_INTERRUPTED(th->ec)) {
357  rb_check_deadlock(th->ractor);
358  }
359  if (th->status == THREAD_STOPPED_FOREVER) {
360  th->status = prev_status;
361  }
362  rb_ractor_sleeper_threads_dec(th->ractor);
363  }
364 
365  if (interruptible_p) {
366  /* release mutex before checking for interrupts...as interrupt checking
367  * code might call rb_raise() */
368  if (mutex->fiber == fiber) mutex->fiber = 0;
369  RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may release mutex */
370  if (!mutex->fiber) {
371  mutex->fiber = fiber;
372  }
373  }
374  }
375 
376  if (mutex->fiber == fiber) mutex_locked(th, self);
377  }
378 
379  // assertion
380  if (mutex_owned_p(fiber, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned.");
381 
382  return self;
383 }
384 
385 static VALUE
386 mutex_lock_uninterruptible(VALUE self)
387 {
388  return do_mutex_lock(self, 0);
389 }
390 
391 /*
392  * call-seq:
393  * mutex.lock -> self
394  *
395  * Attempts to grab the lock and waits if it isn't available.
396  * Raises +ThreadError+ if +mutex+ was locked by the current thread.
397  */
398 VALUE
400 {
401  return do_mutex_lock(self, 1);
402 }
403 
404 /*
405  * call-seq:
406  * mutex.owned? -> true or false
407  *
408  * Returns +true+ if this lock is currently held by current thread.
409  */
410 VALUE
411 rb_mutex_owned_p(VALUE self)
412 {
413  rb_fiber_t *fiber = GET_EC()->fiber_ptr;
414  rb_mutex_t *mutex = mutex_ptr(self);
415 
416  return mutex_owned_p(fiber, mutex);
417 }
418 
419 static const char *
420 rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber)
421 {
422  const char *err = NULL;
423 
424  if (mutex->fiber == 0) {
425  err = "Attempt to unlock a mutex which is not locked";
426  }
427  else if (mutex->fiber != fiber) {
428  err = "Attempt to unlock a mutex which is locked by another thread/fiber";
429  }
430  else {
431  struct sync_waiter *cur = 0, *next;
432 
433  mutex->fiber = 0;
434  list_for_each_safe(&mutex->waitq, cur, next, node) {
435  list_del_init(&cur->node);
436 
437  if (cur->th->scheduler != Qnil && rb_fiberptr_blocking(cur->fiber) == 0) {
438  rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
439  goto found;
440  }
441  else {
442  switch (cur->th->status) {
443  case THREAD_RUNNABLE: /* from someone else calling Thread#run */
444  case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
445  rb_threadptr_interrupt(cur->th);
446  goto found;
447  case THREAD_STOPPED: /* probably impossible */
448  rb_bug("unexpected THREAD_STOPPED");
449  case THREAD_KILLED:
450  /* not sure about this, possible in exit GC? */
451  rb_bug("unexpected THREAD_KILLED");
452  continue;
453  }
454  }
455  }
456 
457  found:
458  thread_mutex_remove(th, mutex);
459  }
460 
461  return err;
462 }
463 
464 /*
465  * call-seq:
466  * mutex.unlock -> self
467  *
468  * Releases the lock.
469  * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread.
470  */
471 VALUE
473 {
474  const char *err;
475  rb_mutex_t *mutex = mutex_ptr(self);
476  rb_thread_t *th = GET_THREAD();
477 
478  err = rb_mutex_unlock_th(mutex, th, GET_EC()->fiber_ptr);
479  if (err) rb_raise(rb_eThreadError, "%s", err);
480 
481  return self;
482 }
483 
484 #if defined(HAVE_WORKING_FORK)
485 static void
486 rb_mutex_abandon_keeping_mutexes(rb_thread_t *th)
487 {
488  rb_mutex_abandon_all(th->keeping_mutexes);
489  th->keeping_mutexes = NULL;
490 }
491 
492 static void
493 rb_mutex_abandon_locking_mutex(rb_thread_t *th)
494 {
495  if (th->locking_mutex) {
496  rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
497 
498  list_head_init(&mutex->waitq);
499  th->locking_mutex = Qfalse;
500  }
501 }
502 
503 static void
504 rb_mutex_abandon_all(rb_mutex_t *mutexes)
505 {
506  rb_mutex_t *mutex;
507 
508  while (mutexes) {
509  mutex = mutexes;
510  mutexes = mutex->next_mutex;
511  mutex->fiber = 0;
512  mutex->next_mutex = 0;
513  list_head_init(&mutex->waitq);
514  }
515 }
516 #endif
517 
518 static VALUE
519 rb_mutex_sleep_forever(VALUE self)
520 {
521  rb_thread_sleep_deadly_allow_spurious_wakeup(self);
522  return Qnil;
523 }
524 
525 static VALUE
526 rb_mutex_wait_for(VALUE time)
527 {
528  rb_hrtime_t *rel = (rb_hrtime_t *)time;
529  /* permit spurious check */
530  return RBOOL(sleep_hrtime(GET_THREAD(), *rel, 0));
531 }
532 
533 VALUE
534 rb_mutex_sleep(VALUE self, VALUE timeout)
535 {
536  struct timeval t;
537  VALUE woken = Qtrue;
538 
539  if (!NIL_P(timeout)) {
540  t = rb_time_interval(timeout);
541  }
542 
543  rb_mutex_unlock(self);
544  time_t beg = time(0);
545 
546  VALUE scheduler = rb_fiber_scheduler_current();
547  if (scheduler != Qnil) {
548  rb_fiber_scheduler_kernel_sleep(scheduler, timeout);
549  mutex_lock_uninterruptible(self);
550  }
551  else {
552  if (NIL_P(timeout)) {
553  rb_ensure(rb_mutex_sleep_forever, self, mutex_lock_uninterruptible, self);
554  }
555  else {
556  rb_hrtime_t rel = rb_timeval2hrtime(&t);
557  woken = rb_ensure(rb_mutex_wait_for, (VALUE)&rel, mutex_lock_uninterruptible, self);
558  }
559  }
560 
561  RUBY_VM_CHECK_INTS_BLOCKING(GET_EC());
562  if (!woken) return Qnil;
563  time_t end = time(0) - beg;
564  return TIMET2NUM(end);
565 }
566 
567 /*
568  * call-seq:
569  * mutex.sleep(timeout = nil) -> number or nil
570  *
571  * Releases the lock and sleeps +timeout+ seconds if it is given and
572  * non-nil or forever. Raises +ThreadError+ if +mutex+ wasn't locked by
573  * the current thread.
574  *
575  * When the thread is next woken up, it will attempt to reacquire
576  * the lock.
577  *
578  * Note that this method can wakeup without explicit Thread#wakeup call.
579  * For example, receiving signal and so on.
580  *
581  * Returns the slept time in seconds if woken up, or +nil+ if timed out.
582  */
583 static VALUE
584 mutex_sleep(int argc, VALUE *argv, VALUE self)
585 {
586  VALUE timeout;
587 
588  timeout = rb_check_arity(argc, 0, 1) ? argv[0] : Qnil;
589  return rb_mutex_sleep(self, timeout);
590 }
591 
592 /*
593  * call-seq:
594  * mutex.synchronize { ... } -> result of the block
595  *
596  * Obtains a lock, runs the block, and releases the lock when the block
597  * completes. See the example under Thread::Mutex.
598  */
599 
600 VALUE
601 rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg)
602 {
603  rb_mutex_lock(mutex);
604  return rb_ensure(func, arg, rb_mutex_unlock, mutex);
605 }
606 
607 /*
608  * call-seq:
609  * mutex.synchronize { ... } -> result of the block
610  *
611  * Obtains a lock, runs the block, and releases the lock when the block
612  * completes. See the example under Thread::Mutex.
613  */
614 static VALUE
615 rb_mutex_synchronize_m(VALUE self)
616 {
617  if (!rb_block_given_p()) {
618  rb_raise(rb_eThreadError, "must be called with a block");
619  }
620 
621  return rb_mutex_synchronize(self, rb_yield, Qundef);
622 }
623 
624 void rb_mutex_allow_trap(VALUE self, int val)
625 {
626  Check_TypedStruct(self, &mutex_data_type);
627 
628  if (val)
629  FL_SET_RAW(self, MUTEX_ALLOW_TRAP);
630  else
631  FL_UNSET_RAW(self, MUTEX_ALLOW_TRAP);
632 }
633 
634 /* Queue */
635 
636 #define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq)
637 PACKED_STRUCT_UNALIGNED(struct rb_queue {
638  struct list_head waitq;
639  rb_serial_t fork_gen;
640  const VALUE que;
641  int num_waiting;
642 });
643 
644 #define szqueue_waitq(sq) UNALIGNED_MEMBER_PTR(sq, q.waitq)
645 #define szqueue_pushq(sq) UNALIGNED_MEMBER_PTR(sq, pushq)
646 PACKED_STRUCT_UNALIGNED(struct rb_szqueue {
647  struct rb_queue q;
648  int num_waiting_push;
649  struct list_head pushq;
650  long max;
651 });
652 
653 static void
654 queue_mark(void *ptr)
655 {
656  struct rb_queue *q = ptr;
657 
658  /* no need to mark threads in waitq, they are on stack */
659  rb_gc_mark(q->que);
660 }
661 
662 static size_t
663 queue_memsize(const void *ptr)
664 {
665  return sizeof(struct rb_queue);
666 }
667 
668 static const rb_data_type_t queue_data_type = {
669  "queue",
670  {queue_mark, RUBY_TYPED_DEFAULT_FREE, queue_memsize,},
671  0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
672 };
673 
674 static VALUE
675 queue_alloc(VALUE klass)
676 {
677  VALUE obj;
678  struct rb_queue *q;
679 
680  obj = TypedData_Make_Struct(klass, struct rb_queue, &queue_data_type, q);
681  list_head_init(queue_waitq(q));
682  return obj;
683 }
684 
685 static int
686 queue_fork_check(struct rb_queue *q)
687 {
688  rb_serial_t fork_gen = GET_VM()->fork_gen;
689 
690  if (q->fork_gen == fork_gen) {
691  return 0;
692  }
693  /* forked children can't reach into parent thread stacks */
694  q->fork_gen = fork_gen;
695  list_head_init(queue_waitq(q));
696  q->num_waiting = 0;
697  return 1;
698 }
699 
700 static struct rb_queue *
701 queue_ptr(VALUE obj)
702 {
703  struct rb_queue *q;
704 
705  TypedData_Get_Struct(obj, struct rb_queue, &queue_data_type, q);
706  queue_fork_check(q);
707 
708  return q;
709 }
710 
711 #define QUEUE_CLOSED FL_USER5
712 
713 static void
714 szqueue_mark(void *ptr)
715 {
716  struct rb_szqueue *sq = ptr;
717 
718  queue_mark(&sq->q);
719 }
720 
721 static size_t
722 szqueue_memsize(const void *ptr)
723 {
724  return sizeof(struct rb_szqueue);
725 }
726 
727 static const rb_data_type_t szqueue_data_type = {
728  "sized_queue",
729  {szqueue_mark, RUBY_TYPED_DEFAULT_FREE, szqueue_memsize,},
730  0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
731 };
732 
733 static VALUE
734 szqueue_alloc(VALUE klass)
735 {
736  struct rb_szqueue *sq;
737  VALUE obj = TypedData_Make_Struct(klass, struct rb_szqueue,
738  &szqueue_data_type, sq);
739  list_head_init(szqueue_waitq(sq));
740  list_head_init(szqueue_pushq(sq));
741  return obj;
742 }
743 
744 static struct rb_szqueue *
745 szqueue_ptr(VALUE obj)
746 {
747  struct rb_szqueue *sq;
748 
749  TypedData_Get_Struct(obj, struct rb_szqueue, &szqueue_data_type, sq);
750  if (queue_fork_check(&sq->q)) {
751  list_head_init(szqueue_pushq(sq));
752  sq->num_waiting_push = 0;
753  }
754 
755  return sq;
756 }
757 
758 static VALUE
759 ary_buf_new(void)
760 {
761  return rb_ary_tmp_new(1);
762 }
763 
764 static VALUE
765 check_array(VALUE obj, VALUE ary)
766 {
767  if (!RB_TYPE_P(ary, T_ARRAY)) {
768  rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj);
769  }
770  return ary;
771 }
772 
773 static long
774 queue_length(VALUE self, struct rb_queue *q)
775 {
776  return RARRAY_LEN(check_array(self, q->que));
777 }
778 
779 static int
780 queue_closed_p(VALUE self)
781 {
782  return FL_TEST_RAW(self, QUEUE_CLOSED) != 0;
783 }
784 
785 /*
786  * Document-class: ClosedQueueError
787  *
788  * The exception class which will be raised when pushing into a closed
789  * Queue. See Thread::Queue#close and Thread::SizedQueue#close.
790  */
791 
792 NORETURN(static void raise_closed_queue_error(VALUE self));
793 
794 static void
795 raise_closed_queue_error(VALUE self)
796 {
797  rb_raise(rb_eClosedQueueError, "queue closed");
798 }
799 
800 static VALUE
801 queue_closed_result(VALUE self, struct rb_queue *q)
802 {
803  assert(queue_length(self, q) == 0);
804  return Qnil;
805 }
806 
807 /*
808  * Document-class: Thread::Queue
809  *
810  * The Thread::Queue class implements multi-producer, multi-consumer
811  * queues. It is especially useful in threaded programming when
812  * information must be exchanged safely between multiple threads. The
813  * Thread::Queue class implements all the required locking semantics.
814  *
815  * The class implements FIFO type of queue. In a FIFO queue, the first
816  * tasks added are the first retrieved.
817  *
818  * Example:
819  *
820  * queue = Thread::Queue.new
821  *
822  * producer = Thread.new do
823  * 5.times do |i|
824  * sleep rand(i) # simulate expense
825  * queue << i
826  * puts "#{i} produced"
827  * end
828  * end
829  *
830  * consumer = Thread.new do
831  * 5.times do |i|
832  * value = queue.pop
833  * sleep rand(i/2) # simulate expense
834  * puts "consumed #{value}"
835  * end
836  * end
837  *
838  * consumer.join
839  *
840  */
841 
842 /*
843  * Document-method: Queue::new
844  *
845  * call-seq:
846  * Thread::Queue.new -> empty_queue
847  * Thread::Queue.new(enumerable) -> queue
848  *
849  * Creates a new queue instance, optionally using the contents of an +enumerable+
850  * for its initial state.
851  *
852  * Example:
853  *
854  * q = Thread::Queue.new
855  * #=> #<Thread::Queue:0x00007ff7501110d0>
856  * q.empty?
857  * #=> true
858  *
859  * q = Thread::Queue.new([1, 2, 3])
860  * #=> #<Thread::Queue:0x00007ff7500ec500>
861  * q.empty?
862  * #=> false
863  * q.pop
864  * #=> 1
865  */
866 
867 static VALUE
868 rb_queue_initialize(int argc, VALUE *argv, VALUE self)
869 {
870  VALUE initial;
871  struct rb_queue *q = queue_ptr(self);
872  if ((argc = rb_scan_args(argc, argv, "01", &initial)) == 1) {
873  initial = rb_to_array(initial);
874  }
875  RB_OBJ_WRITE(self, &q->que, ary_buf_new());
876  list_head_init(queue_waitq(q));
877  if (argc == 1) {
878  rb_ary_concat(q->que, initial);
879  }
880  return self;
881 }
882 
883 static VALUE
884 queue_do_push(VALUE self, struct rb_queue *q, VALUE obj)
885 {
886  if (queue_closed_p(self)) {
887  raise_closed_queue_error(self);
888  }
889  rb_ary_push(check_array(self, q->que), obj);
890  wakeup_one(queue_waitq(q));
891  return self;
892 }
893 
894 /*
895  * Document-method: Thread::Queue#close
896  * call-seq:
897  * close
898  *
899  * Closes the queue. A closed queue cannot be re-opened.
900  *
901  * After the call to close completes, the following are true:
902  *
903  * - +closed?+ will return true
904  *
905  * - +close+ will be ignored.
906  *
907  * - calling enq/push/<< will raise a +ClosedQueueError+.
908  *
909  * - when +empty?+ is false, calling deq/pop/shift will return an object
910  * from the queue as usual.
911  * - when +empty?+ is true, deq(false) will not suspend the thread and will return nil.
912  * deq(true) will raise a +ThreadError+.
913  *
914  * ClosedQueueError is inherited from StopIteration, so that you can break loop block.
915  *
916  * Example:
917  *
918  * q = Thread::Queue.new
919  * Thread.new{
920  * while e = q.deq # wait for nil to break loop
921  * # ...
922  * end
923  * }
924  * q.close
925  */
926 
927 static VALUE
928 rb_queue_close(VALUE self)
929 {
930  struct rb_queue *q = queue_ptr(self);
931 
932  if (!queue_closed_p(self)) {
933  FL_SET(self, QUEUE_CLOSED);
934 
935  wakeup_all(queue_waitq(q));
936  }
937 
938  return self;
939 }
940 
941 /*
942  * Document-method: Thread::Queue#closed?
943  * call-seq: closed?
944  *
945  * Returns +true+ if the queue is closed.
946  */
947 
948 static VALUE
949 rb_queue_closed_p(VALUE self)
950 {
951  return RBOOL(queue_closed_p(self));
952 }
953 
954 /*
955  * Document-method: Thread::Queue#push
956  * call-seq:
957  * push(object)
958  * enq(object)
959  * <<(object)
960  *
961  * Pushes the given +object+ to the queue.
962  */
963 
964 static VALUE
965 rb_queue_push(VALUE self, VALUE obj)
966 {
967  return queue_do_push(self, queue_ptr(self), obj);
968 }
969 
970 static VALUE
971 queue_sleep(VALUE self)
972 {
973  rb_thread_sleep_deadly_allow_spurious_wakeup(self);
974  return Qnil;
975 }
976 
977 struct queue_waiter {
978  struct sync_waiter w;
979  union {
980  struct rb_queue *q;
981  struct rb_szqueue *sq;
982  } as;
983 };
984 
985 static VALUE
986 queue_sleep_done(VALUE p)
987 {
988  struct queue_waiter *qw = (struct queue_waiter *)p;
989 
990  list_del(&qw->w.node);
991  qw->as.q->num_waiting--;
992 
993  return Qfalse;
994 }
995 
996 static VALUE
997 szqueue_sleep_done(VALUE p)
998 {
999  struct queue_waiter *qw = (struct queue_waiter *)p;
1000 
1001  list_del(&qw->w.node);
1002  qw->as.sq->num_waiting_push--;
1003 
1004  return Qfalse;
1005 }
1006 
1007 static VALUE
1008 queue_do_pop(VALUE self, struct rb_queue *q, int should_block)
1009 {
1010  check_array(self, q->que);
1011 
1012  while (RARRAY_LEN(q->que) == 0) {
1013  if (!should_block) {
1014  rb_raise(rb_eThreadError, "queue empty");
1015  }
1016  else if (queue_closed_p(self)) {
1017  return queue_closed_result(self, q);
1018  }
1019  else {
1020  rb_execution_context_t *ec = GET_EC();
1021 
1022  assert(RARRAY_LEN(q->que) == 0);
1023  assert(queue_closed_p(self) == 0);
1024 
1025  struct queue_waiter queue_waiter = {
1026  .w = {.self = self, .th = ec->thread_ptr, .fiber = ec->fiber_ptr},
1027  .as = {.q = q}
1028  };
1029 
1030  struct list_head *waitq = queue_waitq(q);
1031 
1032  list_add_tail(waitq, &queue_waiter.w.node);
1033  queue_waiter.as.q->num_waiting++;
1034 
1035  rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&queue_waiter);
1036  }
1037  }
1038 
1039  return rb_ary_shift(q->que);
1040 }
1041 
1042 static int
1043 queue_pop_should_block(int argc, const VALUE *argv)
1044 {
1045  int should_block = 1;
1046  rb_check_arity(argc, 0, 1);
1047  if (argc > 0) {
1048  should_block = !RTEST(argv[0]);
1049  }
1050  return should_block;
1051 }
1052 
1053 /*
1054  * Document-method: Thread::Queue#pop
1055  * call-seq:
1056  * pop(non_block=false)
1057  * deq(non_block=false)
1058  * shift(non_block=false)
1059  *
1060  * Retrieves data from the queue.
1061  *
1062  * If the queue is empty, the calling thread is suspended until data is pushed
1063  * onto the queue. If +non_block+ is true, the thread isn't suspended, and
1064  * +ThreadError+ is raised.
1065  */
1066 
1067 static VALUE
1068 rb_queue_pop(int argc, VALUE *argv, VALUE self)
1069 {
1070  int should_block = queue_pop_should_block(argc, argv);
1071  return queue_do_pop(self, queue_ptr(self), should_block);
1072 }
1073 
1074 /*
1075  * Document-method: Thread::Queue#empty?
1076  * call-seq: empty?
1077  *
1078  * Returns +true+ if the queue is empty.
1079  */
1080 
1081 static VALUE
1082 rb_queue_empty_p(VALUE self)
1083 {
1084  return RBOOL(queue_length(self, queue_ptr(self)) == 0);
1085 }
1086 
1087 /*
1088  * Document-method: Thread::Queue#clear
1089  *
1090  * Removes all objects from the queue.
1091  */
1092 
1093 static VALUE
1094 rb_queue_clear(VALUE self)
1095 {
1096  struct rb_queue *q = queue_ptr(self);
1097 
1098  rb_ary_clear(check_array(self, q->que));
1099  return self;
1100 }
1101 
1102 /*
1103  * Document-method: Thread::Queue#length
1104  * call-seq:
1105  * length
1106  * size
1107  *
1108  * Returns the length of the queue.
1109  */
1110 
1111 static VALUE
1112 rb_queue_length(VALUE self)
1113 {
1114  return LONG2NUM(queue_length(self, queue_ptr(self)));
1115 }
1116 
1117 /*
1118  * Document-method: Thread::Queue#num_waiting
1119  *
1120  * Returns the number of threads waiting on the queue.
1121  */
1122 
1123 static VALUE
1124 rb_queue_num_waiting(VALUE self)
1125 {
1126  struct rb_queue *q = queue_ptr(self);
1127 
1128  return INT2NUM(q->num_waiting);
1129 }
1130 
1131 /*
1132  * Document-class: Thread::SizedQueue
1133  *
1134  * This class represents queues of specified size capacity. The push operation
1135  * may be blocked if the capacity is full.
1136  *
1137  * See Thread::Queue for an example of how a Thread::SizedQueue works.
1138  */
1139 
1140 /*
1141  * Document-method: SizedQueue::new
1142  * call-seq: new(max)
1143  *
1144  * Creates a fixed-length queue with a maximum size of +max+.
1145  */
1146 
1147 static VALUE
1148 rb_szqueue_initialize(VALUE self, VALUE vmax)
1149 {
1150  long max;
1151  struct rb_szqueue *sq = szqueue_ptr(self);
1152 
1153  max = NUM2LONG(vmax);
1154  if (max <= 0) {
1155  rb_raise(rb_eArgError, "queue size must be positive");
1156  }
1157 
1158  RB_OBJ_WRITE(self, &sq->q.que, ary_buf_new());
1159  list_head_init(szqueue_waitq(sq));
1160  list_head_init(szqueue_pushq(sq));
1161  sq->max = max;
1162 
1163  return self;
1164 }
1165 
1166 /*
1167  * Document-method: Thread::SizedQueue#close
1168  * call-seq:
1169  * close
1170  *
1171  * Similar to Thread::Queue#close.
1172  *
1173  * The difference is behavior with waiting enqueuing threads.
1174  *
1175  * If there are waiting enqueuing threads, they are interrupted by
1176  * raising ClosedQueueError('queue closed').
1177  */
1178 static VALUE
1179 rb_szqueue_close(VALUE self)
1180 {
1181  if (!queue_closed_p(self)) {
1182  struct rb_szqueue *sq = szqueue_ptr(self);
1183 
1184  FL_SET(self, QUEUE_CLOSED);
1185  wakeup_all(szqueue_waitq(sq));
1186  wakeup_all(szqueue_pushq(sq));
1187  }
1188  return self;
1189 }
1190 
1191 /*
1192  * Document-method: Thread::SizedQueue#max
1193  *
1194  * Returns the maximum size of the queue.
1195  */
1196 
1197 static VALUE
1198 rb_szqueue_max_get(VALUE self)
1199 {
1200  return LONG2NUM(szqueue_ptr(self)->max);
1201 }
1202 
1203 /*
1204  * Document-method: Thread::SizedQueue#max=
1205  * call-seq: max=(number)
1206  *
1207  * Sets the maximum size of the queue to the given +number+.
1208  */
1209 
1210 static VALUE
1211 rb_szqueue_max_set(VALUE self, VALUE vmax)
1212 {
1213  long max = NUM2LONG(vmax);
1214  long diff = 0;
1215  struct rb_szqueue *sq = szqueue_ptr(self);
1216 
1217  if (max <= 0) {
1218  rb_raise(rb_eArgError, "queue size must be positive");
1219  }
1220  if (max > sq->max) {
1221  diff = max - sq->max;
1222  }
1223  sq->max = max;
1224  sync_wakeup(szqueue_pushq(sq), diff);
1225  return vmax;
1226 }
1227 
1228 static int
1229 szqueue_push_should_block(int argc, const VALUE *argv)
1230 {
1231  int should_block = 1;
1232  rb_check_arity(argc, 1, 2);
1233  if (argc > 1) {
1234  should_block = !RTEST(argv[1]);
1235  }
1236  return should_block;
1237 }
1238 
1239 /*
1240  * Document-method: Thread::SizedQueue#push
1241  * call-seq:
1242  * push(object, non_block=false)
1243  * enq(object, non_block=false)
1244  * <<(object)
1245  *
1246  * Pushes +object+ to the queue.
1247  *
1248  * If there is no space left in the queue, waits until space becomes
1249  * available, unless +non_block+ is true. If +non_block+ is true, the
1250  * thread isn't suspended, and +ThreadError+ is raised.
1251  */
1252 
1253 static VALUE
1254 rb_szqueue_push(int argc, VALUE *argv, VALUE self)
1255 {
1256  struct rb_szqueue *sq = szqueue_ptr(self);
1257  int should_block = szqueue_push_should_block(argc, argv);
1258 
1259  while (queue_length(self, &sq->q) >= sq->max) {
1260  if (!should_block) {
1261  rb_raise(rb_eThreadError, "queue full");
1262  }
1263  else if (queue_closed_p(self)) {
1264  break;
1265  }
1266  else {
1267  rb_execution_context_t *ec = GET_EC();
1268  struct queue_waiter queue_waiter = {
1269  .w = {.self = self, .th = ec->thread_ptr, .fiber = ec->fiber_ptr},
1270  .as = {.sq = sq}
1271  };
1272 
1273  struct list_head *pushq = szqueue_pushq(sq);
1274 
1275  list_add_tail(pushq, &queue_waiter.w.node);
1276  sq->num_waiting_push++;
1277 
1278  rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&queue_waiter);
1279  }
1280  }
1281 
1282  if (queue_closed_p(self)) {
1283  raise_closed_queue_error(self);
1284  }
1285 
1286  return queue_do_push(self, &sq->q, argv[0]);
1287 }
1288 
1289 static VALUE
1290 szqueue_do_pop(VALUE self, int should_block)
1291 {
1292  struct rb_szqueue *sq = szqueue_ptr(self);
1293  VALUE retval = queue_do_pop(self, &sq->q, should_block);
1294 
1295  if (queue_length(self, &sq->q) < sq->max) {
1296  wakeup_one(szqueue_pushq(sq));
1297  }
1298 
1299  return retval;
1300 }
1301 
1302 /*
1303  * Document-method: Thread::SizedQueue#pop
1304  * call-seq:
1305  * pop(non_block=false)
1306  * deq(non_block=false)
1307  * shift(non_block=false)
1308  *
1309  * Retrieves data from the queue.
1310  *
1311  * If the queue is empty, the calling thread is suspended until data is pushed
1312  * onto the queue. If +non_block+ is true, the thread isn't suspended, and
1313  * +ThreadError+ is raised.
1314  */
1315 
1316 static VALUE
1317 rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
1318 {
1319  int should_block = queue_pop_should_block(argc, argv);
1320  return szqueue_do_pop(self, should_block);
1321 }
1322 
1323 /*
1324  * Document-method: Thread::SizedQueue#clear
1325  *
1326  * Removes all objects from the queue.
1327  */
1328 
1329 static VALUE
1330 rb_szqueue_clear(VALUE self)
1331 {
1332  struct rb_szqueue *sq = szqueue_ptr(self);
1333 
1334  rb_ary_clear(check_array(self, sq->q.que));
1335  wakeup_all(szqueue_pushq(sq));
1336  return self;
1337 }
1338 
1339 /*
1340  * Document-method: Thread::SizedQueue#length
1341  * call-seq:
1342  * length
1343  * size
1344  *
1345  * Returns the length of the queue.
1346  */
1347 
1348 static VALUE
1349 rb_szqueue_length(VALUE self)
1350 {
1351  struct rb_szqueue *sq = szqueue_ptr(self);
1352 
1353  return LONG2NUM(queue_length(self, &sq->q));
1354 }
1355 
1356 /*
1357  * Document-method: Thread::SizedQueue#num_waiting
1358  *
1359  * Returns the number of threads waiting on the queue.
1360  */
1361 
1362 static VALUE
1363 rb_szqueue_num_waiting(VALUE self)
1364 {
1365  struct rb_szqueue *sq = szqueue_ptr(self);
1366 
1367  return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
1368 }
1369 
1370 /*
1371  * Document-method: Thread::SizedQueue#empty?
1372  * call-seq: empty?
1373  *
1374  * Returns +true+ if the queue is empty.
1375  */
1376 
1377 static VALUE
1378 rb_szqueue_empty_p(VALUE self)
1379 {
1380  struct rb_szqueue *sq = szqueue_ptr(self);
1381 
1382  return RBOOL(queue_length(self, &sq->q) == 0);
1383 }
1384 
1385 
1386 /* ConditionalVariable */
1387 struct rb_condvar {
1388  struct list_head waitq;
1389  rb_serial_t fork_gen;
1390 };
1391 
1392 /*
1393  * Document-class: Thread::ConditionVariable
1394  *
1395  * ConditionVariable objects augment class Mutex. Using condition variables,
1396  * it is possible to suspend while in the middle of a critical section until a
1397  * resource becomes available.
1398  *
1399  * Example:
1400  *
1401  * mutex = Thread::Mutex.new
1402  * resource = Thread::ConditionVariable.new
1403  *
1404  * a = Thread.new {
1405  * mutex.synchronize {
1406  * # Thread 'a' now needs the resource
1407  * resource.wait(mutex)
1408  * # 'a' can now have the resource
1409  * }
1410  * }
1411  *
1412  * b = Thread.new {
1413  * mutex.synchronize {
1414  * # Thread 'b' has finished using the resource
1415  * resource.signal
1416  * }
1417  * }
1418  */
1419 
1420 static size_t
1421 condvar_memsize(const void *ptr)
1422 {
1423  return sizeof(struct rb_condvar);
1424 }
1425 
1426 static const rb_data_type_t cv_data_type = {
1427  "condvar",
1428  {0, RUBY_TYPED_DEFAULT_FREE, condvar_memsize,},
1429  0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
1430 };
1431 
1432 static struct rb_condvar *
1433 condvar_ptr(VALUE self)
1434 {
1435  struct rb_condvar *cv;
1436  rb_serial_t fork_gen = GET_VM()->fork_gen;
1437 
1438  TypedData_Get_Struct(self, struct rb_condvar, &cv_data_type, cv);
1439 
1440  /* forked children can't reach into parent thread stacks */
1441  if (cv->fork_gen != fork_gen) {
1442  cv->fork_gen = fork_gen;
1443  list_head_init(&cv->waitq);
1444  }
1445 
1446  return cv;
1447 }
1448 
1449 static VALUE
1450 condvar_alloc(VALUE klass)
1451 {
1452  struct rb_condvar *cv;
1453  VALUE obj;
1454 
1455  obj = TypedData_Make_Struct(klass, struct rb_condvar, &cv_data_type, cv);
1456  list_head_init(&cv->waitq);
1457 
1458  return obj;
1459 }
1460 
1461 /*
1462  * Document-method: ConditionVariable::new
1463  *
1464  * Creates a new condition variable instance.
1465  */
1466 
1467 static VALUE
1468 rb_condvar_initialize(VALUE self)
1469 {
1470  struct rb_condvar *cv = condvar_ptr(self);
1471  list_head_init(&cv->waitq);
1472  return self;
1473 }
1474 
1475 struct sleep_call {
1476  VALUE mutex;
1477  VALUE timeout;
1478 };
1479 
1480 static ID id_sleep;
1481 
1482 static VALUE
1483 do_sleep(VALUE args)
1484 {
1485  struct sleep_call *p = (struct sleep_call *)args;
1486  return rb_funcallv(p->mutex, id_sleep, 1, &p->timeout);
1487 }
1488 
1489 /*
1490  * Document-method: Thread::ConditionVariable#wait
1491  * call-seq: wait(mutex, timeout=nil)
1492  *
1493  * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
1494  *
1495  * If +timeout+ is given, this method returns after +timeout+ seconds passed,
1496  * even if no other thread doesn't signal.
1497  *
1498  * Returns the slept result on +mutex+.
1499  */
1500 
1501 static VALUE
1502 rb_condvar_wait(int argc, VALUE *argv, VALUE self)
1503 {
1504  rb_execution_context_t *ec = GET_EC();
1505 
1506  struct rb_condvar *cv = condvar_ptr(self);
1507  struct sleep_call args;
1508 
1509  rb_scan_args(argc, argv, "11", &args.mutex, &args.timeout);
1510 
1511  struct sync_waiter sync_waiter = {
1512  .self = args.mutex,
1513  .th = ec->thread_ptr,
1514  .fiber = ec->fiber_ptr
1515  };
1516 
1517  list_add_tail(&cv->waitq, &sync_waiter.node);
1518  return rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&sync_waiter);
1519 }
1520 
1521 /*
1522  * Document-method: Thread::ConditionVariable#signal
1523  *
1524  * Wakes up the first thread in line waiting for this lock.
1525  */
1526 
1527 static VALUE
1528 rb_condvar_signal(VALUE self)
1529 {
1530  struct rb_condvar *cv = condvar_ptr(self);
1531  wakeup_one(&cv->waitq);
1532  return self;
1533 }
1534 
1535 /*
1536  * Document-method: Thread::ConditionVariable#broadcast
1537  *
1538  * Wakes up all threads waiting for this lock.
1539  */
1540 
1541 static VALUE
1542 rb_condvar_broadcast(VALUE self)
1543 {
1544  struct rb_condvar *cv = condvar_ptr(self);
1545  wakeup_all(&cv->waitq);
1546  return self;
1547 }
1548 
1549 NORETURN(static VALUE undumpable(VALUE obj));
1550 /* :nodoc: */
1551 static VALUE
1552 undumpable(VALUE obj)
1553 {
1554  rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj));
1556 }
1557 
1558 static VALUE
1559 define_thread_class(VALUE outer, const ID name, VALUE super)
1560 {
1561  VALUE klass = rb_define_class_id_under(outer, name, super);
1562  rb_const_set(rb_cObject, name, klass);
1563  return klass;
1564 }
1565 
1566 static void
1567 Init_thread_sync(void)
1568 {
1569 #undef rb_intern
1570 #if defined(TEACH_RDOC) && TEACH_RDOC == 42
1571  rb_cMutex = rb_define_class_under(rb_cThread, "Mutex", rb_cObject);
1572  rb_cConditionVariable = rb_define_class_under(rb_cThread, "ConditionVariable", rb_cObject);
1573  rb_cQueue = rb_define_class_under(rb_cThread, "Queue", rb_cObject);
1574  rb_cSizedQueue = rb_define_class_under(rb_cThread, "SizedQueue", rb_cObject);
1575 #endif
1576 
1577 #define DEFINE_CLASS(name, super) \
1578  rb_c##name = define_thread_class(rb_cThread, rb_intern(#name), rb_c##super)
1579 
1580  /* Mutex */
1581  DEFINE_CLASS(Mutex, Object);
1582  rb_define_alloc_func(rb_cMutex, mutex_alloc);
1583  rb_define_method(rb_cMutex, "initialize", mutex_initialize, 0);
1584  rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0);
1585  rb_define_method(rb_cMutex, "try_lock", rb_mutex_trylock, 0);
1586  rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0);
1587  rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0);
1588  rb_define_method(rb_cMutex, "sleep", mutex_sleep, -1);
1589  rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize_m, 0);
1590  rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0);
1591 
1592  /* Queue */
1593  DEFINE_CLASS(Queue, Object);
1594  rb_define_alloc_func(rb_cQueue, queue_alloc);
1595 
1596  rb_eClosedQueueError = rb_define_class("ClosedQueueError", rb_eStopIteration);
1597 
1598  rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, -1);
1599  rb_undef_method(rb_cQueue, "initialize_copy");
1600  rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0);
1601  rb_define_method(rb_cQueue, "close", rb_queue_close, 0);
1602  rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
1603  rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
1604  rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
1605  rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
1606  rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
1607  rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
1608  rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
1609 
1610  rb_define_alias(rb_cQueue, "enq", "push");
1611  rb_define_alias(rb_cQueue, "<<", "push");
1612  rb_define_alias(rb_cQueue, "deq", "pop");
1613  rb_define_alias(rb_cQueue, "shift", "pop");
1614  rb_define_alias(rb_cQueue, "size", "length");
1615 
1616  DEFINE_CLASS(SizedQueue, Queue);
1617  rb_define_alloc_func(rb_cSizedQueue, szqueue_alloc);
1618 
1619  rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
1620  rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0);
1621  rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
1622  rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
1623  rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1);
1624  rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
1625  rb_define_method(rb_cSizedQueue, "empty?", rb_szqueue_empty_p, 0);
1626  rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
1627  rb_define_method(rb_cSizedQueue, "length", rb_szqueue_length, 0);
1628  rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
1629 
1630  rb_define_alias(rb_cSizedQueue, "enq", "push");
1631  rb_define_alias(rb_cSizedQueue, "<<", "push");
1632  rb_define_alias(rb_cSizedQueue, "deq", "pop");
1633  rb_define_alias(rb_cSizedQueue, "shift", "pop");
1634  rb_define_alias(rb_cSizedQueue, "size", "length");
1635 
1636  /* CVar */
1637  DEFINE_CLASS(ConditionVariable, Object);
1638  rb_define_alloc_func(rb_cConditionVariable, condvar_alloc);
1639 
1640  id_sleep = rb_intern("sleep");
1641 
1642  rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0);
1643  rb_undef_method(rb_cConditionVariable, "initialize_copy");
1644  rb_define_method(rb_cConditionVariable, "marshal_dump", undumpable, 0);
1645  rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, -1);
1646  rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0);
1647  rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0);
1648 
1649  rb_provide("thread.rb");
1650 }
VALUE rb_define_class(const char *name, VALUE super)
Defines a top-level class.
Definition: class.c:837
VALUE rb_define_class_under(VALUE outer, const char *name, VALUE super)
Defines a class under the namespace of outer.
Definition: class.c:869
VALUE rb_define_class_id_under(VALUE outer, ID id, VALUE super)
Identical to rb_define_class_under(), except it takes the name in ID instead of C's string.
Definition: class.c:875
void rb_define_alias(VALUE klass, const char *name1, const char *name2)
Defines an alias of a method.
Definition: class.c:2116
void rb_undef_method(VALUE klass, const char *name)
Defines an undef of a method.
Definition: class.c:1938
int rb_scan_args(int argc, const VALUE *argv, const char *fmt,...)
Retrieves argument from argc and argv to given VALUE references according to the format string.
Definition: class.c:2406
void rb_define_method(VALUE klass, const char *name, VALUE(*func)(ANYARGS), int argc)
Defines a method.
Definition: class.c:1914
int rb_block_given_p(void)
Determines if the current method is given a block.
Definition: eval.c:854
#define FL_UNSET_RAW
Old name of RB_FL_UNSET_RAW.
Definition: fl_type.h:142
#define Qundef
Old name of RUBY_Qundef.
#define UNREACHABLE_RETURN
Old name of RBIMPL_UNREACHABLE_RETURN.
Definition: assume.h:31
#define FL_TEST_RAW
Old name of RB_FL_TEST_RAW.
Definition: fl_type.h:140
#define FL_SET
Old name of RB_FL_SET.
Definition: fl_type.h:137
#define LONG2NUM
Old name of RB_LONG2NUM.
Definition: long.h:50
#define Qtrue
Old name of RUBY_Qtrue.
#define INT2NUM
Old name of RB_INT2NUM.
Definition: int.h:43
#define Qnil
Old name of RUBY_Qnil.
#define Qfalse
Old name of RUBY_Qfalse.
#define T_ARRAY
Old name of RUBY_T_ARRAY.
Definition: value_type.h:56
#define NIL_P
Old name of RB_NIL_P.
#define Check_TypedStruct(v, t)
Old name of rb_check_typeddata.
Definition: rtypeddata.h:105
#define NUM2LONG
Old name of RB_NUM2LONG.
Definition: long.h:51
#define FL_SET_RAW
Old name of RB_FL_SET_RAW.
Definition: fl_type.h:138
void rb_raise(VALUE exc, const char *fmt,...)
Exception entry point.
Definition: error.c:3025
int rb_typeddata_is_kind_of(VALUE obj, const rb_data_type_t *data_type)
Checks if the given object is of given kind.
Definition: error.c:1049
void rb_bug(const char *fmt,...)
Interpreter panic switch.
Definition: error.c:802
VALUE rb_eTypeError
TypeError exception.
Definition: error.c:1099
VALUE rb_eStopIteration
StopIteration exception.
Definition: enumerator.c:141
VALUE rb_eArgError
ArgumentError exception.
Definition: error.c:1100
VALUE rb_ensure(VALUE(*b_proc)(VALUE), VALUE data1, VALUE(*e_proc)(VALUE), VALUE data2)
An equivalent to ensure clause.
Definition: eval.c:983
VALUE rb_eThreadError
ThreadError exception.
Definition: eval.c:872
VALUE rb_obj_class(VALUE obj)
Queries the class of an object.
Definition: object.c:188
VALUE rb_cThread
Thread class.
Definition: vm.c:397
#define RB_OBJ_WRITE(old, slot, young)
Declaration of a "back" pointer.
Definition: rgengc.h:220
VALUE rb_funcallv(VALUE recv, ID mid, int argc, const VALUE *argv)
Identical to rb_funcall(), except it takes the method arguments as a C array.
Definition: vm_eval.c:1061
VALUE rb_ary_concat(VALUE lhs, VALUE rhs)
Destructively appends the contents of latter into the end of former.
Definition: array.c:4790
VALUE rb_ary_shift(VALUE ary)
Destructively deletes an element from the beginning of the passed array and returns what was deleted.
Definition: array.c:1420
VALUE rb_ary_tmp_new(long capa)
Allocates a "temporary" array.
Definition: array.c:847
VALUE rb_ary_clear(VALUE ary)
Destructively removes everything form an array.
Definition: array.c:4465
VALUE rb_ary_push(VALUE ary, VALUE elem)
Special case of rb_ary_cat() that it adds only one element.
Definition: array.c:1308
static int rb_check_arity(int argc, int min, int max)
Ensures that the passed integer is in the passed range.
Definition: error.h:294
void rb_gc_mark(VALUE obj)
Marks an object.
Definition: gc.c:6775
void rb_provide(const char *feature)
Declares that the given feature is already provided by someone else.
Definition: load.c:638
VALUE rb_mutex_new(void)
Creates a mutex.
Definition: thread_sync.c:172
VALUE rb_mutex_trylock(VALUE mutex)
Attempts to lock the mutex, without waiting for other threads to unlock it.
Definition: thread_sync.c:233
VALUE rb_mutex_locked_p(VALUE mutex)
Queries if there are any threads that holds the lock.
Definition: thread_sync.c:184
VALUE rb_mutex_synchronize(VALUE mutex, VALUE(*func)(VALUE arg), VALUE arg)
Obtains the lock, runs the passed function, and releases the lock when it completes.
Definition: thread_sync.c:601
VALUE rb_mutex_sleep(VALUE self, VALUE timeout)
Releases the lock held in the mutex and waits for the period of time; reacquires the lock on wakeup.
Definition: thread_sync.c:534
VALUE rb_mutex_unlock(VALUE mutex)
Releases the mutex.
Definition: thread_sync.c:472
VALUE rb_mutex_lock(VALUE mutex)
Attempts to lock the mutex.
Definition: thread_sync.c:399
struct timeval rb_time_interval(VALUE num)
Creates a "time interval".
Definition: time.c:2656
void rb_const_set(VALUE space, ID name, VALUE val)
Names a constant.
Definition: variable.c:3106
void rb_define_alloc_func(VALUE klass, rb_alloc_func_t func)
Sets the allocator function of a class.
ID rb_intern(const char *name)
Finds or creates a symbol of the given name.
Definition: symbol.c:782
VALUE rb_yield(VALUE val)
Yields the block.
Definition: vm_eval.c:1357
#define RARRAY_LEN
Just another name of rb_array_len.
Definition: rarray.h:68
#define RUBY_TYPED_DEFAULT_FREE
This is a value you can set to rb_data_type_struct::dfree.
Definition: rtypeddata.h:79
#define TypedData_Get_Struct(obj, type, data_type, sval)
Obtains a C struct from inside of a wrapper Ruby object.
Definition: rtypeddata.h:507
#define TypedData_Make_Struct(klass, type, data_type, sval)
Identical to TypedData_Wrap_Struct, except it allocates a new data region internally instead of takin...
Definition: rtypeddata.h:489
VALUE rb_fiber_scheduler_current(void)
Identical to rb_fiber_scheduler_get(), except it also returns RUBY_Qnil in case of a blocking fiber.
Definition: scheduler.c:126
VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
Nonblocking wait for the passed "blocker", which is for instance Thread.join or Mutex....
Definition: scheduler.c:203
VALUE rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE duration)
Nonblocking sleep.
Definition: scheduler.c:163
VALUE rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
Wakes up a fiber previously blocked using rb_fiber_scheduler_block().
Definition: scheduler.c:209
#define RTEST
This is an old name of RB_TEST.
This is the struct that holds necessary info for a struct.
Definition: rtypeddata.h:190
uintptr_t ID
Type that represents a Ruby identifier such as a variable name.
Definition: value.h:52
uintptr_t VALUE
Type that represents a Ruby object.
Definition: value.h:40
static bool RB_TYPE_P(VALUE obj, enum ruby_value_type t)
Queries if the given object is of given type.
Definition: value_type.h:375
void ruby_xfree(void *ptr)
Deallocates a storage instance.
Definition: gc.c:11775