threadmessage.c (6682B)
1 /* 2 * Copyright (c) 2014 Nicolas George 3 * 4 * This file is part of FFmpeg. 5 * 6 * FFmpeg is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU Lesser General Public License 8 * as published by the Free Software Foundation; either 9 * version 2.1 of the License, or (at your option) any later version. 10 * 11 * FFmpeg is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 * GNU Lesser General Public License for more details. 15 * 16 * You should have received a copy of the GNU Lesser General Public License 17 * along with FFmpeg; if not, write to the Free Software Foundation, Inc., 18 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 19 */ 20 21 #include <limits.h> 22 #include <stddef.h> 23 24 #include "error.h" 25 #include "fifo.h" 26 #include "mem.h" 27 #include "threadmessage.h" 28 #include "thread.h" 29 30 struct AVThreadMessageQueue { 31 #if HAVE_THREADS 32 AVFifo *fifo; 33 pthread_mutex_t lock; 34 pthread_cond_t cond_recv; 35 pthread_cond_t cond_send; 36 int err_send; 37 int err_recv; 38 unsigned elsize; 39 void (*free_func)(void *msg); 40 #else 41 int dummy; 42 #endif 43 }; 44 45 int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, 46 unsigned nelem, 47 unsigned elsize) 48 { 49 #if HAVE_THREADS 50 AVThreadMessageQueue *rmq; 51 int ret = 0; 52 53 if (nelem > INT_MAX / elsize) 54 return AVERROR(EINVAL); 55 if (!(rmq = av_mallocz(sizeof(*rmq)))) 56 return AVERROR(ENOMEM); 57 if ((ret = pthread_mutex_init(&rmq->lock, NULL))) { 58 av_free(rmq); 59 return AVERROR(ret); 60 } 61 if ((ret = pthread_cond_init(&rmq->cond_recv, NULL))) { 62 pthread_mutex_destroy(&rmq->lock); 63 av_free(rmq); 64 return AVERROR(ret); 65 } 66 if ((ret = pthread_cond_init(&rmq->cond_send, NULL))) { 67 pthread_cond_destroy(&rmq->cond_recv); 68 pthread_mutex_destroy(&rmq->lock); 69 av_free(rmq); 70 return AVERROR(ret); 71 } 72 if (!(rmq->fifo = av_fifo_alloc2(nelem, elsize, 0))) { 73 pthread_cond_destroy(&rmq->cond_send); 74 pthread_cond_destroy(&rmq->cond_recv); 75 pthread_mutex_destroy(&rmq->lock); 76 av_free(rmq); 77 return AVERROR(ENOMEM); 78 } 79 rmq->elsize = elsize; 80 *mq = rmq; 81 return 0; 82 #else 83 *mq = NULL; 84 return AVERROR(ENOSYS); 85 #endif /* HAVE_THREADS */ 86 } 87 88 void av_thread_message_queue_set_free_func(AVThreadMessageQueue *mq, 89 void (*free_func)(void *msg)) 90 { 91 #if HAVE_THREADS 92 mq->free_func = free_func; 93 #endif 94 } 95 96 void av_thread_message_queue_free(AVThreadMessageQueue **mq) 97 { 98 #if HAVE_THREADS 99 if (*mq) { 100 av_thread_message_flush(*mq); 101 av_fifo_freep2(&(*mq)->fifo); 102 pthread_cond_destroy(&(*mq)->cond_send); 103 pthread_cond_destroy(&(*mq)->cond_recv); 104 pthread_mutex_destroy(&(*mq)->lock); 105 av_freep(mq); 106 } 107 #endif 108 } 109 110 int av_thread_message_queue_nb_elems(AVThreadMessageQueue *mq) 111 { 112 #if HAVE_THREADS 113 int ret; 114 pthread_mutex_lock(&mq->lock); 115 ret = av_fifo_can_read(mq->fifo); 116 pthread_mutex_unlock(&mq->lock); 117 return ret; 118 #else 119 return AVERROR(ENOSYS); 120 #endif 121 } 122 123 #if HAVE_THREADS 124 125 static int av_thread_message_queue_send_locked(AVThreadMessageQueue *mq, 126 void *msg, 127 unsigned flags) 128 { 129 while (!mq->err_send && !av_fifo_can_write(mq->fifo)) { 130 if ((flags & AV_THREAD_MESSAGE_NONBLOCK)) 131 return AVERROR(EAGAIN); 132 pthread_cond_wait(&mq->cond_send, &mq->lock); 133 } 134 if (mq->err_send) 135 return mq->err_send; 136 av_fifo_write(mq->fifo, msg, 1); 137 /* one message is sent, signal one receiver */ 138 pthread_cond_signal(&mq->cond_recv); 139 return 0; 140 } 141 142 static int av_thread_message_queue_recv_locked(AVThreadMessageQueue *mq, 143 void *msg, 144 unsigned flags) 145 { 146 while (!mq->err_recv && !av_fifo_can_read(mq->fifo)) { 147 if ((flags & AV_THREAD_MESSAGE_NONBLOCK)) 148 return AVERROR(EAGAIN); 149 pthread_cond_wait(&mq->cond_recv, &mq->lock); 150 } 151 if (!av_fifo_can_read(mq->fifo)) 152 return mq->err_recv; 153 av_fifo_read(mq->fifo, msg, 1); 154 /* one message space appeared, signal one sender */ 155 pthread_cond_signal(&mq->cond_send); 156 return 0; 157 } 158 159 #endif /* HAVE_THREADS */ 160 161 int av_thread_message_queue_send(AVThreadMessageQueue *mq, 162 void *msg, 163 unsigned flags) 164 { 165 #if HAVE_THREADS 166 int ret; 167 168 pthread_mutex_lock(&mq->lock); 169 ret = av_thread_message_queue_send_locked(mq, msg, flags); 170 pthread_mutex_unlock(&mq->lock); 171 return ret; 172 #else 173 return AVERROR(ENOSYS); 174 #endif /* HAVE_THREADS */ 175 } 176 177 int av_thread_message_queue_recv(AVThreadMessageQueue *mq, 178 void *msg, 179 unsigned flags) 180 { 181 #if HAVE_THREADS 182 int ret; 183 184 pthread_mutex_lock(&mq->lock); 185 ret = av_thread_message_queue_recv_locked(mq, msg, flags); 186 pthread_mutex_unlock(&mq->lock); 187 return ret; 188 #else 189 return AVERROR(ENOSYS); 190 #endif /* HAVE_THREADS */ 191 } 192 193 void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq, 194 int err) 195 { 196 #if HAVE_THREADS 197 pthread_mutex_lock(&mq->lock); 198 mq->err_send = err; 199 pthread_cond_broadcast(&mq->cond_send); 200 pthread_mutex_unlock(&mq->lock); 201 #endif /* HAVE_THREADS */ 202 } 203 204 void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, 205 int err) 206 { 207 #if HAVE_THREADS 208 pthread_mutex_lock(&mq->lock); 209 mq->err_recv = err; 210 pthread_cond_broadcast(&mq->cond_recv); 211 pthread_mutex_unlock(&mq->lock); 212 #endif /* HAVE_THREADS */ 213 } 214 215 #if HAVE_THREADS 216 static int free_func_wrap(void *arg, void *buf, size_t *nb_elems) 217 { 218 AVThreadMessageQueue *mq = arg; 219 uint8_t *msg = buf; 220 for (size_t i = 0; i < *nb_elems; i++) 221 mq->free_func(msg + i * mq->elsize); 222 return 0; 223 } 224 #endif 225 226 void av_thread_message_flush(AVThreadMessageQueue *mq) 227 { 228 #if HAVE_THREADS 229 size_t used; 230 231 pthread_mutex_lock(&mq->lock); 232 used = av_fifo_can_read(mq->fifo); 233 if (mq->free_func) 234 av_fifo_read_to_cb(mq->fifo, free_func_wrap, mq, &used); 235 /* only the senders need to be notified since the queue is empty and there 236 * is nothing to read */ 237 pthread_cond_broadcast(&mq->cond_send); 238 pthread_mutex_unlock(&mq->lock); 239 #endif /* HAVE_THREADS */ 240 }