X-Git-Url: https://git.kernel.dk/?a=blobdiff_plain;f=workqueue.c;h=f0ebd8a90bb3af03f47e2bec4d6ccbc36c2e4922;hb=ee2b6d6e5baedb00661cdf50016a06beae6a9d9c;hp=54761b0902b612662b71521ad902b3be61a0ddb4;hpb=5bb79f69c2d9dc8542c25af96f040d1884230688;p=fio.git diff --git a/workqueue.c b/workqueue.c index 54761b09..f0ebd8a9 100644 --- a/workqueue.c +++ b/workqueue.c @@ -11,18 +11,6 @@ #include "workqueue.h" #include "lib/getrusage.h" -struct submit_worker { - pthread_t thread; - pthread_mutex_t lock; - pthread_cond_t cond; - struct flist_head work_list; - unsigned int flags; - unsigned int index; - uint64_t seq; - struct workqueue *wq; - struct thread_data td; -}; - enum { SW_F_IDLE = 1 << 0, SW_F_RUNNING = 1 << 1, @@ -138,14 +126,14 @@ static void handle_list(struct submit_worker *sw, struct flist_head *list) while (!flist_empty(list)) { work = flist_first_entry(list, struct workqueue_work, list); flist_del_init(&work->list); - wq->ops.fn(&sw->td, work); + wq->ops.fn(sw, work); } } static int init_submit_worker(struct submit_worker *sw) { struct thread_data *parent = sw->wq->td; - struct thread_data *td = &sw->td; + struct thread_data *td = sw->private; int fio_unused ret; memcpy(&td->o, &parent->o, sizeof(td->o)); @@ -245,7 +233,7 @@ static void sum_ddir(struct thread_data *dst, struct thread_data *src, static void update_accounting(struct submit_worker *sw) { - struct thread_data *src = &sw->td; + struct thread_data *src = sw->private; struct thread_data *dst = sw->wq->td; if (td_read(src)) @@ -286,9 +274,9 @@ static void *worker_thread(void *data) break; } - if (workqueue_pre_sleep_check(wq)) { + if (workqueue_pre_sleep_check(sw)) { pthread_mutex_unlock(&sw->lock); - workqueue_pre_sleep(wq); + workqueue_pre_sleep(sw); pthread_mutex_lock(&sw->lock); } @@ -329,7 +317,8 @@ done: static void free_worker(struct submit_worker *sw) { - struct thread_data *td = &sw->td; + struct thread_data *td = sw->private; + struct workqueue *wq = sw->wq; fio_options_free(td); close_and_free_files(td); @@ -339,15 +328,19 @@ static void free_worker(struct submit_worker *sw) pthread_cond_destroy(&sw->cond); pthread_mutex_destroy(&sw->lock); + + if (wq->ops.free_worker_fn) + wq->ops.free_worker_fn(sw); } static void shutdown_worker(struct submit_worker *sw, unsigned int *sum_cnt) { struct thread_data *parent = sw->wq->td; + struct thread_data *td = sw->private; pthread_join(sw->thread, NULL); (*sum_cnt)++; - sum_thread_stats(&parent->ts, &sw->td.ts, *sum_cnt == 1); + sum_thread_stats(&parent->ts, &td->ts, *sum_cnt == 1); free_worker(sw); } @@ -397,6 +390,12 @@ static int start_worker(struct workqueue *wq, unsigned int index) sw->wq = wq; sw->index = index; + if (wq->ops.alloc_worker_fn) { + ret = wq->ops.alloc_worker_fn(sw); + if (ret) + return ret; + } + ret = pthread_create(&sw->thread, NULL, worker_thread, sw); if (!ret) { pthread_mutex_lock(&sw->lock);