From 155f2f027b34321f6f5abe79c09ece8ecca25046 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Tue, 8 Dec 2015 11:35:39 -0700 Subject: [PATCH] iolog: replace tp usage with workqueue Now that workqueue.c is generic enough to handle any kind of work offloading, convert the online log compression to workqueue usage and kill off lib/tp.* Signed-off-by: Jens Axboe --- backend.c | 8 ++-- fio.h | 2 +- iolog.c | 76 +++++++++++++++++++++++--------- iolog.h | 2 + lib/tp.c | 119 -------------------------------------------------- lib/tp.h | 33 -------------- rate-submit.c | 6 ++- workqueue.h | 2 +- 8 files changed, 66 insertions(+), 182 deletions(-) delete mode 100644 lib/tp.c delete mode 100644 lib/tp.h diff --git a/backend.c b/backend.c index 01e0b3ac..5c35757c 100644 --- a/backend.c +++ b/backend.c @@ -54,7 +54,6 @@ #include "lib/getrusage.h" #include "idletime.h" #include "err.h" -#include "lib/tp.h" #include "workqueue.h" #include "lib/mountcheck.h" #include "rate-submit.h" @@ -1554,8 +1553,8 @@ static void *thread_main(void *data) goto err; } - if (td->flags & TD_F_COMPRESS_LOG) - tp_init(&td->tp_data); + if (iolog_compress_init(td)) + goto err; fio_verify_init(td); @@ -1661,8 +1660,7 @@ static void *thread_main(void *data) if (o->io_submit_mode == IO_MODE_OFFLOAD) workqueue_exit(&td->io_wq); - if (td->flags & TD_F_COMPRESS_LOG) - tp_exit(&td->tp_data); + iolog_compress_exit(td); if (o->exec_postrun) exec_string(o, o->exec_postrun, (const char *)"postrun"); diff --git a/fio.h b/fio.h index e4f778eb..6f85266c 100644 --- a/fio.h +++ b/fio.h @@ -129,7 +129,7 @@ struct thread_data { struct io_log *bw_log; struct io_log *iops_log; - struct tp_data *tp_data; + struct workqueue log_compress_wq; struct thread_data *parent; diff --git a/iolog.c b/iolog.c index d7c8a45d..865722ea 100644 --- a/iolog.c +++ b/iolog.c @@ -18,7 +18,6 @@ #include "verify.h" #include "trim.h" #include "filelock.h" -#include "lib/tp.h" static const char iolog_ver2[] = "fio version 2 iolog"; @@ -672,7 +671,12 @@ static void flush_samples(FILE *f, void *samples, uint64_t sample_size) #ifdef CONFIG_ZLIB struct iolog_flush_data { - struct tp_work work; + struct workqueue_work work; + pthread_mutex_t lock; + pthread_cond_t cv; + int wait; + volatile int done; + volatile int refs; struct io_log *log; void *samples; uint64_t nr_samples; @@ -971,7 +975,7 @@ void flush_log(struct io_log *log, int do_append) static int finish_log(struct thread_data *td, struct io_log *log, int trylock) { - if (td->tp_data) + if (td->flags & TD_F_COMPRESS_LOG) iolog_flush(log, 1); if (trylock) { @@ -997,7 +1001,7 @@ static int finish_log(struct thread_data *td, struct io_log *log, int trylock) * the specified memory limitation. Compresses the previously stored * entries. */ -static int gz_work(struct tp_work *work) +static int gz_work(struct submit_worker *sw, struct workqueue_work *work) { struct iolog_flush_data *data; struct iolog_compress *c; @@ -1078,12 +1082,18 @@ static int gz_work(struct tp_work *work) ret = 0; done: - if (work->wait) { - work->done = 1; - pthread_cond_signal(&work->cv); + if (data->wait) { + int refs; + + pthread_mutex_lock(&data->lock); + data->done = 1; + pthread_cond_signal(&data->cv); + refs = --data->refs; + pthread_mutex_unlock(&data->lock); + if (!refs) + free(data); } else free(data); - return ret; err: while (!flist_empty(&list)) { @@ -1095,6 +1105,27 @@ err: goto done; } +static struct workqueue_ops log_compress_wq_ops = { + .fn = gz_work, +}; + +int iolog_compress_init(struct thread_data *td) +{ + if (!(td->flags & TD_F_COMPRESS_LOG)) + return 0; + + workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1); + return 0; +} + +void iolog_compress_exit(struct thread_data *td) +{ + if (!(td->flags & TD_F_COMPRESS_LOG)) + return; + + workqueue_exit(&td->log_compress_wq); +} + /* * Queue work item to compress the existing log entries. We copy the * samples, and reset the log sample count to 0 (so the logging will @@ -1103,7 +1134,6 @@ err: */ int iolog_flush(struct io_log *log, int wait) { - struct tp_data *tdat = log->td->tp_data; struct iolog_flush_data *data; size_t sample_size; @@ -1122,25 +1152,29 @@ int iolog_flush(struct io_log *log, int wait) memcpy(data->samples, log->log, sample_size); data->nr_samples = log->nr_samples; - data->work.fn = gz_work; log->nr_samples = 0; if (wait) { - pthread_mutex_init(&data->work.lock, NULL); - pthread_cond_init(&data->work.cv, NULL); - data->work.wait = 1; + pthread_mutex_init(&data->lock, NULL); + pthread_cond_init(&data->cv, NULL); + data->done = 0; + data->wait = 1; + data->refs = 2; } else - data->work.wait = 0; + data->wait = 0; - data->work.prio = 1; - tp_queue_work(tdat, &data->work); + workqueue_enqueue(&log->td->log_compress_wq, &data->work); if (wait) { - pthread_mutex_lock(&data->work.lock); - while (!data->work.done) - pthread_cond_wait(&data->work.cv, &data->work.lock); - pthread_mutex_unlock(&data->work.lock); - free(data); + int refs; + + pthread_mutex_lock(&data->lock); + while (!data->done) + pthread_cond_wait(&data->cv, &data->lock); + refs = --data->refs; + pthread_mutex_unlock(&data->lock); + if (!refs) + free(data); } return 0; diff --git a/iolog.h b/iolog.h index eb5fdf36..6f027ca8 100644 --- a/iolog.h +++ b/iolog.h @@ -184,6 +184,8 @@ extern void trim_io_piece(struct thread_data *, const struct io_u *); extern void queue_io_piece(struct thread_data *, struct io_piece *); extern void prune_io_piece_log(struct thread_data *); extern void write_iolog_close(struct thread_data *); +extern int iolog_compress_init(struct thread_data *); +extern void iolog_compress_exit(struct thread_data *); #ifdef CONFIG_ZLIB extern int iolog_file_inflate(const char *); diff --git a/lib/tp.c b/lib/tp.c deleted file mode 100644 index 7462f5bf..00000000 --- a/lib/tp.c +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Basic workqueue like code, that sets up a thread and allows async - * processing of some sort. Could be extended to allow for multiple - * worker threads. But right now fio associates one of this per IO - * thread, so should be enough to have just a single thread doing the - * work. - */ -#include -#include -#include -#include -#include -#include -#include - -#include "../smalloc.h" -#include "../log.h" -#include "tp.h" - -static void tp_flush_work(struct flist_head *list) -{ - struct tp_work *work; - - while (!flist_empty(list)) { - int prio; - - work = flist_entry(list->next, struct tp_work, list); - flist_del(&work->list); - - prio = work->prio; - if (nice(prio) < 0) - log_err("fio: nice %s\n", strerror(errno)); - - work->fn(work); - - if (nice(prio) < 0) - log_err("fio: nice %s\n", strerror(errno)); - } -} - -static void *tp_thread(void *data) -{ - struct tp_data *tdat = data; - struct flist_head work_list; - - INIT_FLIST_HEAD(&work_list); - - while (1) { - pthread_mutex_lock(&tdat->lock); - - if (!tdat->thread_exit && flist_empty(&tdat->work)) - pthread_cond_wait(&tdat->cv, &tdat->lock); - - if (!flist_empty(&tdat->work)) - flist_splice_tail_init(&tdat->work, &work_list); - - pthread_mutex_unlock(&tdat->lock); - - if (flist_empty(&work_list)) { - if (tdat->thread_exit) - break; - continue; - } - - tp_flush_work(&work_list); - } - - return NULL; -} - -void tp_queue_work(struct tp_data *tdat, struct tp_work *work) -{ - work->done = 0; - - pthread_mutex_lock(&tdat->lock); - flist_add_tail(&work->list, &tdat->work); - pthread_mutex_unlock(&tdat->lock); - - pthread_cond_signal(&tdat->cv); -} - -void tp_init(struct tp_data **tdatp) -{ - struct tp_data *tdat; - int ret; - - if (*tdatp) - return; - - *tdatp = tdat = smalloc(sizeof(*tdat)); - pthread_mutex_init(&tdat->lock, NULL); - INIT_FLIST_HEAD(&tdat->work); - pthread_cond_init(&tdat->cv, NULL); - pthread_cond_init(&tdat->sleep_cv, NULL); - - ret = pthread_create(&tdat->thread, NULL, tp_thread, tdat); - if (ret) - log_err("fio: failed to create tp thread\n"); -} - -void tp_exit(struct tp_data **tdatp) -{ - struct tp_data *tdat = *tdatp; - void *ret; - - if (!tdat) - return; - - pthread_mutex_lock(&tdat->lock); - tdat->thread_exit = 1; - pthread_mutex_unlock(&tdat->lock); - - pthread_cond_signal(&tdat->cv); - - pthread_join(tdat->thread, &ret); - - sfree(tdat); - *tdatp = NULL; -} diff --git a/lib/tp.h b/lib/tp.h deleted file mode 100644 index 9147cc2c..00000000 --- a/lib/tp.h +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef FIO_TP_H -#define FIO_TP_H - -#include "../flist.h" - -struct tp_work; -typedef int (tp_work_fn)(struct tp_work *); - -struct tp_work { - struct flist_head list; - tp_work_fn *fn; - int wait; - int prio; - pthread_cond_t cv; - pthread_mutex_t lock; - volatile int done; -}; - -struct tp_data { - pthread_t thread; - pthread_cond_t cv; - pthread_mutex_t lock; - struct flist_head work; - volatile int thread_exit; - pthread_cond_t sleep_cv; - volatile int sleeping; -}; - -extern void tp_init(struct tp_data **); -extern void tp_exit(struct tp_data **); -extern void tp_queue_work(struct tp_data *, struct tp_work *); - -#endif diff --git a/rate-submit.c b/rate-submit.c index a68a3dd3..a1d55916 100644 --- a/rate-submit.c +++ b/rate-submit.c @@ -2,8 +2,8 @@ #include "ioengine.h" #include "lib/getrusage.h" -static void io_workqueue_fn(struct submit_worker *sw, - struct workqueue_work *work) +static int io_workqueue_fn(struct submit_worker *sw, + struct workqueue_work *work) { struct io_u *io_u = container_of(work, struct io_u, work); const enum fio_ddir ddir = io_u->ddir; @@ -48,6 +48,8 @@ static void io_workqueue_fn(struct submit_worker *sw, if (ret > 0) td->cur_depth -= ret; } + + return 0; } static bool io_workqueue_pre_sleep_flush_fn(struct submit_worker *sw) diff --git a/workqueue.h b/workqueue.h index f732b308..69a85128 100644 --- a/workqueue.h +++ b/workqueue.h @@ -19,7 +19,7 @@ struct submit_worker { void *private; }; -typedef void (workqueue_work_fn)(struct submit_worker *, struct workqueue_work *); +typedef int (workqueue_work_fn)(struct submit_worker *, struct workqueue_work *); typedef bool (workqueue_pre_sleep_flush_fn)(struct submit_worker *); typedef void (workqueue_pre_sleep_fn)(struct submit_worker *); typedef int (workqueue_alloc_worker_fn)(struct submit_worker *); -- 2.25.1