X-Git-Url: https://git.kernel.dk/?p=fio.git;a=blobdiff_plain;f=workqueue.c;h=06fc26310dd57a829c4b05ce5bc00cd17856ab3f;hp=11791209b54da02502d26c06ed0d60be25d304ee;hb=51575029ff4027f42ef6be374fd50b2cda5880b4;hpb=882718418309228b28146dc03d43c9253c7cfb35 diff --git a/workqueue.c b/workqueue.c index 11791209..06fc2631 100644 --- a/workqueue.c +++ b/workqueue.c @@ -1,5 +1,5 @@ /* - * Rated submission helpers + * Generic workqueue offload mechanism * * Copyright (C) 2015 Jens Axboe * @@ -9,19 +9,6 @@ #include "fio.h" #include "flist.h" #include "workqueue.h" -#include "lib/getrusage.h" - -struct submit_worker { - pthread_t thread; - pthread_mutex_t lock; - pthread_cond_t cond; - struct flist_head work_list; - unsigned int flags; - unsigned int index; - uint64_t seq; - struct workqueue *wq; - struct thread_data td; -}; enum { SW_F_IDLE = 1 << 0, @@ -111,23 +98,20 @@ void workqueue_flush(struct workqueue *wq) /* * Must be serialized by caller. Returns true for queued, false for busy. */ -bool workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work) +void workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work) { struct submit_worker *sw; sw = get_submit_worker(wq); - if (sw) { - pthread_mutex_lock(&sw->lock); - flist_add_tail(&work->list, &sw->work_list); - sw->seq = ++wq->work_seq; - sw->flags &= ~SW_F_IDLE; - pthread_mutex_unlock(&sw->lock); + assert(sw); - pthread_cond_signal(&sw->cond); - return true; - } + pthread_mutex_lock(&sw->lock); + flist_add_tail(&work->list, &sw->work_list); + sw->seq = ++wq->work_seq; + sw->flags &= ~SW_F_IDLE; + pthread_mutex_unlock(&sw->lock); - return false; + pthread_cond_signal(&sw->cond); } static void handle_list(struct submit_worker *sw, struct flist_head *list) @@ -138,133 +122,27 @@ static void handle_list(struct submit_worker *sw, struct flist_head *list) while (!flist_empty(list)) { work = flist_first_entry(list, struct workqueue_work, list); flist_del_init(&work->list); - wq->fn(&sw->td, work); - } -} - -static int init_submit_worker(struct submit_worker *sw) -{ - struct thread_data *parent = sw->wq->td; - struct thread_data *td = &sw->td; - int fio_unused ret; - - memcpy(&td->o, &parent->o, sizeof(td->o)); - memcpy(&td->ts, &parent->ts, sizeof(td->ts)); - td->o.uid = td->o.gid = -1U; - dup_files(td, parent); - td->eo = parent->eo; - fio_options_mem_dupe(td); - - if (ioengine_load(td)) - goto err; - - if (td->o.odirect) - td->io_ops->flags |= FIO_RAWIO; - - td->pid = gettid(); - - INIT_FLIST_HEAD(&td->io_log_list); - INIT_FLIST_HEAD(&td->io_hist_list); - INIT_FLIST_HEAD(&td->verify_list); - INIT_FLIST_HEAD(&td->trim_list); - INIT_FLIST_HEAD(&td->next_rand_list); - td->io_hist_tree = RB_ROOT; - - td->o.iodepth = 1; - if (td_io_init(td)) - goto err_io_init; - - fio_gettime(&td->epoch, NULL); - fio_getrusage(&td->ru_start); - clear_io_state(td, 1); - - td_set_runstate(td, TD_RUNNING); - td->flags |= TD_F_CHILD; - td->parent = parent; - return 0; - -err_io_init: - close_ioengine(td); -err: - return 1; -} - -#ifdef CONFIG_SFAA -static void sum_val(uint64_t *dst, uint64_t *src) -{ - if (*src) { - __sync_fetch_and_add(dst, *src); - *src = 0; - } -} -#else -static void sum_val(uint64_t *dst, uint64_t *src) -{ - if (*src) { - *dst += *src; - *src = 0; + wq->ops.fn(sw, work); } } -#endif - -static void pthread_double_unlock(pthread_mutex_t *lock1, - pthread_mutex_t *lock2) -{ -#ifndef CONFIG_SFAA - pthread_mutex_unlock(lock1); - pthread_mutex_unlock(lock2); -#endif -} - -static void pthread_double_lock(pthread_mutex_t *lock1, pthread_mutex_t *lock2) -{ -#ifndef CONFIG_SFAA - if (lock1 < lock2) { - pthread_mutex_lock(lock1); - pthread_mutex_lock(lock2); - } else { - pthread_mutex_lock(lock2); - pthread_mutex_lock(lock1); - } -#endif -} - -static void sum_ddir(struct thread_data *dst, struct thread_data *src, - enum fio_ddir ddir) -{ - pthread_double_lock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock); - - sum_val(&dst->io_bytes[ddir], &src->io_bytes[ddir]); - sum_val(&dst->io_blocks[ddir], &src->io_blocks[ddir]); - sum_val(&dst->this_io_blocks[ddir], &src->this_io_blocks[ddir]); - sum_val(&dst->this_io_bytes[ddir], &src->this_io_bytes[ddir]); - sum_val(&dst->bytes_done[ddir], &src->bytes_done[ddir]); - - pthread_double_unlock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock); -} - -static void update_accounting(struct submit_worker *sw) -{ - struct thread_data *src = &sw->td; - struct thread_data *dst = sw->wq->td; - - if (td_read(src)) - sum_ddir(dst, src, DDIR_READ); - if (td_write(src)) - sum_ddir(dst, src, DDIR_WRITE); - if (td_trim(src)) - sum_ddir(dst, src, DDIR_TRIM); -} static void *worker_thread(void *data) { struct submit_worker *sw = data; struct workqueue *wq = sw->wq; - struct thread_data *td = &sw->td; - unsigned int eflags = 0, ret; + unsigned int eflags = 0, ret = 0; FLIST_HEAD(local_list); - ret = init_submit_worker(sw); + if (wq->ops.nice) { + if (nice(wq->ops.nice) < 0) { + log_err("workqueue: nice %s\n", strerror(errno)); + ret = 1; + } + } + + if (!ret) + ret = workqueue_init_worker(sw); + pthread_mutex_lock(&sw->lock); sw->flags |= SW_F_RUNNING; if (ret) @@ -287,14 +165,9 @@ static void *worker_thread(void *data) break; } - if (td->io_u_queued || td->cur_depth || - td->io_u_in_flight) { - int ret; - + if (workqueue_pre_sleep_check(sw)) { pthread_mutex_unlock(&sw->lock); - ret = io_u_quiesce(td); - if (ret > 0) - td->cur_depth -= ret; + workqueue_pre_sleep(sw); pthread_mutex_lock(&sw->lock); } @@ -314,7 +187,9 @@ static void *worker_thread(void *data) if (wq->wake_idle) pthread_cond_signal(&wq->flush_cond); } - update_accounting(sw); + if (wq->ops.update_acct_fn) + wq->ops.update_acct_fn(sw); + pthread_cond_wait(&sw->cond, &sw->lock); } else { handle_work: @@ -324,7 +199,8 @@ handle_work: handle_list(sw, &local_list); } - update_accounting(sw); + if (wq->ops.update_acct_fn) + wq->ops.update_acct_fn(sw); done: pthread_mutex_lock(&sw->lock); @@ -333,28 +209,23 @@ done: return NULL; } -static void free_worker(struct submit_worker *sw) +static void free_worker(struct submit_worker *sw, unsigned int *sum_cnt) { - struct thread_data *td = &sw->td; + struct workqueue *wq = sw->wq; - fio_options_free(td); - close_and_free_files(td); - if (td->io_ops) - close_ioengine(td); - td_set_runstate(td, TD_EXITED); + workqueue_exit_worker(sw, sum_cnt); pthread_cond_destroy(&sw->cond); pthread_mutex_destroy(&sw->lock); + + if (wq->ops.free_worker_fn) + wq->ops.free_worker_fn(sw); } static void shutdown_worker(struct submit_worker *sw, unsigned int *sum_cnt) { - struct thread_data *parent = sw->wq->td; - pthread_join(sw->thread, NULL); - (*sum_cnt)++; - sum_thread_stats(&parent->ts, &sw->td.ts, *sum_cnt == 1); - free_worker(sw); + free_worker(sw, sum_cnt); } void workqueue_exit(struct workqueue *wq) @@ -403,6 +274,12 @@ static int start_worker(struct workqueue *wq, unsigned int index) sw->wq = wq; sw->index = index; + if (wq->ops.alloc_worker_fn) { + ret = wq->ops.alloc_worker_fn(sw); + if (ret) + return ret; + } + ret = pthread_create(&sw->thread, NULL, worker_thread, sw); if (!ret) { pthread_mutex_lock(&sw->lock); @@ -411,19 +288,19 @@ static int start_worker(struct workqueue *wq, unsigned int index) return 0; } - free_worker(sw); + free_worker(sw, NULL); return 1; } int workqueue_init(struct thread_data *td, struct workqueue *wq, - workqueue_fn *fn, unsigned max_pending) + struct workqueue_ops *ops, unsigned max_pending) { unsigned int running; int i, error; wq->max_workers = max_pending; wq->td = td; - wq->fn = fn; + wq->ops = *ops; wq->work_seq = 0; wq->next_free_worker = 0; pthread_cond_init(&wq->flush_cond, NULL);