X-Git-Url: https://git.kernel.dk/?a=blobdiff_plain;f=workqueue.c;h=f0ebd8a90bb3af03f47e2bec4d6ccbc36c2e4922;hb=ee2b6d6e5baedb00661cdf50016a06beae6a9d9c;hp=8d43b090229c1eabda627bdc2b87f91beb3b2873;hpb=fd595830408b43beabbd94c8ad3d10e1f68356dd;p=fio.git diff --git a/workqueue.c b/workqueue.c index 8d43b090..f0ebd8a9 100644 --- a/workqueue.c +++ b/workqueue.c @@ -7,23 +7,10 @@ #include #include "fio.h" -#include "ioengine.h" #include "flist.h" #include "workqueue.h" #include "lib/getrusage.h" -struct submit_worker { - pthread_t thread; - pthread_mutex_t lock; - pthread_cond_t cond; - struct flist_head work_list; - unsigned int flags; - unsigned int index; - uint64_t seq; - struct workqueue *wq; - struct thread_data td; -}; - enum { SW_F_IDLE = 1 << 0, SW_F_RUNNING = 1 << 1, @@ -110,52 +97,43 @@ void workqueue_flush(struct workqueue *wq) } /* - * Must be serialized by caller. + * Must be serialized by caller. Returns true for queued, false for busy. */ -int workqueue_enqueue(struct workqueue *wq, struct io_u *io_u) +bool workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work) { struct submit_worker *sw; sw = get_submit_worker(wq); if (sw) { - const enum fio_ddir ddir = acct_ddir(io_u); - struct thread_data *parent = wq->td; - - if (ddir_rw(ddir)) { - parent->io_issues[ddir]++; - parent->io_issue_bytes[ddir] += io_u->xfer_buflen; - parent->rate_io_issue_bytes[ddir] += io_u->xfer_buflen; - } - pthread_mutex_lock(&sw->lock); - flist_add_tail(&io_u->verify_list, &sw->work_list); + flist_add_tail(&work->list, &sw->work_list); sw->seq = ++wq->work_seq; sw->flags &= ~SW_F_IDLE; pthread_mutex_unlock(&sw->lock); pthread_cond_signal(&sw->cond); - return FIO_Q_QUEUED; + return true; } - return FIO_Q_BUSY; + return false; } static void handle_list(struct submit_worker *sw, struct flist_head *list) { struct workqueue *wq = sw->wq; - struct io_u *io_u; + struct workqueue_work *work; while (!flist_empty(list)) { - io_u = flist_first_entry(list, struct io_u, verify_list); - flist_del_init(&io_u->verify_list); - wq->fn(&sw->td, io_u); + work = flist_first_entry(list, struct workqueue_work, list); + flist_del_init(&work->list); + wq->ops.fn(sw, work); } } static int init_submit_worker(struct submit_worker *sw) { struct thread_data *parent = sw->wq->td; - struct thread_data *td = &sw->td; + struct thread_data *td = sw->private; int fio_unused ret; memcpy(&td->o, &parent->o, sizeof(td->o)); @@ -255,7 +233,7 @@ static void sum_ddir(struct thread_data *dst, struct thread_data *src, static void update_accounting(struct submit_worker *sw) { - struct thread_data *src = &sw->td; + struct thread_data *src = sw->private; struct thread_data *dst = sw->wq->td; if (td_read(src)) @@ -270,7 +248,6 @@ static void *worker_thread(void *data) { struct submit_worker *sw = data; struct workqueue *wq = sw->wq; - struct thread_data *td = &sw->td; unsigned int eflags = 0, ret; FLIST_HEAD(local_list); @@ -297,14 +274,9 @@ static void *worker_thread(void *data) break; } - if (td->io_u_queued || td->cur_depth || - td->io_u_in_flight) { - int ret; - + if (workqueue_pre_sleep_check(sw)) { pthread_mutex_unlock(&sw->lock); - ret = io_u_quiesce(td); - if (ret > 0) - td->cur_depth -= ret; + workqueue_pre_sleep(sw); pthread_mutex_lock(&sw->lock); } @@ -345,7 +317,8 @@ done: static void free_worker(struct submit_worker *sw) { - struct thread_data *td = &sw->td; + struct thread_data *td = sw->private; + struct workqueue *wq = sw->wq; fio_options_free(td); close_and_free_files(td); @@ -355,15 +328,19 @@ static void free_worker(struct submit_worker *sw) pthread_cond_destroy(&sw->cond); pthread_mutex_destroy(&sw->lock); + + if (wq->ops.free_worker_fn) + wq->ops.free_worker_fn(sw); } static void shutdown_worker(struct submit_worker *sw, unsigned int *sum_cnt) { struct thread_data *parent = sw->wq->td; + struct thread_data *td = sw->private; pthread_join(sw->thread, NULL); (*sum_cnt)++; - sum_thread_stats(&parent->ts, &sw->td.ts, *sum_cnt == 1); + sum_thread_stats(&parent->ts, &td->ts, *sum_cnt == 1); free_worker(sw); } @@ -413,6 +390,12 @@ static int start_worker(struct workqueue *wq, unsigned int index) sw->wq = wq; sw->index = index; + if (wq->ops.alloc_worker_fn) { + ret = wq->ops.alloc_worker_fn(sw); + if (ret) + return ret; + } + ret = pthread_create(&sw->thread, NULL, worker_thread, sw); if (!ret) { pthread_mutex_lock(&sw->lock); @@ -426,14 +409,14 @@ static int start_worker(struct workqueue *wq, unsigned int index) } int workqueue_init(struct thread_data *td, struct workqueue *wq, - workqueue_fn *fn, unsigned max_pending) + struct workqueue_ops *ops, unsigned max_pending) { unsigned int running; int i, error; wq->max_workers = max_pending; wq->td = td; - wq->fn = fn; + wq->ops = *ops; wq->work_seq = 0; wq->next_free_worker = 0; pthread_cond_init(&wq->flush_cond, NULL);