From a9da8ab2169810667aeb26f857a8ac3c056e4d61 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Thu, 19 Mar 2015 22:50:06 -0600 Subject: [PATCH] First cut at supporting IO offload rate_iops=x io_submit_mode=offload Signed-off-by: Jens Axboe --- Makefile | 2 +- backend.c | 72 ++++++-- cconv.c | 2 + fio.h | 49 +++++- init.c | 3 + io_u.c | 86 +++++---- ioengine.h | 11 ++ ioengines.c | 16 +- options.c | 20 +++ stat.c | 6 + thread_options.h | 3 +- verify.c | 8 +- workqueue.c | 444 +++++++++++++++++++++++++++++++++++++++++++++++ workqueue.h | 29 ++++ 14 files changed, 685 insertions(+), 66 deletions(-) create mode 100644 workqueue.c create mode 100644 workqueue.h diff --git a/Makefile b/Makefile index 4202ed80..9b7f27ab 100644 --- a/Makefile +++ b/Makefile @@ -36,7 +36,7 @@ SOURCE := gettime.c ioengines.c init.c stat.c log.c time.c filesetup.c \ lib/lfsr.c gettime-thread.c helpers.c lib/flist_sort.c \ lib/hweight.c lib/getrusage.c idletime.c td_error.c \ profiles/tiobench.c profiles/act.c io_u_queue.c filelock.c \ - lib/tp.c lib/bloom.c lib/gauss.c + lib/tp.c lib/bloom.c lib/gauss.c workqueue.c ifdef CONFIG_LIBHDFS HDFSFLAGS= -I $(JAVA_HOME)/include -I $(JAVA_HOME)/include/linux -I $(FIO_LIBHDFS_INCLUDE) diff --git a/backend.c b/backend.c index f82b83f7..65a3e184 100644 --- a/backend.c +++ b/backend.c @@ -54,6 +54,7 @@ #include "idletime.h" #include "err.h" #include "lib/tp.h" +#include "workqueue.h" static pthread_t helper_thread; static pthread_mutex_t helper_lock; @@ -623,7 +624,7 @@ static void do_verify(struct thread_data *td, uint64_t verify_bytes) continue; } else if (io_u->ddir == DDIR_TRIM) { io_u->ddir = DDIR_READ; - io_u->flags |= IO_U_F_TRIMMED; + io_u_set(io_u, IO_U_F_TRIMMED); break; } else if (io_u->ddir == DDIR_WRITE) { io_u->ddir = DDIR_READ; @@ -868,21 +869,27 @@ static uint64_t do_io(struct thread_data *td) !td->o.experimental_verify) log_io_piece(td, io_u); - ret = td_io_queue(td, io_u); + if (td->o.io_submit_mode == IO_MODE_OFFLOAD) { + if (td->error) + break; + ret = workqueue_enqueue(&td->io_wq, io_u); + } else { + ret = td_io_queue(td, io_u); - if (io_queue_event(td, io_u, &ret, ddir, &bytes_issued, 1, &comp_time)) - break; + if (io_queue_event(td, io_u, &ret, ddir, &bytes_issued, 1, &comp_time)) + break; - /* - * See if we need to complete some commands. Note that we - * can get BUSY even without IO queued, if the system is - * resource starved. - */ + /* + * See if we need to complete some commands. Note that + * we can get BUSY even without IO queued, if the + * system is resource starved. + */ reap: - full = queue_full(td) || - (ret == FIO_Q_BUSY && td->cur_depth); - if (full || !td->o.iodepth_batch_complete) - ret = wait_for_completions(td, &comp_time); + full = queue_full(td) || + (ret == FIO_Q_BUSY && td->cur_depth); + if (full || !td->o.iodepth_batch_complete) + ret = wait_for_completions(td, &comp_time); + } if (ret < 0) break; if (!ddir_rw_sum(td->bytes_done) && @@ -931,7 +938,12 @@ reap: if (!td->error) { struct fio_file *f; - i = td->cur_depth; + if (td->o.io_submit_mode == IO_MODE_OFFLOAD) { + workqueue_flush(&td->io_wq); + i = 0; + } else + i = td->cur_depth; + if (i) { ret = io_u_queued_complete(td, i); if (td->o.fill_device && td->error == ENOSPC) @@ -1242,7 +1254,7 @@ static uint64_t do_dry_run(struct thread_data *td) if (!io_u) break; - io_u->flags |= IO_U_F_FLIGHT; + io_u_set(io_u, IO_U_F_FLIGHT); io_u->error = 0; io_u->resid = 0; if (ddir_rw(acct_ddir(io_u))) @@ -1265,6 +1277,29 @@ static uint64_t do_dry_run(struct thread_data *td) return td->bytes_done[DDIR_WRITE] + td->bytes_done[DDIR_TRIM]; } +static void io_workqueue_fn(struct thread_data *td, struct io_u *io_u) +{ + const enum fio_ddir ddir = io_u->ddir; + int ret; + + dprint(FD_RATE, "io_u %p queued by %u\n", io_u, gettid()); + + io_u_set(io_u, IO_U_F_NO_FILE_PUT); + + td->cur_depth++; + + ret = td_io_queue(td, io_u); + + dprint(FD_RATE, "io_u %p ret %d by %u\n", io_u, ret, gettid()); + + io_queue_event(td, io_u, &ret, ddir, NULL, 0, NULL); + + if (ret == FIO_Q_QUEUED) + ret = io_u_queued_complete(td, 1); + + td->cur_depth--; +} + /* * Entry point for the thread based jobs. The process based jobs end up * here as well, after a little setup. @@ -1462,6 +1497,10 @@ static void *thread_main(void *data) fio_verify_init(td); + if ((o->io_submit_mode == IO_MODE_OFFLOAD) && + workqueue_init(td, &td->io_wq, io_workqueue_fn, td->o.iodepth)) + goto err; + fio_gettime(&td->epoch, NULL); fio_getrusage(&td->ru_start); clear_state = 0; @@ -1570,6 +1609,9 @@ static void *thread_main(void *data) fio_writeout_logs(td); + if (o->io_submit_mode == IO_MODE_OFFLOAD) + workqueue_exit(&td->io_wq); + if (td->flags & TD_F_COMPRESS_LOG) tp_exit(&td->tp_data); diff --git a/cconv.c b/cconv.c index 68f119f1..976059cd 100644 --- a/cconv.c +++ b/cconv.c @@ -116,6 +116,7 @@ void convert_thread_options_to_cpu(struct thread_options *o, } o->ratecycle = le32_to_cpu(top->ratecycle); + o->io_submit_mode = le32_to_cpu(top->io_submit_mode); o->nr_files = le32_to_cpu(top->nr_files); o->open_files = le32_to_cpu(top->open_files); o->file_lock_mode = le32_to_cpu(top->file_lock_mode); @@ -295,6 +296,7 @@ void convert_thread_options_to_net(struct thread_options_pack *top, top->fill_device = cpu_to_le32(o->fill_device); top->file_append = cpu_to_le32(o->file_append); top->ratecycle = cpu_to_le32(o->ratecycle); + top->io_submit_mode = cpu_to_le32(o->io_submit_mode); top->nr_files = cpu_to_le32(o->nr_files); top->open_files = cpu_to_le32(o->open_files); top->file_lock_mode = cpu_to_le32(o->file_lock_mode); diff --git a/fio.h b/fio.h index 44648235..a4637bb9 100644 --- a/fio.h +++ b/fio.h @@ -40,6 +40,7 @@ #include "stat.h" #include "flow.h" #include "io_u_queue.h" +#include "workqueue.h" #ifdef CONFIG_SOLARISAIO #include @@ -75,6 +76,8 @@ enum { TD_F_NOIO = 256, TD_F_COMPRESS_LOG = 512, TD_F_VSTATE_SAVED = 1024, + TD_F_NEED_LOCK = 2048, + TD_F_CHILD = 4096, }; enum { @@ -94,6 +97,11 @@ enum { FIO_RAND_NR_OFFS, }; +enum { + IO_MODE_INLINE = 0, + IO_MODE_OFFLOAD, +}; + /* * This describes a single thread/process executing a fio job. */ @@ -118,6 +126,8 @@ struct thread_data { struct tp_data *tp_data; + struct thread_data *parent; + uint64_t stat_io_bytes[DDIR_RWDIR_CNT]; struct timeval bw_sample_time; @@ -232,6 +242,11 @@ struct thread_data { unsigned long rate_blocks[DDIR_RWDIR_CNT]; struct timeval lastrate[DDIR_RWDIR_CNT]; + /* + * Enforced rate submission/completion workqueue + */ + struct workqueue io_wq; + uint64_t total_io_size; uint64_t fill_device_size; @@ -365,12 +380,23 @@ enum { } while (0) -#define td_clear_error(td) \ - (td)->error = 0; -#define td_verror(td, err, func) \ - __td_verror((td), (err), strerror((err)), (func)) -#define td_vmsg(td, err, msg, func) \ - __td_verror((td), (err), (msg), (func)) +#define td_clear_error(td) do { \ + (td)->error = 0; \ + if ((td)->parent) \ + (td)->parent->error = 0; \ +} while (0) + +#define td_verror(td, err, func) do { \ + __td_verror((td), (err), strerror((err)), (func)); \ + if ((td)->parent) \ + __td_verror((td)->parent, (err), strerror((err)), (func)); \ +} while (0) + +#define td_vmsg(td, err, msg, func) do { \ + __td_verror((td), (err), (msg), (func)); \ + if ((td)->parent) \ + __td_verror((td)->parent, (err), (msg), (func)); \ +} while (0) #define __fio_stringify_1(x) #x #define __fio_stringify(x) __fio_stringify_1(x) @@ -610,25 +636,30 @@ static inline int is_power_of_2(uint64_t val) return (val != 0 && ((val & (val - 1)) == 0)); } +static inline int td_async_processing(struct thread_data *td) +{ + return (td->flags & TD_F_NEED_LOCK) != 0; +} + /* * We currently only need to do locking if we have verifier threads * accessing our internal structures too */ static inline void td_io_u_lock(struct thread_data *td) { - if (td->o.verify_async) + if (td_async_processing(td)) pthread_mutex_lock(&td->io_u_lock); } static inline void td_io_u_unlock(struct thread_data *td) { - if (td->o.verify_async) + if (td_async_processing(td)) pthread_mutex_unlock(&td->io_u_lock); } static inline void td_io_u_free_notify(struct thread_data *td) { - if (td->o.verify_async) + if (td_async_processing(td)) pthread_cond_signal(&td->free_cond); } diff --git a/init.c b/init.c index a126f79f..1a5d4c9e 100644 --- a/init.c +++ b/init.c @@ -956,6 +956,9 @@ static void init_flags(struct thread_data *td) td->flags |= TD_F_SCRAMBLE_BUFFERS; if (o->verify != VERIFY_NONE) td->flags |= TD_F_VER_NONE; + + if (o->verify_async || o->io_submit_mode == IO_MODE_OFFLOAD) + td->flags |= TD_F_NEED_LOCK; } static int setup_random_seeds(struct thread_data *td) diff --git a/io_u.c b/io_u.c index e4fcfd83..ba3f7ca0 100644 --- a/io_u.c +++ b/io_u.c @@ -326,7 +326,7 @@ static int get_next_block(struct thread_data *td, struct io_u *io_u, *is_random = 1; } else { *is_random = 0; - io_u->flags |= IO_U_F_BUSY_OK; + io_u_set(io_u, IO_U_F_BUSY_OK); ret = get_next_seq_offset(td, f, ddir, &offset); if (ret) ret = get_next_rand_block(td, f, ddir, &b); @@ -336,7 +336,7 @@ static int get_next_block(struct thread_data *td, struct io_u *io_u, ret = get_next_seq_offset(td, f, ddir, &offset); } } else { - io_u->flags |= IO_U_F_BUSY_OK; + io_u_set(io_u, IO_U_F_BUSY_OK); *is_random = 0; if (td->o.rw_seq == RW_SEQ_SEQ) { @@ -591,7 +591,8 @@ static enum fio_ddir rate_ddir(struct thread_data *td, enum fio_ddir ddir) } else usec = td->rate_pending_usleep[ddir]; - io_u_quiesce(td); + if (td->o.io_submit_mode == IO_MODE_INLINE) + io_u_quiesce(td); usec = usec_sleep(td, usec); @@ -684,7 +685,7 @@ static void set_rw_ddir(struct thread_data *td, struct io_u *io_u) td->o.barrier_blocks && !(td->io_issues[DDIR_WRITE] % td->o.barrier_blocks) && td->io_issues[DDIR_WRITE]) - io_u->flags |= IO_U_F_BARRIER; + io_u_set(io_u, IO_U_F_BARRIER); } void put_file_log(struct thread_data *td, struct fio_file *f) @@ -697,16 +698,21 @@ void put_file_log(struct thread_data *td, struct fio_file *f) void put_io_u(struct thread_data *td, struct io_u *io_u) { + if (td->parent) + td = td->parent; + td_io_u_lock(td); if (io_u->file && !(io_u->flags & IO_U_F_NO_FILE_PUT)) put_file_log(td, io_u->file); io_u->file = NULL; - io_u->flags |= IO_U_F_FREE; + io_u_set(io_u, IO_U_F_FREE); - if (io_u->flags & IO_U_F_IN_CUR_DEPTH) + if (io_u->flags & IO_U_F_IN_CUR_DEPTH) { td->cur_depth--; + assert(!(td->flags & TD_F_CHILD)); + } io_u_qpush(&td->io_u_freelist, io_u); td_io_u_unlock(td); td_io_u_free_notify(td); @@ -714,7 +720,7 @@ void put_io_u(struct thread_data *td, struct io_u *io_u) void clear_io_u(struct thread_data *td, struct io_u *io_u) { - io_u->flags &= ~IO_U_F_FLIGHT; + io_u_clear(io_u, IO_U_F_FLIGHT); put_io_u(td, io_u); } @@ -725,18 +731,24 @@ void requeue_io_u(struct thread_data *td, struct io_u **io_u) dprint(FD_IO, "requeue %p\n", __io_u); + if (td->parent) + td = td->parent; + td_io_u_lock(td); - __io_u->flags |= IO_U_F_FREE; + io_u_set(__io_u, IO_U_F_FREE); if ((__io_u->flags & IO_U_F_FLIGHT) && ddir_rw(ddir)) td->io_issues[ddir]--; - __io_u->flags &= ~IO_U_F_FLIGHT; - if (__io_u->flags & IO_U_F_IN_CUR_DEPTH) + io_u_clear(__io_u, IO_U_F_FLIGHT); + if (__io_u->flags & IO_U_F_IN_CUR_DEPTH) { td->cur_depth--; + assert(!(td->flags & TD_F_CHILD)); + } io_u_rpush(&td->io_u_requeues, __io_u); td_io_u_unlock(td); + td_io_u_free_notify(td); *io_u = NULL; } @@ -1329,21 +1341,23 @@ again: if (io_u) { assert(io_u->flags & IO_U_F_FREE); - io_u->flags &= ~(IO_U_F_FREE | IO_U_F_NO_FILE_PUT | + io_u_clear(io_u, IO_U_F_FREE | IO_U_F_NO_FILE_PUT | IO_U_F_TRIMMED | IO_U_F_BARRIER | IO_U_F_VER_LIST); io_u->error = 0; io_u->acct_ddir = -1; td->cur_depth++; - io_u->flags |= IO_U_F_IN_CUR_DEPTH; + assert(!(td->flags & TD_F_CHILD)); + io_u_set(io_u, IO_U_F_IN_CUR_DEPTH); io_u->ipo = NULL; - } else if (td->o.verify_async) { + } else if (td_async_processing(td)) { /* * We ran out, wait for async verify threads to finish and * return one */ - pthread_cond_wait(&td->free_cond, &td->io_u_lock); + assert(!(td->flags & TD_F_CHILD)); + assert(!pthread_cond_wait(&td->free_cond, &td->io_u_lock)); goto again; } @@ -1542,7 +1556,7 @@ err_put: return ERR_PTR(ret); } -void io_u_log_error(struct thread_data *td, struct io_u *io_u) +static void __io_u_log_error(struct thread_data *td, struct io_u *io_u) { enum error_type_bit eb = td_error_type(io_u->ddir, io_u->error); @@ -1560,6 +1574,13 @@ void io_u_log_error(struct thread_data *td, struct io_u *io_u) td_verror(td, io_u->error, "io_u error"); } +void io_u_log_error(struct thread_data *td, struct io_u *io_u) +{ + __io_u_log_error(td, io_u); + if (td->parent) + __io_u_log_error(td, io_u); +} + static inline int gtod_reduce(struct thread_data *td) { return td->o.disable_clat && td->o.disable_lat && td->o.disable_slat @@ -1570,9 +1591,10 @@ static void account_io_completion(struct thread_data *td, struct io_u *io_u, struct io_completion_data *icd, const enum fio_ddir idx, unsigned int bytes) { + const int no_reduce = !gtod_reduce(td); unsigned long lusec = 0; - if (!gtod_reduce(td)) + if (no_reduce) lusec = utime_since(&io_u->issue_time, &icd->time); if (!td->o.disable_lat) { @@ -1601,10 +1623,13 @@ static void account_io_completion(struct thread_data *td, struct io_u *io_u, io_u_mark_latency(td, lusec); } + if (td->parent) + td = td->parent; + if (!td->o.disable_bw) add_bw_sample(td, idx, bytes, &icd->time); - if (!gtod_reduce(td)) + if (no_reduce) add_iops_sample(td, idx, bytes, &icd->time); if (td->ts.nr_block_infos && io_u->ddir == DDIR_TRIM) { @@ -1625,6 +1650,7 @@ static long long usec_for_io(struct thread_data *td, enum fio_ddir ddir) { uint64_t secs, remainder, bps, bytes; + assert(!(td->flags & TD_F_CHILD)); bytes = td->this_io_bytes[ddir]; bps = td->rate_bps[ddir]; secs = bytes / bps; @@ -1641,9 +1667,8 @@ static void io_completed(struct thread_data *td, struct io_u **io_u_ptr, dprint_io_u(io_u, "io complete"); - td_io_u_lock(td); assert(io_u->flags & IO_U_F_FLIGHT); - io_u->flags &= ~(IO_U_F_FLIGHT | IO_U_F_BUSY_OK); + io_u_clear(io_u, IO_U_F_FLIGHT | IO_U_F_BUSY_OK); /* * Mark IO ok to verify @@ -1660,8 +1685,6 @@ static void io_completed(struct thread_data *td, struct io_u **io_u_ptr, } } - td_io_u_unlock(td); - if (ddir_sync(ddir)) { td->last_was_sync = 1; if (f) { @@ -1706,18 +1729,23 @@ static void io_completed(struct thread_data *td, struct io_u **io_u_ptr, if (ramp_time_over(td) && (td->runstate == TD_RUNNING || td->runstate == TD_VERIFYING)) { + struct thread_data *__td = td; + account_io_completion(td, io_u, icd, ddir, bytes); - if (__should_check_rate(td, ddir)) { - td->rate_pending_usleep[ddir] = - (usec_for_io(td, ddir) - - utime_since_now(&td->start)); + if (td->parent) + __td = td->parent; + + if (__should_check_rate(__td, ddir)) { + __td->rate_pending_usleep[ddir] = + (usec_for_io(__td, ddir) - + utime_since_now(&__td->start)); } if (ddir != DDIR_TRIM && - __should_check_rate(td, oddir)) { - td->rate_pending_usleep[oddir] = - (usec_for_io(td, oddir) - - utime_since_now(&td->start)); + __should_check_rate(__td, oddir)) { + __td->rate_pending_usleep[oddir] = + (usec_for_io(__td, oddir) - + utime_since_now(&__td->start)); } } diff --git a/ioengine.h b/ioengine.h index a55290d4..3d499936 100644 --- a/ioengine.h +++ b/ioengine.h @@ -119,6 +119,7 @@ struct io_u { struct ibv_mr *mr; #endif void *mmap_data; + uint64_t null; }; }; @@ -251,4 +252,14 @@ static inline enum fio_ddir acct_ddir(struct io_u *io_u) return io_u->ddir; } +static inline void io_u_clear(struct io_u *io_u, unsigned int flags) +{ + __sync_fetch_and_and(&io_u->flags, ~flags); +} + +static inline void io_u_set(struct io_u *io_u, unsigned int flags) +{ + __sync_fetch_and_or(&io_u->flags, flags); +} + #endif diff --git a/ioengines.c b/ioengines.c index b42e2c48..b724e0e9 100644 --- a/ioengines.c +++ b/ioengines.c @@ -264,13 +264,15 @@ out: int td_io_queue(struct thread_data *td, struct io_u *io_u) { + const enum fio_ddir ddir = acct_ddir(io_u); + unsigned long buflen = io_u->xfer_buflen; int ret; dprint_io_u(io_u, "queue"); fio_ro_check(td, io_u); assert((io_u->flags & IO_U_F_FLIGHT) == 0); - io_u->flags |= IO_U_F_FLIGHT; + io_u_set(io_u, IO_U_F_FLIGHT); assert(fio_file_open(io_u->file)); @@ -294,18 +296,18 @@ int td_io_queue(struct thread_data *td, struct io_u *io_u) sizeof(struct timeval)); } - if (ddir_rw(acct_ddir(io_u))) { - td->io_issues[acct_ddir(io_u)]++; - td->io_issue_bytes[acct_ddir(io_u)] += io_u->xfer_buflen; + if (ddir_rw(ddir)) { + td->io_issues[ddir]++; + td->io_issue_bytes[ddir] += buflen; } ret = td->io_ops->queue(td, io_u); unlock_file(td, io_u->file); - if (ret == FIO_Q_BUSY && ddir_rw(acct_ddir(io_u))) { - td->io_issues[acct_ddir(io_u)]--; - td->io_issue_bytes[acct_ddir(io_u)] -= io_u->xfer_buflen; + if (ret == FIO_Q_BUSY && ddir_rw(ddir)) { + td->io_issues[ddir]--; + td->io_issue_bytes[ddir] -= buflen; } /* diff --git a/options.c b/options.c index 017920e1..b34e2c72 100644 --- a/options.c +++ b/options.c @@ -1622,6 +1622,26 @@ struct fio_option fio_options[FIO_MAX_OPTS] = { .category = FIO_OPT_C_IO, .group = FIO_OPT_G_IO_BASIC, }, + { + .name = "io_submit_mode", + .lname = "IO submit mode", + .type = FIO_OPT_STR, + .off1 = td_var_offset(io_submit_mode), + .help = "How IO submissions and completions are done", + .def = "inline", + .category = FIO_OPT_C_IO, + .group = FIO_OPT_G_IO_BASIC, + .posval = { + { .ival = "inline", + .oval = IO_MODE_INLINE, + .help = "Submit and complete IO inline", + }, + { .ival = "offload", + .oval = IO_MODE_OFFLOAD, + .help = "Offload submit and complete to threads", + }, + }, + }, { .name = "size", .lname = "Size", diff --git a/stat.c b/stat.c index 34e3792f..533981fc 100644 --- a/stat.c +++ b/stat.c @@ -2009,6 +2009,8 @@ void add_bw_sample(struct thread_data *td, enum fio_ddir ddir, unsigned int bs, if (spent < td->o.bw_avg_time) return; + td_io_u_lock(td); + /* * Compute both read and write rates for the interval. */ @@ -2033,6 +2035,7 @@ void add_bw_sample(struct thread_data *td, enum fio_ddir ddir, unsigned int bs, } fio_gettime(&td->bw_sample_time, NULL); + td_io_u_unlock(td); } void add_iops_sample(struct thread_data *td, enum fio_ddir ddir, unsigned int bs, @@ -2048,6 +2051,8 @@ void add_iops_sample(struct thread_data *td, enum fio_ddir ddir, unsigned int bs if (spent < td->o.iops_avg_time) return; + td_io_u_lock(td); + /* * Compute both read and write rates for the interval. */ @@ -2072,6 +2077,7 @@ void add_iops_sample(struct thread_data *td, enum fio_ddir ddir, unsigned int bs } fio_gettime(&td->iops_sample_time, NULL); + td_io_u_unlock(td); } void stat_init(void) diff --git a/thread_options.h b/thread_options.h index 026b85b4..aa7f3f26 100644 --- a/thread_options.h +++ b/thread_options.h @@ -223,6 +223,7 @@ struct thread_options { unsigned int rate[DDIR_RWDIR_CNT]; unsigned int ratemin[DDIR_RWDIR_CNT]; unsigned int ratecycle; + unsigned int io_submit_mode; unsigned int rate_iops[DDIR_RWDIR_CNT]; unsigned int rate_iops_min[DDIR_RWDIR_CNT]; @@ -452,6 +453,7 @@ struct thread_options_pack { uint32_t rate[DDIR_RWDIR_CNT]; uint32_t ratemin[DDIR_RWDIR_CNT]; uint32_t ratecycle; + uint32_t io_submit_mode; uint32_t rate_iops[DDIR_RWDIR_CNT]; uint32_t rate_iops_min[DDIR_RWDIR_CNT]; @@ -489,7 +491,6 @@ struct thread_options_pack { uint64_t latency_target; uint64_t latency_window; - uint32_t pad3; fio_fp64_t latency_percentile; uint32_t block_error_hist; diff --git a/verify.c b/verify.c index b6793d7d..aa178e95 100644 --- a/verify.c +++ b/verify.c @@ -656,7 +656,7 @@ int verify_io_u_async(struct thread_data *td, struct io_u **io_u_ptr) if (io_u->flags & IO_U_F_IN_CUR_DEPTH) { td->cur_depth--; - io_u->flags &= ~IO_U_F_IN_CUR_DEPTH; + io_u_clear(io_u, IO_U_F_IN_CUR_DEPTH); } flist_add_tail(&io_u->verify_list, &td->verify_list); *io_u_ptr = NULL; @@ -1105,10 +1105,10 @@ int get_next_verify(struct thread_data *td, struct io_u *io_u) io_u->buflen = ipo->len; io_u->numberio = ipo->numberio; io_u->file = ipo->file; - io_u->flags |= IO_U_F_VER_LIST; + io_u_set(io_u, IO_U_F_VER_LIST); if (ipo->flags & IP_F_TRIMMED) - io_u->flags |= IO_U_F_TRIMMED; + io_u_set(io_u, IO_U_F_TRIMMED); if (!fio_file_open(io_u->file)) { int r = td_io_open_file(td, io_u->file); @@ -1192,7 +1192,7 @@ static void *verify_async_thread(void *data) io_u = flist_first_entry(&list, struct io_u, verify_list); flist_del_init(&io_u->verify_list); - io_u->flags |= IO_U_F_NO_FILE_PUT; + io_u_set(io_u, IO_U_F_NO_FILE_PUT); ret = verify_io_u(td, &io_u); put_io_u(td, io_u); diff --git a/workqueue.c b/workqueue.c new file mode 100644 index 00000000..92088bab --- /dev/null +++ b/workqueue.c @@ -0,0 +1,444 @@ +/* + * Rated submission helpers + * + * Copyright (C) 2015 Jens Axboe + * + */ +#include + +#include "fio.h" +#include "ioengine.h" +#include "flist.h" +#include "workqueue.h" +#include "lib/getrusage.h" + +struct submit_worker { + pthread_t thread; + pthread_mutex_t lock; + pthread_cond_t cond; + struct flist_head work_list; + unsigned int flags; + unsigned int index; + uint64_t seq; + struct workqueue *wq; + struct thread_data td; +}; + +enum { + SW_F_IDLE = 1 << 0, + SW_F_RUNNING = 1 << 1, + SW_F_EXIT = 1 << 2, + SW_F_EXITED = 1 << 3, + SW_F_ACCOUNTED = 1 << 4, + SW_F_ERROR = 1 << 5, +}; + +static struct submit_worker *__get_submit_worker(struct workqueue *wq, + unsigned int start, + unsigned int end, + struct submit_worker **best) +{ + struct submit_worker *sw = NULL; + + while (start <= end) { + sw = &wq->workers[start]; + if (sw->flags & SW_F_IDLE) + return sw; + if (!(*best) || sw->seq < (*best)->seq) + *best = sw; + start++; + } + + return NULL; +} + +static struct submit_worker *get_submit_worker(struct workqueue *wq) +{ + unsigned int next = wq->next_free_worker; + struct submit_worker *sw, *best = NULL; + + assert(next < wq->max_workers); + + sw = __get_submit_worker(wq, next, wq->max_workers - 1, &best); + if (!sw && next) + sw = __get_submit_worker(wq, 0, next - 1, &best); + + /* + * No truly idle found, use best match + */ + if (!sw) + sw = best; + + if (sw->index == wq->next_free_worker) { + if (sw->index + 1 < wq->max_workers) + wq->next_free_worker = sw->index + 1; + else + wq->next_free_worker = 0; + } + + return sw; +} + +static int all_sw_idle(struct workqueue *wq) +{ + int i; + + for (i = 0; i < wq->max_workers; i++) { + struct submit_worker *sw = &wq->workers[i]; + + if (!(sw->flags & SW_F_IDLE)) + return 0; + } + + return 1; +} + +/* + * Must be serialized wrt workqueue_enqueue() by caller + */ +void workqueue_flush(struct workqueue *wq) +{ + wq->wake_idle = 1; + + while (!all_sw_idle(wq)) { + pthread_mutex_lock(&wq->flush_lock); + pthread_cond_wait(&wq->flush_cond, &wq->flush_lock); + pthread_mutex_unlock(&wq->flush_lock); + } + + wq->wake_idle = 0; +} + +/* + * Must be serialized by caller. + */ +int workqueue_enqueue(struct workqueue *wq, struct io_u *io_u) +{ + struct submit_worker *sw; + + sw = get_submit_worker(wq); + if (sw) { + const enum fio_ddir ddir = acct_ddir(io_u); + struct thread_data *parent = wq->td; + + if (ddir_rw(ddir)) { + parent->io_issues[ddir]++; + parent->io_issue_bytes[ddir] += io_u->xfer_buflen; + } + + pthread_mutex_lock(&sw->lock); + flist_add_tail(&io_u->verify_list, &sw->work_list); + sw->seq = ++wq->work_seq; + sw->flags &= ~SW_F_IDLE; + pthread_mutex_unlock(&sw->lock); + + pthread_cond_signal(&sw->cond); + return FIO_Q_QUEUED; + } + + return FIO_Q_BUSY; +} + +static void handle_list(struct submit_worker *sw, struct flist_head *list) +{ + struct workqueue *wq = sw->wq; + struct io_u *io_u; + + while (!flist_empty(list)) { + io_u = flist_first_entry(list, struct io_u, verify_list); + flist_del_init(&io_u->verify_list); + wq->fn(&sw->td, io_u); + } +} + +static int init_submit_worker(struct submit_worker *sw) +{ + struct thread_data *parent = sw->wq->td; + struct thread_data *td = &sw->td; + int fio_unused ret; + + memcpy(&td->o, &parent->o, sizeof(td->o)); + memcpy(&td->ts, &parent->ts, sizeof(td->ts)); + td->o.uid = td->o.gid = -1U; + dup_files(td, parent); + fio_options_mem_dupe(td); + + if (ioengine_load(td)) + goto err; + + if (td->o.odirect) + td->io_ops->flags |= FIO_RAWIO; + + td->pid = gettid(); + + INIT_FLIST_HEAD(&td->io_log_list); + INIT_FLIST_HEAD(&td->io_hist_list); + INIT_FLIST_HEAD(&td->verify_list); + INIT_FLIST_HEAD(&td->trim_list); + INIT_FLIST_HEAD(&td->next_rand_list); + td->io_hist_tree = RB_ROOT; + + td->o.iodepth = 1; + if (td_io_init(td)) + goto err_io_init; + + fio_gettime(&td->epoch, NULL); + fio_getrusage(&td->ru_start); + clear_io_state(td); + + td_set_runstate(td, TD_RUNNING); + td->flags |= TD_F_CHILD; + td->parent = parent; + return 0; + +err_io_init: + close_ioengine(td); +err: + return 1; +} + +static void sum_val(uint64_t *dst, uint64_t *src) +{ + if (*src) { + __sync_fetch_and_add(dst, *src); + *src = 0; + } +} + +static void sum_ddir(struct thread_data *dst, struct thread_data *src, + enum fio_ddir ddir) +{ + sum_val(&dst->io_bytes[ddir], &src->io_bytes[ddir]); + sum_val(&dst->io_blocks[ddir], &src->io_blocks[ddir]); + sum_val(&dst->this_io_blocks[ddir], &src->this_io_blocks[ddir]); + sum_val(&dst->this_io_bytes[ddir], &src->this_io_bytes[ddir]); + sum_val(&dst->bytes_done[ddir], &src->bytes_done[ddir]); +} + +static void update_accounting(struct submit_worker *sw) +{ + struct thread_data *src = &sw->td; + struct thread_data *dst = sw->wq->td; + + if (td_read(src)) + sum_ddir(dst, src, DDIR_READ); + if (td_write(src)) + sum_ddir(dst, src, DDIR_WRITE); + if (td_trim(src)) + sum_ddir(dst, src, DDIR_TRIM); +} + +static void *worker_thread(void *data) +{ + struct submit_worker *sw = data; + struct workqueue *wq = sw->wq; + struct thread_data *td = &sw->td; + unsigned int eflags = 0, ret; + FLIST_HEAD(local_list); + + ret = init_submit_worker(sw); + pthread_mutex_lock(&sw->lock); + sw->flags |= SW_F_RUNNING; + if (ret) + sw->flags |= SW_F_ERROR; + pthread_mutex_unlock(&sw->lock); + + pthread_mutex_lock(&wq->flush_lock); + pthread_cond_signal(&wq->flush_cond); + pthread_mutex_unlock(&wq->flush_lock); + + if (sw->flags & SW_F_ERROR) + goto done; + + while (1) { + pthread_mutex_lock(&sw->lock); + + if (flist_empty(&sw->work_list)) { + if (sw->flags & SW_F_EXIT) { + pthread_mutex_unlock(&sw->lock); + break; + } + + if (td->io_u_queued || td->cur_depth || + td->io_u_in_flight) { + pthread_mutex_unlock(&sw->lock); + io_u_quiesce(td); + pthread_mutex_lock(&sw->lock); + } + + /* + * We dropped and reaquired the lock, check + * state again. + */ + if (!flist_empty(&sw->work_list)) + goto handle_work; + + if (sw->flags & SW_F_EXIT) { + pthread_mutex_unlock(&sw->lock); + break; + } else if (!(sw->flags & SW_F_IDLE)) { + sw->flags |= SW_F_IDLE; + wq->next_free_worker = sw->index; + if (wq->wake_idle) + pthread_cond_signal(&wq->flush_cond); + } + update_accounting(sw); + pthread_cond_wait(&sw->cond, &sw->lock); + } else { +handle_work: + flist_splice_init(&sw->work_list, &local_list); + } + pthread_mutex_unlock(&sw->lock); + handle_list(sw, &local_list); + } + + update_accounting(sw); + +done: + pthread_mutex_lock(&sw->lock); + sw->flags |= (SW_F_EXITED | eflags); + pthread_mutex_unlock(&sw->lock); + return NULL; +} + +static void free_worker(struct submit_worker *sw) +{ + struct thread_data *td = &sw->td; + + fio_options_free(td); + close_and_free_files(td); + if (td->io_ops) + close_ioengine(td); + td_set_runstate(td, TD_EXITED); + + pthread_cond_destroy(&sw->cond); + pthread_mutex_destroy(&sw->lock); +} + +static void shutdown_worker(struct submit_worker *sw, unsigned int *sum_cnt) +{ + struct thread_data *parent = sw->wq->td; + + pthread_join(sw->thread, NULL); + (*sum_cnt)++; + sum_thread_stats(&parent->ts, &sw->td.ts, *sum_cnt); + free_worker(sw); +} + +void workqueue_exit(struct workqueue *wq) +{ + unsigned int shutdown, sum_cnt = 0; + struct submit_worker *sw; + int i; + + for (i = 0; i < wq->max_workers; i++) { + sw = &wq->workers[i]; + + pthread_mutex_lock(&sw->lock); + sw->flags |= SW_F_EXIT; + pthread_cond_signal(&sw->cond); + pthread_mutex_unlock(&sw->lock); + } + + do { + shutdown = 0; + for (i = 0; i < wq->max_workers; i++) { + sw = &wq->workers[i]; + if (sw->flags & SW_F_ACCOUNTED) + continue; + sw->flags |= SW_F_ACCOUNTED; + shutdown_worker(sw, &sum_cnt); + shutdown++; + } + } while (shutdown && shutdown != wq->max_workers); + + free(wq->workers); + pthread_mutex_destroy(&wq->flush_lock); + pthread_cond_destroy(&wq->flush_cond); +} + +static int start_worker(struct workqueue *wq, unsigned int index) +{ + struct submit_worker *sw = &wq->workers[index]; + int ret; + + INIT_FLIST_HEAD(&sw->work_list); + pthread_cond_init(&sw->cond, NULL); + pthread_mutex_init(&sw->lock, NULL); + sw->wq = wq; + sw->index = index; + + ret = pthread_create(&sw->thread, NULL, worker_thread, sw); + if (!ret) { + pthread_mutex_lock(&sw->lock); + sw->flags = SW_F_IDLE; + pthread_mutex_unlock(&sw->lock); + return 0; + } + + free_worker(sw); + return 1; +} + +int workqueue_init(struct thread_data *td, struct workqueue *wq, + workqueue_fn *fn, unsigned max_pending) +{ + unsigned int running; + int i, error; + + wq->max_workers = max_pending; + wq->td = td; + wq->fn = fn; + wq->work_seq = 0; + wq->next_free_worker = 0; + pthread_cond_init(&wq->flush_cond, NULL); + pthread_mutex_init(&wq->flush_lock, NULL); + + wq->workers = calloc(wq->max_workers, sizeof(struct submit_worker)); + + for (i = 0; i < wq->max_workers; i++) + if (start_worker(wq, i)) + break; + + wq->max_workers = i; + if (!wq->max_workers) { +err: + log_err("Can't create rate workqueue\n"); + td_verror(td, ESRCH, "workqueue_init"); + workqueue_exit(wq); + return 1; + } + + /* + * Wait for them all to be started and initialized + */ + error = 0; + do { + struct submit_worker *sw; + + running = 0; + pthread_mutex_lock(&wq->flush_lock); + for (i = 0; i < wq->max_workers; i++) { + sw = &wq->workers[i]; + pthread_mutex_lock(&sw->lock); + if (sw->flags & SW_F_RUNNING) + running++; + if (sw->flags & SW_F_ERROR) + error++; + pthread_mutex_unlock(&sw->lock); + } + + if (error || running == wq->max_workers) { + pthread_mutex_unlock(&wq->flush_lock); + break; + } + + pthread_cond_wait(&wq->flush_cond, &wq->flush_lock); + pthread_mutex_unlock(&wq->flush_lock); + } while (1); + + if (error) + goto err; + + return 0; +} diff --git a/workqueue.h b/workqueue.h new file mode 100644 index 00000000..5d47a5e6 --- /dev/null +++ b/workqueue.h @@ -0,0 +1,29 @@ +#ifndef FIO_RATE_H +#define FIO_RATE_H + +#include "flist.h" + +typedef void (workqueue_fn)(struct thread_data *, struct io_u *); + +struct workqueue { + unsigned int max_workers; + + struct thread_data *td; + workqueue_fn *fn; + + uint64_t work_seq; + struct submit_worker *workers; + unsigned int next_free_worker; + + pthread_cond_t flush_cond; + pthread_mutex_t flush_lock; + volatile int wake_idle; +}; + +int workqueue_init(struct thread_data *td, struct workqueue *wq, workqueue_fn *fn, unsigned int max_workers); +void workqueue_exit(struct workqueue *wq); + +int workqueue_enqueue(struct workqueue *wq, struct io_u *io_u); +void workqueue_flush(struct workqueue *wq); + +#endif -- 2.25.1