Blender  V3.3
task_iterator.c
Go to the documentation of this file.
1 /* SPDX-License-Identifier: GPL-2.0-or-later */
2 
9 #include <stdlib.h>
10 
11 #include "MEM_guardedalloc.h"
12 
13 #include "DNA_listBase.h"
14 
15 #include "BLI_listbase.h"
16 #include "BLI_math.h"
17 #include "BLI_mempool.h"
18 #include "BLI_mempool_private.h"
19 #include "BLI_task.h"
20 #include "BLI_threads.h"
21 
22 #include "atomic_ops.h"
23 
24 /* -------------------------------------------------------------------- */
28 /* Allows to avoid using malloc for userdata_chunk in tasks, when small enough. */
29 #define MALLOCA(_size) ((_size) <= 8192) ? alloca((_size)) : MEM_mallocN((_size), __func__)
30 #define MALLOCA_FREE(_mem, _size) \
31  if (((_mem) != NULL) && ((_size) > 8192)) { \
32  MEM_freeN((_mem)); \
33  } \
34  ((void)0)
35 
38 /* -------------------------------------------------------------------- */
43  const int items_num,
44  int tasks_num,
45  int *r_chunk_size)
46 {
47  int chunk_size = 0;
48 
49  if (!settings->use_threading) {
50  /* Some users of this helper will still need a valid chunk size in case processing is not
51  * threaded. We can use a bigger one than in default threaded case then. */
52  chunk_size = 1024;
53  tasks_num = 1;
54  }
55  else if (settings->min_iter_per_thread > 0) {
56  /* Already set by user, no need to do anything here. */
57  chunk_size = settings->min_iter_per_thread;
58  }
59  else {
60  /* Multiplier used in heuristics below to define "optimal" chunk size.
61  * The idea here is to increase the chunk size to compensate for a rather measurable threading
62  * overhead caused by fetching tasks. With too many CPU threads we are starting
63  * to spend too much time in those overheads.
64  * First values are: 1 if tasks_num < 16;
65  * else 2 if tasks_num < 32;
66  * else 3 if tasks_num < 48;
67  * else 4 if tasks_num < 64;
68  * etc.
69  * NOTE: If we wanted to keep the 'power of two' multiplier, we'd need something like:
70  * 1 << max_ii(0, (int)(sizeof(int) * 8) - 1 - bitscan_reverse_i(tasks_num) - 3)
71  */
72  const int tasks_num_factor = max_ii(1, tasks_num >> 3);
73 
74  /* We could make that 'base' 32 number configurable in TaskParallelSettings too, or maybe just
75  * always use that heuristic using TaskParallelSettings.min_iter_per_thread as basis? */
76  chunk_size = 32 * tasks_num_factor;
77 
78  /* Basic heuristic to avoid threading on low amount of items.
79  * We could make that limit configurable in settings too. */
80  if (items_num > 0 && items_num < max_ii(256, chunk_size * 2)) {
81  chunk_size = items_num;
82  }
83  }
84 
86  *r_chunk_size = chunk_size;
87 }
88 
89 typedef struct TaskParallelIteratorState {
90  void *userdata;
93 
94  /* *** Data used to 'acquire' chunks of items from the iterator. *** */
95  /* Common data also passed to the generator callback. */
97  /* Total number of items. If unknown, set it to a negative number. */
98  int items_num;
100 
102  void *userdata_chunk)
103 {
104  TaskParallelTLS tls = {
105  .userdata_chunk = userdata_chunk,
106  };
107 
108  void **current_chunk_items;
109  int *current_chunk_indices;
110  int current_chunk_size;
111 
112  const size_t items_size = sizeof(*current_chunk_items) * (size_t)state->iter_shared.chunk_size;
113  const size_t indices_size = sizeof(*current_chunk_indices) *
114  (size_t)state->iter_shared.chunk_size;
115 
116  current_chunk_items = MALLOCA(items_size);
117  current_chunk_indices = MALLOCA(indices_size);
118  current_chunk_size = 0;
119 
120  for (bool do_abort = false; !do_abort;) {
121  if (state->iter_shared.spin_lock != NULL) {
122  BLI_spin_lock(state->iter_shared.spin_lock);
123  }
124 
125  /* Get current status. */
126  int index = state->iter_shared.next_index;
127  void *item = state->iter_shared.next_item;
128  int i;
129 
130  /* 'Acquire' a chunk of items from the iterator function. */
131  for (i = 0; i < state->iter_shared.chunk_size && !state->iter_shared.is_finished; i++) {
132  current_chunk_indices[i] = index;
133  current_chunk_items[i] = item;
134  state->iter_func(state->userdata, &tls, &item, &index, &state->iter_shared.is_finished);
135  }
136 
137  /* Update current status. */
138  state->iter_shared.next_index = index;
139  state->iter_shared.next_item = item;
140  current_chunk_size = i;
141 
142  do_abort = state->iter_shared.is_finished;
143 
144  if (state->iter_shared.spin_lock != NULL) {
145  BLI_spin_unlock(state->iter_shared.spin_lock);
146  }
147 
148  for (i = 0; i < current_chunk_size; ++i) {
149  state->func(state->userdata, current_chunk_items[i], current_chunk_indices[i], &tls);
150  }
151  }
152 
153  MALLOCA_FREE(current_chunk_items, items_size);
154  MALLOCA_FREE(current_chunk_indices, indices_size);
155 }
156 
157 static void parallel_iterator_func(TaskPool *__restrict pool, void *userdata_chunk)
158 {
160 
161  parallel_iterator_func_do(state, userdata_chunk);
162 }
163 
166 {
167  /* Prepare user's TLS data. */
168  void *userdata_chunk = settings->userdata_chunk;
169  if (userdata_chunk) {
170  if (settings->func_init != NULL) {
171  settings->func_init(state->userdata, userdata_chunk);
172  }
173  }
174 
175  /* Also marking it as non-threaded for the iterator callback. */
176  state->iter_shared.spin_lock = NULL;
177 
178  parallel_iterator_func_do(state, userdata_chunk);
179 
180  if (userdata_chunk) {
181  if (settings->func_free != NULL) {
182  /* `func_free` should only free data that was created during execution of `func`. */
183  settings->func_free(state->userdata, userdata_chunk);
184  }
185  }
186 }
187 
190 {
191  const int threads_num = BLI_task_scheduler_num_threads();
192 
194  settings, state->items_num, threads_num, &state->iter_shared.chunk_size);
195 
196  if (!settings->use_threading) {
198  return;
199  }
200 
201  const int chunk_size = state->iter_shared.chunk_size;
202  const int items_num = state->items_num;
203  const size_t tasks_num = items_num >= 0 ?
204  (size_t)min_ii(threads_num, state->items_num / chunk_size) :
205  (size_t)threads_num;
206 
207  BLI_assert(tasks_num > 0);
208  if (tasks_num == 1) {
210  return;
211  }
212 
213  SpinLock spin_lock;
214  BLI_spin_init(&spin_lock);
215  state->iter_shared.spin_lock = &spin_lock;
216 
217  void *userdata_chunk = settings->userdata_chunk;
218  const size_t userdata_chunk_size = settings->userdata_chunk_size;
219  void *userdata_chunk_local = NULL;
220  void *userdata_chunk_array = NULL;
221  const bool use_userdata_chunk = (userdata_chunk_size != 0) && (userdata_chunk != NULL);
222 
224 
225  if (use_userdata_chunk) {
226  userdata_chunk_array = MALLOCA(userdata_chunk_size * tasks_num);
227  }
228 
229  for (size_t i = 0; i < tasks_num; i++) {
230  if (use_userdata_chunk) {
231  userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i);
232  memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size);
233  if (settings->func_init != NULL) {
234  settings->func_init(state->userdata, userdata_chunk_local);
235  }
236  }
237  /* Use this pool's pre-allocated tasks. */
238  BLI_task_pool_push(task_pool, parallel_iterator_func, userdata_chunk_local, false, NULL);
239  }
240 
243 
244  if (use_userdata_chunk) {
245  if (settings->func_reduce != NULL || settings->func_free != NULL) {
246  for (size_t i = 0; i < tasks_num; i++) {
247  userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i);
248  if (settings->func_reduce != NULL) {
249  settings->func_reduce(state->userdata, userdata_chunk, userdata_chunk_local);
250  }
251  if (settings->func_free != NULL) {
252  settings->func_free(state->userdata, userdata_chunk_local);
253  }
254  }
255  }
256  MALLOCA_FREE(userdata_chunk_array, userdata_chunk_size * tasks_num);
257  }
258 
259  BLI_spin_end(&spin_lock);
260  state->iter_shared.spin_lock = NULL;
261 }
262 
263 void BLI_task_parallel_iterator(void *userdata,
265  void *init_item,
266  const int init_index,
267  const int items_num,
269  const TaskParallelSettings *settings)
270 {
272 
273  state.items_num = items_num;
274  state.iter_shared.next_index = init_index;
275  state.iter_shared.next_item = init_item;
276  state.iter_shared.is_finished = false;
277  state.userdata = userdata;
278  state.iter_func = iter_func;
279  state.func = func;
280 
281  task_parallel_iterator_do(settings, &state);
282 }
283 
286 /* -------------------------------------------------------------------- */
290 static void task_parallel_listbase_get(void *__restrict UNUSED(userdata),
291  const TaskParallelTLS *__restrict UNUSED(tls),
292  void **r_next_item,
293  int *r_next_index,
294  bool *r_do_abort)
295 {
296  /* Get current status. */
297  Link *link = *r_next_item;
298 
299  if (link->next == NULL) {
300  *r_do_abort = true;
301  }
302  *r_next_item = link->next;
303  (*r_next_index)++;
304 }
305 
307  void *userdata,
309  const TaskParallelSettings *settings)
310 {
311  if (BLI_listbase_is_empty(listbase)) {
312  return;
313  }
314 
316 
317  state.items_num = BLI_listbase_count(listbase);
318  state.iter_shared.next_index = 0;
319  state.iter_shared.next_item = listbase->first;
320  state.iter_shared.is_finished = false;
321  state.userdata = userdata;
322  state.iter_func = task_parallel_listbase_get;
323  state.func = func;
324 
325  task_parallel_iterator_do(settings, &state);
326 }
327 
330 /* -------------------------------------------------------------------- */
334 typedef struct ParallelMempoolState {
335  void *userdata;
338 
339 static void parallel_mempool_func(TaskPool *__restrict pool, void *taskdata)
340 {
342  BLI_mempool_threadsafe_iter *iter = &((ParallelMempoolTaskData *)taskdata)->ts_iter;
343  TaskParallelTLS *tls = &((ParallelMempoolTaskData *)taskdata)->tls;
344 
345  MempoolIterData *item;
346  while ((item = mempool_iter_threadsafe_step(iter)) != NULL) {
347  state->func(state->userdata, item, tls);
348  }
349 }
350 
352  void *userdata,
354  const TaskParallelSettings *settings)
355 {
356  if (UNLIKELY(BLI_mempool_len(mempool) == 0)) {
357  return;
358  }
359 
360  void *userdata_chunk = settings->userdata_chunk;
361  const size_t userdata_chunk_size = settings->userdata_chunk_size;
362  void *userdata_chunk_array = NULL;
363  const bool use_userdata_chunk = (userdata_chunk_size != 0) && (userdata_chunk != NULL);
364 
365  if (!settings->use_threading) {
366  TaskParallelTLS tls = {NULL};
367  if (use_userdata_chunk) {
368  if (settings->func_init != NULL) {
369  settings->func_init(userdata, userdata_chunk);
370  }
371  tls.userdata_chunk = userdata_chunk;
372  }
373 
374  BLI_mempool_iter iter;
375  BLI_mempool_iternew(mempool, &iter);
376 
377  void *item;
378  while ((item = BLI_mempool_iterstep(&iter))) {
379  func(userdata, item, &tls);
380  }
381 
382  if (use_userdata_chunk) {
383  if (settings->func_free != NULL) {
384  /* `func_free` should only free data that was created during execution of `func`. */
385  settings->func_free(userdata, userdata_chunk);
386  }
387  }
388 
389  return;
390  }
391 
394  const int threads_num = BLI_task_scheduler_num_threads();
395 
396  /* The idea here is to prevent creating task for each of the loop iterations
397  * and instead have tasks which are evenly distributed across CPU cores and
398  * pull next item to be crunched using the threaded-aware BLI_mempool_iter.
399  */
400  const int tasks_num = threads_num + 2;
401 
402  state.userdata = userdata;
403  state.func = func;
404 
405  if (use_userdata_chunk) {
406  userdata_chunk_array = MALLOCA(userdata_chunk_size * tasks_num);
407  }
408 
410  mempool, (size_t)tasks_num);
411 
412  for (int i = 0; i < tasks_num; i++) {
413  void *userdata_chunk_local = NULL;
414  if (use_userdata_chunk) {
415  userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i);
416  memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size);
417  if (settings->func_init != NULL) {
418  settings->func_init(userdata, userdata_chunk_local);
419  }
420  }
421  mempool_iterator_data[i].tls.userdata_chunk = userdata_chunk_local;
422 
423  /* Use this pool's pre-allocated tasks. */
424  BLI_task_pool_push(task_pool, parallel_mempool_func, &mempool_iterator_data[i], false, NULL);
425  }
426 
429 
430  if (use_userdata_chunk) {
431  if ((settings->func_free != NULL) || (settings->func_reduce != NULL)) {
432  for (int i = 0; i < tasks_num; i++) {
433  if (settings->func_reduce) {
434  settings->func_reduce(
435  userdata, userdata_chunk, mempool_iterator_data[i].tls.userdata_chunk);
436  }
437  if (settings->func_free) {
438  settings->func_free(userdata, mempool_iterator_data[i].tls.userdata_chunk);
439  }
440  }
441  }
442  MALLOCA_FREE(userdata_chunk_array, userdata_chunk_size * tasks_num);
443  }
444 
445  mempool_iter_threadsafe_destroy(mempool_iterator_data);
446 }
447 
448 #undef MALLOCA
449 #undef MALLOCA_FREE
450 
#define BLI_assert(a)
Definition: BLI_assert.h:46
#define BLI_INLINE
BLI_INLINE bool BLI_listbase_is_empty(const struct ListBase *lb)
Definition: BLI_listbase.h:269
int BLI_listbase_count(const struct ListBase *listbase) ATTR_WARN_UNUSED_RESULT ATTR_NONNULL(1)
MINLINE int min_ii(int a, int b)
MINLINE int max_ii(int a, int b)
void mempool_iter_threadsafe_destroy(ParallelMempoolTaskData *iter_arr)
Definition: BLI_mempool.c:533
void * mempool_iter_threadsafe_step(BLI_mempool_threadsafe_iter *ts_iter)
Definition: BLI_mempool.c:605
ParallelMempoolTaskData * mempool_iter_threadsafe_create(BLI_mempool *pool, const size_t iter_num)
Definition: BLI_mempool.c:513
void BLI_mempool_iternew(BLI_mempool *pool, BLI_mempool_iter *iter) ATTR_NONNULL()
Definition: BLI_mempool.c:498
void * BLI_mempool_iterstep(BLI_mempool_iter *iter) ATTR_WARN_UNUSED_RESULT ATTR_NONNULL()
Definition: BLI_mempool.c:577
int BLI_mempool_len(const BLI_mempool *pool) ATTR_NONNULL(1)
Definition: BLI_mempool.c:434
int BLI_task_scheduler_num_threads(void)
@ TASK_PRIORITY_HIGH
Definition: BLI_task.h:57
struct MempoolIterData MempoolIterData
Definition: BLI_task.h:272
void * BLI_task_pool_user_data(TaskPool *pool)
Definition: task_pool.cc:525
void BLI_task_pool_work_and_wait(TaskPool *pool)
Definition: task_pool.cc:480
void(* TaskParallelIteratorIterFunc)(void *__restrict userdata, const TaskParallelTLS *__restrict tls, void **r_next_item, int *r_next_index, bool *r_do_abort)
Definition: BLI_task.h:223
TaskPool * BLI_task_pool_create(void *userdata, eTaskPriority priority)
Definition: task_pool.cc:390
void(* TaskParallelIteratorFunc)(void *__restrict userdata, void *item, int index, const TaskParallelTLS *__restrict tls)
Definition: BLI_task.h:229
void(* TaskParallelMempoolFunc)(void *userdata, MempoolIterData *iter, const TaskParallelTLS *__restrict tls)
Definition: BLI_task.h:274
void BLI_task_pool_free(TaskPool *pool)
Definition: task_pool.cc:440
void BLI_task_pool_push(TaskPool *pool, TaskRunFunction run, void *taskdata, bool free_taskdata, TaskFreeFunction freedata)
Definition: task_pool.cc:459
pthread_spinlock_t SpinLock
Definition: BLI_threads.h:110
void BLI_spin_init(SpinLock *spin)
Definition: threads.cc:419
void BLI_spin_unlock(SpinLock *spin)
Definition: threads.cc:452
void BLI_spin_lock(SpinLock *spin)
Definition: threads.cc:433
void BLI_spin_end(SpinLock *spin)
Definition: threads.cc:467
#define UNUSED(x)
#define UNLIKELY(x)
These structs are the foundation for all linked lists in the library system.
Read Guarded memory(de)allocation.
Provides wrapper around system-specific atomic primitives, and some extensions (faked-atomic operatio...
TaskPool * task_pool
const int state
static const int chunk_size
void * first
Definition: DNA_listBase.h:31
TaskParallelMempoolFunc func
TaskParallelIteratorFunc func
Definition: task_iterator.c:92
TaskParallelIteratorStateShared iter_shared
Definition: task_iterator.c:96
TaskParallelIteratorIterFunc iter_func
Definition: task_iterator.c:91
TaskParallelReduceFunc func_reduce
Definition: BLI_task.h:181
TaskParallelFreeFunc func_free
Definition: BLI_task.h:183
TaskParallelInitFunc func_init
Definition: BLI_task.h:176
size_t userdata_chunk_size
Definition: BLI_task.h:169
void * userdata_chunk
Definition: BLI_task.h:142
BLI_INLINE void task_parallel_calc_chunk_size(const TaskParallelSettings *settings, const int items_num, int tasks_num, int *r_chunk_size)
Definition: task_iterator.c:42
static void task_parallel_listbase_get(void *__restrict UNUSED(userdata), const TaskParallelTLS *__restrict UNUSED(tls), void **r_next_item, int *r_next_index, bool *r_do_abort)
void BLI_task_parallel_iterator(void *userdata, TaskParallelIteratorIterFunc iter_func, void *init_item, const int init_index, const int items_num, TaskParallelIteratorFunc func, const TaskParallelSettings *settings)
#define MALLOCA_FREE(_mem, _size)
Definition: task_iterator.c:30
static void parallel_iterator_func(TaskPool *__restrict pool, void *userdata_chunk)
static void task_parallel_iterator_do(const TaskParallelSettings *settings, TaskParallelIteratorState *state)
static void parallel_mempool_func(TaskPool *__restrict pool, void *taskdata)
#define MALLOCA(_size)
Definition: task_iterator.c:29
void BLI_task_parallel_listbase(ListBase *listbase, void *userdata, TaskParallelIteratorFunc func, const TaskParallelSettings *settings)
struct ParallelMempoolState ParallelMempoolState
static void task_parallel_iterator_no_threads(const TaskParallelSettings *settings, TaskParallelIteratorState *state)
void BLI_task_parallel_mempool(BLI_mempool *mempool, void *userdata, TaskParallelMempoolFunc func, const TaskParallelSettings *settings)
struct TaskParallelIteratorState TaskParallelIteratorState
static void parallel_iterator_func_do(TaskParallelIteratorState *__restrict state, void *userdata_chunk)