Ruby  3.1.4p223 (2023-03-30 revision HEAD)
scheduler.c
1 /**********************************************************************
2 
3  scheduler.c
4 
5  $Author$
6 
7  Copyright (C) 2020 Samuel Grant Dawson Williams
8 
9 **********************************************************************/
10 
11 #include "vm_core.h"
12 #include "ruby/fiber/scheduler.h"
13 #include "ruby/io.h"
14 #include "ruby/io/buffer.h"
15 
16 #include "internal/thread.h"
17 
18 static ID id_close;
19 static ID id_scheduler_close;
20 
21 static ID id_block;
22 static ID id_unblock;
23 
24 static ID id_timeout_after;
25 static ID id_kernel_sleep;
26 static ID id_process_wait;
27 
28 static ID id_io_read, id_io_pread;
29 static ID id_io_write, id_io_pwrite;
30 static ID id_io_wait;
31 static ID id_io_close;
32 
33 static ID id_address_resolve;
34 
35 void
36 Init_Fiber_Scheduler(void)
37 {
38  id_close = rb_intern_const("close");
39  id_scheduler_close = rb_intern_const("scheduler_close");
40 
41  id_block = rb_intern_const("block");
42  id_unblock = rb_intern_const("unblock");
43 
44  id_timeout_after = rb_intern_const("timeout_after");
45  id_kernel_sleep = rb_intern_const("kernel_sleep");
46  id_process_wait = rb_intern_const("process_wait");
47 
48  id_io_read = rb_intern_const("io_read");
49  id_io_pread = rb_intern_const("io_pread");
50  id_io_write = rb_intern_const("io_write");
51  id_io_pwrite = rb_intern_const("io_pwrite");
52 
53  id_io_wait = rb_intern_const("io_wait");
54  id_io_close = rb_intern_const("io_close");
55 
56  id_address_resolve = rb_intern_const("address_resolve");
57 }
58 
59 VALUE
61 {
62  VM_ASSERT(ruby_thread_has_gvl_p());
63 
64  rb_thread_t *thread = GET_THREAD();
65  VM_ASSERT(thread);
66 
67  return thread->scheduler;
68 }
69 
70 static void
71 verify_interface(VALUE scheduler)
72 {
73  if (!rb_respond_to(scheduler, id_block)) {
74  rb_raise(rb_eArgError, "Scheduler must implement #block");
75  }
76 
77  if (!rb_respond_to(scheduler, id_unblock)) {
78  rb_raise(rb_eArgError, "Scheduler must implement #unblock");
79  }
80 
81  if (!rb_respond_to(scheduler, id_kernel_sleep)) {
82  rb_raise(rb_eArgError, "Scheduler must implement #kernel_sleep");
83  }
84 
85  if (!rb_respond_to(scheduler, id_io_wait)) {
86  rb_raise(rb_eArgError, "Scheduler must implement #io_wait");
87  }
88 }
89 
90 VALUE
92 {
93  VM_ASSERT(ruby_thread_has_gvl_p());
94 
95  rb_thread_t *thread = GET_THREAD();
96  VM_ASSERT(thread);
97 
98  if (scheduler != Qnil) {
99  verify_interface(scheduler);
100  }
101 
102  // We invoke Scheduler#close when setting it to something else, to ensure the previous scheduler runs to completion before changing the scheduler. That way, we do not need to consider interactions, e.g., of a Fiber from the previous scheduler with the new scheduler.
103  if (thread->scheduler != Qnil) {
104  rb_fiber_scheduler_close(thread->scheduler);
105  }
106 
107  thread->scheduler = scheduler;
108 
109  return thread->scheduler;
110 }
111 
112 static VALUE
113 rb_fiber_scheduler_current_for_threadptr(rb_thread_t *thread)
114 {
115  VM_ASSERT(thread);
116 
117  if (thread->blocking == 0) {
118  return thread->scheduler;
119  }
120  else {
121  return Qnil;
122  }
123 }
124 
125 VALUE
127 {
128  return rb_fiber_scheduler_current_for_threadptr(GET_THREAD());
129 }
130 
132 {
133  return rb_fiber_scheduler_current_for_threadptr(rb_thread_ptr(thread));
134 }
135 
136 VALUE
138 {
139  VM_ASSERT(ruby_thread_has_gvl_p());
140 
141  VALUE result;
142 
143  result = rb_check_funcall(scheduler, id_scheduler_close, 0, NULL);
144  if (result != Qundef) return result;
145 
146  result = rb_check_funcall(scheduler, id_close, 0, NULL);
147  if (result != Qundef) return result;
148 
149  return Qnil;
150 }
151 
152 VALUE
154 {
155  if (timeout) {
156  return rb_float_new((double)timeout->tv_sec + (0.000001f * timeout->tv_usec));
157  }
158 
159  return Qnil;
160 }
161 
162 VALUE
164 {
165  return rb_funcall(scheduler, id_kernel_sleep, 1, timeout);
166 }
167 
168 VALUE
169 rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv)
170 {
171  return rb_funcallv(scheduler, id_kernel_sleep, argc, argv);
172 }
173 
174 #if 0
175 VALUE
176 rb_fiber_scheduler_timeout_after(VALUE scheduler, VALUE timeout, VALUE exception, VALUE message)
177 {
178  VALUE arguments[] = {
179  timeout, exception, message
180  };
181 
182  return rb_check_funcall(scheduler, id_timeout_after, 3, arguments);
183 }
184 
185 VALUE
186 rb_fiber_scheduler_timeout_afterv(VALUE scheduler, int argc, VALUE * argv)
187 {
188  return rb_check_funcall(scheduler, id_timeout_after, argc, argv);
189 }
190 #endif
191 
192 VALUE
193 rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
194 {
195  VALUE arguments[] = {
196  PIDT2NUM(pid), RB_INT2NUM(flags)
197  };
198 
199  return rb_check_funcall(scheduler, id_process_wait, 2, arguments);
200 }
201 
202 VALUE
203 rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
204 {
205  return rb_funcall(scheduler, id_block, 2, blocker, timeout);
206 }
207 
208 VALUE
209 rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
210 {
211  VM_ASSERT(rb_obj_is_fiber(fiber));
212 
213  return rb_funcall(scheduler, id_unblock, 2, blocker, fiber);
214 }
215 
216 VALUE
217 rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
218 {
219  return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout);
220 }
221 
222 VALUE
224 {
226 }
227 
228 VALUE
230 {
232 }
233 
234 VALUE
235 rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length)
236 {
237  VALUE arguments[] = {
238  io, buffer, SIZET2NUM(length)
239  };
240 
241  return rb_check_funcall(scheduler, id_io_read, 3, arguments);
242 }
243 
244 VALUE
245 rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, VALUE buffer, size_t length, off_t offset)
246 {
247  VALUE arguments[] = {
248  io, buffer, SIZET2NUM(length), OFFT2NUM(offset)
249  };
250 
251  return rb_check_funcall(scheduler, id_io_pread, 4, arguments);
252 }
253 
254 VALUE
255 rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length)
256 {
257  VALUE arguments[] = {
258  io, buffer, SIZET2NUM(length)
259  };
260 
261  return rb_check_funcall(scheduler, id_io_write, 3, arguments);
262 }
263 
264 VALUE
265 rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, VALUE buffer, size_t length, off_t offset)
266 {
267  VALUE arguments[] = {
268  io, buffer, SIZET2NUM(length), OFFT2NUM(offset)
269  };
270 
271  return rb_check_funcall(scheduler, id_io_pwrite, 4, arguments);
272 }
273 
274 VALUE
275 rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length)
276 {
277  VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
278 
279  VALUE result = rb_fiber_scheduler_io_read(scheduler, io, buffer, length);
280 
281  rb_io_buffer_unlock(buffer);
282  rb_io_buffer_free(buffer);
283 
284  return result;
285 }
286 
287 VALUE
288 rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, size_t size, size_t length)
289 {
290  VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY);
291 
292  VALUE result = rb_fiber_scheduler_io_write(scheduler, io, buffer, length);
293 
294  rb_io_buffer_unlock(buffer);
295  rb_io_buffer_free(buffer);
296 
297  return result;
298 }
299 
300 VALUE
302 {
303  VALUE arguments[] = {io};
304 
305  return rb_check_funcall(scheduler, id_io_close, 1, arguments);
306 }
307 
308 VALUE
310 {
311  VALUE arguments[] = {
312  hostname
313  };
314 
315  return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);
316 }
VALUE rb_float_new(double d)
Converts a C's double into an instance of rb_cFloat.
Definition: numeric.c:6431
#define Qundef
Old name of RUBY_Qundef.
#define SIZET2NUM
Old name of RB_SIZE2NUM.
Definition: size_t.h:62
#define Qnil
Old name of RUBY_Qnil.
void rb_raise(VALUE exc, const char *fmt,...)
Exception entry point.
Definition: error.c:3025
VALUE rb_eArgError
ArgumentError exception.
Definition: error.c:1100
VALUE rb_funcall(VALUE recv, ID mid, int n,...)
Calls a method.
Definition: vm_eval.c:1102
VALUE rb_funcallv(VALUE recv, ID mid, int argc, const VALUE *argv)
Identical to rb_funcall(), except it takes the method arguments as a C array.
Definition: vm_eval.c:1061
VALUE rb_obj_is_fiber(VALUE obj)
Queries if an object is a fiber.
Definition: cont.c:1106
int rb_respond_to(VALUE obj, ID mid)
Queries if the object responds to the method.
Definition: vm_method.c:2765
VALUE rb_check_funcall(VALUE recv, ID mid, int argc, const VALUE *argv)
Identical to rb_funcallv(), except it returns RUBY_Qundef instead of raising rb_eNoMethodError.
Definition: vm_eval.c:664
static ID rb_intern_const(const char *str)
This is a "tiny optimisation" over rb_intern().
Definition: symbol.h:276
@ RUBY_IO_READABLE
IO::READABLE
Definition: io.h:67
@ RUBY_IO_WRITABLE
IO::WRITABLE
Definition: io.h:68
#define RB_UINT2NUM
Just another name of rb_uint2num_inline.
Definition: int.h:39
#define RB_INT2NUM
Just another name of rb_int2num_inline.
Definition: int.h:37
#define OFFT2NUM
Converts a C's off_t into an instance of rb_cInteger.
Definition: off_t.h:33
#define PIDT2NUM
Converts a C's pid_t into an instance of rb_cInteger.
Definition: pid_t.h:28
Scheduler APIs.
VALUE rb_fiber_scheduler_current(void)
Identical to rb_fiber_scheduler_get(), except it also returns RUBY_Qnil in case of a blocking fiber.
Definition: scheduler.c:126
VALUE rb_fiber_scheduler_make_timeout(struct timeval *timeout)
Converts the passed timeout to an expression that rb_fiber_scheduler_block() etc.
Definition: scheduler.c:153
VALUE rb_fiber_scheduler_io_wait_readable(VALUE scheduler, VALUE io)
Nonblocking wait until the passed IO is ready for reading.
Definition: scheduler.c:223
VALUE rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE *argv)
Identical to rb_fiber_scheduler_kernel_sleep(), except it can pass multiple arguments.
Definition: scheduler.c:169
VALUE rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
Nonblocking version of rb_io_wait().
Definition: scheduler.c:217
VALUE rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
Nonblocking waitpid.
Definition: scheduler.c:193
VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
Nonblocking wait for the passed "blocker", which is for instance Thread.join or Mutex....
Definition: scheduler.c:203
VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length)
Nonblocking read from the passed IO.
Definition: scheduler.c:235
VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, VALUE buffer, size_t length, off_t offset)
Nonblocking read from the passed IO at the specified offset.
Definition: scheduler.c:245
VALUE rb_fiber_scheduler_close(VALUE scheduler)
Closes the passed scheduler object.
Definition: scheduler.c:137
VALUE rb_fiber_scheduler_current_for_thread(VALUE thread)
Identical to rb_fiber_scheduler_current(), except it queries for that of the passed thread instead of...
Definition: scheduler.c:131
VALUE rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE duration)
Nonblocking sleep.
Definition: scheduler.c:163
VALUE rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
Nonblocking DNS lookup.
Definition: scheduler.c:309
VALUE rb_fiber_scheduler_set(VALUE scheduler)
Destructively assigns the passed scheduler to that of the current thread that is calling this functio...
Definition: scheduler.c:91
VALUE rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io)
Nonblocking wait until the passed IO is ready for writing.
Definition: scheduler.c:229
VALUE rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *buffer, size_t size, size_t length)
Nonblocking write to the passed IO using a native buffer.
Definition: scheduler.c:288
VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length)
Nonblocking write to the passed IO.
Definition: scheduler.c:255
VALUE rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *buffer, size_t size, size_t length)
Nonblocking read from the passed IO using a native buffer.
Definition: scheduler.c:275
VALUE rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io)
Nonblocking close the given IO.
Definition: scheduler.c:301
VALUE rb_fiber_scheduler_get(void)
Queries the current scheduler of the current thread that is calling this function.
Definition: scheduler.c:60
VALUE rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
Wakes up a fiber previously blocked using rb_fiber_scheduler_block().
Definition: scheduler.c:209
VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, VALUE buffer, size_t length, off_t offset)
Nonblocking write to the passed IO at the specified offset.
Definition: scheduler.c:265
uintptr_t ID
Type that represents a Ruby identifier such as a variable name.
Definition: value.h:52
uintptr_t VALUE
Type that represents a Ruby object.
Definition: value.h:40