thread_utils.c (11145B)
1 // Copyright 2011 Google Inc. All Rights Reserved. 2 // 3 // Use of this source code is governed by a BSD-style license 4 // that can be found in the COPYING file in the root of the source 5 // tree. An additional intellectual property rights grant can be found 6 // in the file PATENTS. All contributing project authors may 7 // be found in the AUTHORS file in the root of the source tree. 8 // ----------------------------------------------------------------------------- 9 // 10 // Multi-threaded worker 11 // 12 // Author: Skal (pascal.massimino@gmail.com) 13 14 #include <assert.h> 15 #include <string.h> // for memset() 16 17 #include "src/utils/thread_utils.h" 18 #include "src/utils/utils.h" 19 20 #ifdef WEBP_USE_THREAD 21 22 #if defined(_WIN32) 23 24 #include <windows.h> 25 typedef HANDLE pthread_t; 26 typedef CRITICAL_SECTION pthread_mutex_t; 27 28 #if _WIN32_WINNT >= 0x0600 // Windows Vista / Server 2008 or greater 29 #define USE_WINDOWS_CONDITION_VARIABLE 30 typedef CONDITION_VARIABLE pthread_cond_t; 31 #else 32 typedef struct { 33 HANDLE waiting_sem; 34 HANDLE received_sem; 35 HANDLE signal_event; 36 } pthread_cond_t; 37 #endif // _WIN32_WINNT >= 0x600 38 39 #ifndef WINAPI_FAMILY_PARTITION 40 #define WINAPI_PARTITION_DESKTOP 1 41 #define WINAPI_FAMILY_PARTITION(x) x 42 #endif 43 44 #if !WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP) 45 #define USE_CREATE_THREAD 46 #endif 47 48 #else // !_WIN32 49 50 #include <pthread.h> 51 52 #endif // _WIN32 53 54 typedef struct { 55 pthread_mutex_t mutex; 56 pthread_cond_t condition; 57 pthread_t thread; 58 } WebPWorkerImpl; 59 60 #if defined(_WIN32) 61 62 //------------------------------------------------------------------------------ 63 // simplistic pthread emulation layer 64 65 #include <process.h> 66 67 // _beginthreadex requires __stdcall 68 #define THREADFN unsigned int __stdcall 69 #define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val) 70 71 #if _WIN32_WINNT >= 0x0501 // Windows XP or greater 72 #define WaitForSingleObject(obj, timeout) \ 73 WaitForSingleObjectEx(obj, timeout, FALSE /*bAlertable*/) 74 #endif 75 76 static int pthread_create(pthread_t* const thread, const void* attr, 77 unsigned int (__stdcall* start)(void*), void* arg) { 78 (void)attr; 79 #ifdef USE_CREATE_THREAD 80 *thread = CreateThread(NULL, /* lpThreadAttributes */ 81 0, /* dwStackSize */ 82 start, 83 arg, 84 0, /* dwStackSize */ 85 NULL); /* lpThreadId */ 86 #else 87 *thread = (pthread_t)_beginthreadex(NULL, /* void *security */ 88 0, /* unsigned stack_size */ 89 start, 90 arg, 91 0, /* unsigned initflag */ 92 NULL); /* unsigned *thrdaddr */ 93 #endif 94 if (*thread == NULL) return 1; 95 SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL); 96 return 0; 97 } 98 99 static int pthread_join(pthread_t thread, void** value_ptr) { 100 (void)value_ptr; 101 return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 || 102 CloseHandle(thread) == 0); 103 } 104 105 // Mutex 106 static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) { 107 (void)mutexattr; 108 #if _WIN32_WINNT >= 0x0600 // Windows Vista / Server 2008 or greater 109 InitializeCriticalSectionEx(mutex, 0 /*dwSpinCount*/, 0 /*Flags*/); 110 #else 111 InitializeCriticalSection(mutex); 112 #endif 113 return 0; 114 } 115 116 static int pthread_mutex_lock(pthread_mutex_t* const mutex) { 117 EnterCriticalSection(mutex); 118 return 0; 119 } 120 121 static int pthread_mutex_unlock(pthread_mutex_t* const mutex) { 122 LeaveCriticalSection(mutex); 123 return 0; 124 } 125 126 static int pthread_mutex_destroy(pthread_mutex_t* const mutex) { 127 DeleteCriticalSection(mutex); 128 return 0; 129 } 130 131 // Condition 132 static int pthread_cond_destroy(pthread_cond_t* const condition) { 133 int ok = 1; 134 #ifdef USE_WINDOWS_CONDITION_VARIABLE 135 (void)condition; 136 #else 137 ok &= (CloseHandle(condition->waiting_sem) != 0); 138 ok &= (CloseHandle(condition->received_sem) != 0); 139 ok &= (CloseHandle(condition->signal_event) != 0); 140 #endif 141 return !ok; 142 } 143 144 static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) { 145 (void)cond_attr; 146 #ifdef USE_WINDOWS_CONDITION_VARIABLE 147 InitializeConditionVariable(condition); 148 #else 149 condition->waiting_sem = CreateSemaphore(NULL, 0, 1, NULL); 150 condition->received_sem = CreateSemaphore(NULL, 0, 1, NULL); 151 condition->signal_event = CreateEvent(NULL, FALSE, FALSE, NULL); 152 if (condition->waiting_sem == NULL || 153 condition->received_sem == NULL || 154 condition->signal_event == NULL) { 155 pthread_cond_destroy(condition); 156 return 1; 157 } 158 #endif 159 return 0; 160 } 161 162 static int pthread_cond_signal(pthread_cond_t* const condition) { 163 int ok = 1; 164 #ifdef USE_WINDOWS_CONDITION_VARIABLE 165 WakeConditionVariable(condition); 166 #else 167 if (WaitForSingleObject(condition->waiting_sem, 0) == WAIT_OBJECT_0) { 168 // a thread is waiting in pthread_cond_wait: allow it to be notified 169 ok = SetEvent(condition->signal_event); 170 // wait until the event is consumed so the signaler cannot consume 171 // the event via its own pthread_cond_wait. 172 ok &= (WaitForSingleObject(condition->received_sem, INFINITE) != 173 WAIT_OBJECT_0); 174 } 175 #endif 176 return !ok; 177 } 178 179 static int pthread_cond_wait(pthread_cond_t* const condition, 180 pthread_mutex_t* const mutex) { 181 int ok; 182 #ifdef USE_WINDOWS_CONDITION_VARIABLE 183 ok = SleepConditionVariableCS(condition, mutex, INFINITE); 184 #else 185 // note that there is a consumer available so the signal isn't dropped in 186 // pthread_cond_signal 187 if (!ReleaseSemaphore(condition->waiting_sem, 1, NULL)) return 1; 188 // now unlock the mutex so pthread_cond_signal may be issued 189 pthread_mutex_unlock(mutex); 190 ok = (WaitForSingleObject(condition->signal_event, INFINITE) == 191 WAIT_OBJECT_0); 192 ok &= ReleaseSemaphore(condition->received_sem, 1, NULL); 193 pthread_mutex_lock(mutex); 194 #endif 195 return !ok; 196 } 197 198 #else // !_WIN32 199 # define THREADFN void* 200 # define THREAD_RETURN(val) val 201 #endif // _WIN32 202 203 //------------------------------------------------------------------------------ 204 205 static THREADFN ThreadLoop(void* ptr) { 206 WebPWorker* const worker = (WebPWorker*)ptr; 207 WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl; 208 int done = 0; 209 while (!done) { 210 pthread_mutex_lock(&impl->mutex); 211 while (worker->status == OK) { // wait in idling mode 212 pthread_cond_wait(&impl->condition, &impl->mutex); 213 } 214 if (worker->status == WORK) { 215 WebPGetWorkerInterface()->Execute(worker); 216 worker->status = OK; 217 } else if (worker->status == NOT_OK) { // finish the worker 218 done = 1; 219 } 220 // signal to the main thread that we're done (for Sync()) 221 // Note the associated mutex does not need to be held when signaling the 222 // condition. Unlocking the mutex first may improve performance in some 223 // implementations, avoiding the case where the waiting thread can't 224 // reacquire the mutex when woken. 225 pthread_mutex_unlock(&impl->mutex); 226 pthread_cond_signal(&impl->condition); 227 } 228 return THREAD_RETURN(NULL); // Thread is finished 229 } 230 231 // main thread state control 232 static void ChangeState(WebPWorker* const worker, WebPWorkerStatus new_status) { 233 // No-op when attempting to change state on a thread that didn't come up. 234 // Checking 'status' without acquiring the lock first would result in a data 235 // race. 236 WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl; 237 if (impl == NULL) return; 238 239 pthread_mutex_lock(&impl->mutex); 240 if (worker->status >= OK) { 241 // wait for the worker to finish 242 while (worker->status != OK) { 243 pthread_cond_wait(&impl->condition, &impl->mutex); 244 } 245 // assign new status and release the working thread if needed 246 if (new_status != OK) { 247 worker->status = new_status; 248 // Note the associated mutex does not need to be held when signaling the 249 // condition. Unlocking the mutex first may improve performance in some 250 // implementations, avoiding the case where the waiting thread can't 251 // reacquire the mutex when woken. 252 pthread_mutex_unlock(&impl->mutex); 253 pthread_cond_signal(&impl->condition); 254 return; 255 } 256 } 257 pthread_mutex_unlock(&impl->mutex); 258 } 259 260 #endif // WEBP_USE_THREAD 261 262 //------------------------------------------------------------------------------ 263 264 static void Init(WebPWorker* const worker) { 265 memset(worker, 0, sizeof(*worker)); 266 worker->status = NOT_OK; 267 } 268 269 static int Sync(WebPWorker* const worker) { 270 #ifdef WEBP_USE_THREAD 271 ChangeState(worker, OK); 272 #endif 273 assert(worker->status <= OK); 274 return !worker->had_error; 275 } 276 277 static int Reset(WebPWorker* const worker) { 278 int ok = 1; 279 worker->had_error = 0; 280 if (worker->status < OK) { 281 #ifdef WEBP_USE_THREAD 282 WebPWorkerImpl* const impl = 283 (WebPWorkerImpl*)WebPSafeCalloc(1, sizeof(WebPWorkerImpl)); 284 worker->impl = (void*)impl; 285 if (worker->impl == NULL) { 286 return 0; 287 } 288 if (pthread_mutex_init(&impl->mutex, NULL)) { 289 goto Error; 290 } 291 if (pthread_cond_init(&impl->condition, NULL)) { 292 pthread_mutex_destroy(&impl->mutex); 293 goto Error; 294 } 295 pthread_mutex_lock(&impl->mutex); 296 ok = !pthread_create(&impl->thread, NULL, ThreadLoop, worker); 297 if (ok) worker->status = OK; 298 pthread_mutex_unlock(&impl->mutex); 299 if (!ok) { 300 pthread_mutex_destroy(&impl->mutex); 301 pthread_cond_destroy(&impl->condition); 302 Error: 303 WebPSafeFree(impl); 304 worker->impl = NULL; 305 return 0; 306 } 307 #else 308 worker->status = OK; 309 #endif 310 } else if (worker->status > OK) { 311 ok = Sync(worker); 312 } 313 assert(!ok || (worker->status == OK)); 314 return ok; 315 } 316 317 static void Execute(WebPWorker* const worker) { 318 if (worker->hook != NULL) { 319 worker->had_error |= !worker->hook(worker->data1, worker->data2); 320 } 321 } 322 323 static void Launch(WebPWorker* const worker) { 324 #ifdef WEBP_USE_THREAD 325 ChangeState(worker, WORK); 326 #else 327 Execute(worker); 328 #endif 329 } 330 331 static void End(WebPWorker* const worker) { 332 #ifdef WEBP_USE_THREAD 333 if (worker->impl != NULL) { 334 WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl; 335 ChangeState(worker, NOT_OK); 336 pthread_join(impl->thread, NULL); 337 pthread_mutex_destroy(&impl->mutex); 338 pthread_cond_destroy(&impl->condition); 339 WebPSafeFree(impl); 340 worker->impl = NULL; 341 } 342 #else 343 worker->status = NOT_OK; 344 assert(worker->impl == NULL); 345 #endif 346 assert(worker->status == NOT_OK); 347 } 348 349 //------------------------------------------------------------------------------ 350 351 static WebPWorkerInterface g_worker_interface = { 352 Init, Reset, Sync, Launch, Execute, End 353 }; 354 355 int WebPSetWorkerInterface(const WebPWorkerInterface* const winterface) { 356 if (winterface == NULL || 357 winterface->Init == NULL || winterface->Reset == NULL || 358 winterface->Sync == NULL || winterface->Launch == NULL || 359 winterface->Execute == NULL || winterface->End == NULL) { 360 return 0; 361 } 362 g_worker_interface = *winterface; 363 return 1; 364 } 365 366 const WebPWorkerInterface* WebPGetWorkerInterface(void) { 367 return &g_worker_interface; 368 } 369 370 //------------------------------------------------------------------------------