X-Git-Url: https://git.kernel.dk/?a=blobdiff_plain;f=workqueue.c;h=9e6c41ff2f399172703b6e438061236670962df8;hb=db83b0abd16bbd6b8f589a993e6f70d9812be6e3;hp=b59595124913338bc59dc20fb6a036c895b4f1f1;hpb=4cf30b66c62f3f5e6501390d564cf0d966823591;p=fio.git diff --git a/workqueue.c b/workqueue.c index b5959512..9e6c41ff 100644 --- a/workqueue.c +++ b/workqueue.c @@ -85,15 +85,14 @@ static bool all_sw_idle(struct workqueue *wq) */ void workqueue_flush(struct workqueue *wq) { + pthread_mutex_lock(&wq->flush_lock); wq->wake_idle = 1; - while (!all_sw_idle(wq)) { - pthread_mutex_lock(&wq->flush_lock); + while (!all_sw_idle(wq)) pthread_cond_wait(&wq->flush_cond, &wq->flush_lock); - pthread_mutex_unlock(&wq->flush_lock); - } wq->wake_idle = 0; + pthread_mutex_unlock(&wq->flush_lock); } /* @@ -159,12 +158,10 @@ static void *worker_thread(void *data) if (sw->flags & SW_F_ERROR) goto done; + pthread_mutex_lock(&sw->lock); while (1) { - pthread_mutex_lock(&sw->lock); - if (flist_empty(&sw->work_list)) { if (sw->flags & SW_F_EXIT) { - pthread_mutex_unlock(&sw->lock); break; } @@ -173,34 +170,41 @@ static void *worker_thread(void *data) workqueue_pre_sleep(sw); pthread_mutex_lock(&sw->lock); } - - /* - * We dropped and reaquired the lock, check - * state again. - */ - if (!flist_empty(&sw->work_list)) - goto handle_work; - + } + /* + * We may have dropped and reaquired the lock, check state + * again. + */ + if (flist_empty(&sw->work_list)) { if (sw->flags & SW_F_EXIT) { - pthread_mutex_unlock(&sw->lock); break; - } else if (!(sw->flags & SW_F_IDLE)) { + } + if (!(sw->flags & SW_F_IDLE)) { sw->flags |= SW_F_IDLE; wq->next_free_worker = sw->index; + pthread_mutex_unlock(&sw->lock); + pthread_mutex_lock(&wq->flush_lock); if (wq->wake_idle) pthread_cond_signal(&wq->flush_cond); + pthread_mutex_unlock(&wq->flush_lock); + pthread_mutex_lock(&sw->lock); + } + } + if (flist_empty(&sw->work_list)) { + if (sw->flags & SW_F_EXIT) { + break; } - pthread_cond_wait(&sw->cond, &sw->lock); } else { -handle_work: flist_splice_init(&sw->work_list, &local_list); } pthread_mutex_unlock(&sw->lock); handle_list(sw, &local_list); if (wq->ops.update_acct_fn) wq->ops.update_acct_fn(sw); + pthread_mutex_lock(&sw->lock); } + pthread_mutex_unlock(&sw->lock); done: sk_out_drop(); @@ -336,11 +340,11 @@ int workqueue_init(struct thread_data *td, struct workqueue *wq, * Wait for them all to be started and initialized */ error = 0; + pthread_mutex_lock(&wq->flush_lock); do { struct submit_worker *sw; running = 0; - pthread_mutex_lock(&wq->flush_lock); for (i = 0; i < wq->max_workers; i++) { sw = &wq->workers[i]; pthread_mutex_lock(&sw->lock); @@ -351,14 +355,12 @@ int workqueue_init(struct thread_data *td, struct workqueue *wq, pthread_mutex_unlock(&sw->lock); } - if (error || running == wq->max_workers) { - pthread_mutex_unlock(&wq->flush_lock); + if (error || running == wq->max_workers) break; - } pthread_cond_wait(&wq->flush_cond, &wq->flush_lock); - pthread_mutex_unlock(&wq->flush_lock); } while (1); + pthread_mutex_unlock(&wq->flush_lock); if (!error) return 0;