From ee2b6d6e5baedb00661cdf50016a06beae6a9d9c Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Tue, 8 Dec 2015 08:23:24 -0700 Subject: [PATCH] workqueue: move 'td' private data to the workqueue user Signed-off-by: Jens Axboe --- backend.c | 28 +++++++++++++++++++++++++--- workqueue.c | 37 ++++++++++++++++++------------------- workqueue.h | 34 +++++++++++++++++++++++++++------- 3 files changed, 70 insertions(+), 29 deletions(-) diff --git a/backend.c b/backend.c index bc2e3eb5..0842370d 100644 --- a/backend.c +++ b/backend.c @@ -1361,10 +1361,12 @@ static uint64_t do_dry_run(struct thread_data *td) return td->bytes_done[DDIR_WRITE] + td->bytes_done[DDIR_TRIM]; } -static void io_workqueue_fn(struct thread_data *td, struct workqueue_work *work) +static void io_workqueue_fn(struct submit_worker *sw, + struct workqueue_work *work) { struct io_u *io_u = container_of(work, struct io_u, work); const enum fio_ddir ddir = io_u->ddir; + struct thread_data *td = sw->private; int ret; dprint(FD_RATE, "io_u %p queued by %u\n", io_u, gettid()); @@ -1407,16 +1409,19 @@ static void io_workqueue_fn(struct thread_data *td, struct workqueue_work *work) } } -static bool io_workqueue_pre_sleep_flush_fn(struct thread_data *td) +static bool io_workqueue_pre_sleep_flush_fn(struct submit_worker *sw) { + struct thread_data *td = sw->private; + if (td->io_u_queued || td->cur_depth || td->io_u_in_flight) return true; return false; } -static void io_workqueue_pre_sleep_fn(struct thread_data *td) +static void io_workqueue_pre_sleep_fn(struct submit_worker *sw) { + struct thread_data *td = sw->private; int ret; ret = io_u_quiesce(td); @@ -1424,10 +1429,27 @@ static void io_workqueue_pre_sleep_fn(struct thread_data *td) td->cur_depth -= ret; } +static int io_workqueue_alloc_fn(struct submit_worker *sw) +{ + struct thread_data *td; + + td = calloc(1, sizeof(*td)); + sw->private = td; + return 0; +} + +static void io_workqueue_free_fn(struct submit_worker *sw) +{ + free(sw->private); + sw->private = NULL; +} + struct workqueue_ops rated_wq_ops = { .fn = io_workqueue_fn, .pre_sleep_flush_fn = io_workqueue_pre_sleep_flush_fn, .pre_sleep_fn = io_workqueue_pre_sleep_fn, + .alloc_worker_fn = io_workqueue_alloc_fn, + .free_worker_fn = io_workqueue_free_fn, }; /* diff --git a/workqueue.c b/workqueue.c index 54761b09..f0ebd8a9 100644 --- a/workqueue.c +++ b/workqueue.c @@ -11,18 +11,6 @@ #include "workqueue.h" #include "lib/getrusage.h" -struct submit_worker { - pthread_t thread; - pthread_mutex_t lock; - pthread_cond_t cond; - struct flist_head work_list; - unsigned int flags; - unsigned int index; - uint64_t seq; - struct workqueue *wq; - struct thread_data td; -}; - enum { SW_F_IDLE = 1 << 0, SW_F_RUNNING = 1 << 1, @@ -138,14 +126,14 @@ static void handle_list(struct submit_worker *sw, struct flist_head *list) while (!flist_empty(list)) { work = flist_first_entry(list, struct workqueue_work, list); flist_del_init(&work->list); - wq->ops.fn(&sw->td, work); + wq->ops.fn(sw, work); } } static int init_submit_worker(struct submit_worker *sw) { struct thread_data *parent = sw->wq->td; - struct thread_data *td = &sw->td; + struct thread_data *td = sw->private; int fio_unused ret; memcpy(&td->o, &parent->o, sizeof(td->o)); @@ -245,7 +233,7 @@ static void sum_ddir(struct thread_data *dst, struct thread_data *src, static void update_accounting(struct submit_worker *sw) { - struct thread_data *src = &sw->td; + struct thread_data *src = sw->private; struct thread_data *dst = sw->wq->td; if (td_read(src)) @@ -286,9 +274,9 @@ static void *worker_thread(void *data) break; } - if (workqueue_pre_sleep_check(wq)) { + if (workqueue_pre_sleep_check(sw)) { pthread_mutex_unlock(&sw->lock); - workqueue_pre_sleep(wq); + workqueue_pre_sleep(sw); pthread_mutex_lock(&sw->lock); } @@ -329,7 +317,8 @@ done: static void free_worker(struct submit_worker *sw) { - struct thread_data *td = &sw->td; + struct thread_data *td = sw->private; + struct workqueue *wq = sw->wq; fio_options_free(td); close_and_free_files(td); @@ -339,15 +328,19 @@ static void free_worker(struct submit_worker *sw) pthread_cond_destroy(&sw->cond); pthread_mutex_destroy(&sw->lock); + + if (wq->ops.free_worker_fn) + wq->ops.free_worker_fn(sw); } static void shutdown_worker(struct submit_worker *sw, unsigned int *sum_cnt) { struct thread_data *parent = sw->wq->td; + struct thread_data *td = sw->private; pthread_join(sw->thread, NULL); (*sum_cnt)++; - sum_thread_stats(&parent->ts, &sw->td.ts, *sum_cnt == 1); + sum_thread_stats(&parent->ts, &td->ts, *sum_cnt == 1); free_worker(sw); } @@ -397,6 +390,12 @@ static int start_worker(struct workqueue *wq, unsigned int index) sw->wq = wq; sw->index = index; + if (wq->ops.alloc_worker_fn) { + ret = wq->ops.alloc_worker_fn(sw); + if (ret) + return ret; + } + ret = pthread_create(&sw->thread, NULL, worker_thread, sw); if (!ret) { pthread_mutex_lock(&sw->lock); diff --git a/workqueue.h b/workqueue.h index 837b221d..b48f9db0 100644 --- a/workqueue.h +++ b/workqueue.h @@ -7,14 +7,30 @@ struct workqueue_work { struct flist_head list; }; -typedef void (workqueue_work_fn)(struct thread_data *, struct workqueue_work *); -typedef bool (workqueue_pre_sleep_flush_fn)(struct thread_data *); -typedef void (workqueue_pre_sleep_fn)(struct thread_data *); +struct submit_worker { + pthread_t thread; + pthread_mutex_t lock; + pthread_cond_t cond; + struct flist_head work_list; + unsigned int flags; + unsigned int index; + uint64_t seq; + struct workqueue *wq; + void *private; +}; + +typedef void (workqueue_work_fn)(struct submit_worker *, struct workqueue_work *); +typedef bool (workqueue_pre_sleep_flush_fn)(struct submit_worker *); +typedef void (workqueue_pre_sleep_fn)(struct submit_worker *); +typedef int (workqueue_alloc_worker_fn)(struct submit_worker *); +typedef void (workqueue_free_worker_fn)(struct submit_worker *); struct workqueue_ops { workqueue_work_fn *fn; workqueue_pre_sleep_flush_fn *pre_sleep_flush_fn; workqueue_pre_sleep_fn *pre_sleep_fn; + workqueue_alloc_worker_fn *alloc_worker_fn; + workqueue_free_worker_fn *free_worker_fn; }; struct workqueue { @@ -39,18 +55,22 @@ void workqueue_exit(struct workqueue *wq); bool workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work); void workqueue_flush(struct workqueue *wq); -static inline bool workqueue_pre_sleep_check(struct workqueue *wq) +static inline bool workqueue_pre_sleep_check(struct submit_worker *sw) { + struct workqueue *wq = sw->wq; + if (!wq->ops.pre_sleep_flush_fn) return false; - return wq->ops.pre_sleep_flush_fn(wq->td); + return wq->ops.pre_sleep_flush_fn(sw); } -static inline void workqueue_pre_sleep(struct workqueue *wq) +static inline void workqueue_pre_sleep(struct submit_worker *sw) { + struct workqueue *wq = sw->wq; + if (wq->ops.pre_sleep_fn) - wq->ops.pre_sleep_fn(wq->td); + wq->ops.pre_sleep_fn(sw); } #endif -- 2.25.1