workqueue: move 'td' private data to the workqueue user
authorJens Axboe <axboe@fb.com>
Tue, 8 Dec 2015 15:23:24 +0000 (08:23 -0700)
committerJens Axboe <axboe@fb.com>
Tue, 8 Dec 2015 15:23:24 +0000 (08:23 -0700)
Signed-off-by: Jens Axboe <axboe@fb.com>
backend.c
workqueue.c
workqueue.h

index bc2e3eb..0842370 100644 (file)
--- a/backend.c
+++ b/backend.c
@@ -1361,10 +1361,12 @@ static uint64_t do_dry_run(struct thread_data *td)
        return td->bytes_done[DDIR_WRITE] + td->bytes_done[DDIR_TRIM];
 }
 
-static void io_workqueue_fn(struct thread_data *td, struct workqueue_work *work)
+static void io_workqueue_fn(struct submit_worker *sw,
+                           struct workqueue_work *work)
 {
        struct io_u *io_u = container_of(work, struct io_u, work);
        const enum fio_ddir ddir = io_u->ddir;
+       struct thread_data *td = sw->private;
        int ret;
 
        dprint(FD_RATE, "io_u %p queued by %u\n", io_u, gettid());
@@ -1407,16 +1409,19 @@ static void io_workqueue_fn(struct thread_data *td, struct workqueue_work *work)
        }
 }
 
-static bool io_workqueue_pre_sleep_flush_fn(struct thread_data *td)
+static bool io_workqueue_pre_sleep_flush_fn(struct submit_worker *sw)
 {
+       struct thread_data *td = sw->private;
+
        if (td->io_u_queued || td->cur_depth || td->io_u_in_flight)
                return true;
 
        return false;
 }
 
-static void io_workqueue_pre_sleep_fn(struct thread_data *td)
+static void io_workqueue_pre_sleep_fn(struct submit_worker *sw)
 {
+       struct thread_data *td = sw->private;
        int ret;
 
        ret = io_u_quiesce(td);
@@ -1424,10 +1429,27 @@ static void io_workqueue_pre_sleep_fn(struct thread_data *td)
                td->cur_depth -= ret;
 }
 
