X-Git-Url: https://git.kernel.dk/?a=blobdiff_plain;f=workqueue.c;h=9e6c41ff2f399172703b6e438061236670962df8;hb=855dc4d44e000b68c01b0e33eae3389b49eb7f7f;hp=f26d9d7bae1f3f75a2c9794b0ee0e41656759c82;hpb=df8472e12d8bdbe32ff1a4cf4f0c73224b8e8020;p=fio.git diff --git a/workqueue.c b/workqueue.c index f26d9d7b..9e6c41ff 100644 --- a/workqueue.c +++ b/workqueue.c @@ -1,5 +1,5 @@ /* - * Rated submission helpers + * Generic workqueue offload mechanism * * Copyright (C) 2015 Jens Axboe * @@ -9,15 +9,15 @@ #include "fio.h" #include "flist.h" #include "workqueue.h" -#include "lib/getrusage.h" +#include "smalloc.h" +#include "pshared.h" enum { SW_F_IDLE = 1 << 0, SW_F_RUNNING = 1 << 1, SW_F_EXIT = 1 << 2, - SW_F_EXITED = 1 << 3, - SW_F_ACCOUNTED = 1 << 4, - SW_F_ERROR = 1 << 5, + SW_F_ACCOUNTED = 1 << 3, + SW_F_ERROR = 1 << 4, }; static struct submit_worker *__get_submit_worker(struct workqueue *wq, @@ -85,37 +85,33 @@ static bool all_sw_idle(struct workqueue *wq) */ void workqueue_flush(struct workqueue *wq) { + pthread_mutex_lock(&wq->flush_lock); wq->wake_idle = 1; - while (!all_sw_idle(wq)) { - pthread_mutex_lock(&wq->flush_lock); + while (!all_sw_idle(wq)) pthread_cond_wait(&wq->flush_cond, &wq->flush_lock); - pthread_mutex_unlock(&wq->flush_lock); - } wq->wake_idle = 0; + pthread_mutex_unlock(&wq->flush_lock); } /* - * Must be serialized by caller. Returns true for queued, false for busy. + * Must be serialized by caller. */ -bool workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work) +void workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work) { struct submit_worker *sw; sw = get_submit_worker(wq); - if (sw) { - pthread_mutex_lock(&sw->lock); - flist_add_tail(&work->list, &sw->work_list); - sw->seq = ++wq->work_seq; - sw->flags &= ~SW_F_IDLE; - pthread_mutex_unlock(&sw->lock); + assert(sw); - pthread_cond_signal(&sw->cond); - return true; - } + pthread_mutex_lock(&sw->lock); + flist_add_tail(&work->list, &sw->work_list); + sw->seq = ++wq->work_seq; + sw->flags &= ~SW_F_IDLE; - return false; + pthread_cond_signal(&sw->cond); + pthread_mutex_unlock(&sw->lock); } static void handle_list(struct submit_worker *sw, struct flist_head *list) @@ -130,81 +126,25 @@ static void handle_list(struct submit_worker *sw, struct flist_head *list) } } -#ifdef CONFIG_SFAA -static void sum_val(uint64_t *dst, uint64_t *src) -{ - if (*src) { - __sync_fetch_and_add(dst, *src); - *src = 0; - } -} -#else -static void sum_val(uint64_t *dst, uint64_t *src) -{ - if (*src) { - *dst += *src; - *src = 0; - } -} -#endif - -static void pthread_double_unlock(pthread_mutex_t *lock1, - pthread_mutex_t *lock2) -{ -#ifndef CONFIG_SFAA - pthread_mutex_unlock(lock1); - pthread_mutex_unlock(lock2); -#endif -} - -static void pthread_double_lock(pthread_mutex_t *lock1, pthread_mutex_t *lock2) -{ -#ifndef CONFIG_SFAA - if (lock1 < lock2) { - pthread_mutex_lock(lock1); - pthread_mutex_lock(lock2); - } else { - pthread_mutex_lock(lock2); - pthread_mutex_lock(lock1); - } -#endif -} - -static void sum_ddir(struct thread_data *dst, struct thread_data *src, - enum fio_ddir ddir) -{ - pthread_double_lock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock); - - sum_val(&dst->io_bytes[ddir], &src->io_bytes[ddir]); - sum_val(&dst->io_blocks[ddir], &src->io_blocks[ddir]); - sum_val(&dst->this_io_blocks[ddir], &src->this_io_blocks[ddir]); - sum_val(&dst->this_io_bytes[ddir], &src->this_io_bytes[ddir]); - sum_val(&dst->bytes_done[ddir], &src->bytes_done[ddir]); - - pthread_double_unlock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock); -} - -static void update_accounting(struct submit_worker *sw) -{ - struct thread_data *src = sw->private; - struct thread_data *dst = sw->wq->td; - - if (td_read(src)) - sum_ddir(dst, src, DDIR_READ); - if (td_write(src)) - sum_ddir(dst, src, DDIR_WRITE); - if (td_trim(src)) - sum_ddir(dst, src, DDIR_TRIM); -} - static void *worker_thread(void *data) { struct submit_worker *sw = data; struct workqueue *wq = sw->wq; - unsigned int eflags = 0, ret; + unsigned int ret = 0; FLIST_HEAD(local_list); - ret = workqueue_init_worker(sw); + sk_out_assign(sw->sk_out); + + if (wq->ops.nice) { + if (nice(wq->ops.nice) < 0) { + log_err("workqueue: nice %s\n", strerror(errno)); + ret = 1; + } + } + + if (!ret) + ret = workqueue_init_worker(sw); + pthread_mutex_lock(&sw->lock); sw->flags |= SW_F_RUNNING; if (ret) @@ -218,12 +158,10 @@ static void *worker_thread(void *data) if (sw->flags & SW_F_ERROR) goto done; + pthread_mutex_lock(&sw->lock); while (1) { - pthread_mutex_lock(&sw->lock); - if (flist_empty(&sw->work_list)) { if (sw->flags & SW_F_EXIT) { - pthread_mutex_unlock(&sw->lock); break; } @@ -232,47 +170,52 @@ static void *worker_thread(void *data) workqueue_pre_sleep(sw); pthread_mutex_lock(&sw->lock); } - - /* - * We dropped and reaquired the lock, check - * state again. - */ - if (!flist_empty(&sw->work_list)) - goto handle_work; - + } + /* + * We may have dropped and reaquired the lock, check state + * again. + */ + if (flist_empty(&sw->work_list)) { if (sw->flags & SW_F_EXIT) { - pthread_mutex_unlock(&sw->lock); break; - } else if (!(sw->flags & SW_F_IDLE)) { + } + if (!(sw->flags & SW_F_IDLE)) { sw->flags |= SW_F_IDLE; wq->next_free_worker = sw->index; + pthread_mutex_unlock(&sw->lock); + pthread_mutex_lock(&wq->flush_lock); if (wq->wake_idle) pthread_cond_signal(&wq->flush_cond); + pthread_mutex_unlock(&wq->flush_lock); + pthread_mutex_lock(&sw->lock); + } + } + if (flist_empty(&sw->work_list)) { + if (sw->flags & SW_F_EXIT) { + break; } - update_accounting(sw); pthread_cond_wait(&sw->cond, &sw->lock); } else { -handle_work: flist_splice_init(&sw->work_list, &local_list); } pthread_mutex_unlock(&sw->lock); handle_list(sw, &local_list); + if (wq->ops.update_acct_fn) + wq->ops.update_acct_fn(sw); + pthread_mutex_lock(&sw->lock); } - - update_accounting(sw); + pthread_mutex_unlock(&sw->lock); done: - pthread_mutex_lock(&sw->lock); - sw->flags |= (SW_F_EXITED | eflags); - pthread_mutex_unlock(&sw->lock); + sk_out_drop(); return NULL; } -static void free_worker(struct submit_worker *sw) +static void free_worker(struct submit_worker *sw, unsigned int *sum_cnt) { struct workqueue *wq = sw->wq; - workqueue_exit_worker(sw); + workqueue_exit_worker(sw, sum_cnt); pthread_cond_destroy(&sw->cond); pthread_mutex_destroy(&sw->lock); @@ -283,13 +226,8 @@ static void free_worker(struct submit_worker *sw) static void shutdown_worker(struct submit_worker *sw, unsigned int *sum_cnt) { - struct thread_data *parent = sw->wq->td; - struct thread_data *td = sw->private; - pthread_join(sw->thread, NULL); - (*sum_cnt)++; - sum_thread_stats(&parent->ts, &td->ts, *sum_cnt == 1); - free_worker(sw); + free_worker(sw, sum_cnt); } void workqueue_exit(struct workqueue *wq) @@ -298,6 +236,9 @@ void workqueue_exit(struct workqueue *wq) struct submit_worker *sw; int i; + if (!wq->workers) + return; + for (i = 0; i < wq->max_workers; i++) { sw = &wq->workers[i]; @@ -321,22 +262,28 @@ void workqueue_exit(struct workqueue *wq) } } while (shutdown && shutdown != wq->max_workers); - free(wq->workers); + sfree(wq->workers); + wq->workers = NULL; pthread_mutex_destroy(&wq->flush_lock); pthread_cond_destroy(&wq->flush_cond); pthread_mutex_destroy(&wq->stat_lock); } -static int start_worker(struct workqueue *wq, unsigned int index) +static int start_worker(struct workqueue *wq, unsigned int index, + struct sk_out *sk_out) { struct submit_worker *sw = &wq->workers[index]; int ret; INIT_FLIST_HEAD(&sw->work_list); - pthread_cond_init(&sw->cond, NULL); - pthread_mutex_init(&sw->lock, NULL); + + ret = mutex_cond_init_pshared(&sw->lock, &sw->cond); + if (ret) + return ret; + sw->wq = wq; sw->index = index; + sw->sk_out = sk_out; if (wq->ops.alloc_worker_fn) { ret = wq->ops.alloc_worker_fn(sw); @@ -352,29 +299,37 @@ static int start_worker(struct workqueue *wq, unsigned int index) return 0; } - free_worker(sw); + free_worker(sw, NULL); return 1; } int workqueue_init(struct thread_data *td, struct workqueue *wq, - struct workqueue_ops *ops, unsigned max_pending) + struct workqueue_ops *ops, unsigned int max_workers, + struct sk_out *sk_out) { unsigned int running; int i, error; + int ret; - wq->max_workers = max_pending; + wq->max_workers = max_workers; wq->td = td; wq->ops = *ops; wq->work_seq = 0; wq->next_free_worker = 0; - pthread_cond_init(&wq->flush_cond, NULL); - pthread_mutex_init(&wq->flush_lock, NULL); - pthread_mutex_init(&wq->stat_lock, NULL); - wq->workers = calloc(wq->max_workers, sizeof(struct submit_worker)); + ret = mutex_cond_init_pshared(&wq->flush_lock, &wq->flush_cond); + if (ret) + goto err; + ret = mutex_init_pshared(&wq->stat_lock); + if (ret) + goto err; + + wq->workers = smalloc(wq->max_workers * sizeof(struct submit_worker)); + if (!wq->workers) + goto err; for (i = 0; i < wq->max_workers; i++) - if (start_worker(wq, i)) + if (start_worker(wq, i, sk_out)) break; wq->max_workers = i; @@ -385,11 +340,11 @@ int workqueue_init(struct thread_data *td, struct workqueue *wq, * Wait for them all to be started and initialized */ error = 0; + pthread_mutex_lock(&wq->flush_lock); do { struct submit_worker *sw; running = 0; - pthread_mutex_lock(&wq->flush_lock); for (i = 0; i < wq->max_workers; i++) { sw = &wq->workers[i]; pthread_mutex_lock(&sw->lock); @@ -400,14 +355,12 @@ int workqueue_init(struct thread_data *td, struct workqueue *wq, pthread_mutex_unlock(&sw->lock); } - if (error || running == wq->max_workers) { - pthread_mutex_unlock(&wq->flush_lock); + if (error || running == wq->max_workers) break; - } pthread_cond_wait(&wq->flush_cond, &wq->flush_lock); - pthread_mutex_unlock(&wq->flush_lock); } while (1); + pthread_mutex_unlock(&wq->flush_lock); if (!error) return 0;