workqueue: move 'td' private data to the workqueue user
[fio.git] / workqueue.c
index 11791209b54da02502d26c06ed0d60be25d304ee..f0ebd8a90bb3af03f47e2bec4d6ccbc36c2e4922 100644 (file)
 #include "workqueue.h"
 #include "lib/getrusage.h"
 
-struct submit_worker {
-       pthread_t thread;
-       pthread_mutex_t lock;
-       pthread_cond_t cond;
-       struct flist_head work_list;
-       unsigned int flags;
-       unsigned int index;
-       uint64_t seq;
-       struct workqueue *wq;
-       struct thread_data td;
-};
-
 enum {
        SW_F_IDLE       = 1 << 0,
        SW_F_RUNNING    = 1 << 1,
@@ -138,14 +126,14 @@ static void handle_list(struct submit_worker *sw, struct flist_head *list)
        while (!flist_empty(list)) {
                work = flist_first_entry(list, struct workqueue_work, list);
                flist_del_init(&work->list);
-               wq->fn(&sw->td, work);
+               wq->ops.fn(sw, work);
        }
 }
 
 static int init_submit_worker(struct submit_worker *sw)
 {
        struct thread_data *parent = sw->wq->td;
-       struct thread_data *td = &sw->td;
+       struct thread_data *td = sw->private;
        int fio_unused ret;
 
        memcpy(&td->o, &parent->o, sizeof(td->o));
@@ -245,7 +233,7 @@ static void sum_ddir(struct thread_data *dst, struct thread_data *src,
 
 static void update_accounting(struct submit_worker *sw)
 {
-       struct thread_data *src = &sw->td;
+       struct thread_data *src = sw->private;
        struct thread_data *dst = sw->wq->td;
 
        if (td_read(src))
@@ -260,7 +248,6 @@ static void *worker_thread(void *data)
 {
        struct submit_worker *sw = data;
        struct workqueue *wq = sw->wq;
-       struct thread_data *td = &sw->td;
        unsigned int eflags = 0, ret;
        FLIST_HEAD(local_list);
 
@@ -287,14 +274,9 @@ static void *worker_thread(void *data)
                                break;
                        }
 
-                       if (td->io_u_queued || td->cur_depth ||
-                           td->io_u_in_flight) {
-                               int ret;
-
+                       if (workqueue_pre_sleep_check(sw)) {
                                pthread_mutex_unlock(&sw->lock);
-                               ret = io_u_quiesce(td);
-                               if (ret > 0)
-                                       td->cur_depth -= ret;
+                               workqueue_pre_sleep(sw);
                                pthread_mutex_lock(&sw->lock);
                        }
 
@@ -335,7 +317,8 @@ done:
 
 static void free_worker(struct submit_worker *sw)
 {
-       struct thread_data *td = &sw->td;
+       struct thread_data *td = sw->private;
+       struct workqueue *wq = sw->wq;
 
        fio_options_free(td);
        close_and_free_files(td);
@@ -345,15 +328,19 @@ static void free_worker(struct submit_worker *sw)
 
        pthread_cond_destroy(&sw->cond);
        pthread_mutex_destroy(&sw->lock);
+
+       if (wq->ops.free_worker_fn)
+               wq->ops.free_worker_fn(sw);
 }
 
 static void shutdown_worker(struct submit_worker *sw, unsigned int *sum_cnt)
 {
        struct thread_data *parent = sw->wq->td;
+       struct thread_data *td = sw->private;
 
        pthread_join(sw->thread, NULL);
        (*sum_cnt)++;
-       sum_thread_stats(&parent->ts, &sw->td.ts, *sum_cnt == 1);
+       sum_thread_stats(&parent->ts, &td->ts, *sum_cnt == 1);
        free_worker(sw);
 }
 
@@ -403,6 +390,12 @@ static int start_worker(struct workqueue *wq, unsigned int index)
        sw->wq = wq;
        sw->index = index;
 
+       if (wq->ops.alloc_worker_fn) {
+               ret = wq->ops.alloc_worker_fn(sw);
+               if (ret)
+                       return ret;
+       }
+
        ret = pthread_create(&sw->thread, NULL, worker_thread, sw);
        if (!ret) {
                pthread_mutex_lock(&sw->lock);
@@ -416,14 +409,14 @@ static int start_worker(struct workqueue *wq, unsigned int index)
 }
 
 int workqueue_init(struct thread_data *td, struct workqueue *wq,
-                  workqueue_fn *fn, unsigned max_pending)
+                  struct workqueue_ops *ops, unsigned max_pending)
 {
        unsigned int running;
        int i, error;
 
        wq->max_workers = max_pending;
        wq->td = td;
-       wq->fn = fn;
+       wq->ops = *ops;
        wq->work_seq = 0;
        wq->next_free_worker = 0;
        pthread_cond_init(&wq->flush_cond, NULL);