+static int io_workqueue_alloc_fn(struct submit_worker *sw)
+{
+       struct thread_data *td;
+
+       td = calloc(1, sizeof(*td));
+       sw->private = td;
+       return 0;
+}
+
+static void io_workqueue_free_fn(struct submit_worker *sw)
+{
+       free(sw->private);
+       sw->private = NULL;
+}
+
 struct workqueue_ops rated_wq_ops = {
        .fn                     = io_workqueue_fn,
        .pre_sleep_flush_fn     = io_workqueue_pre_sleep_flush_fn,
        .pre_sleep_fn           = io_workqueue_pre_sleep_fn,
+       .alloc_worker_fn        = io_workqueue_alloc_fn,
+       .free_worker_fn         = io_workqueue_free_fn,
 };
 
 /*
index 54761b0..f0ebd8a 100644 (file)
 #include "workqueue.h"
 #include "lib/getrusage.h"
 
-struct submit_worker {
-       pthread_t thread;
-       pthread_mutex_t lock;
-       pthread_cond_t cond;
-       struct flist_head work_list;
-       unsigned int flags;
-       unsigned int index;
-       uint64_t seq;
-       struct workqueue *wq;
-       struct thread_data td;
-};
-
 enum {
        SW_F_IDLE       = 1 << 0,
        SW_F_RUNNING    = 1 << 1,
@@ -138,14 +126,14 @@ static void handle_list(struct submit_worker *sw, struct flist_head *list)
        while (!flist_empty(list)) {
                work = flist_first_entry(list, struct workqueue_work, list);
                flist_del_init(&work->list);
-               wq->ops.fn(&sw->td, work);
+               wq->ops.fn(sw, work);
        }
 }
 
 static int init_submit_worker(struct submit_worker *sw)
 {
        struct thread_data *parent = sw->wq->td;
-       struct thread_data *td = &sw->td;
+       struct thread_data *td = sw->private;
        int fio_unused ret;
 
        memcpy(&td->o, &parent->o, sizeof(td->o));
@@ -245,7 +233,7 @@ static void sum_ddir(struct thread_data *dst, struct thread_data *src,
 
 static void update_accounting(struct submit_worker *sw)
 {
-       struct thread_data *src = &sw->td;
+       struct thread_data *src = sw->private;
        struct thread_data *dst = sw->wq->td;
 
        if (td_read(src))
@@ -286,9 +274,9 @@ static void *worker_thread(void *data)
                                break;
                        }
 
-                       if (workqueue_pre_sleep_check(wq)) {
+                       if (workqueue_pre_sleep_check(sw)) {
                                pthread_mutex_unlock(&sw->lock);
-                               workqueue_pre_sleep(wq);
+                               workqueue_pre_sleep(sw);
                                pthread_mutex_lock(&sw->lock);
                        }
 
@@ -329,7 +317,8 @@ done:
 
 static void free_worker(struct submit_worker *sw)
 {
-       struct thread_data *td = &sw->td;
+       struct thread_data *td = sw->private;
+       struct workqueue *wq = sw->wq;
 
        fio_options_free(td);
        close_and_free_files(td);
@@ -339,15 +328,19 @@ static void free_worker(struct submit_worker *sw)
 
        pthread_cond_destroy(&sw->cond);
        pthread_mutex_destroy(&sw->lock);
+
+       if (wq->ops.free_worker_fn)
+               wq->ops.free_worker_fn(sw);
 }
 
 static void shutdown_worker(struct submit_worker *sw, unsigned int *sum_cnt)
 {
        struct thread_data *parent = sw->wq->td;
+       struct thread_data *td = sw->private;
 
        pthread_join(sw->thread, NULL);
        (*sum_cnt)++;
-       sum_thread_stats(&parent->ts, &sw->td.ts, *sum_cnt == 1);
+       sum_thread_stats(&parent->ts, &td->ts, *sum_cnt == 1);
        free_worker(sw);
 }
 
@@ -397,6 +390,12 @@ static int start_worker(struct workqueue *wq, unsigned int index)
        sw->wq = wq;
        sw->index = index;
 
+       if (wq->ops.alloc_worker_fn) {
+               ret = wq->ops.alloc_worker_fn(sw);
+               if (ret)
+                       return ret;
+       }
+
        ret = pthread_create(&sw->thread, NULL, worker_thread, sw);
        if (!ret) {
                pthread_mutex_lock(&sw->lock);
index 837b221..b48f9db 100644 (file)
@@ -7,14 +7,30 @@ struct workqueue_work {
        struct flist_head list;
 };
 
-typedef void (workqueue_work_fn)(struct thread_data *, struct workqueue_work *);
-typedef bool (workqueue_pre_sleep_flush_fn)(struct thread_data *);
-typedef void (workqueue_pre_sleep_fn)(struct thread_data *);
+struct submit_worker {
+       pthread_t thread;
+       pthread_mutex_t lock;
+       pthread_cond_t cond;
+       struct flist_head work_list;
+       unsigned int flags;
+       unsigned int index;
+       uint64_t seq;
+       struct workqueue *wq;
+       void *private;
+};
+
+typedef void (workqueue_work_fn)(struct submit_worker *, struct workqueue_work *);
+typedef bool (workqueue_pre_sleep_flush_fn)(struct submit_worker *);
+typedef void (workqueue_pre_sleep_fn)(struct submit_worker *);
+typedef int (workqueue_alloc_worker_fn)(struct submit_worker *);
+typedef void (workqueue_free_worker_fn)(struct submit_worker *);
 
 struct workqueue_ops {
        workqueue_work_fn *fn;
        workqueue_pre_sleep_flush_fn *pre_sleep_flush_fn;
        workqueue_pre_sleep_fn *pre_sleep_fn;
+       workqueue_alloc_worker_fn *alloc_worker_fn;
+       workqueue_free_worker_fn *free_worker_fn;
 };
 
 struct workqueue {
@@ -39,18 +55,22 @@ void workqueue_exit(struct workqueue *wq);
 bool workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work);
 void workqueue_flush(struct workqueue *wq);
 
-static inline bool workqueue_pre_sleep_check(struct workqueue *wq)
+static inline bool workqueue_pre_sleep_check(struct submit_worker *sw)
 {
+       struct workqueue *wq = sw->wq;
+
        if (!wq->ops.pre_sleep_flush_fn)
                return false;
 
-       return wq->ops.pre_sleep_flush_fn(wq->td);
+       return wq->ops.pre_sleep_flush_fn(sw);
 }
 
-static inline void workqueue_pre_sleep(struct workqueue *wq)
+static inline void workqueue_pre_sleep(struct submit_worker *sw)
 {
+       struct workqueue *wq = sw->wq;
+
        if (wq->ops.pre_sleep_fn)
-               wq->ops.pre_sleep_fn(wq->td);
+               wq->ops.pre_sleep_fn(sw);
 }
 
 #endif