slicethread.c (8225B)
1 /* 2 * This file is part of FFmpeg. 3 * 4 * FFmpeg is free software; you can redistribute it and/or 5 * modify it under the terms of the GNU Lesser General Public 6 * License as published by the Free Software Foundation; either 7 * version 2.1 of the License, or (at your option) any later version. 8 * 9 * FFmpeg is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 * Lesser General Public License for more details. 13 * 14 * You should have received a copy of the GNU Lesser General Public 15 * License along with FFmpeg; if not, write to the Free Software 16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 17 */ 18 19 #include <stdatomic.h> 20 #include "cpu.h" 21 #include "internal.h" 22 #include "slicethread.h" 23 #include "mem.h" 24 #include "thread.h" 25 #include "avassert.h" 26 27 #define MAX_AUTO_THREADS 16 28 29 #if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS 30 31 typedef struct WorkerContext { 32 AVSliceThread *ctx; 33 pthread_mutex_t mutex; 34 pthread_cond_t cond; 35 pthread_t thread; 36 int done; 37 } WorkerContext; 38 39 struct AVSliceThread { 40 WorkerContext *workers; 41 int nb_threads; 42 int nb_active_threads; 43 int nb_jobs; 44 45 atomic_uint first_job; 46 atomic_uint current_job; 47 pthread_mutex_t done_mutex; 48 pthread_cond_t done_cond; 49 int done; 50 int finished; 51 52 void *priv; 53 void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads); 54 void (*main_func)(void *priv); 55 }; 56 57 static int run_jobs(AVSliceThread *ctx) 58 { 59 unsigned nb_jobs = ctx->nb_jobs; 60 unsigned nb_active_threads = ctx->nb_active_threads; 61 unsigned first_job = atomic_fetch_add_explicit(&ctx->first_job, 1, memory_order_acq_rel); 62 unsigned current_job = first_job; 63 64 do { 65 ctx->worker_func(ctx->priv, current_job, first_job, nb_jobs, nb_active_threads); 66 } while ((current_job = atomic_fetch_add_explicit(&ctx->current_job, 1, memory_order_acq_rel)) < nb_jobs); 67 68 return current_job == nb_jobs + nb_active_threads - 1; 69 } 70 71 static void *attribute_align_arg thread_worker(void *v) 72 { 73 WorkerContext *w = v; 74 AVSliceThread *ctx = w->ctx; 75 76 pthread_mutex_lock(&w->mutex); 77 pthread_cond_signal(&w->cond); 78 79 while (1) { 80 w->done = 1; 81 while (w->done) 82 pthread_cond_wait(&w->cond, &w->mutex); 83 84 if (ctx->finished) { 85 pthread_mutex_unlock(&w->mutex); 86 return NULL; 87 } 88 89 if (run_jobs(ctx)) { 90 pthread_mutex_lock(&ctx->done_mutex); 91 ctx->done = 1; 92 pthread_cond_signal(&ctx->done_cond); 93 pthread_mutex_unlock(&ctx->done_mutex); 94 } 95 } 96 } 97 98 int avpriv_slicethread_create(AVSliceThread **pctx, void *priv, 99 void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads), 100 void (*main_func)(void *priv), 101 int nb_threads) 102 { 103 AVSliceThread *ctx; 104 int nb_workers, i; 105 int ret; 106 107 av_assert0(nb_threads >= 0); 108 if (!nb_threads) { 109 int nb_cpus = av_cpu_count(); 110 if (nb_cpus > 1) 111 nb_threads = FFMIN(nb_cpus + 1, MAX_AUTO_THREADS); 112 else 113 nb_threads = 1; 114 } 115 116 nb_workers = nb_threads; 117 if (!main_func) 118 nb_workers--; 119 120 *pctx = ctx = av_mallocz(sizeof(*ctx)); 121 if (!ctx) 122 return AVERROR(ENOMEM); 123 124 if (nb_workers && !(ctx->workers = av_calloc(nb_workers, sizeof(*ctx->workers)))) { 125 av_freep(pctx); 126 return AVERROR(ENOMEM); 127 } 128 129 ctx->priv = priv; 130 ctx->worker_func = worker_func; 131 ctx->main_func = main_func; 132 ctx->nb_threads = nb_threads; 133 ctx->nb_active_threads = 0; 134 ctx->nb_jobs = 0; 135 ctx->finished = 0; 136 137 atomic_init(&ctx->first_job, 0); 138 atomic_init(&ctx->current_job, 0); 139 ret = pthread_mutex_init(&ctx->done_mutex, NULL); 140 if (ret) { 141 av_freep(&ctx->workers); 142 av_freep(pctx); 143 return AVERROR(ret); 144 } 145 ret = pthread_cond_init(&ctx->done_cond, NULL); 146 if (ret) { 147 ctx->nb_threads = main_func ? 0 : 1; 148 avpriv_slicethread_free(pctx); 149 return AVERROR(ret); 150 } 151 ctx->done = 0; 152 153 for (i = 0; i < nb_workers; i++) { 154 WorkerContext *w = &ctx->workers[i]; 155 int ret; 156 w->ctx = ctx; 157 ret = pthread_mutex_init(&w->mutex, NULL); 158 if (ret) { 159 ctx->nb_threads = main_func ? i : i + 1; 160 avpriv_slicethread_free(pctx); 161 return AVERROR(ret); 162 } 163 ret = pthread_cond_init(&w->cond, NULL); 164 if (ret) { 165 pthread_mutex_destroy(&w->mutex); 166 ctx->nb_threads = main_func ? i : i + 1; 167 avpriv_slicethread_free(pctx); 168 return AVERROR(ret); 169 } 170 pthread_mutex_lock(&w->mutex); 171 w->done = 0; 172 173 if (ret = pthread_create(&w->thread, NULL, thread_worker, w)) { 174 ctx->nb_threads = main_func ? i : i + 1; 175 pthread_mutex_unlock(&w->mutex); 176 pthread_cond_destroy(&w->cond); 177 pthread_mutex_destroy(&w->mutex); 178 avpriv_slicethread_free(pctx); 179 return AVERROR(ret); 180 } 181 182 while (!w->done) 183 pthread_cond_wait(&w->cond, &w->mutex); 184 pthread_mutex_unlock(&w->mutex); 185 } 186 187 return nb_threads; 188 } 189 190 void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main) 191 { 192 int nb_workers, i, is_last = 0; 193 194 av_assert0(nb_jobs > 0); 195 ctx->nb_jobs = nb_jobs; 196 ctx->nb_active_threads = FFMIN(nb_jobs, ctx->nb_threads); 197 atomic_store_explicit(&ctx->first_job, 0, memory_order_relaxed); 198 atomic_store_explicit(&ctx->current_job, ctx->nb_active_threads, memory_order_relaxed); 199 nb_workers = ctx->nb_active_threads; 200 if (!ctx->main_func || !execute_main) 201 nb_workers--; 202 203 for (i = 0; i < nb_workers; i++) { 204 WorkerContext *w = &ctx->workers[i]; 205 pthread_mutex_lock(&w->mutex); 206 w->done = 0; 207 pthread_cond_signal(&w->cond); 208 pthread_mutex_unlock(&w->mutex); 209 } 210 211 if (ctx->main_func && execute_main) 212 ctx->main_func(ctx->priv); 213 else 214 is_last = run_jobs(ctx); 215 216 if (!is_last) { 217 pthread_mutex_lock(&ctx->done_mutex); 218 while (!ctx->done) 219 pthread_cond_wait(&ctx->done_cond, &ctx->done_mutex); 220 ctx->done = 0; 221 pthread_mutex_unlock(&ctx->done_mutex); 222 } 223 } 224 225 void avpriv_slicethread_free(AVSliceThread **pctx) 226 { 227 AVSliceThread *ctx; 228 int nb_workers, i; 229 230 if (!pctx || !*pctx) 231 return; 232 233 ctx = *pctx; 234 nb_workers = ctx->nb_threads; 235 if (!ctx->main_func) 236 nb_workers--; 237 238 ctx->finished = 1; 239 for (i = 0; i < nb_workers; i++) { 240 WorkerContext *w = &ctx->workers[i]; 241 pthread_mutex_lock(&w->mutex); 242 w->done = 0; 243 pthread_cond_signal(&w->cond); 244 pthread_mutex_unlock(&w->mutex); 245 } 246 247 for (i = 0; i < nb_workers; i++) { 248 WorkerContext *w = &ctx->workers[i]; 249 pthread_join(w->thread, NULL); 250 pthread_cond_destroy(&w->cond); 251 pthread_mutex_destroy(&w->mutex); 252 } 253 254 pthread_cond_destroy(&ctx->done_cond); 255 pthread_mutex_destroy(&ctx->done_mutex); 256 av_freep(&ctx->workers); 257 av_freep(pctx); 258 } 259 260 #else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */ 261 262 int avpriv_slicethread_create(AVSliceThread **pctx, void *priv, 263 void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads), 264 void (*main_func)(void *priv), 265 int nb_threads) 266 { 267 *pctx = NULL; 268 return AVERROR(ENOSYS); 269 } 270 271 void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main) 272 { 273 av_assert0(0); 274 } 275 276 void avpriv_slicethread_free(AVSliceThread **pctx) 277 { 278 av_assert0(!pctx || !*pctx); 279 } 280 281 #endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */