Blender  V3.3
COM_WorkScheduler.cc
Go to the documentation of this file.
1 /* SPDX-License-Identifier: GPL-2.0-or-later
2  * Copyright 2011 Blender Foundation. */
3 
4 #include "COM_WorkScheduler.h"
5 
6 #include "COM_CPUDevice.h"
8 #include "COM_ExecutionGroup.h"
9 #include "COM_OpenCLDevice.h"
10 #include "COM_OpenCLKernels.cl.h"
12 
13 #include "clew.h"
14 
15 #include "MEM_guardedalloc.h"
16 
17 #include "BLI_task.h"
18 #include "BLI_threads.h"
19 #include "BLI_vector.hh"
20 
21 #include "BKE_global.h"
22 
23 namespace blender::compositor {
24 
25 enum class ThreadingModel {
29  Queue,
31  Task
32 };
33 
40 {
41  return ThreadingModel::Queue;
42 }
43 
47 constexpr bool COM_is_opencl_enabled()
48 {
50 }
51 
52 static ThreadLocal(CPUDevice *) g_thread_device;
53 static struct {
54  struct {
59 
62  bool initialized = false;
65  } queue;
66 
67  struct {
69  } task;
70 
71  struct {
73  cl_context context;
74  cl_program program;
81  bool active = false;
82  bool initialized = false;
83  } opencl;
84 
87 
88 /* -------------------------------------------------------------------- */
92 static void CL_CALLBACK cl_context_error(const char *errinfo,
93  const void * /*private_info*/,
94  size_t /*cb*/,
95  void * /*user_data*/)
96 {
97  printf("OPENCL error: %s\n", errinfo);
98 }
99 
100 static void *thread_execute_gpu(void *data)
101 {
102  Device *device = (Device *)data;
103  WorkPackage *work;
104 
105  while ((work = (WorkPackage *)BLI_thread_queue_pop(g_work_scheduler.opencl.queue))) {
106  device->execute(work);
107  }
108 
109  return nullptr;
110 }
111 
113 {
114  if (context.get_has_active_opencl_devices()) {
115  g_work_scheduler.opencl.queue = BLI_thread_queue_init();
116  BLI_threadpool_init(&g_work_scheduler.opencl.threads,
118  g_work_scheduler.opencl.devices.size());
119  for (Device &device : g_work_scheduler.opencl.devices) {
120  BLI_threadpool_insert(&g_work_scheduler.opencl.threads, &device);
121  }
122  g_work_scheduler.opencl.active = true;
123  }
124  else {
125  g_work_scheduler.opencl.active = false;
126  }
127 }
128 
129 static bool opencl_schedule(WorkPackage *package)
130 {
131  if (package->type == eWorkPackageType::Tile && package->execution_group->get_flags().open_cl &&
132  g_work_scheduler.opencl.active) {
133  BLI_thread_queue_push(g_work_scheduler.opencl.queue, package);
134  return true;
135  }
136  return false;
137 }
138 
139 static void opencl_finish()
140 {
141  if (g_work_scheduler.opencl.active) {
143  }
144 }
145 
146 static void opencl_stop()
147 {
148  if (g_work_scheduler.opencl.active) {
150  BLI_threadpool_end(&g_work_scheduler.opencl.threads);
152  g_work_scheduler.opencl.queue = nullptr;
153  }
154 }
155 
157 {
158  return !g_work_scheduler.opencl.devices.is_empty();
159 }
160 
161 static void opencl_initialize(const bool use_opencl)
162 {
163  /* deinitialize OpenCL GPU's */
164  if (use_opencl && !g_work_scheduler.opencl.initialized) {
165  g_work_scheduler.opencl.context = nullptr;
166  g_work_scheduler.opencl.program = nullptr;
167 
168  /* This will check for errors and skip if already initialized. */
169  if (clewInit() != CLEW_SUCCESS) {
170  return;
171  }
172 
173  if (clCreateContextFromType) {
174  cl_uint number_of_platforms = 0;
175  cl_int error;
176  error = clGetPlatformIDs(0, nullptr, &number_of_platforms);
177  if (error == -1001) {
178  } /* GPU not supported */
179  else if (error != CL_SUCCESS) {
180  printf("CLERROR[%d]: %s\n", error, clewErrorString(error));
181  }
182  if (G.f & G_DEBUG) {
183  printf("%u number of platforms\n", number_of_platforms);
184  }
185  cl_platform_id *platforms = (cl_platform_id *)MEM_mallocN(
186  sizeof(cl_platform_id) * number_of_platforms, __func__);
187  error = clGetPlatformIDs(number_of_platforms, platforms, nullptr);
188  unsigned int index_platform;
189  for (index_platform = 0; index_platform < number_of_platforms; index_platform++) {
190  cl_platform_id platform = platforms[index_platform];
191  cl_uint number_of_devices = 0;
192  clGetDeviceIDs(platform, CL_DEVICE_TYPE_GPU, 0, nullptr, &number_of_devices);
193  if (number_of_devices <= 0) {
194  continue;
195  }
196 
197  cl_device_id *cldevices = (cl_device_id *)MEM_mallocN(
198  sizeof(cl_device_id) * number_of_devices, __func__);
199  clGetDeviceIDs(platform, CL_DEVICE_TYPE_GPU, number_of_devices, cldevices, nullptr);
200 
201  g_work_scheduler.opencl.context = clCreateContext(
202  nullptr, number_of_devices, cldevices, cl_context_error, nullptr, &error);
203  if (error != CL_SUCCESS) {
204  printf("CLERROR[%d]: %s\n", error, clewErrorString(error));
205  }
206  const char *cl_str[2] = {datatoc_COM_OpenCLKernels_cl, nullptr};
207  g_work_scheduler.opencl.program = clCreateProgramWithSource(
208  g_work_scheduler.opencl.context, 1, cl_str, nullptr, &error);
209  error = clBuildProgram(g_work_scheduler.opencl.program,
210  number_of_devices,
211  cldevices,
212  nullptr,
213  nullptr,
214  nullptr);
215  if (error != CL_SUCCESS) {
216  cl_int error2;
217  size_t ret_val_size = 0;
218  printf("CLERROR[%d]: %s\n", error, clewErrorString(error));
219  error2 = clGetProgramBuildInfo(g_work_scheduler.opencl.program,
220  cldevices[0],
221  CL_PROGRAM_BUILD_LOG,
222  0,
223  nullptr,
224  &ret_val_size);
225  if (error2 != CL_SUCCESS) {
226  printf("CLERROR[%d]: %s\n", error, clewErrorString(error));
227  }
228  char *build_log = (char *)MEM_mallocN(sizeof(char) * ret_val_size + 1, __func__);
229  error2 = clGetProgramBuildInfo(g_work_scheduler.opencl.program,
230  cldevices[0],
231  CL_PROGRAM_BUILD_LOG,
232  ret_val_size,
233  build_log,
234  nullptr);
235  if (error2 != CL_SUCCESS) {
236  printf("CLERROR[%d]: %s\n", error, clewErrorString(error));
237  }
238  build_log[ret_val_size] = '\0';
239  printf("%s", build_log);
240  MEM_freeN(build_log);
241  }
242  else {
243  unsigned int index_devices;
244  for (index_devices = 0; index_devices < number_of_devices; index_devices++) {
245  cl_device_id device = cldevices[index_devices];
246  cl_int vendorID = 0;
247  cl_int error2 = clGetDeviceInfo(
248  device, CL_DEVICE_VENDOR_ID, sizeof(cl_int), &vendorID, nullptr);
249  if (error2 != CL_SUCCESS) {
250  printf("CLERROR[%d]: %s\n", error2, clewErrorString(error2));
251  }
252  g_work_scheduler.opencl.devices.append_as(g_work_scheduler.opencl.context,
253  device,
254  g_work_scheduler.opencl.program,
255  vendorID);
256  }
257  }
258  MEM_freeN(cldevices);
259  }
260  MEM_freeN(platforms);
261  }
262 
263  g_work_scheduler.opencl.initialized = true;
264  }
265 }
266 
267 static void opencl_deinitialize()
268 {
269  g_work_scheduler.opencl.devices.clear_and_make_inline();
270 
271  if (g_work_scheduler.opencl.program) {
272  clReleaseProgram(g_work_scheduler.opencl.program);
273  g_work_scheduler.opencl.program = nullptr;
274  }
275 
276  if (g_work_scheduler.opencl.context) {
277  clReleaseContext(g_work_scheduler.opencl.context);
278  g_work_scheduler.opencl.context = nullptr;
279  }
280 
281  g_work_scheduler.opencl.initialized = false;
282 }
283 
286 /* -------------------------------------------------------------------- */
291 {
292  CPUDevice device(0);
293  device.execute(package);
294 }
295 
298 /* -------------------------------------------------------------------- */
303 {
304  CPUDevice *device = (CPUDevice *)data;
305  WorkPackage *work;
306  BLI_thread_local_set(g_thread_device, device);
307  while ((work = (WorkPackage *)BLI_thread_queue_pop(g_work_scheduler.queue.queue))) {
308  device->execute(work);
309  }
310 
311  return nullptr;
312 }
313 
315 {
316  BLI_thread_queue_push(g_work_scheduler.queue.queue, package);
317 }
318 
320 {
321  g_work_scheduler.queue.queue = BLI_thread_queue_init();
322  BLI_threadpool_init(&g_work_scheduler.queue.threads,
324  g_work_scheduler.queue.devices.size());
325  for (Device &device : g_work_scheduler.queue.devices) {
326  BLI_threadpool_insert(&g_work_scheduler.queue.threads, &device);
327  }
328 }
329 
331 {
333 }
334 
336 {
338  BLI_threadpool_end(&g_work_scheduler.queue.threads);
340  g_work_scheduler.queue.queue = nullptr;
341 }
342 
344 {
345  /* Reinitialize if number of threads doesn't match. */
346  if (g_work_scheduler.queue.devices.size() != num_cpu_threads) {
347  g_work_scheduler.queue.devices.clear();
348  if (g_work_scheduler.queue.initialized) {
349  BLI_thread_local_delete(g_thread_device);
350  g_work_scheduler.queue.initialized = false;
351  }
352  }
353 
354  /* Initialize CPU threads. */
355  if (!g_work_scheduler.queue.initialized) {
356  for (int index = 0; index < num_cpu_threads; index++) {
357  g_work_scheduler.queue.devices.append_as(index);
358  }
359  BLI_thread_local_create(g_thread_device);
360  g_work_scheduler.queue.initialized = true;
361  }
362 }
364 {
365  /* deinitialize CPU threads */
366  if (g_work_scheduler.queue.initialized) {
367  g_work_scheduler.queue.devices.clear_and_make_inline();
368 
369  BLI_thread_local_delete(g_thread_device);
370  g_work_scheduler.queue.initialized = false;
371  }
372 }
373 
376 /* -------------------------------------------------------------------- */
380 static void threading_model_task_execute(TaskPool *__restrict UNUSED(pool), void *task_data)
381 {
382  WorkPackage *package = static_cast<WorkPackage *>(task_data);
383  CPUDevice device(BLI_task_parallel_thread_id(nullptr));
384  BLI_thread_local_set(g_thread_device, &device);
385  device.execute(package);
386 }
387 
389 {
391  g_work_scheduler.task.pool, threading_model_task_execute, package, false, nullptr);
392 }
393 
395 {
396  BLI_thread_local_create(g_thread_device);
398 }
399 
401 {
403 }
404 
406 {
408  g_work_scheduler.task.pool = nullptr;
409  BLI_thread_local_delete(g_thread_device);
410 }
411 
414 /* -------------------------------------------------------------------- */
419 {
420  if (COM_is_opencl_enabled()) {
421  if (opencl_schedule(package)) {
422  return;
423  }
424  }
425 
426  switch (COM_threading_model()) {
429  break;
430  }
431 
432  case ThreadingModel::Queue: {
434  break;
435  }
436 
437  case ThreadingModel::Task: {
439  break;
440  }
441  }
442 }
443 
445 {
446  if (COM_is_opencl_enabled()) {
448  }
449 
450  switch (COM_threading_model()) {
452  /* Nothing to do. */
453  break;
454 
457  break;
458 
461  break;
462  }
463 }
464 
466 {
467  if (COM_is_opencl_enabled()) {
468  opencl_finish();
469  }
470 
471  switch (COM_threading_model()) {
473  /* Nothing to do. */
474  break;
475 
478  break;
479 
482  break;
483  }
484 }
485 
487 {
488  if (COM_is_opencl_enabled()) {
489  opencl_stop();
490  }
491 
492  switch (COM_threading_model()) {
494  /* Nothing to do. */
495  break;
496 
499  break;
500 
503  break;
504  }
505 }
506 
508 {
509  if (COM_is_opencl_enabled()) {
510  return opencl_has_gpu_devices();
511  }
512  return false;
513 }
514 
515 void WorkScheduler::initialize(bool use_opencl, int num_cpu_threads)
516 {
517  if (COM_is_opencl_enabled()) {
518  opencl_initialize(use_opencl);
519  }
520 
521  g_work_scheduler.num_cpu_threads = num_cpu_threads;
522  switch (COM_threading_model()) {
524  g_work_scheduler.num_cpu_threads = 1;
525  /* Nothing to do. */
526  break;
529  break;
530 
532  /* Nothing to do. */
533  break;
534  }
535 }
536 
538 {
539  if (COM_is_opencl_enabled()) {
541  }
542 
543  switch (COM_threading_model()) {
545  /* Nothing to do. */
546  break;
547 
550  break;
551 
553  /* Nothing to do. */
554  break;
555  }
556 }
557 
559 {
560  return g_work_scheduler.num_cpu_threads;
561 }
562 
564 {
566  return 0;
567  }
568 
569  CPUDevice *device = (CPUDevice *)BLI_thread_local_get(g_thread_device);
570  return device->thread_id();
571 }
572 
575 } // namespace blender::compositor
@ G_DEBUG
Definition: BKE_global.h:174
@ TASK_PRIORITY_HIGH
Definition: BLI_task.h:57
void BLI_task_pool_work_and_wait(TaskPool *pool)
Definition: task_pool.cc:480
TaskPool * BLI_task_pool_create(void *userdata, eTaskPriority priority)
Definition: task_pool.cc:390
int BLI_task_parallel_thread_id(const TaskParallelTLS *tls)
void BLI_task_pool_free(TaskPool *pool)
Definition: task_pool.cc:440
void BLI_task_pool_push(TaskPool *pool, TaskRunFunction run, void *taskdata, bool free_taskdata, TaskFreeFunction freedata)
Definition: task_pool.cc:459
void BLI_thread_queue_push(ThreadQueue *queue, void *work)
Definition: threads.cc:641
#define BLI_thread_local_create(name)
Definition: BLI_threads.h:191
#define BLI_thread_local_set(name, value)
Definition: BLI_threads.h:194
#define BLI_thread_local_delete(name)
Definition: BLI_threads.h:192
void BLI_threadpool_init(struct ListBase *threadbase, void *(*do_thread)(void *), int tot)
Definition: threads.cc:134
void BLI_thread_queue_free(ThreadQueue *queue)
Definition: threads.cc:629
#define BLI_thread_local_get(name)
Definition: BLI_threads.h:193
void BLI_threadpool_end(struct ListBase *threadbase)
Definition: threads.cc:262
void BLI_thread_queue_nowait(ThreadQueue *queue)
Definition: threads.cc:767
void BLI_thread_queue_wait_finish(ThreadQueue *queue)
Definition: threads.cc:778
void * BLI_thread_queue_pop(ThreadQueue *queue)
Definition: threads.cc:652
void BLI_threadpool_insert(struct ListBase *threadbase, void *callerdata)
Definition: threads.cc:212
ThreadQueue * BLI_thread_queue_init(void)
Definition: threads.cc:615
#define UNUSED(x)
Read Guarded memory(de)allocation.
class representing a CPU device.
Definition: COM_CPUDevice.h:15
void execute(WorkPackage *work) override
execute a WorkPackage
Overall context of the compositor.
Abstract class for device implementations to be used by the Compositor. devices are queried,...
Definition: COM_Device.h:19
virtual void execute(struct WorkPackage *work)=0
execute a WorkPackage
const ExecutionGroupFlags get_flags() const
@ Tile
Executes an execution group tile.
void(* MEM_freeN)(void *vmemh)
Definition: mallocn.c:27
void *(* MEM_mallocN)(size_t len, const char *str)
Definition: mallocn.c:33
#define G(x, y, z)
static void error(const char *str)
Definition: meshlaplacian.c:51
static void opencl_deinitialize()
static void opencl_initialize(const bool use_opencl)
static bool opencl_has_gpu_devices()
static void opencl_start(const CompositorContext &context)
static void threading_model_queue_deinitialize()
static void * thread_execute_gpu(void *data)
ThreadQueue * queue
all scheduled work for the cpu
constexpr ThreadingModel COM_threading_model()
static void threading_model_task_execute(TaskPool *__restrict UNUSED(pool), void *task_data)
static void threading_model_single_thread_execute(WorkPackage *package)
static void threading_model_queue_stop()
static ThreadLocal(CPUDevice *) g_thread_device
bool active
all scheduled work for the GPU.
Vector< CPUDevice > devices
list of all CPUDevices. for every hardware thread an instance of CPUDevice is created
ListBase threads
list of all thread for every CPUDevice in cpudevices a thread exists.
static void threading_model_queue_initialize(const int num_cpu_threads)
static bool opencl_schedule(WorkPackage *package)
struct blender::compositor::@179::@182 opencl
static void threading_model_queue_schedule(WorkPackage *package)
static void opencl_finish()
static void threading_model_task_stop()
static void threading_model_queue_start()
static void CL_CALLBACK cl_context_error(const char *errinfo, const void *, size_t, void *)
static struct blender::compositor::@179 g_work_scheduler
struct blender::compositor::@179::@181 task
constexpr bool COM_is_opencl_enabled()
static void threading_model_task_start()
static void * threading_model_queue_execute(void *data)
static void threading_model_task_finish()
static void threading_model_queue_finish()
static void threading_model_task_schedule(WorkPackage *package)
contains data about work that can be scheduled
ExecutionGroup * execution_group
execution_group with the operations-setup to be evaluated
static void schedule(WorkPackage *package)
schedule a chunk of a group to be calculated. An execution group schedules a chunk in the WorkSchedul...
static void deinitialize()
deinitialize the WorkScheduler free all allocated resources
static bool has_gpu_devices()
Are there OpenCL capable GPU devices initialized? the result of this method is stored in the Composit...
static void start(const CompositorContext &context)
Start the execution this methods will start the WorkScheduler. Inside this method all threads are ini...
static void finish()
wait for all work to be completed.
static void stop()
stop the execution All created thread by the start method are destroyed.
static void initialize(bool use_opencl, int num_cpu_threads)
initialize the WorkScheduler