workqueue: remove knowledge of td queue state
authorJens Axboe <axboe@fb.com>
Tue, 8 Dec 2015 05:35:31 +0000 (22:35 -0700)
committerJens Axboe <axboe@fb.com>
Tue, 8 Dec 2015 05:35:31 +0000 (22:35 -0700)
Add pre sleep checker and worker ops, so we can push this to the
owner of the workqueue.

Signed-off-by: Jens Axboe <axboe@fb.com>
backend.c
workqueue.c
workqueue.h

index 26c1bf559509eacac2e97a3c9fdc620133bd8dc4..bc2e3eb50e37a501ae89f89dcac767f1023fa616 100644 (file)
--- a/backend.c
+++ b/backend.c
@@ -1407,6 +1407,29 @@ static void io_workqueue_fn(struct thread_data *td, struct workqueue_work *work)
        }
 }
 
        }
 }
 
+static bool io_workqueue_pre_sleep_flush_fn(struct thread_data *td)
+{
+       if (td->io_u_queued || td->cur_depth || td->io_u_in_flight)
+               return true;
+
+       return false;
+}
+
+static void io_workqueue_pre_sleep_fn(struct thread_data *td)
+{
+       int ret;
+
+       ret = io_u_quiesce(td);
+       if (ret > 0)
+               td->cur_depth -= ret;
+}
+
+struct workqueue_ops rated_wq_ops = {
+       .fn                     = io_workqueue_fn,
+       .pre_sleep_flush_fn     = io_workqueue_pre_sleep_flush_fn,
+       .pre_sleep_fn           = io_workqueue_pre_sleep_fn,
+};
+
 /*
  * Entry point for the thread based jobs. The process based jobs end up
  * here as well, after a little setup.
 /*
  * Entry point for the thread based jobs. The process based jobs end up
  * here as well, after a little setup.
@@ -1605,7 +1628,7 @@ static void *thread_main(void *data)
        fio_verify_init(td);
 
        if ((o->io_submit_mode == IO_MODE_OFFLOAD) &&
        fio_verify_init(td);
 
        if ((o->io_submit_mode == IO_MODE_OFFLOAD) &&
-           workqueue_init(td, &td->io_wq, io_workqueue_fn, td->o.iodepth))
+           workqueue_init(td, &td->io_wq, &rated_wq_ops, td->o.iodepth))
                goto err;
 
        fio_gettime(&td->epoch, NULL);
                goto err;
 
        fio_gettime(&td->epoch, NULL);
index 11791209b54da02502d26c06ed0d60be25d304ee..54761b0902b612662b71521ad902b3be61a0ddb4 100644 (file)
@@ -138,7 +138,7 @@ static void handle_list(struct submit_worker *sw, struct flist_head *list)
        while (!flist_empty(list)) {
                work = flist_first_entry(list, struct workqueue_work, list);
                flist_del_init(&work->list);
        while (!flist_empty(list)) {
                work = flist_first_entry(list, struct workqueue_work, list);
                flist_del_init(&work->list);
-               wq->fn(&sw->td, work);
+               wq->ops.fn(&sw->td, work);
        }
 }
 
        }
 }
 
@@ -260,7 +260,6 @@ static void *worker_thread(void *data)
 {
        struct submit_worker *sw = data;
        struct workqueue *wq = sw->wq;
 {
        struct submit_worker *sw = data;
        struct workqueue *wq = sw->wq;
-       struct thread_data *td = &sw->td;
        unsigned int eflags = 0, ret;
        FLIST_HEAD(local_list);
 
        unsigned int eflags = 0, ret;
        FLIST_HEAD(local_list);
 
@@ -287,14 +286,9 @@ static void *worker_thread(void *data)
                                break;
                        }
 
                                break;
                        }
 
-                       if (td->io_u_queued || td->cur_depth ||
-                           td->io_u_in_flight) {
-                               int ret;
-
+                       if (workqueue_pre_sleep_check(wq)) {
                                pthread_mutex_unlock(&sw->lock);
                                pthread_mutex_unlock(&sw->lock);
-                               ret = io_u_quiesce(td);
-                               if (ret > 0)
-                                       td->cur_depth -= ret;
+                               workqueue_pre_sleep(wq);
                                pthread_mutex_lock(&sw->lock);
                        }
 
                                pthread_mutex_lock(&sw->lock);
                        }
 
@@ -416,14 +410,14 @@ static int start_worker(struct workqueue *wq, unsigned int index)
 }
 
 int workqueue_init(struct thread_data *td, struct workqueue *wq,
 }
 
 int workqueue_init(struct thread_data *td, struct workqueue *wq,
-                  workqueue_fn *fn, unsigned max_pending)
+                  struct workqueue_ops *ops, unsigned max_pending)
 {
        unsigned int running;
        int i, error;
 
        wq->max_workers = max_pending;
        wq->td = td;
 {
        unsigned int running;
        int i, error;
 
        wq->max_workers = max_pending;
        wq->td = td;
-       wq->fn = fn;
+       wq->ops = *ops;
        wq->work_seq = 0;
        wq->next_free_worker = 0;
        pthread_cond_init(&wq->flush_cond, NULL);
        wq->work_seq = 0;
        wq->next_free_worker = 0;
        pthread_cond_init(&wq->flush_cond, NULL);
index 26b3aee1602171e9b090363c275003a53b13791a..837b221d6790f0176eecbc992ea66e9ef5b4691f 100644 (file)
@@ -7,13 +7,21 @@ struct workqueue_work {
        struct flist_head list;
 };
 
        struct flist_head list;
 };
 
-typedef void (workqueue_fn)(struct thread_data *, struct workqueue_work *);
+typedef void (workqueue_work_fn)(struct thread_data *, struct workqueue_work *);
+typedef bool (workqueue_pre_sleep_flush_fn)(struct thread_data *);
+typedef void (workqueue_pre_sleep_fn)(struct thread_data *);
+
+struct workqueue_ops {
+       workqueue_work_fn *fn;
+       workqueue_pre_sleep_flush_fn *pre_sleep_flush_fn;
+       workqueue_pre_sleep_fn *pre_sleep_fn;
+};
 
 struct workqueue {
        unsigned int max_workers;
 
        struct thread_data *td;
 
 struct workqueue {
        unsigned int max_workers;
 
        struct thread_data *td;
-       workqueue_fn *fn;
+       struct workqueue_ops ops;
 
        uint64_t work_seq;
        struct submit_worker *workers;
 
        uint64_t work_seq;
        struct submit_worker *workers;
@@ -25,10 +33,24 @@ struct workqueue {
        volatile int wake_idle;
 };
 
        volatile int wake_idle;
 };
 
-int workqueue_init(struct thread_data *td, struct workqueue *wq, workqueue_fn *fn, unsigned int max_workers);
+int workqueue_init(struct thread_data *td, struct workqueue *wq, struct workqueue_ops *ops, unsigned int max_workers);
 void workqueue_exit(struct workqueue *wq);
 
 bool workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work);
 void workqueue_flush(struct workqueue *wq);
 
 void workqueue_exit(struct workqueue *wq);
 
 bool workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work);
 void workqueue_flush(struct workqueue *wq);
 
+static inline bool workqueue_pre_sleep_check(struct workqueue *wq)
+{
+       if (!wq->ops.pre_sleep_flush_fn)
+               return false;
+
+       return wq->ops.pre_sleep_flush_fn(wq->td);
+}
+
+static inline void workqueue_pre_sleep(struct workqueue *wq)
+{
+       if (wq->ops.pre_sleep_fn)
+               wq->ops.pre_sleep_fn(wq->td);
+}
+
 #endif
 #endif