michael@0: // Copyright 2013 Google Inc. All Rights Reserved. michael@0: // michael@0: // Use of this source code is governed by a BSD-style license michael@0: // that can be found in the COPYING file in the root of the source michael@0: // tree. An additional intellectual property rights grant can be found michael@0: // in the file PATENTS. All contributing project authors may michael@0: // be found in the AUTHORS file in the root of the source tree. michael@0: // ----------------------------------------------------------------------------- michael@0: // michael@0: // Multi-threaded worker michael@0: // michael@0: // Original source: michael@0: // http://git.chromium.org/webm/libwebp.git michael@0: // 100644 blob eff8f2a8c20095aade3c292b0e9292dac6cb3587 src/utils/thread.c michael@0: michael@0: michael@0: #include michael@0: #include // for memset() michael@0: #include "./vp9_thread.h" michael@0: michael@0: #if defined(__cplusplus) || defined(c_plusplus) michael@0: extern "C" { michael@0: #endif michael@0: michael@0: #if CONFIG_MULTITHREAD michael@0: michael@0: #if defined(_WIN32) michael@0: michael@0: //------------------------------------------------------------------------------ michael@0: // simplistic pthread emulation layer michael@0: michael@0: #include // NOLINT michael@0: michael@0: // _beginthreadex requires __stdcall michael@0: #define THREADFN unsigned int __stdcall michael@0: #define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val) michael@0: michael@0: static int pthread_create(pthread_t* const thread, const void* attr, michael@0: unsigned int (__stdcall *start)(void*), void* arg) { michael@0: (void)attr; michael@0: *thread = (pthread_t)_beginthreadex(NULL, /* void *security */ michael@0: 0, /* unsigned stack_size */ michael@0: start, michael@0: arg, michael@0: 0, /* unsigned initflag */ michael@0: NULL); /* unsigned *thrdaddr */ michael@0: if (*thread == NULL) return 1; michael@0: SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL); michael@0: return 0; michael@0: } michael@0: michael@0: static int pthread_join(pthread_t thread, void** value_ptr) { michael@0: (void)value_ptr; michael@0: return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 || michael@0: CloseHandle(thread) == 0); michael@0: } michael@0: michael@0: // Mutex michael@0: static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) { michael@0: (void)mutexattr; michael@0: InitializeCriticalSection(mutex); michael@0: return 0; michael@0: } michael@0: michael@0: static int pthread_mutex_lock(pthread_mutex_t* const mutex) { michael@0: EnterCriticalSection(mutex); michael@0: return 0; michael@0: } michael@0: michael@0: static int pthread_mutex_unlock(pthread_mutex_t* const mutex) { michael@0: LeaveCriticalSection(mutex); michael@0: return 0; michael@0: } michael@0: michael@0: static int pthread_mutex_destroy(pthread_mutex_t* const mutex) { michael@0: DeleteCriticalSection(mutex); michael@0: return 0; michael@0: } michael@0: michael@0: // Condition michael@0: static int pthread_cond_destroy(pthread_cond_t* const condition) { michael@0: int ok = 1; michael@0: ok &= (CloseHandle(condition->waiting_sem_) != 0); michael@0: ok &= (CloseHandle(condition->received_sem_) != 0); michael@0: ok &= (CloseHandle(condition->signal_event_) != 0); michael@0: return !ok; michael@0: } michael@0: michael@0: static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) { michael@0: (void)cond_attr; michael@0: condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL); michael@0: condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL); michael@0: condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL); michael@0: if (condition->waiting_sem_ == NULL || michael@0: condition->received_sem_ == NULL || michael@0: condition->signal_event_ == NULL) { michael@0: pthread_cond_destroy(condition); michael@0: return 1; michael@0: } michael@0: return 0; michael@0: } michael@0: michael@0: static int pthread_cond_signal(pthread_cond_t* const condition) { michael@0: int ok = 1; michael@0: if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) { michael@0: // a thread is waiting in pthread_cond_wait: allow it to be notified michael@0: ok = SetEvent(condition->signal_event_); michael@0: // wait until the event is consumed so the signaler cannot consume michael@0: // the event via its own pthread_cond_wait. michael@0: ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) != michael@0: WAIT_OBJECT_0); michael@0: } michael@0: return !ok; michael@0: } michael@0: michael@0: static int pthread_cond_wait(pthread_cond_t* const condition, michael@0: pthread_mutex_t* const mutex) { michael@0: int ok; michael@0: // note that there is a consumer available so the signal isn't dropped in michael@0: // pthread_cond_signal michael@0: if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL)) michael@0: return 1; michael@0: // now unlock the mutex so pthread_cond_signal may be issued michael@0: pthread_mutex_unlock(mutex); michael@0: ok = (WaitForSingleObject(condition->signal_event_, INFINITE) == michael@0: WAIT_OBJECT_0); michael@0: ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL); michael@0: pthread_mutex_lock(mutex); michael@0: return !ok; michael@0: } michael@0: michael@0: #else // _WIN32 michael@0: # define THREADFN void* michael@0: # define THREAD_RETURN(val) val michael@0: #endif michael@0: michael@0: //------------------------------------------------------------------------------ michael@0: michael@0: static THREADFN thread_loop(void *ptr) { // thread loop michael@0: VP9Worker* const worker = (VP9Worker*)ptr; michael@0: int done = 0; michael@0: while (!done) { michael@0: pthread_mutex_lock(&worker->mutex_); michael@0: while (worker->status_ == OK) { // wait in idling mode michael@0: pthread_cond_wait(&worker->condition_, &worker->mutex_); michael@0: } michael@0: if (worker->status_ == WORK) { michael@0: vp9_worker_execute(worker); michael@0: worker->status_ = OK; michael@0: } else if (worker->status_ == NOT_OK) { // finish the worker michael@0: done = 1; michael@0: } michael@0: // signal to the main thread that we're done (for Sync()) michael@0: pthread_cond_signal(&worker->condition_); michael@0: pthread_mutex_unlock(&worker->mutex_); michael@0: } michael@0: return THREAD_RETURN(NULL); // Thread is finished michael@0: } michael@0: michael@0: // main thread state control michael@0: static void change_state(VP9Worker* const worker, michael@0: VP9WorkerStatus new_status) { michael@0: // no-op when attempting to change state on a thread that didn't come up michael@0: if (worker->status_ < OK) return; michael@0: michael@0: pthread_mutex_lock(&worker->mutex_); michael@0: // wait for the worker to finish michael@0: while (worker->status_ != OK) { michael@0: pthread_cond_wait(&worker->condition_, &worker->mutex_); michael@0: } michael@0: // assign new status and release the working thread if needed michael@0: if (new_status != OK) { michael@0: worker->status_ = new_status; michael@0: pthread_cond_signal(&worker->condition_); michael@0: } michael@0: pthread_mutex_unlock(&worker->mutex_); michael@0: } michael@0: michael@0: #endif // CONFIG_MULTITHREAD michael@0: michael@0: //------------------------------------------------------------------------------ michael@0: michael@0: void vp9_worker_init(VP9Worker* const worker) { michael@0: memset(worker, 0, sizeof(*worker)); michael@0: worker->status_ = NOT_OK; michael@0: } michael@0: michael@0: int vp9_worker_sync(VP9Worker* const worker) { michael@0: #if CONFIG_MULTITHREAD michael@0: change_state(worker, OK); michael@0: #endif michael@0: assert(worker->status_ <= OK); michael@0: return !worker->had_error; michael@0: } michael@0: michael@0: int vp9_worker_reset(VP9Worker* const worker) { michael@0: int ok = 1; michael@0: worker->had_error = 0; michael@0: if (worker->status_ < OK) { michael@0: #if CONFIG_MULTITHREAD michael@0: if (pthread_mutex_init(&worker->mutex_, NULL) || michael@0: pthread_cond_init(&worker->condition_, NULL)) { michael@0: return 0; michael@0: } michael@0: pthread_mutex_lock(&worker->mutex_); michael@0: ok = !pthread_create(&worker->thread_, NULL, thread_loop, worker); michael@0: if (ok) worker->status_ = OK; michael@0: pthread_mutex_unlock(&worker->mutex_); michael@0: #else michael@0: worker->status_ = OK; michael@0: #endif michael@0: } else if (worker->status_ > OK) { michael@0: ok = vp9_worker_sync(worker); michael@0: } michael@0: assert(!ok || (worker->status_ == OK)); michael@0: return ok; michael@0: } michael@0: michael@0: void vp9_worker_execute(VP9Worker* const worker) { michael@0: if (worker->hook != NULL) { michael@0: worker->had_error |= !worker->hook(worker->data1, worker->data2); michael@0: } michael@0: } michael@0: michael@0: void vp9_worker_launch(VP9Worker* const worker) { michael@0: #if CONFIG_MULTITHREAD michael@0: change_state(worker, WORK); michael@0: #else michael@0: vp9_worker_execute(worker); michael@0: #endif michael@0: } michael@0: michael@0: void vp9_worker_end(VP9Worker* const worker) { michael@0: if (worker->status_ >= OK) { michael@0: #if CONFIG_MULTITHREAD michael@0: change_state(worker, NOT_OK); michael@0: pthread_join(worker->thread_, NULL); michael@0: pthread_mutex_destroy(&worker->mutex_); michael@0: pthread_cond_destroy(&worker->condition_); michael@0: #else michael@0: worker->status_ = NOT_OK; michael@0: #endif michael@0: } michael@0: assert(worker->status_ == NOT_OK); michael@0: } michael@0: michael@0: //------------------------------------------------------------------------------ michael@0: michael@0: #if defined(__cplusplus) || defined(c_plusplus) michael@0: } // extern "C" michael@0: #endif