X-Git-Url: https://git.kernel.dk/?a=blobdiff_plain;f=workqueue.c;h=6e67f3e7df4dc8ac0910a2c96ef28d39d9e04bb2;hb=8dc1a870973e20011282059ba124838cdd120dd3;hp=b9b098668094143f4a3e998d7f1294aca1dfac32;hpb=f6496ba7d984b42602d2066837547eeb05b480f0;p=fio.git diff --git a/workqueue.c b/workqueue.c index b9b09866..6e67f3e7 100644 --- a/workqueue.c +++ b/workqueue.c @@ -1,5 +1,5 @@ /* - * Rated submission helpers + * Generic workqueue offload mechanism * * Copyright (C) 2015 Jens Axboe * @@ -9,7 +9,6 @@ #include "fio.h" #include "flist.h" #include "workqueue.h" -#include "lib/getrusage.h" enum { SW_F_IDLE = 1 << 0, @@ -99,23 +98,20 @@ void workqueue_flush(struct workqueue *wq) /* * Must be serialized by caller. Returns true for queued, false for busy. */ -bool workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work) +void workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work) { struct submit_worker *sw; sw = get_submit_worker(wq); - if (sw) { - pthread_mutex_lock(&sw->lock); - flist_add_tail(&work->list, &sw->work_list); - sw->seq = ++wq->work_seq; - sw->flags &= ~SW_F_IDLE; - pthread_mutex_unlock(&sw->lock); + assert(sw); - pthread_cond_signal(&sw->cond); - return true; - } + pthread_mutex_lock(&sw->lock); + flist_add_tail(&work->list, &sw->work_list); + sw->seq = ++wq->work_seq; + sw->flags &= ~SW_F_IDLE; + pthread_mutex_unlock(&sw->lock); - return false; + pthread_cond_signal(&sw->cond); } static void handle_list(struct submit_worker *sw, struct flist_head *list) @@ -137,6 +133,8 @@ static void *worker_thread(void *data) unsigned int eflags = 0, ret = 0; FLIST_HEAD(local_list); + sk_out_assign(sw->sk_out); + if (wq->ops.nice) { if (nice(wq->ops.nice) < 0) { log_err("workqueue: nice %s\n", strerror(errno)); @@ -210,6 +208,7 @@ done: pthread_mutex_lock(&sw->lock); sw->flags |= (SW_F_EXITED | eflags); pthread_mutex_unlock(&sw->lock); + sk_out_drop(); return NULL; } @@ -238,6 +237,9 @@ void workqueue_exit(struct workqueue *wq) struct submit_worker *sw; int i; + if (!wq->workers) + return; + for (i = 0; i < wq->max_workers; i++) { sw = &wq->workers[i]; @@ -262,12 +264,14 @@ void workqueue_exit(struct workqueue *wq) } while (shutdown && shutdown != wq->max_workers); free(wq->workers); + wq->workers = NULL; pthread_mutex_destroy(&wq->flush_lock); pthread_cond_destroy(&wq->flush_cond); pthread_mutex_destroy(&wq->stat_lock); } -static int start_worker(struct workqueue *wq, unsigned int index) +static int start_worker(struct workqueue *wq, unsigned int index, + struct sk_out *sk_out) { struct submit_worker *sw = &wq->workers[index]; int ret; @@ -277,6 +281,7 @@ static int start_worker(struct workqueue *wq, unsigned int index) pthread_mutex_init(&sw->lock, NULL); sw->wq = wq; sw->index = index; + sw->sk_out = sk_out; if (wq->ops.alloc_worker_fn) { ret = wq->ops.alloc_worker_fn(sw); @@ -297,12 +302,13 @@ static int start_worker(struct workqueue *wq, unsigned int index) } int workqueue_init(struct thread_data *td, struct workqueue *wq, - struct workqueue_ops *ops, unsigned max_pending) + struct workqueue_ops *ops, unsigned int max_workers, + struct sk_out *sk_out) { unsigned int running; int i, error; - wq->max_workers = max_pending; + wq->max_workers = max_workers; wq->td = td; wq->ops = *ops; wq->work_seq = 0; @@ -314,7 +320,7 @@ int workqueue_init(struct thread_data *td, struct workqueue *wq, wq->workers = calloc(wq->max_workers, sizeof(struct submit_worker)); for (i = 0; i < wq->max_workers; i++) - if (start_worker(wq, i)) + if (start_worker(wq, i, sk_out)) break; wq->max_workers = i;