thread_task.c (40403B)
1 /* 2 * Copyright © 2018, VideoLAN and dav1d authors 3 * Copyright © 2018, Two Orioles, LLC 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions are met: 8 * 9 * 1. Redistributions of source code must retain the above copyright notice, this 10 * list of conditions and the following disclaimer. 11 * 12 * 2. Redistributions in binary form must reproduce the above copyright notice, 13 * this list of conditions and the following disclaimer in the documentation 14 * and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 19 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 20 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 21 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 22 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 23 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 25 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include "config.h" 29 30 #include "common/frame.h" 31 32 #include "src/thread_task.h" 33 #include "src/fg_apply.h" 34 35 // This function resets the cur pointer to the first frame theoretically 36 // executable after a task completed (ie. each time we update some progress or 37 // insert some tasks in the queue). 38 // When frame_idx is set, it can be either from a completed task, or from tasks 39 // inserted in the queue, in which case we have to make sure the cur pointer 40 // isn't past this insert. 41 // The special case where frame_idx is UINT_MAX is to handle the reset after 42 // completing a task and locklessly signaling progress. In this case we don't 43 // enter a critical section, which is needed for this function, so we set an 44 // atomic for a delayed handling, happening here. Meaning we can call this 45 // function without any actual update other than what's in the atomic, hence 46 // this special case. 47 static inline int reset_task_cur(const Dav1dContext *const c, 48 struct TaskThreadData *const ttd, 49 unsigned frame_idx) 50 { 51 const unsigned first = atomic_load(&ttd->first); 52 unsigned reset_frame_idx = atomic_exchange(&ttd->reset_task_cur, UINT_MAX); 53 if (reset_frame_idx < first) { 54 if (frame_idx == UINT_MAX) return 0; 55 reset_frame_idx = UINT_MAX; 56 } 57 if (!ttd->cur && c->fc[first].task_thread.task_cur_prev == NULL) 58 return 0; 59 if (reset_frame_idx != UINT_MAX) { 60 if (frame_idx == UINT_MAX) { 61 if (reset_frame_idx > first + ttd->cur) 62 return 0; 63 ttd->cur = reset_frame_idx - first; 64 goto cur_found; 65 } 66 } else if (frame_idx == UINT_MAX) 67 return 0; 68 if (frame_idx < first) frame_idx += c->n_fc; 69 const unsigned min_frame_idx = umin(reset_frame_idx, frame_idx); 70 const unsigned cur_frame_idx = first + ttd->cur; 71 if (ttd->cur < c->n_fc && cur_frame_idx < min_frame_idx) 72 return 0; 73 for (ttd->cur = min_frame_idx - first; ttd->cur < c->n_fc; ttd->cur++) 74 if (c->fc[(first + ttd->cur) % c->n_fc].task_thread.task_head) 75 break; 76 cur_found: 77 for (unsigned i = ttd->cur; i < c->n_fc; i++) 78 c->fc[(first + i) % c->n_fc].task_thread.task_cur_prev = NULL; 79 return 1; 80 } 81 82 static inline void reset_task_cur_async(struct TaskThreadData *const ttd, 83 unsigned frame_idx, unsigned n_frames) 84 { 85 const unsigned first = atomic_load(&ttd->first); 86 if (frame_idx < first) frame_idx += n_frames; 87 unsigned last_idx = frame_idx; 88 do { 89 frame_idx = last_idx; 90 last_idx = atomic_exchange(&ttd->reset_task_cur, frame_idx); 91 } while (last_idx < frame_idx); 92 if (frame_idx == first && atomic_load(&ttd->first) != first) { 93 unsigned expected = frame_idx; 94 atomic_compare_exchange_strong(&ttd->reset_task_cur, &expected, UINT_MAX); 95 } 96 } 97 98 static void insert_tasks_between(Dav1dFrameContext *const f, 99 Dav1dTask *const first, Dav1dTask *const last, 100 Dav1dTask *const a, Dav1dTask *const b, 101 const int cond_signal) 102 { 103 struct TaskThreadData *const ttd = f->task_thread.ttd; 104 if (atomic_load(f->c->flush)) return; 105 assert(!a || a->next == b); 106 if (!a) f->task_thread.task_head = first; 107 else a->next = first; 108 if (!b) f->task_thread.task_tail = last; 109 last->next = b; 110 reset_task_cur(f->c, ttd, first->frame_idx); 111 if (cond_signal && !atomic_fetch_or(&ttd->cond_signaled, 1)) 112 pthread_cond_signal(&ttd->cond); 113 } 114 115 static void insert_tasks(Dav1dFrameContext *const f, 116 Dav1dTask *const first, Dav1dTask *const last, 117 const int cond_signal) 118 { 119 // insert task back into task queue 120 Dav1dTask *t_ptr, *prev_t = NULL; 121 for (t_ptr = f->task_thread.task_head; 122 t_ptr; prev_t = t_ptr, t_ptr = t_ptr->next) 123 { 124 // entropy coding precedes other steps 125 if (t_ptr->type == DAV1D_TASK_TYPE_TILE_ENTROPY) { 126 if (first->type > DAV1D_TASK_TYPE_TILE_ENTROPY) continue; 127 // both are entropy 128 if (first->sby > t_ptr->sby) continue; 129 if (first->sby < t_ptr->sby) { 130 insert_tasks_between(f, first, last, prev_t, t_ptr, cond_signal); 131 return; 132 } 133 // same sby 134 } else { 135 if (first->type == DAV1D_TASK_TYPE_TILE_ENTROPY) { 136 insert_tasks_between(f, first, last, prev_t, t_ptr, cond_signal); 137 return; 138 } 139 if (first->sby > t_ptr->sby) continue; 140 if (first->sby < t_ptr->sby) { 141 insert_tasks_between(f, first, last, prev_t, t_ptr, cond_signal); 142 return; 143 } 144 // same sby 145 if (first->type > t_ptr->type) continue; 146 if (first->type < t_ptr->type) { 147 insert_tasks_between(f, first, last, prev_t, t_ptr, cond_signal); 148 return; 149 } 150 // same task type 151 } 152 153 // sort by tile-id 154 assert(first->type == DAV1D_TASK_TYPE_TILE_RECONSTRUCTION || 155 first->type == DAV1D_TASK_TYPE_TILE_ENTROPY); 156 assert(first->type == t_ptr->type); 157 assert(t_ptr->sby == first->sby); 158 const int p = first->type == DAV1D_TASK_TYPE_TILE_ENTROPY; 159 const int t_tile_idx = (int) (first - f->task_thread.tile_tasks[p]); 160 const int p_tile_idx = (int) (t_ptr - f->task_thread.tile_tasks[p]); 161 assert(t_tile_idx != p_tile_idx); 162 if (t_tile_idx > p_tile_idx) continue; 163 insert_tasks_between(f, first, last, prev_t, t_ptr, cond_signal); 164 return; 165 } 166 // append at the end 167 insert_tasks_between(f, first, last, prev_t, NULL, cond_signal); 168 } 169 170 static inline void insert_task(Dav1dFrameContext *const f, 171 Dav1dTask *const t, const int cond_signal) 172 { 173 insert_tasks(f, t, t, cond_signal); 174 } 175 176 static inline void add_pending(Dav1dFrameContext *const f, Dav1dTask *const t) { 177 pthread_mutex_lock(&f->task_thread.pending_tasks.lock); 178 t->next = NULL; 179 if (!f->task_thread.pending_tasks.head) 180 f->task_thread.pending_tasks.head = t; 181 else 182 f->task_thread.pending_tasks.tail->next = t; 183 f->task_thread.pending_tasks.tail = t; 184 atomic_store(&f->task_thread.pending_tasks.merge, 1); 185 pthread_mutex_unlock(&f->task_thread.pending_tasks.lock); 186 } 187 188 static inline int merge_pending_frame(Dav1dFrameContext *const f) { 189 int const merge = atomic_load(&f->task_thread.pending_tasks.merge); 190 if (merge) { 191 pthread_mutex_lock(&f->task_thread.pending_tasks.lock); 192 Dav1dTask *t = f->task_thread.pending_tasks.head; 193 f->task_thread.pending_tasks.head = NULL; 194 f->task_thread.pending_tasks.tail = NULL; 195 atomic_store(&f->task_thread.pending_tasks.merge, 0); 196 pthread_mutex_unlock(&f->task_thread.pending_tasks.lock); 197 while (t) { 198 Dav1dTask *const tmp = t->next; 199 insert_task(f, t, 0); 200 t = tmp; 201 } 202 } 203 return merge; 204 } 205 206 static inline int merge_pending(const Dav1dContext *const c) { 207 int res = 0; 208 for (unsigned i = 0; i < c->n_fc; i++) 209 res |= merge_pending_frame(&c->fc[i]); 210 return res; 211 } 212 213 static int create_filter_sbrow(Dav1dFrameContext *const f, 214 const int pass, Dav1dTask **res_t) 215 { 216 const int has_deblock = f->frame_hdr->loopfilter.level_y[0] || 217 f->frame_hdr->loopfilter.level_y[1]; 218 const int has_cdef = f->seq_hdr->cdef; 219 const int has_resize = f->frame_hdr->width[0] != f->frame_hdr->width[1]; 220 const int has_lr = f->lf.restore_planes; 221 222 Dav1dTask *tasks = f->task_thread.tasks; 223 const int uses_2pass = f->c->n_fc > 1; 224 int num_tasks = f->sbh * (1 + uses_2pass); 225 if (num_tasks > f->task_thread.num_tasks) { 226 const size_t size = sizeof(Dav1dTask) * num_tasks; 227 tasks = dav1d_realloc(ALLOC_COMMON_CTX, f->task_thread.tasks, size); 228 if (!tasks) return -1; 229 memset(tasks, 0, size); 230 f->task_thread.tasks = tasks; 231 f->task_thread.num_tasks = num_tasks; 232 } 233 tasks += f->sbh * (pass & 1); 234 235 if (pass & 1) { 236 f->frame_thread.entropy_progress = 0; 237 } else { 238 const int prog_sz = ((f->sbh + 31) & ~31) >> 5; 239 if (prog_sz > f->frame_thread.prog_sz) { 240 atomic_uint *const prog = dav1d_realloc(ALLOC_COMMON_CTX, f->frame_thread.frame_progress, 241 2 * prog_sz * sizeof(*prog)); 242 if (!prog) return -1; 243 f->frame_thread.frame_progress = prog; 244 f->frame_thread.copy_lpf_progress = prog + prog_sz; 245 } 246 f->frame_thread.prog_sz = prog_sz; 247 memset(f->frame_thread.frame_progress, 0, prog_sz * sizeof(atomic_uint)); 248 memset(f->frame_thread.copy_lpf_progress, 0, prog_sz * sizeof(atomic_uint)); 249 atomic_store(&f->frame_thread.deblock_progress, 0); 250 } 251 f->frame_thread.next_tile_row[pass & 1] = 0; 252 253 Dav1dTask *t = &tasks[0]; 254 t->sby = 0; 255 t->recon_progress = 1; 256 t->deblock_progress = 0; 257 t->type = pass == 1 ? DAV1D_TASK_TYPE_ENTROPY_PROGRESS : 258 has_deblock ? DAV1D_TASK_TYPE_DEBLOCK_COLS : 259 has_cdef || has_lr /* i.e. LR backup */ ? DAV1D_TASK_TYPE_DEBLOCK_ROWS : 260 has_resize ? DAV1D_TASK_TYPE_SUPER_RESOLUTION : 261 DAV1D_TASK_TYPE_RECONSTRUCTION_PROGRESS; 262 t->frame_idx = (int)(f - f->c->fc); 263 264 *res_t = t; 265 return 0; 266 } 267 268 int dav1d_task_create_tile_sbrow(Dav1dFrameContext *const f, const int pass, 269 const int cond_signal) 270 { 271 Dav1dTask *tasks = f->task_thread.tile_tasks[0]; 272 const int uses_2pass = f->c->n_fc > 1; 273 const int num_tasks = f->frame_hdr->tiling.cols * f->frame_hdr->tiling.rows; 274 if (pass < 2) { 275 int alloc_num_tasks = num_tasks * (1 + uses_2pass); 276 if (alloc_num_tasks > f->task_thread.num_tile_tasks) { 277 const size_t size = sizeof(Dav1dTask) * alloc_num_tasks; 278 tasks = dav1d_realloc(ALLOC_COMMON_CTX, f->task_thread.tile_tasks[0], size); 279 if (!tasks) return -1; 280 memset(tasks, 0, size); 281 f->task_thread.tile_tasks[0] = tasks; 282 f->task_thread.num_tile_tasks = alloc_num_tasks; 283 } 284 f->task_thread.tile_tasks[1] = tasks + num_tasks; 285 } 286 tasks += num_tasks * (pass & 1); 287 288 Dav1dTask *pf_t; 289 if (create_filter_sbrow(f, pass, &pf_t)) 290 return -1; 291 292 Dav1dTask *prev_t = NULL; 293 for (int tile_idx = 0; tile_idx < num_tasks; tile_idx++) { 294 Dav1dTileState *const ts = &f->ts[tile_idx]; 295 Dav1dTask *t = &tasks[tile_idx]; 296 t->sby = ts->tiling.row_start >> f->sb_shift; 297 if (pf_t && t->sby) { 298 prev_t->next = pf_t; 299 prev_t = pf_t; 300 pf_t = NULL; 301 } 302 t->recon_progress = 0; 303 t->deblock_progress = 0; 304 t->deps_skip = 0; 305 t->type = pass != 1 ? DAV1D_TASK_TYPE_TILE_RECONSTRUCTION : 306 DAV1D_TASK_TYPE_TILE_ENTROPY; 307 t->frame_idx = (int)(f - f->c->fc); 308 if (prev_t) prev_t->next = t; 309 prev_t = t; 310 } 311 if (pf_t) { 312 prev_t->next = pf_t; 313 prev_t = pf_t; 314 } 315 prev_t->next = NULL; 316 317 atomic_store(&f->task_thread.done[pass & 1], 0); 318 319 // XXX in theory this could be done locklessly, at this point they are no 320 // tasks in the frameQ, so no other runner should be using this lock, but 321 // we must add both passes at once 322 pthread_mutex_lock(&f->task_thread.pending_tasks.lock); 323 assert(f->task_thread.pending_tasks.head == NULL || pass == 2); 324 if (!f->task_thread.pending_tasks.head) 325 f->task_thread.pending_tasks.head = &tasks[0]; 326 else 327 f->task_thread.pending_tasks.tail->next = &tasks[0]; 328 f->task_thread.pending_tasks.tail = prev_t; 329 atomic_store(&f->task_thread.pending_tasks.merge, 1); 330 atomic_store(&f->task_thread.init_done, 1); 331 pthread_mutex_unlock(&f->task_thread.pending_tasks.lock); 332 333 return 0; 334 } 335 336 void dav1d_task_frame_init(Dav1dFrameContext *const f) { 337 const Dav1dContext *const c = f->c; 338 339 atomic_store(&f->task_thread.init_done, 0); 340 // schedule init task, which will schedule the remaining tasks 341 Dav1dTask *const t = &f->task_thread.init_task; 342 t->type = DAV1D_TASK_TYPE_INIT; 343 t->frame_idx = (int)(f - c->fc); 344 t->sby = 0; 345 t->recon_progress = t->deblock_progress = 0; 346 insert_task(f, t, 1); 347 } 348 349 void dav1d_task_delayed_fg(Dav1dContext *const c, Dav1dPicture *const out, 350 const Dav1dPicture *const in) 351 { 352 struct TaskThreadData *const ttd = &c->task_thread; 353 ttd->delayed_fg.in = in; 354 ttd->delayed_fg.out = out; 355 ttd->delayed_fg.type = DAV1D_TASK_TYPE_FG_PREP; 356 atomic_init(&ttd->delayed_fg.progress[0], 0); 357 atomic_init(&ttd->delayed_fg.progress[1], 0); 358 pthread_mutex_lock(&ttd->lock); 359 ttd->delayed_fg.exec = 1; 360 ttd->delayed_fg.finished = 0; 361 pthread_cond_signal(&ttd->cond); 362 do { 363 pthread_cond_wait(&ttd->delayed_fg.cond, &ttd->lock); 364 } while (!ttd->delayed_fg.finished); 365 pthread_mutex_unlock(&ttd->lock); 366 } 367 368 static inline int ensure_progress(struct TaskThreadData *const ttd, 369 Dav1dFrameContext *const f, 370 Dav1dTask *const t, const enum TaskType type, 371 atomic_int *const state, int *const target) 372 { 373 // deblock_rows (non-LR portion) depends on deblock of previous sbrow, 374 // so ensure that completed. if not, re-add to task-queue; else, fall-through 375 int p1 = atomic_load(state); 376 if (p1 < t->sby) { 377 t->type = type; 378 t->recon_progress = t->deblock_progress = 0; 379 *target = t->sby; 380 add_pending(f, t); 381 pthread_mutex_lock(&ttd->lock); 382 return 1; 383 } 384 return 0; 385 } 386 387 static inline int check_tile(Dav1dTask *const t, Dav1dFrameContext *const f, 388 const int frame_mt) 389 { 390 const int tp = t->type == DAV1D_TASK_TYPE_TILE_ENTROPY; 391 const int tile_idx = (int)(t - f->task_thread.tile_tasks[tp]); 392 Dav1dTileState *const ts = &f->ts[tile_idx]; 393 const int p1 = atomic_load(&ts->progress[tp]); 394 if (p1 < t->sby) return 1; 395 int error = p1 == TILE_ERROR; 396 error |= atomic_fetch_or(&f->task_thread.error, error); 397 if (!error && frame_mt && !tp) { 398 const int p2 = atomic_load(&ts->progress[1]); 399 if (p2 <= t->sby) return 1; 400 error = p2 == TILE_ERROR; 401 error |= atomic_fetch_or(&f->task_thread.error, error); 402 } 403 if (!error && frame_mt && !IS_KEY_OR_INTRA(f->frame_hdr)) { 404 // check reference state 405 const Dav1dThreadPicture *p = &f->sr_cur; 406 const int ss_ver = p->p.p.layout == DAV1D_PIXEL_LAYOUT_I420; 407 const unsigned p_b = (t->sby + 1) << (f->sb_shift + 2); 408 const int tile_sby = t->sby - (ts->tiling.row_start >> f->sb_shift); 409 const int (*const lowest_px)[2] = ts->lowest_pixel[tile_sby]; 410 for (int n = t->deps_skip; n < 7; n++, t->deps_skip++) { 411 unsigned lowest; 412 if (tp) { 413 // if temporal mv refs are disabled, we only need this 414 // for the primary ref; if segmentation is disabled, we 415 // don't even need that 416 lowest = p_b; 417 } else { 418 // +8 is postfilter-induced delay 419 const int y = lowest_px[n][0] == INT_MIN ? INT_MIN : 420 lowest_px[n][0] + 8; 421 const int uv = lowest_px[n][1] == INT_MIN ? INT_MIN : 422 lowest_px[n][1] * (1 << ss_ver) + 8; 423 const int max = imax(y, uv); 424 if (max == INT_MIN) continue; 425 lowest = iclip(max, 1, f->refp[n].p.p.h); 426 } 427 const unsigned p3 = atomic_load(&f->refp[n].progress[!tp]); 428 if (p3 < lowest) return 1; 429 atomic_fetch_or(&f->task_thread.error, p3 == FRAME_ERROR); 430 } 431 } 432 return 0; 433 } 434 435 static inline int get_frame_progress(const Dav1dContext *const c, 436 const Dav1dFrameContext *const f) 437 { 438 unsigned frame_prog = c->n_fc > 1 ? atomic_load(&f->sr_cur.progress[1]) : 0; 439 if (frame_prog >= FRAME_ERROR) 440 return f->sbh - 1; 441 int idx = frame_prog >> (f->sb_shift + 7); 442 int prog; 443 do { 444 atomic_uint *state = &f->frame_thread.frame_progress[idx]; 445 const unsigned val = ~atomic_load(state); 446 prog = val ? ctz(val) : 32; 447 if (prog != 32) break; 448 prog = 0; 449 } while (++idx < f->frame_thread.prog_sz); 450 return ((idx << 5) | prog) - 1; 451 } 452 453 static inline void abort_frame(Dav1dFrameContext *const f, const int error) { 454 atomic_store(&f->task_thread.error, error == DAV1D_ERR(EINVAL) ? 1 : -1); 455 atomic_store(&f->task_thread.task_counter, 0); 456 atomic_store(&f->task_thread.done[0], 1); 457 atomic_store(&f->task_thread.done[1], 1); 458 atomic_store(&f->sr_cur.progress[0], FRAME_ERROR); 459 atomic_store(&f->sr_cur.progress[1], FRAME_ERROR); 460 dav1d_decode_frame_exit(f, error); 461 f->n_tile_data = 0; 462 pthread_cond_signal(&f->task_thread.cond); 463 } 464 465 static inline void delayed_fg_task(const Dav1dContext *const c, 466 struct TaskThreadData *const ttd) 467 { 468 const Dav1dPicture *const in = ttd->delayed_fg.in; 469 Dav1dPicture *const out = ttd->delayed_fg.out; 470 #if CONFIG_16BPC 471 int off; 472 if (out->p.bpc != 8) 473 off = (out->p.bpc >> 1) - 4; 474 #endif 475 switch (ttd->delayed_fg.type) { 476 case DAV1D_TASK_TYPE_FG_PREP: 477 ttd->delayed_fg.exec = 0; 478 if (atomic_load(&ttd->cond_signaled)) 479 pthread_cond_signal(&ttd->cond); 480 pthread_mutex_unlock(&ttd->lock); 481 switch (out->p.bpc) { 482 #if CONFIG_8BPC 483 case 8: 484 dav1d_prep_grain_8bpc(&c->dsp[0].fg, out, in, 485 ttd->delayed_fg.scaling_8bpc, 486 ttd->delayed_fg.grain_lut_8bpc); 487 break; 488 #endif 489 #if CONFIG_16BPC 490 case 10: 491 case 12: 492 dav1d_prep_grain_16bpc(&c->dsp[off].fg, out, in, 493 ttd->delayed_fg.scaling_16bpc, 494 ttd->delayed_fg.grain_lut_16bpc); 495 break; 496 #endif 497 default: abort(); 498 } 499 ttd->delayed_fg.type = DAV1D_TASK_TYPE_FG_APPLY; 500 pthread_mutex_lock(&ttd->lock); 501 ttd->delayed_fg.exec = 1; 502 // fall-through 503 case DAV1D_TASK_TYPE_FG_APPLY:; 504 int row = atomic_fetch_add(&ttd->delayed_fg.progress[0], 1); 505 pthread_mutex_unlock(&ttd->lock); 506 int progmax = (out->p.h + FG_BLOCK_SIZE - 1) / FG_BLOCK_SIZE; 507 while (row < progmax) { 508 if (row + 1 < progmax) 509 pthread_cond_signal(&ttd->cond); 510 else { 511 pthread_mutex_lock(&ttd->lock); 512 ttd->delayed_fg.exec = 0; 513 pthread_mutex_unlock(&ttd->lock); 514 } 515 switch (out->p.bpc) { 516 #if CONFIG_8BPC 517 case 8: 518 dav1d_apply_grain_row_8bpc(&c->dsp[0].fg, out, in, 519 ttd->delayed_fg.scaling_8bpc, 520 ttd->delayed_fg.grain_lut_8bpc, row); 521 break; 522 #endif 523 #if CONFIG_16BPC 524 case 10: 525 case 12: 526 dav1d_apply_grain_row_16bpc(&c->dsp[off].fg, out, in, 527 ttd->delayed_fg.scaling_16bpc, 528 ttd->delayed_fg.grain_lut_16bpc, row); 529 break; 530 #endif 531 default: abort(); 532 } 533 row = atomic_fetch_add(&ttd->delayed_fg.progress[0], 1); 534 atomic_fetch_add(&ttd->delayed_fg.progress[1], 1); 535 } 536 pthread_mutex_lock(&ttd->lock); 537 ttd->delayed_fg.exec = 0; 538 int done = atomic_fetch_add(&ttd->delayed_fg.progress[1], 1) + 1; 539 progmax = atomic_load(&ttd->delayed_fg.progress[0]); 540 // signal for completion only once the last runner reaches this 541 if (done >= progmax) { 542 ttd->delayed_fg.finished = 1; 543 pthread_cond_signal(&ttd->delayed_fg.cond); 544 } 545 break; 546 default: abort(); 547 } 548 } 549 550 void *dav1d_worker_task(void *data) { 551 Dav1dTaskContext *const tc = data; 552 const Dav1dContext *const c = tc->c; 553 struct TaskThreadData *const ttd = tc->task_thread.ttd; 554 555 dav1d_set_thread_name("dav1d-worker"); 556 557 pthread_mutex_lock(&ttd->lock); 558 for (;;) { 559 if (tc->task_thread.die) break; 560 if (atomic_load(c->flush)) goto park; 561 562 merge_pending(c); 563 if (ttd->delayed_fg.exec) { // run delayed film grain first 564 delayed_fg_task(c, ttd); 565 continue; 566 } 567 Dav1dFrameContext *f; 568 Dav1dTask *t, *prev_t = NULL; 569 if (c->n_fc > 1) { // run init tasks second 570 for (unsigned i = 0; i < c->n_fc; i++) { 571 const unsigned first = atomic_load(&ttd->first); 572 f = &c->fc[(first + i) % c->n_fc]; 573 if (atomic_load(&f->task_thread.init_done)) continue; 574 t = f->task_thread.task_head; 575 if (!t) continue; 576 if (t->type == DAV1D_TASK_TYPE_INIT) goto found; 577 if (t->type == DAV1D_TASK_TYPE_INIT_CDF) { 578 // XXX This can be a simple else, if adding tasks of both 579 // passes at once (in dav1d_task_create_tile_sbrow). 580 // Adding the tasks to the pending Q can result in a 581 // thread merging them before setting init_done. 582 // We will need to set init_done before adding to the 583 // pending Q, so maybe return the tasks, set init_done, 584 // and add to pending Q only then. 585 const int p1 = f->in_cdf.progress ? 586 atomic_load(f->in_cdf.progress) : 1; 587 if (p1) { 588 atomic_fetch_or(&f->task_thread.error, p1 == TILE_ERROR); 589 goto found; 590 } 591 } 592 } 593 } 594 while (ttd->cur < c->n_fc) { // run decoding tasks last 595 const unsigned first = atomic_load(&ttd->first); 596 f = &c->fc[(first + ttd->cur) % c->n_fc]; 597 merge_pending_frame(f); 598 prev_t = f->task_thread.task_cur_prev; 599 t = prev_t ? prev_t->next : f->task_thread.task_head; 600 while (t) { 601 if (t->type == DAV1D_TASK_TYPE_INIT_CDF) goto next; 602 else if (t->type == DAV1D_TASK_TYPE_TILE_ENTROPY || 603 t->type == DAV1D_TASK_TYPE_TILE_RECONSTRUCTION) 604 { 605 // if not bottom sbrow of tile, this task will be re-added 606 // after it's finished 607 if (!check_tile(t, f, c->n_fc > 1)) 608 goto found; 609 } else if (t->recon_progress) { 610 const int p = t->type == DAV1D_TASK_TYPE_ENTROPY_PROGRESS; 611 int error = atomic_load(&f->task_thread.error); 612 assert(!atomic_load(&f->task_thread.done[p]) || error); 613 const int tile_row_base = f->frame_hdr->tiling.cols * 614 f->frame_thread.next_tile_row[p]; 615 if (p) { 616 atomic_int *const prog = &f->frame_thread.entropy_progress; 617 const int p1 = atomic_load(prog); 618 if (p1 < t->sby) goto next; 619 atomic_fetch_or(&f->task_thread.error, p1 == TILE_ERROR); 620 } 621 for (int tc = 0; tc < f->frame_hdr->tiling.cols; tc++) { 622 Dav1dTileState *const ts = &f->ts[tile_row_base + tc]; 623 const int p2 = atomic_load(&ts->progress[p]); 624 if (p2 < t->recon_progress) goto next; 625 atomic_fetch_or(&f->task_thread.error, p2 == TILE_ERROR); 626 } 627 if (t->sby + 1 < f->sbh) { 628 // add sby+1 to list to replace this one 629 Dav1dTask *next_t = &t[1]; 630 *next_t = *t; 631 next_t->sby++; 632 const int ntr = f->frame_thread.next_tile_row[p] + 1; 633 const int start = f->frame_hdr->tiling.row_start_sb[ntr]; 634 if (next_t->sby == start) 635 f->frame_thread.next_tile_row[p] = ntr; 636 next_t->recon_progress = next_t->sby + 1; 637 insert_task(f, next_t, 0); 638 } 639 goto found; 640 } else if (t->type == DAV1D_TASK_TYPE_CDEF) { 641 atomic_uint *prog = f->frame_thread.copy_lpf_progress; 642 const int p1 = atomic_load(&prog[(t->sby - 1) >> 5]); 643 if (p1 & (1U << ((t->sby - 1) & 31))) 644 goto found; 645 } else { 646 assert(t->deblock_progress); 647 const int p1 = atomic_load(&f->frame_thread.deblock_progress); 648 if (p1 >= t->deblock_progress) { 649 atomic_fetch_or(&f->task_thread.error, p1 == TILE_ERROR); 650 goto found; 651 } 652 } 653 next: 654 prev_t = t; 655 t = t->next; 656 f->task_thread.task_cur_prev = prev_t; 657 } 658 ttd->cur++; 659 } 660 if (reset_task_cur(c, ttd, UINT_MAX)) continue; 661 if (merge_pending(c)) continue; 662 park: 663 tc->task_thread.flushed = 1; 664 pthread_cond_signal(&tc->task_thread.td.cond); 665 // we want to be woken up next time progress is signaled 666 atomic_store(&ttd->cond_signaled, 0); 667 pthread_cond_wait(&ttd->cond, &ttd->lock); 668 tc->task_thread.flushed = 0; 669 reset_task_cur(c, ttd, UINT_MAX); 670 continue; 671 672 found: 673 // remove t from list 674 if (prev_t) prev_t->next = t->next; 675 else f->task_thread.task_head = t->next; 676 if (!t->next) f->task_thread.task_tail = prev_t; 677 if (t->type > DAV1D_TASK_TYPE_INIT_CDF && !f->task_thread.task_head) 678 ttd->cur++; 679 t->next = NULL; 680 // we don't need to check cond_signaled here, since we found a task 681 // after the last signal so we want to re-signal the next waiting thread 682 // and again won't need to signal after that 683 atomic_store(&ttd->cond_signaled, 1); 684 pthread_cond_signal(&ttd->cond); 685 pthread_mutex_unlock(&ttd->lock); 686 found_unlocked:; 687 const int flush = atomic_load(c->flush); 688 int error = atomic_fetch_or(&f->task_thread.error, flush) | flush; 689 690 // run it 691 tc->f = f; 692 int sby = t->sby; 693 switch (t->type) { 694 case DAV1D_TASK_TYPE_INIT: { 695 assert(c->n_fc > 1); 696 int res = dav1d_decode_frame_init(f); 697 int p1 = f->in_cdf.progress ? atomic_load(f->in_cdf.progress) : 1; 698 if (res || p1 == TILE_ERROR) { 699 pthread_mutex_lock(&ttd->lock); 700 abort_frame(f, res ? res : DAV1D_ERR(EINVAL)); 701 reset_task_cur(c, ttd, t->frame_idx); 702 } else { 703 t->type = DAV1D_TASK_TYPE_INIT_CDF; 704 if (p1) goto found_unlocked; 705 add_pending(f, t); 706 pthread_mutex_lock(&ttd->lock); 707 } 708 continue; 709 } 710 case DAV1D_TASK_TYPE_INIT_CDF: { 711 assert(c->n_fc > 1); 712 int res = DAV1D_ERR(EINVAL); 713 if (!atomic_load(&f->task_thread.error)) 714 res = dav1d_decode_frame_init_cdf(f); 715 if (f->frame_hdr->refresh_context && !f->task_thread.update_set) { 716 atomic_store(f->out_cdf.progress, res < 0 ? TILE_ERROR : 1); 717 } 718 if (!res) { 719 assert(c->n_fc > 1); 720 for (int p = 1; p <= 2; p++) { 721 const int res = dav1d_task_create_tile_sbrow(f, p, 0); 722 if (res) { 723 pthread_mutex_lock(&ttd->lock); 724 // memory allocation failed 725 atomic_store(&f->task_thread.done[2 - p], 1); 726 atomic_store(&f->task_thread.error, -1); 727 atomic_fetch_sub(&f->task_thread.task_counter, 728 f->frame_hdr->tiling.cols * 729 f->frame_hdr->tiling.rows + f->sbh); 730 atomic_store(&f->sr_cur.progress[p - 1], FRAME_ERROR); 731 if (p == 2 && atomic_load(&f->task_thread.done[1])) { 732 assert(!atomic_load(&f->task_thread.task_counter)); 733 dav1d_decode_frame_exit(f, DAV1D_ERR(ENOMEM)); 734 f->n_tile_data = 0; 735 pthread_cond_signal(&f->task_thread.cond); 736 } else { 737 pthread_mutex_unlock(&ttd->lock); 738 } 739 } 740 } 741 pthread_mutex_lock(&ttd->lock); 742 } else { 743 pthread_mutex_lock(&ttd->lock); 744 abort_frame(f, res); 745 reset_task_cur(c, ttd, t->frame_idx); 746 atomic_store(&f->task_thread.init_done, 1); 747 } 748 continue; 749 } 750 case DAV1D_TASK_TYPE_TILE_ENTROPY: 751 case DAV1D_TASK_TYPE_TILE_RECONSTRUCTION: { 752 const int p = t->type == DAV1D_TASK_TYPE_TILE_ENTROPY; 753 const int tile_idx = (int)(t - f->task_thread.tile_tasks[p]); 754 Dav1dTileState *const ts = &f->ts[tile_idx]; 755 756 tc->ts = ts; 757 tc->by = sby << f->sb_shift; 758 const int uses_2pass = c->n_fc > 1; 759 tc->frame_thread.pass = !uses_2pass ? 0 : 760 1 + (t->type == DAV1D_TASK_TYPE_TILE_RECONSTRUCTION); 761 if (!error) error = dav1d_decode_tile_sbrow(tc); 762 const int progress = error ? TILE_ERROR : 1 + sby; 763 764 // signal progress 765 atomic_fetch_or(&f->task_thread.error, error); 766 if (((sby + 1) << f->sb_shift) < ts->tiling.row_end) { 767 t->sby++; 768 t->deps_skip = 0; 769 if (!check_tile(t, f, uses_2pass)) { 770 atomic_store(&ts->progress[p], progress); 771 reset_task_cur_async(ttd, t->frame_idx, c->n_fc); 772 if (!atomic_fetch_or(&ttd->cond_signaled, 1)) 773 pthread_cond_signal(&ttd->cond); 774 goto found_unlocked; 775 } 776 atomic_store(&ts->progress[p], progress); 777 add_pending(f, t); 778 pthread_mutex_lock(&ttd->lock); 779 } else { 780 pthread_mutex_lock(&ttd->lock); 781 atomic_store(&ts->progress[p], progress); 782 reset_task_cur(c, ttd, t->frame_idx); 783 error = atomic_load(&f->task_thread.error); 784 if (f->frame_hdr->refresh_context && 785 tc->frame_thread.pass <= 1 && f->task_thread.update_set && 786 f->frame_hdr->tiling.update == tile_idx) 787 { 788 if (!error) 789 dav1d_cdf_thread_update(f->frame_hdr, f->out_cdf.data.cdf, 790 &f->ts[f->frame_hdr->tiling.update].cdf); 791 if (c->n_fc > 1) 792 atomic_store(f->out_cdf.progress, error ? TILE_ERROR : 1); 793 } 794 if (atomic_fetch_sub(&f->task_thread.task_counter, 1) - 1 == 0 && 795 atomic_load(&f->task_thread.done[0]) && 796 (!uses_2pass || atomic_load(&f->task_thread.done[1]))) 797 { 798 error = atomic_load(&f->task_thread.error); 799 dav1d_decode_frame_exit(f, error == 1 ? DAV1D_ERR(EINVAL) : 800 error ? DAV1D_ERR(ENOMEM) : 0); 801 f->n_tile_data = 0; 802 pthread_cond_signal(&f->task_thread.cond); 803 } 804 assert(atomic_load(&f->task_thread.task_counter) >= 0); 805 if (!atomic_fetch_or(&ttd->cond_signaled, 1)) 806 pthread_cond_signal(&ttd->cond); 807 } 808 continue; 809 } 810 case DAV1D_TASK_TYPE_DEBLOCK_COLS: 811 if (!atomic_load(&f->task_thread.error)) 812 f->bd_fn.filter_sbrow_deblock_cols(f, sby); 813 if (ensure_progress(ttd, f, t, DAV1D_TASK_TYPE_DEBLOCK_ROWS, 814 &f->frame_thread.deblock_progress, 815 &t->deblock_progress)) continue; 816 // fall-through 817 case DAV1D_TASK_TYPE_DEBLOCK_ROWS: 818 if (!atomic_load(&f->task_thread.error)) 819 f->bd_fn.filter_sbrow_deblock_rows(f, sby); 820 // signal deblock progress 821 if (f->frame_hdr->loopfilter.level_y[0] || 822 f->frame_hdr->loopfilter.level_y[1]) 823 { 824 error = atomic_load(&f->task_thread.error); 825 atomic_store(&f->frame_thread.deblock_progress, 826 error ? TILE_ERROR : sby + 1); 827 reset_task_cur_async(ttd, t->frame_idx, c->n_fc); 828 if (!atomic_fetch_or(&ttd->cond_signaled, 1)) 829 pthread_cond_signal(&ttd->cond); 830 } else if (f->seq_hdr->cdef || f->lf.restore_planes) { 831 atomic_fetch_or(&f->frame_thread.copy_lpf_progress[sby >> 5], 832 1U << (sby & 31)); 833 // CDEF needs the top buffer to be saved by lr_copy_lpf of the 834 // previous sbrow 835 if (sby) { 836 int prog = atomic_load(&f->frame_thread.copy_lpf_progress[(sby - 1) >> 5]); 837 if (~prog & (1U << ((sby - 1) & 31))) { 838 t->type = DAV1D_TASK_TYPE_CDEF; 839 t->recon_progress = t->deblock_progress = 0; 840 add_pending(f, t); 841 pthread_mutex_lock(&ttd->lock); 842 continue; 843 } 844 } 845 } 846 // fall-through 847 case DAV1D_TASK_TYPE_CDEF: 848 if (f->seq_hdr->cdef) { 849 if (!atomic_load(&f->task_thread.error)) 850 f->bd_fn.filter_sbrow_cdef(tc, sby); 851 reset_task_cur_async(ttd, t->frame_idx, c->n_fc); 852 if (!atomic_fetch_or(&ttd->cond_signaled, 1)) 853 pthread_cond_signal(&ttd->cond); 854 } 855 // fall-through 856 case DAV1D_TASK_TYPE_SUPER_RESOLUTION: 857 if (f->frame_hdr->width[0] != f->frame_hdr->width[1]) 858 if (!atomic_load(&f->task_thread.error)) 859 f->bd_fn.filter_sbrow_resize(f, sby); 860 // fall-through 861 case DAV1D_TASK_TYPE_LOOP_RESTORATION: 862 if (!atomic_load(&f->task_thread.error) && f->lf.restore_planes) 863 f->bd_fn.filter_sbrow_lr(f, sby); 864 // fall-through 865 case DAV1D_TASK_TYPE_RECONSTRUCTION_PROGRESS: 866 // dummy to cover for no post-filters 867 case DAV1D_TASK_TYPE_ENTROPY_PROGRESS: 868 // dummy to convert tile progress to frame 869 break; 870 default: abort(); 871 } 872 // if task completed [typically LR], signal picture progress as per below 873 const int uses_2pass = c->n_fc > 1; 874 const int sbh = f->sbh; 875 const int sbsz = f->sb_step * 4; 876 if (t->type == DAV1D_TASK_TYPE_ENTROPY_PROGRESS) { 877 error = atomic_load(&f->task_thread.error); 878 const unsigned y = sby + 1 == sbh ? UINT_MAX : (unsigned)(sby + 1) * sbsz; 879 assert(c->n_fc > 1); 880 if (f->sr_cur.p.data[0] /* upon flush, this can be free'ed already */) 881 atomic_store(&f->sr_cur.progress[0], error ? FRAME_ERROR : y); 882 atomic_store(&f->frame_thread.entropy_progress, 883 error ? TILE_ERROR : sby + 1); 884 if (sby + 1 == sbh) 885 atomic_store(&f->task_thread.done[1], 1); 886 pthread_mutex_lock(&ttd->lock); 887 const int num_tasks = atomic_fetch_sub(&f->task_thread.task_counter, 1) - 1; 888 if (sby + 1 < sbh && num_tasks) { 889 reset_task_cur(c, ttd, t->frame_idx); 890 continue; 891 } 892 if (!num_tasks && atomic_load(&f->task_thread.done[0]) && 893 atomic_load(&f->task_thread.done[1])) 894 { 895 error = atomic_load(&f->task_thread.error); 896 dav1d_decode_frame_exit(f, error == 1 ? DAV1D_ERR(EINVAL) : 897 error ? DAV1D_ERR(ENOMEM) : 0); 898 f->n_tile_data = 0; 899 pthread_cond_signal(&f->task_thread.cond); 900 } 901 reset_task_cur(c, ttd, t->frame_idx); 902 continue; 903 } 904 // t->type != DAV1D_TASK_TYPE_ENTROPY_PROGRESS 905 atomic_fetch_or(&f->frame_thread.frame_progress[sby >> 5], 906 1U << (sby & 31)); 907 pthread_mutex_lock(&f->task_thread.lock); 908 sby = get_frame_progress(c, f); 909 error = atomic_load(&f->task_thread.error); 910 const unsigned y = sby + 1 == sbh ? UINT_MAX : (unsigned)(sby + 1) * sbsz; 911 if (c->n_fc > 1 && f->sr_cur.p.data[0] /* upon flush, this can be free'ed already */) 912 atomic_store(&f->sr_cur.progress[1], error ? FRAME_ERROR : y); 913 pthread_mutex_unlock(&f->task_thread.lock); 914 if (sby + 1 == sbh) 915 atomic_store(&f->task_thread.done[0], 1); 916 pthread_mutex_lock(&ttd->lock); 917 const int num_tasks = atomic_fetch_sub(&f->task_thread.task_counter, 1) - 1; 918 if (sby + 1 < sbh && num_tasks) { 919 reset_task_cur(c, ttd, t->frame_idx); 920 continue; 921 } 922 if (!num_tasks && atomic_load(&f->task_thread.done[0]) && 923 (!uses_2pass || atomic_load(&f->task_thread.done[1]))) 924 { 925 error = atomic_load(&f->task_thread.error); 926 dav1d_decode_frame_exit(f, error == 1 ? DAV1D_ERR(EINVAL) : 927 error ? DAV1D_ERR(ENOMEM) : 0); 928 f->n_tile_data = 0; 929 pthread_cond_signal(&f->task_thread.cond); 930 } 931 reset_task_cur(c, ttd, t->frame_idx); 932 } 933 pthread_mutex_unlock(&ttd->lock); 934 935 return NULL; 936 }