X-Git-Url: https://git.kernel.dk/?p=fio.git;a=blobdiff_plain;f=workqueue.c;h=11791209b54da02502d26c06ed0d60be25d304ee;hp=8f6963f58aaadac55ef37d378f8c7f05c928516e;hb=882718418309228b28146dc03d43c9253c7cfb35;hpb=2a2743361cf643b9dd2ba3e491da62e7cb83a101 diff --git a/workqueue.c b/workqueue.c index 8f6963f5..11791209 100644 --- a/workqueue.c +++ b/workqueue.c @@ -7,7 +7,6 @@ #include #include "fio.h" -#include "ioengine.h" #include "flist.h" #include "workqueue.h" #include "lib/getrusage.h" @@ -79,7 +78,7 @@ static struct submit_worker *get_submit_worker(struct workqueue *wq) return sw; } -static int all_sw_idle(struct workqueue *wq) +static bool all_sw_idle(struct workqueue *wq) { int i; @@ -87,10 +86,10 @@ static int all_sw_idle(struct workqueue *wq) struct submit_worker *sw = &wq->workers[i]; if (!(sw->flags & SW_F_IDLE)) - return 0; + return false; } - return 1; + return true; } /* @@ -110,44 +109,36 @@ void workqueue_flush(struct workqueue *wq) } /* - * Must be serialized by caller. + * Must be serialized by caller. Returns true for queued, false for busy. */ -int workqueue_enqueue(struct workqueue *wq, struct io_u *io_u) +bool workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work) { struct submit_worker *sw; sw = get_submit_worker(wq); if (sw) { - const enum fio_ddir ddir = acct_ddir(io_u); - struct thread_data *parent = wq->td; - - if (ddir_rw(ddir)) { - parent->io_issues[ddir]++; - parent->io_issue_bytes[ddir] += io_u->xfer_buflen; - } - pthread_mutex_lock(&sw->lock); - flist_add_tail(&io_u->verify_list, &sw->work_list); + flist_add_tail(&work->list, &sw->work_list); sw->seq = ++wq->work_seq; sw->flags &= ~SW_F_IDLE; pthread_mutex_unlock(&sw->lock); pthread_cond_signal(&sw->cond); - return FIO_Q_QUEUED; + return true; } - return FIO_Q_BUSY; + return false; } static void handle_list(struct submit_worker *sw, struct flist_head *list) { struct workqueue *wq = sw->wq; - struct io_u *io_u; + struct workqueue_work *work; while (!flist_empty(list)) { - io_u = flist_first_entry(list, struct io_u, verify_list); - flist_del_init(&io_u->verify_list); - wq->fn(&sw->td, io_u); + work = flist_first_entry(list, struct workqueue_work, list); + flist_del_init(&work->list); + wq->fn(&sw->td, work); } } @@ -161,6 +152,7 @@ static int init_submit_worker(struct submit_worker *sw) memcpy(&td->ts, &parent->ts, sizeof(td->ts)); td->o.uid = td->o.gid = -1U; dup_files(td, parent); + td->eo = parent->eo; fio_options_mem_dupe(td); if (ioengine_load(td)) @@ -184,7 +176,7 @@ static int init_submit_worker(struct submit_worker *sw) fio_gettime(&td->epoch, NULL); fio_getrusage(&td->ru_start); - clear_io_state(td); + clear_io_state(td, 1); td_set_runstate(td, TD_RUNNING); td->flags |= TD_F_CHILD; @@ -197,7 +189,7 @@ err: return 1; } -#ifdef CONFIG_SFA +#ifdef CONFIG_SFAA static void sum_val(uint64_t *dst, uint64_t *src) { if (*src) { @@ -215,13 +207,32 @@ static void sum_val(uint64_t *dst, uint64_t *src) } #endif +static void pthread_double_unlock(pthread_mutex_t *lock1, + pthread_mutex_t *lock2) +{ +#ifndef CONFIG_SFAA + pthread_mutex_unlock(lock1); + pthread_mutex_unlock(lock2); +#endif +} + +static void pthread_double_lock(pthread_mutex_t *lock1, pthread_mutex_t *lock2) +{ +#ifndef CONFIG_SFAA + if (lock1 < lock2) { + pthread_mutex_lock(lock1); + pthread_mutex_lock(lock2); + } else { + pthread_mutex_lock(lock2); + pthread_mutex_lock(lock1); + } +#endif +} + static void sum_ddir(struct thread_data *dst, struct thread_data *src, enum fio_ddir ddir) { -#ifndef CONFIG_SFA - pthread_mutex_lock(&dst->io_wq.stat_lock); - pthread_mutex_lock(&src->io_wq.stat_lock); -#endif + pthread_double_lock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock); sum_val(&dst->io_bytes[ddir], &src->io_bytes[ddir]); sum_val(&dst->io_blocks[ddir], &src->io_blocks[ddir]); @@ -229,10 +240,7 @@ static void sum_ddir(struct thread_data *dst, struct thread_data *src, sum_val(&dst->this_io_bytes[ddir], &src->this_io_bytes[ddir]); sum_val(&dst->bytes_done[ddir], &src->bytes_done[ddir]); -#ifndef CONFIG_SFA - pthread_mutex_unlock(&src->io_wq.stat_lock); - pthread_mutex_unlock(&dst->io_wq.stat_lock); -#endif + pthread_double_unlock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock); } static void update_accounting(struct submit_worker *sw) @@ -281,8 +289,12 @@ static void *worker_thread(void *data) if (td->io_u_queued || td->cur_depth || td->io_u_in_flight) { + int ret; + pthread_mutex_unlock(&sw->lock); - io_u_quiesce(td); + ret = io_u_quiesce(td); + if (ret > 0) + td->cur_depth -= ret; pthread_mutex_lock(&sw->lock); } @@ -341,7 +353,7 @@ static void shutdown_worker(struct submit_worker *sw, unsigned int *sum_cnt) pthread_join(sw->thread, NULL); (*sum_cnt)++; - sum_thread_stats(&parent->ts, &sw->td.ts, *sum_cnt); + sum_thread_stats(&parent->ts, &sw->td.ts, *sum_cnt == 1); free_worker(sw); } @@ -366,7 +378,9 @@ void workqueue_exit(struct workqueue *wq) sw = &wq->workers[i]; if (sw->flags & SW_F_ACCOUNTED) continue; + pthread_mutex_lock(&sw->lock); sw->flags |= SW_F_ACCOUNTED; + pthread_mutex_unlock(&sw->lock); shutdown_worker(sw, &sum_cnt); shutdown++; }