pthread_frame.c (35890B)
1 /* 2 * This file is part of FFmpeg. 3 * 4 * FFmpeg is free software; you can redistribute it and/or 5 * modify it under the terms of the GNU Lesser General Public 6 * License as published by the Free Software Foundation; either 7 * version 2.1 of the License, or (at your option) any later version. 8 * 9 * FFmpeg is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 * Lesser General Public License for more details. 13 * 14 * You should have received a copy of the GNU Lesser General Public 15 * License along with FFmpeg; if not, write to the Free Software 16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 17 */ 18 19 /** 20 * @file 21 * Frame multithreading support functions 22 * @see doc/multithreading.txt 23 */ 24 25 #include <stdatomic.h> 26 27 #include "avcodec.h" 28 #include "avcodec_internal.h" 29 #include "codec_desc.h" 30 #include "codec_internal.h" 31 #include "decode.h" 32 #include "hwaccel_internal.h" 33 #include "hwconfig.h" 34 #include "internal.h" 35 #include "packet_internal.h" 36 #include "pthread_internal.h" 37 #include "libavutil/refstruct.h" 38 #include "thread.h" 39 #include "threadframe.h" 40 #include "version_major.h" 41 42 #include "libavutil/avassert.h" 43 #include "libavutil/buffer.h" 44 #include "libavutil/common.h" 45 #include "libavutil/cpu.h" 46 #include "libavutil/frame.h" 47 #include "libavutil/internal.h" 48 #include "libavutil/log.h" 49 #include "libavutil/mem.h" 50 #include "libavutil/opt.h" 51 #include "libavutil/thread.h" 52 53 enum { 54 /// Set when the thread is awaiting a packet. 55 STATE_INPUT_READY, 56 /// Set before the codec has called ff_thread_finish_setup(). 57 STATE_SETTING_UP, 58 /// Set after the codec has called ff_thread_finish_setup(). 59 STATE_SETUP_FINISHED, 60 }; 61 62 enum { 63 UNINITIALIZED, ///< Thread has not been created, AVCodec->close mustn't be called 64 NEEDS_CLOSE, ///< FFCodec->close needs to be called 65 INITIALIZED, ///< Thread has been properly set up 66 }; 67 68 typedef struct DecodedFrames { 69 AVFrame **f; 70 size_t nb_f; 71 size_t nb_f_allocated; 72 } DecodedFrames; 73 74 typedef struct ThreadFrameProgress { 75 atomic_int progress[2]; 76 } ThreadFrameProgress; 77 78 /** 79 * Context used by codec threads and stored in their AVCodecInternal thread_ctx. 80 */ 81 typedef struct PerThreadContext { 82 struct FrameThreadContext *parent; 83 84 pthread_t thread; 85 int thread_init; 86 unsigned pthread_init_cnt;///< Number of successfully initialized mutexes/conditions 87 pthread_cond_t input_cond; ///< Used to wait for a new packet from the main thread. 88 pthread_cond_t progress_cond; ///< Used by child threads to wait for progress to change. 89 pthread_cond_t output_cond; ///< Used by the main thread to wait for frames to finish. 90 91 pthread_mutex_t mutex; ///< Mutex used to protect the contents of the PerThreadContext. 92 pthread_mutex_t progress_mutex; ///< Mutex used to protect frame progress values and progress_cond. 93 94 AVCodecContext *avctx; ///< Context used to decode packets passed to this thread. 95 96 AVPacket *avpkt; ///< Input packet (for decoding) or output (for encoding). 97 98 /** 99 * Decoded frames from a single decode iteration. 100 */ 101 DecodedFrames df; 102 int result; ///< The result of the last codec decode/encode() call. 103 104 atomic_int state; 105 106 int die; ///< Set when the thread should exit. 107 108 int hwaccel_serializing; 109 int async_serializing; 110 111 // set to 1 in ff_thread_finish_setup() when a threadsafe hwaccel is used; 112 // cannot check hwaccel caps directly, because 113 // worked threads clear hwaccel state for thread-unsafe hwaccels 114 // after each decode call 115 int hwaccel_threadsafe; 116 117 atomic_int debug_threads; ///< Set if the FF_DEBUG_THREADS option is set. 118 119 /// The following two fields have the same semantics as the DecodeContext field 120 int intra_only_flag; 121 enum AVPictureType initial_pict_type; 122 } PerThreadContext; 123 124 /** 125 * Context stored in the client AVCodecInternal thread_ctx. 126 */ 127 typedef struct FrameThreadContext { 128 PerThreadContext *threads; ///< The contexts for each thread. 129 PerThreadContext *prev_thread; ///< The last thread submit_packet() was called on. 130 131 unsigned pthread_init_cnt; ///< Number of successfully initialized mutexes/conditions 132 pthread_mutex_t buffer_mutex; ///< Mutex used to protect get/release_buffer(). 133 /** 134 * This lock is used for ensuring threads run in serial when thread-unsafe 135 * hwaccel is used. 136 */ 137 pthread_mutex_t hwaccel_mutex; 138 pthread_mutex_t async_mutex; 139 pthread_cond_t async_cond; 140 int async_lock; 141 142 DecodedFrames df; 143 int result; 144 145 /** 146 * Packet to be submitted to the next thread for decoding. 147 */ 148 AVPacket *next_pkt; 149 150 int next_decoding; ///< The next context to submit a packet to. 151 int next_finished; ///< The next context to return output from. 152 153 /* hwaccel state for thread-unsafe hwaccels is temporarily stored here in 154 * order to transfer its ownership to the next decoding thread without the 155 * need for extra synchronization */ 156 const AVHWAccel *stash_hwaccel; 157 void *stash_hwaccel_context; 158 void *stash_hwaccel_priv; 159 } FrameThreadContext; 160 161 static int hwaccel_serial(const AVCodecContext *avctx) 162 { 163 return avctx->hwaccel && !(ffhwaccel(avctx->hwaccel)->caps_internal & HWACCEL_CAP_THREAD_SAFE); 164 } 165 166 static void async_lock(FrameThreadContext *fctx) 167 { 168 pthread_mutex_lock(&fctx->async_mutex); 169 while (fctx->async_lock) 170 pthread_cond_wait(&fctx->async_cond, &fctx->async_mutex); 171 fctx->async_lock = 1; 172 pthread_mutex_unlock(&fctx->async_mutex); 173 } 174 175 static void async_unlock(FrameThreadContext *fctx) 176 { 177 pthread_mutex_lock(&fctx->async_mutex); 178 av_assert0(fctx->async_lock); 179 fctx->async_lock = 0; 180 pthread_cond_broadcast(&fctx->async_cond); 181 pthread_mutex_unlock(&fctx->async_mutex); 182 } 183 184 static void thread_set_name(PerThreadContext *p) 185 { 186 AVCodecContext *avctx = p->avctx; 187 int idx = p - p->parent->threads; 188 char name[16]; 189 190 snprintf(name, sizeof(name), "av:%.7s:df%d", avctx->codec->name, idx); 191 192 ff_thread_setname(name); 193 } 194 195 // get a free frame to decode into 196 static AVFrame *decoded_frames_get_free(DecodedFrames *df) 197 { 198 if (df->nb_f == df->nb_f_allocated) { 199 AVFrame **tmp = av_realloc_array(df->f, df->nb_f + 1, 200 sizeof(*df->f)); 201 if (!tmp) 202 return NULL; 203 df->f = tmp; 204 205 df->f[df->nb_f] = av_frame_alloc(); 206 if (!df->f[df->nb_f]) 207 return NULL; 208 209 df->nb_f_allocated++; 210 } 211 212 av_assert0(!df->f[df->nb_f]->buf[0]); 213 214 return df->f[df->nb_f]; 215 } 216 217 static void decoded_frames_pop(DecodedFrames *df, AVFrame *dst) 218 { 219 AVFrame *tmp_frame = df->f[0]; 220 av_frame_move_ref(dst, tmp_frame); 221 memmove(df->f, df->f + 1, (df->nb_f - 1) * sizeof(*df->f)); 222 df->f[--df->nb_f] = tmp_frame; 223 } 224 225 static void decoded_frames_flush(DecodedFrames *df) 226 { 227 for (size_t i = 0; i < df->nb_f; i++) 228 av_frame_unref(df->f[i]); 229 df->nb_f = 0; 230 } 231 232 static void decoded_frames_free(DecodedFrames *df) 233 { 234 for (size_t i = 0; i < df->nb_f_allocated; i++) 235 av_frame_free(&df->f[i]); 236 av_freep(&df->f); 237 df->nb_f = 0; 238 df->nb_f_allocated = 0; 239 } 240 241 /** 242 * Codec worker thread. 243 * 244 * Automatically calls ff_thread_finish_setup() if the codec does 245 * not provide an update_thread_context method, or if the codec returns 246 * before calling it. 247 */ 248 static attribute_align_arg void *frame_worker_thread(void *arg) 249 { 250 PerThreadContext *p = arg; 251 AVCodecContext *avctx = p->avctx; 252 const FFCodec *codec = ffcodec(avctx->codec); 253 254 thread_set_name(p); 255 256 pthread_mutex_lock(&p->mutex); 257 while (1) { 258 int ret; 259 260 while (atomic_load(&p->state) == STATE_INPUT_READY && !p->die) 261 pthread_cond_wait(&p->input_cond, &p->mutex); 262 263 if (p->die) break; 264 265 if (!codec->update_thread_context) 266 ff_thread_finish_setup(avctx); 267 268 /* If a decoder supports hwaccel, then it must call ff_get_format(). 269 * Since that call must happen before ff_thread_finish_setup(), the 270 * decoder is required to implement update_thread_context() and call 271 * ff_thread_finish_setup() manually. Therefore the above 272 * ff_thread_finish_setup() call did not happen and hwaccel_serializing 273 * cannot be true here. */ 274 av_assert0(!p->hwaccel_serializing); 275 276 /* if the previous thread uses thread-unsafe hwaccel then we take the 277 * lock to ensure the threads don't run concurrently */ 278 if (hwaccel_serial(avctx)) { 279 pthread_mutex_lock(&p->parent->hwaccel_mutex); 280 p->hwaccel_serializing = 1; 281 } 282 283 ret = 0; 284 while (ret >= 0) { 285 AVFrame *frame; 286 287 /* get the frame which will store the output */ 288 frame = decoded_frames_get_free(&p->df); 289 if (!frame) { 290 p->result = AVERROR(ENOMEM); 291 goto alloc_fail; 292 } 293 294 /* do the actual decoding */ 295 ret = ff_decode_receive_frame_internal(avctx, frame); 296 if (ret == 0) 297 p->df.nb_f++; 298 else if (ret < 0 && frame->buf[0]) 299 av_frame_unref(frame); 300 301 p->result = (ret == AVERROR(EAGAIN)) ? 0 : ret; 302 } 303 304 if (atomic_load(&p->state) == STATE_SETTING_UP) 305 ff_thread_finish_setup(avctx); 306 307 alloc_fail: 308 if (p->hwaccel_serializing) { 309 /* wipe hwaccel state for thread-unsafe hwaccels to avoid stale 310 * pointers lying around; 311 * the state was transferred to FrameThreadContext in 312 * ff_thread_finish_setup(), so nothing is leaked */ 313 avctx->hwaccel = NULL; 314 avctx->hwaccel_context = NULL; 315 avctx->internal->hwaccel_priv_data = NULL; 316 317 p->hwaccel_serializing = 0; 318 pthread_mutex_unlock(&p->parent->hwaccel_mutex); 319 } 320 av_assert0(!avctx->hwaccel || 321 (ffhwaccel(avctx->hwaccel)->caps_internal & HWACCEL_CAP_THREAD_SAFE)); 322 323 if (p->async_serializing) { 324 p->async_serializing = 0; 325 326 async_unlock(p->parent); 327 } 328 329 pthread_mutex_lock(&p->progress_mutex); 330 331 atomic_store(&p->state, STATE_INPUT_READY); 332 333 pthread_cond_broadcast(&p->progress_cond); 334 pthread_cond_signal(&p->output_cond); 335 pthread_mutex_unlock(&p->progress_mutex); 336 } 337 pthread_mutex_unlock(&p->mutex); 338 339 return NULL; 340 } 341 342 /** 343 * Update the next thread's AVCodecContext with values from the reference thread's context. 344 * 345 * @param dst The destination context. 346 * @param src The source context. 347 * @param for_user 0 if the destination is a codec thread, 1 if the destination is the user's thread 348 * @return 0 on success, negative error code on failure 349 */ 350 static int update_context_from_thread(AVCodecContext *dst, const AVCodecContext *src, int for_user) 351 { 352 const FFCodec *const codec = ffcodec(dst->codec); 353 int err = 0; 354 355 if (dst != src && (for_user || codec->update_thread_context)) { 356 dst->time_base = src->time_base; 357 dst->framerate = src->framerate; 358 dst->width = src->width; 359 dst->height = src->height; 360 dst->pix_fmt = src->pix_fmt; 361 dst->sw_pix_fmt = src->sw_pix_fmt; 362 363 dst->coded_width = src->coded_width; 364 dst->coded_height = src->coded_height; 365 366 dst->has_b_frames = src->has_b_frames; 367 dst->idct_algo = src->idct_algo; 368 #if FF_API_CODEC_PROPS 369 FF_DISABLE_DEPRECATION_WARNINGS 370 dst->properties = src->properties; 371 FF_ENABLE_DEPRECATION_WARNINGS 372 #endif 373 374 dst->bits_per_coded_sample = src->bits_per_coded_sample; 375 dst->sample_aspect_ratio = src->sample_aspect_ratio; 376 377 dst->profile = src->profile; 378 dst->level = src->level; 379 380 dst->bits_per_raw_sample = src->bits_per_raw_sample; 381 #if FF_API_TICKS_PER_FRAME 382 FF_DISABLE_DEPRECATION_WARNINGS 383 dst->ticks_per_frame = src->ticks_per_frame; 384 FF_ENABLE_DEPRECATION_WARNINGS 385 #endif 386 dst->color_primaries = src->color_primaries; 387 388 dst->color_trc = src->color_trc; 389 dst->colorspace = src->colorspace; 390 dst->color_range = src->color_range; 391 dst->chroma_sample_location = src->chroma_sample_location; 392 393 dst->sample_rate = src->sample_rate; 394 dst->sample_fmt = src->sample_fmt; 395 err = av_channel_layout_copy(&dst->ch_layout, &src->ch_layout); 396 if (err < 0) 397 return err; 398 399 if (!!dst->hw_frames_ctx != !!src->hw_frames_ctx || 400 (dst->hw_frames_ctx && dst->hw_frames_ctx->data != src->hw_frames_ctx->data)) { 401 av_buffer_unref(&dst->hw_frames_ctx); 402 403 if (src->hw_frames_ctx) { 404 dst->hw_frames_ctx = av_buffer_ref(src->hw_frames_ctx); 405 if (!dst->hw_frames_ctx) 406 return AVERROR(ENOMEM); 407 } 408 } 409 410 dst->hwaccel_flags = src->hwaccel_flags; 411 412 av_refstruct_replace(&dst->internal->pool, src->internal->pool); 413 ff_decode_internal_sync(dst, src); 414 } 415 416 if (for_user) { 417 if (codec->update_thread_context_for_user) 418 err = codec->update_thread_context_for_user(dst, src); 419 } else { 420 const PerThreadContext *p_src = src->internal->thread_ctx; 421 PerThreadContext *p_dst = dst->internal->thread_ctx; 422 423 if (codec->update_thread_context) { 424 err = codec->update_thread_context(dst, src); 425 if (err < 0) 426 return err; 427 } 428 429 // reset dst hwaccel state if needed 430 av_assert0(p_dst->hwaccel_threadsafe || 431 (!dst->hwaccel && !dst->internal->hwaccel_priv_data)); 432 if (p_dst->hwaccel_threadsafe && 433 (!p_src->hwaccel_threadsafe || dst->hwaccel != src->hwaccel)) { 434 ff_hwaccel_uninit(dst); 435 p_dst->hwaccel_threadsafe = 0; 436 } 437 438 // propagate hwaccel state for threadsafe hwaccels 439 if (p_src->hwaccel_threadsafe) { 440 const FFHWAccel *hwaccel = ffhwaccel(src->hwaccel); 441 if (!dst->hwaccel) { 442 if (hwaccel->priv_data_size) { 443 av_assert0(hwaccel->update_thread_context); 444 445 dst->internal->hwaccel_priv_data = 446 av_mallocz(hwaccel->priv_data_size); 447 if (!dst->internal->hwaccel_priv_data) 448 return AVERROR(ENOMEM); 449 } 450 dst->hwaccel = src->hwaccel; 451 } 452 av_assert0(dst->hwaccel == src->hwaccel); 453 454 if (hwaccel->update_thread_context) { 455 err = hwaccel->update_thread_context(dst, src); 456 if (err < 0) { 457 av_log(dst, AV_LOG_ERROR, "Error propagating hwaccel state\n"); 458 ff_hwaccel_uninit(dst); 459 return err; 460 } 461 } 462 p_dst->hwaccel_threadsafe = 1; 463 } 464 } 465 466 return err; 467 } 468 469 /** 470 * Update the next thread's AVCodecContext with values set by the user. 471 * 472 * @param dst The destination context. 473 * @param src The source context. 474 * @return 0 on success, negative error code on failure 475 */ 476 static int update_context_from_user(AVCodecContext *dst, const AVCodecContext *src) 477 { 478 int err; 479 480 dst->flags = src->flags; 481 482 dst->draw_horiz_band= src->draw_horiz_band; 483 dst->get_buffer2 = src->get_buffer2; 484 485 dst->opaque = src->opaque; 486 dst->debug = src->debug; 487 488 dst->slice_flags = src->slice_flags; 489 dst->flags2 = src->flags2; 490 dst->export_side_data = src->export_side_data; 491 492 dst->skip_loop_filter = src->skip_loop_filter; 493 dst->skip_idct = src->skip_idct; 494 dst->skip_frame = src->skip_frame; 495 496 dst->frame_num = src->frame_num; 497 498 av_packet_unref(dst->internal->last_pkt_props); 499 err = av_packet_copy_props(dst->internal->last_pkt_props, src->internal->last_pkt_props); 500 if (err < 0) 501 return err; 502 503 return 0; 504 } 505 506 static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx, 507 AVPacket *in_pkt) 508 { 509 FrameThreadContext *fctx = p->parent; 510 PerThreadContext *prev_thread = fctx->prev_thread; 511 const AVCodec *codec = p->avctx->codec; 512 int ret; 513 514 pthread_mutex_lock(&p->mutex); 515 516 av_packet_unref(p->avpkt); 517 av_packet_move_ref(p->avpkt, in_pkt); 518 519 if (AVPACKET_IS_EMPTY(p->avpkt)) 520 p->avctx->internal->draining = 1; 521 522 ret = update_context_from_user(p->avctx, user_avctx); 523 if (ret) { 524 pthread_mutex_unlock(&p->mutex); 525 return ret; 526 } 527 atomic_store_explicit(&p->debug_threads, 528 (p->avctx->debug & FF_DEBUG_THREADS) != 0, 529 memory_order_relaxed); 530 531 if (prev_thread) { 532 if (atomic_load(&prev_thread->state) == STATE_SETTING_UP) { 533 pthread_mutex_lock(&prev_thread->progress_mutex); 534 while (atomic_load(&prev_thread->state) == STATE_SETTING_UP) 535 pthread_cond_wait(&prev_thread->progress_cond, &prev_thread->progress_mutex); 536 pthread_mutex_unlock(&prev_thread->progress_mutex); 537 } 538 539 /* codecs without delay might not be prepared to be called repeatedly here during 540 * flushing (vp3/theora), and also don't need to be, since from this point on, they 541 * will always return EOF anyway */ 542 if (!p->avctx->internal->draining || 543 (codec->capabilities & AV_CODEC_CAP_DELAY)) { 544 ret = update_context_from_thread(p->avctx, prev_thread->avctx, 0); 545 if (ret) { 546 pthread_mutex_unlock(&p->mutex); 547 return ret; 548 } 549 } 550 } 551 552 /* transfer the stashed hwaccel state, if any */ 553 av_assert0(!p->avctx->hwaccel || p->hwaccel_threadsafe); 554 if (!p->hwaccel_threadsafe) { 555 FFSWAP(const AVHWAccel*, p->avctx->hwaccel, fctx->stash_hwaccel); 556 FFSWAP(void*, p->avctx->hwaccel_context, fctx->stash_hwaccel_context); 557 FFSWAP(void*, p->avctx->internal->hwaccel_priv_data, fctx->stash_hwaccel_priv); 558 } 559 560 atomic_store(&p->state, STATE_SETTING_UP); 561 pthread_cond_signal(&p->input_cond); 562 pthread_mutex_unlock(&p->mutex); 563 564 fctx->prev_thread = p; 565 fctx->next_decoding = (fctx->next_decoding + 1) % p->avctx->thread_count; 566 567 return 0; 568 } 569 570 int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame) 571 { 572 FrameThreadContext *fctx = avctx->internal->thread_ctx; 573 int ret = 0; 574 575 /* release the async lock, permitting blocked hwaccel threads to 576 * go forward while we are in this function */ 577 async_unlock(fctx); 578 579 /* submit packets to threads while there are no buffered results to return */ 580 while (!fctx->df.nb_f && !fctx->result) { 581 PerThreadContext *p; 582 583 /* get a packet to be submitted to the next thread */ 584 av_packet_unref(fctx->next_pkt); 585 ret = ff_decode_get_packet(avctx, fctx->next_pkt); 586 if (ret < 0 && ret != AVERROR_EOF) 587 goto finish; 588 589 ret = submit_packet(&fctx->threads[fctx->next_decoding], avctx, 590 fctx->next_pkt); 591 if (ret < 0) 592 goto finish; 593 594 /* do not return any frames until all threads have something to do */ 595 if (fctx->next_decoding != fctx->next_finished && 596 !avctx->internal->draining) 597 continue; 598 599 p = &fctx->threads[fctx->next_finished]; 600 fctx->next_finished = (fctx->next_finished + 1) % avctx->thread_count; 601 602 if (atomic_load(&p->state) != STATE_INPUT_READY) { 603 pthread_mutex_lock(&p->progress_mutex); 604 while (atomic_load_explicit(&p->state, memory_order_relaxed) != STATE_INPUT_READY) 605 pthread_cond_wait(&p->output_cond, &p->progress_mutex); 606 pthread_mutex_unlock(&p->progress_mutex); 607 } 608 609 update_context_from_thread(avctx, p->avctx, 1); 610 fctx->result = p->result; 611 p->result = 0; 612 if (p->df.nb_f) 613 FFSWAP(DecodedFrames, fctx->df, p->df); 614 } 615 616 /* a thread may return multiple frames AND an error 617 * we first return all the frames, then the error */ 618 if (fctx->df.nb_f) { 619 decoded_frames_pop(&fctx->df, frame); 620 ret = 0; 621 } else { 622 ret = fctx->result; 623 fctx->result = 0; 624 } 625 626 finish: 627 async_lock(fctx); 628 return ret; 629 } 630 631 void ff_thread_report_progress(ThreadFrame *f, int n, int field) 632 { 633 PerThreadContext *p; 634 atomic_int *progress = f->progress ? f->progress->progress : NULL; 635 636 if (!progress || 637 atomic_load_explicit(&progress[field], memory_order_relaxed) >= n) 638 return; 639 640 p = f->owner[field]->internal->thread_ctx; 641 642 if (atomic_load_explicit(&p->debug_threads, memory_order_relaxed)) 643 av_log(f->owner[field], AV_LOG_DEBUG, 644 "%p finished %d field %d\n", progress, n, field); 645 646 pthread_mutex_lock(&p->progress_mutex); 647 648 atomic_store_explicit(&progress[field], n, memory_order_release); 649 650 pthread_cond_broadcast(&p->progress_cond); 651 pthread_mutex_unlock(&p->progress_mutex); 652 } 653 654 void ff_thread_await_progress(const ThreadFrame *f, int n, int field) 655 { 656 PerThreadContext *p; 657 atomic_int *progress = f->progress ? f->progress->progress : NULL; 658 659 if (!progress || 660 atomic_load_explicit(&progress[field], memory_order_acquire) >= n) 661 return; 662 663 p = f->owner[field]->internal->thread_ctx; 664 665 if (atomic_load_explicit(&p->debug_threads, memory_order_relaxed)) 666 av_log(f->owner[field], AV_LOG_DEBUG, 667 "thread awaiting %d field %d from %p\n", n, field, progress); 668 669 pthread_mutex_lock(&p->progress_mutex); 670 while (atomic_load_explicit(&progress[field], memory_order_relaxed) < n) 671 pthread_cond_wait(&p->progress_cond, &p->progress_mutex); 672 pthread_mutex_unlock(&p->progress_mutex); 673 } 674 675 void ff_thread_finish_setup(AVCodecContext *avctx) { 676 PerThreadContext *p; 677 678 if (!(avctx->active_thread_type&FF_THREAD_FRAME)) return; 679 680 p = avctx->internal->thread_ctx; 681 682 p->hwaccel_threadsafe = avctx->hwaccel && 683 (ffhwaccel(avctx->hwaccel)->caps_internal & HWACCEL_CAP_THREAD_SAFE); 684 685 if (hwaccel_serial(avctx) && !p->hwaccel_serializing) { 686 pthread_mutex_lock(&p->parent->hwaccel_mutex); 687 p->hwaccel_serializing = 1; 688 } 689 690 /* this assumes that no hwaccel calls happen before ff_thread_finish_setup() */ 691 if (avctx->hwaccel && 692 !(ffhwaccel(avctx->hwaccel)->caps_internal & HWACCEL_CAP_ASYNC_SAFE)) { 693 p->async_serializing = 1; 694 695 async_lock(p->parent); 696 } 697 698 /* thread-unsafe hwaccels share a single private data instance, so we 699 * save hwaccel state for passing to the next thread; 700 * this is done here so that this worker thread can wipe its own hwaccel 701 * state after decoding, without requiring synchronization */ 702 av_assert0(!p->parent->stash_hwaccel); 703 if (hwaccel_serial(avctx)) { 704 p->parent->stash_hwaccel = avctx->hwaccel; 705 p->parent->stash_hwaccel_context = avctx->hwaccel_context; 706 p->parent->stash_hwaccel_priv = avctx->internal->hwaccel_priv_data; 707 } 708 709 pthread_mutex_lock(&p->progress_mutex); 710 if(atomic_load(&p->state) == STATE_SETUP_FINISHED){ 711 av_log(avctx, AV_LOG_WARNING, "Multiple ff_thread_finish_setup() calls\n"); 712 } 713 714 atomic_store(&p->state, STATE_SETUP_FINISHED); 715 716 pthread_cond_broadcast(&p->progress_cond); 717 pthread_mutex_unlock(&p->progress_mutex); 718 } 719 720 /// Waits for all threads to finish. 721 static void park_frame_worker_threads(FrameThreadContext *fctx, int thread_count) 722 { 723 int i; 724 725 async_unlock(fctx); 726 727 for (i = 0; i < thread_count; i++) { 728 PerThreadContext *p = &fctx->threads[i]; 729 730 if (atomic_load(&p->state) != STATE_INPUT_READY) { 731 pthread_mutex_lock(&p->progress_mutex); 732 while (atomic_load(&p->state) != STATE_INPUT_READY) 733 pthread_cond_wait(&p->output_cond, &p->progress_mutex); 734 pthread_mutex_unlock(&p->progress_mutex); 735 } 736 } 737 738 async_lock(fctx); 739 } 740 741 #define OFF(member) offsetof(FrameThreadContext, member) 742 DEFINE_OFFSET_ARRAY(FrameThreadContext, thread_ctx, pthread_init_cnt, 743 (OFF(buffer_mutex), OFF(hwaccel_mutex), OFF(async_mutex)), 744 (OFF(async_cond))); 745 #undef OFF 746 747 #define OFF(member) offsetof(PerThreadContext, member) 748 DEFINE_OFFSET_ARRAY(PerThreadContext, per_thread, pthread_init_cnt, 749 (OFF(progress_mutex), OFF(mutex)), 750 (OFF(input_cond), OFF(progress_cond), OFF(output_cond))); 751 #undef OFF 752 753 void ff_frame_thread_free(AVCodecContext *avctx, int thread_count) 754 { 755 FrameThreadContext *fctx = avctx->internal->thread_ctx; 756 const FFCodec *codec = ffcodec(avctx->codec); 757 int i; 758 759 park_frame_worker_threads(fctx, thread_count); 760 761 for (i = 0; i < thread_count; i++) { 762 PerThreadContext *p = &fctx->threads[i]; 763 AVCodecContext *ctx = p->avctx; 764 765 if (ctx->internal) { 766 if (p->thread_init == INITIALIZED) { 767 pthread_mutex_lock(&p->mutex); 768 p->die = 1; 769 pthread_cond_signal(&p->input_cond); 770 pthread_mutex_unlock(&p->mutex); 771 772 pthread_join(p->thread, NULL); 773 } 774 if (codec->close && p->thread_init != UNINITIALIZED) 775 codec->close(ctx); 776 777 /* When using a threadsafe hwaccel, this is where 778 * each thread's context is uninit'd and freed. */ 779 ff_hwaccel_uninit(ctx); 780 781 if (ctx->priv_data) { 782 if (codec->p.priv_class) 783 av_opt_free(ctx->priv_data); 784 av_freep(&ctx->priv_data); 785 } 786 787 av_refstruct_unref(&ctx->internal->pool); 788 av_packet_free(&ctx->internal->in_pkt); 789 av_packet_free(&ctx->internal->last_pkt_props); 790 ff_decode_internal_uninit(ctx); 791 av_freep(&ctx->internal); 792 av_buffer_unref(&ctx->hw_frames_ctx); 793 av_frame_side_data_free(&ctx->decoded_side_data, 794 &ctx->nb_decoded_side_data); 795 } 796 797 decoded_frames_free(&p->df); 798 799 ff_pthread_free(p, per_thread_offsets); 800 av_packet_free(&p->avpkt); 801 802 av_freep(&p->avctx); 803 } 804 805 decoded_frames_free(&fctx->df); 806 av_packet_free(&fctx->next_pkt); 807 808 av_freep(&fctx->threads); 809 ff_pthread_free(fctx, thread_ctx_offsets); 810 811 /* if we have stashed hwaccel state, move it to the user-facing context, 812 * so it will be freed in ff_codec_close() */ 813 av_assert0(!avctx->hwaccel); 814 FFSWAP(const AVHWAccel*, avctx->hwaccel, fctx->stash_hwaccel); 815 FFSWAP(void*, avctx->hwaccel_context, fctx->stash_hwaccel_context); 816 FFSWAP(void*, avctx->internal->hwaccel_priv_data, fctx->stash_hwaccel_priv); 817 818 av_freep(&avctx->internal->thread_ctx); 819 } 820 821 static av_cold int init_thread(PerThreadContext *p, int *threads_to_free, 822 FrameThreadContext *fctx, AVCodecContext *avctx, 823 const FFCodec *codec, int first) 824 { 825 AVCodecContext *copy; 826 int err; 827 828 p->initial_pict_type = AV_PICTURE_TYPE_NONE; 829 if (avctx->codec_descriptor->props & AV_CODEC_PROP_INTRA_ONLY) { 830 p->intra_only_flag = AV_FRAME_FLAG_KEY; 831 if (avctx->codec_type == AVMEDIA_TYPE_VIDEO) 832 p->initial_pict_type = AV_PICTURE_TYPE_I; 833 } 834 835 atomic_init(&p->state, STATE_INPUT_READY); 836 837 copy = av_memdup(avctx, sizeof(*avctx)); 838 if (!copy) 839 return AVERROR(ENOMEM); 840 copy->priv_data = NULL; 841 copy->decoded_side_data = NULL; 842 copy->nb_decoded_side_data = 0; 843 844 /* From now on, this PerThreadContext will be cleaned up by 845 * ff_frame_thread_free in case of errors. */ 846 (*threads_to_free)++; 847 848 p->parent = fctx; 849 p->avctx = copy; 850 851 copy->internal = ff_decode_internal_alloc(); 852 if (!copy->internal) 853 return AVERROR(ENOMEM); 854 ff_decode_internal_sync(copy, avctx); 855 copy->internal->thread_ctx = p; 856 copy->internal->progress_frame_pool = avctx->internal->progress_frame_pool; 857 858 copy->delay = avctx->delay; 859 860 if (codec->priv_data_size) { 861 copy->priv_data = av_mallocz(codec->priv_data_size); 862 if (!copy->priv_data) 863 return AVERROR(ENOMEM); 864 865 if (codec->p.priv_class) { 866 *(const AVClass **)copy->priv_data = codec->p.priv_class; 867 err = av_opt_copy(copy->priv_data, avctx->priv_data); 868 if (err < 0) 869 return err; 870 } 871 } 872 873 err = ff_pthread_init(p, per_thread_offsets); 874 if (err < 0) 875 return err; 876 877 if (!(p->avpkt = av_packet_alloc())) 878 return AVERROR(ENOMEM); 879 880 copy->internal->is_frame_mt = 1; 881 if (!first) 882 copy->internal->is_copy = 1; 883 884 copy->internal->in_pkt = av_packet_alloc(); 885 if (!copy->internal->in_pkt) 886 return AVERROR(ENOMEM); 887 888 copy->internal->last_pkt_props = av_packet_alloc(); 889 if (!copy->internal->last_pkt_props) 890 return AVERROR(ENOMEM); 891 892 if (codec->init) { 893 err = codec->init(copy); 894 if (err < 0) { 895 if (codec->caps_internal & FF_CODEC_CAP_INIT_CLEANUP) 896 p->thread_init = NEEDS_CLOSE; 897 return err; 898 } 899 } 900 p->thread_init = NEEDS_CLOSE; 901 902 if (first) { 903 update_context_from_thread(avctx, copy, 1); 904 905 av_frame_side_data_free(&avctx->decoded_side_data, &avctx->nb_decoded_side_data); 906 for (int i = 0; i < copy->nb_decoded_side_data; i++) { 907 err = av_frame_side_data_clone(&avctx->decoded_side_data, 908 &avctx->nb_decoded_side_data, 909 copy->decoded_side_data[i], 0); 910 if (err < 0) 911 return err; 912 } 913 } 914 915 atomic_init(&p->debug_threads, (copy->debug & FF_DEBUG_THREADS) != 0); 916 917 err = AVERROR(pthread_create(&p->thread, NULL, frame_worker_thread, p)); 918 if (err < 0) 919 return err; 920 p->thread_init = INITIALIZED; 921 922 return 0; 923 } 924 925 int ff_frame_thread_init(AVCodecContext *avctx) 926 { 927 int thread_count = avctx->thread_count; 928 const FFCodec *codec = ffcodec(avctx->codec); 929 FrameThreadContext *fctx; 930 int err, i = 0; 931 932 if (!thread_count) { 933 int nb_cpus = av_cpu_count(); 934 // use number of cores + 1 as thread count if there is more than one 935 if (nb_cpus > 1) 936 thread_count = avctx->thread_count = FFMIN(nb_cpus + 1, MAX_AUTO_THREADS); 937 else 938 thread_count = avctx->thread_count = 1; 939 } 940 941 if (thread_count <= 1) { 942 avctx->active_thread_type = 0; 943 return 0; 944 } 945 946 avctx->internal->thread_ctx = fctx = av_mallocz(sizeof(FrameThreadContext)); 947 if (!fctx) 948 return AVERROR(ENOMEM); 949 950 err = ff_pthread_init(fctx, thread_ctx_offsets); 951 if (err < 0) { 952 ff_pthread_free(fctx, thread_ctx_offsets); 953 av_freep(&avctx->internal->thread_ctx); 954 return err; 955 } 956 957 fctx->next_pkt = av_packet_alloc(); 958 if (!fctx->next_pkt) 959 return AVERROR(ENOMEM); 960 961 fctx->async_lock = 1; 962 963 if (codec->p.type == AVMEDIA_TYPE_VIDEO) 964 avctx->delay = avctx->thread_count - 1; 965 966 fctx->threads = av_calloc(thread_count, sizeof(*fctx->threads)); 967 if (!fctx->threads) { 968 err = AVERROR(ENOMEM); 969 goto error; 970 } 971 972 for (; i < thread_count; ) { 973 PerThreadContext *p = &fctx->threads[i]; 974 int first = !i; 975 976 err = init_thread(p, &i, fctx, avctx, codec, first); 977 if (err < 0) 978 goto error; 979 } 980 981 return 0; 982 983 error: 984 ff_frame_thread_free(avctx, i); 985 return err; 986 } 987 988 void ff_thread_flush(AVCodecContext *avctx) 989 { 990 int i; 991 FrameThreadContext *fctx = avctx->internal->thread_ctx; 992 993 if (!fctx) return; 994 995 park_frame_worker_threads(fctx, avctx->thread_count); 996 if (fctx->prev_thread) { 997 if (fctx->prev_thread != &fctx->threads[0]) 998 update_context_from_thread(fctx->threads[0].avctx, fctx->prev_thread->avctx, 0); 999 } 1000 1001 fctx->next_decoding = fctx->next_finished = 0; 1002 fctx->prev_thread = NULL; 1003 1004 decoded_frames_flush(&fctx->df); 1005 fctx->result = 0; 1006 1007 for (i = 0; i < avctx->thread_count; i++) { 1008 PerThreadContext *p = &fctx->threads[i]; 1009 1010 decoded_frames_flush(&p->df); 1011 p->result = 0; 1012 1013 avcodec_flush_buffers(p->avctx); 1014 } 1015 } 1016 1017 int ff_thread_can_start_frame(AVCodecContext *avctx) 1018 { 1019 if ((avctx->active_thread_type & FF_THREAD_FRAME) && 1020 ffcodec(avctx->codec)->update_thread_context) { 1021 PerThreadContext *p = avctx->internal->thread_ctx; 1022 1023 if (atomic_load(&p->state) != STATE_SETTING_UP) 1024 return 0; 1025 } 1026 1027 return 1; 1028 } 1029 1030 static int thread_get_buffer_internal(AVCodecContext *avctx, AVFrame *f, int flags) 1031 { 1032 PerThreadContext *p; 1033 int err; 1034 1035 if (!(avctx->active_thread_type & FF_THREAD_FRAME)) 1036 return ff_get_buffer(avctx, f, flags); 1037 1038 p = avctx->internal->thread_ctx; 1039 if (atomic_load(&p->state) != STATE_SETTING_UP && 1040 ffcodec(avctx->codec)->update_thread_context) { 1041 av_log(avctx, AV_LOG_ERROR, "get_buffer() cannot be called after ff_thread_finish_setup()\n"); 1042 return -1; 1043 } 1044 1045 pthread_mutex_lock(&p->parent->buffer_mutex); 1046 err = ff_get_buffer(avctx, f, flags); 1047 1048 pthread_mutex_unlock(&p->parent->buffer_mutex); 1049 1050 return err; 1051 } 1052 1053 int ff_thread_get_buffer(AVCodecContext *avctx, AVFrame *f, int flags) 1054 { 1055 int ret = thread_get_buffer_internal(avctx, f, flags); 1056 if (ret < 0) 1057 av_log(avctx, AV_LOG_ERROR, "thread_get_buffer() failed\n"); 1058 return ret; 1059 } 1060 1061 int ff_thread_get_ext_buffer(AVCodecContext *avctx, ThreadFrame *f, int flags) 1062 { 1063 int ret; 1064 1065 f->owner[0] = f->owner[1] = avctx; 1066 if (!(avctx->active_thread_type & FF_THREAD_FRAME)) 1067 return ff_get_buffer(avctx, f->f, flags); 1068 1069 f->progress = av_refstruct_allocz(sizeof(*f->progress)); 1070 if (!f->progress) 1071 return AVERROR(ENOMEM); 1072 1073 atomic_init(&f->progress->progress[0], -1); 1074 atomic_init(&f->progress->progress[1], -1); 1075 1076 ret = ff_thread_get_buffer(avctx, f->f, flags); 1077 if (ret) 1078 av_refstruct_unref(&f->progress); 1079 return ret; 1080 } 1081 1082 void ff_thread_release_ext_buffer(ThreadFrame *f) 1083 { 1084 av_refstruct_unref(&f->progress); 1085 f->owner[0] = f->owner[1] = NULL; 1086 if (f->f) 1087 av_frame_unref(f->f); 1088 } 1089 1090 enum ThreadingStatus ff_thread_sync_ref(AVCodecContext *avctx, size_t offset) 1091 { 1092 PerThreadContext *p; 1093 const void *ref; 1094 1095 if (!avctx->internal->is_copy) 1096 return avctx->active_thread_type & FF_THREAD_FRAME ? 1097 FF_THREAD_IS_FIRST_THREAD : FF_THREAD_NO_FRAME_THREADING; 1098 1099 p = avctx->internal->thread_ctx; 1100 1101 av_assert1(memcpy(&ref, (char*)avctx->priv_data + offset, sizeof(ref)) && ref == NULL); 1102 1103 memcpy(&ref, (const char*)p->parent->threads[0].avctx->priv_data + offset, sizeof(ref)); 1104 av_assert1(ref); 1105 av_refstruct_replace((char*)avctx->priv_data + offset, ref); 1106 1107 return FF_THREAD_IS_COPY; 1108 } 1109 1110 int ff_thread_get_packet(AVCodecContext *avctx, AVPacket *pkt) 1111 { 1112 PerThreadContext *p = avctx->internal->thread_ctx; 1113 1114 if (!AVPACKET_IS_EMPTY(p->avpkt)) { 1115 av_packet_move_ref(pkt, p->avpkt); 1116 return 0; 1117 } 1118 1119 return avctx->internal->draining ? AVERROR_EOF : AVERROR(EAGAIN); 1120 }