From e8462bd8250cf3ff2d41f17e1a4d4cefc70b6b37 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Mon, 6 Jul 2009 12:59:04 +0200 Subject: [PATCH] Add support for async IO verification offload This adds support for setting up a number of IO verification offload threads, instead of doing the offload inline. An option for controlling the CPU affinity of those threads are always added. Signed-off-by: Jens Axboe --- HOWTO | 10 ++++ file.h | 1 + fio.1 | 10 ++++ fio.c | 27 +++++++++-- fio.h | 36 ++++++++++++++ io_u.c | 33 +++++++++++-- ioengine.h | 1 + options.c | 49 +++++++++++++++++-- os/os-linux.h | 8 ++-- os/os-solaris.h | 4 +- os/os.h | 2 +- verify.c | 122 ++++++++++++++++++++++++++++++++++++++++++++++++ verify.h | 7 +++ 13 files changed, 291 insertions(+), 19 deletions(-) diff --git a/HOWTO b/HOWTO index 708eca00..55662d34 100644 --- a/HOWTO +++ b/HOWTO @@ -827,6 +827,16 @@ verify_fatal=bool Normally fio will keep checking the entire contents before quitting on a block verification failure. If this option is set, fio will exit the job on the first observed failure. + +verify_async=int Fio will normally verify IO inline from the submitting + thread. This option takes an integer describing how many + async offload threads to create for IO verification instead, + causing fio to offload the duty of verifying IO contents + to one or more separate threads. + +verify_async_cpus=str Tell fio to set the given CPU affinity on the + async IO verification threads. See cpus_allowed for the + format used. stonewall Wait for preceeding jobs in the job file to exit, before starting this one. Can be used to insert serialization diff --git a/file.h b/file.h index 6aa5b502..dc22d4e0 100644 --- a/file.h +++ b/file.h @@ -133,6 +133,7 @@ extern int __must_check pre_read_files(struct thread_data *); extern int add_file(struct thread_data *, const char *); extern void get_file(struct fio_file *); extern int __must_check put_file(struct thread_data *, struct fio_file *); +extern void put_file_log(struct thread_data *, struct fio_file *); extern void lock_file(struct thread_data *, struct fio_file *, enum fio_ddir); extern void unlock_file(struct thread_data *, struct fio_file *); extern void unlock_file_all(struct thread_data *, struct fio_file *); diff --git a/fio.1 b/fio.1 index 32993b61..7b1fc802 100644 --- a/fio.1 +++ b/fio.1 @@ -598,6 +598,16 @@ Write the verification header for this number of bytes, which should divide If true, exit the job on the first observed verification failure. Default: false. .TP +.BI verify_async \fR=\fPint +Fio will normally verify IO inline from the submitting thread. This option +takes an integer describing how many async offload threads to create for IO +verification instead, causing fio to offload the duty of verifying IO contents +to one or more separate threads. +.TP +.BI verify_async_cpus \fR=\fPstr +Tell fio to set the given CPU affinity on the async IO verification threads. +See \fBcpus_allowed\fP for the format used. +.TP .B stonewall Wait for preceeding jobs in the job file to exit before starting this one. \fBstonewall\fR implies \fBnew_group\fR. diff --git a/fio.c b/fio.c index aabfee65..fc6dd8ae 100644 --- a/fio.c +++ b/fio.c @@ -472,7 +472,10 @@ static void do_verify(struct thread_data *td) break; } - io_u->end_io = verify_io_u; + if (td->o.verify_async) + io_u->end_io = verify_io_u_async; + else + io_u->end_io = verify_io_u; ret = td_io_queue(td, io_u); switch (ret) { @@ -604,7 +607,10 @@ static void do_io(struct thread_data *td) * a previously written file. */ if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_READ) { - io_u->end_io = verify_io_u; + if (td->o.verify_async) + io_u->end_io = verify_io_u_async; + else + io_u->end_io = verify_io_u; td_set_runstate(td, TD_VERIFYING); } else if (in_ramp_time(td)) td_set_runstate(td, TD_RAMP); @@ -992,6 +998,7 @@ static void *thread_main(void *data) { unsigned long long runtime[2], elapsed; struct thread_data *td = data; + pthread_condattr_t attr; int clear_state; if (!td->o.use_thread) @@ -1006,8 +1013,14 @@ static void *thread_main(void *data) INIT_FLIST_HEAD(&td->io_u_requeues); INIT_FLIST_HEAD(&td->io_log_list); INIT_FLIST_HEAD(&td->io_hist_list); + INIT_FLIST_HEAD(&td->verify_list); + pthread_mutex_init(&td->io_u_lock, NULL); td->io_hist_tree = RB_ROOT; + pthread_condattr_init(&attr); + pthread_cond_init(&td->verify_cond, &attr); + pthread_cond_init(&td->free_cond, &attr); + td_set_runstate(td, TD_INITIALIZED); dprint(FD_MUTEX, "up startup_mutex\n"); fio_mutex_up(startup_mutex); @@ -1031,7 +1044,10 @@ static void *thread_main(void *data) if (init_io_u(td)) goto err; - if (td->o.cpumask_set && fio_setaffinity(td) == -1) { + if (td->o.verify_async && verify_async_init(td)) + goto err; + + if (td->o.cpumask_set && fio_setaffinity(td->pid, td->o.cpumask) == -1) { td_verror(td, errno, "cpu_set_affinity"); goto err; } @@ -1042,7 +1058,7 @@ static void *thread_main(void *data) */ if (td->o.gtod_cpu) { fio_cpu_clear(&td->o.cpumask, td->o.gtod_cpu); - if (fio_setaffinity(td) == -1) { + if (fio_setaffinity(td->pid, td->o.cpumask) == -1) { td_verror(td, errno, "cpu_set_affinity"); goto err; } @@ -1184,6 +1200,9 @@ err: td_verror(td, ret, "fio_cpuset_exit"); } + if (td->o.verify_async) + verify_async_exit(td); + /* * do this very late, it will log file closing as well */ diff --git a/fio.h b/fio.h index b19101c2..fb70b465 100644 --- a/fio.h +++ b/fio.h @@ -174,6 +174,7 @@ struct thread_options { unsigned int verify_pattern; unsigned int verify_pattern_bytes; unsigned int verify_fatal; + unsigned int verify_async; unsigned int use_thread; unsigned int unlink; unsigned int do_disk_util; @@ -209,6 +210,8 @@ struct thread_options { unsigned int numjobs; os_cpu_mask_t cpumask; unsigned int cpumask_set; + os_cpu_mask_t verify_cpumask; + unsigned int verify_cpumask_set; unsigned int iolog; unsigned int rwmixcycle; unsigned int rwmix[2]; @@ -319,6 +322,17 @@ struct thread_data { struct flist_head io_u_freelist; struct flist_head io_u_busylist; struct flist_head io_u_requeues; + pthread_mutex_t io_u_lock; + pthread_cond_t free_cond; + + /* + * async verify offload + */ + struct flist_head verify_list; + pthread_t *verify_threads; + unsigned int nr_verify_threads; + pthread_cond_t verify_cond; + int verify_thread_exit; /* * Rate state @@ -661,4 +675,26 @@ static inline int is_power_of_2(unsigned int val) return (val != 0 && ((val & (val - 1)) == 0)); } +/* + * We currently only need to do locking if we have verifier threads + * accessing our internal structures too + */ +static inline void td_io_u_lock(struct thread_data *td) +{ + if (td->o.verify_async) + pthread_mutex_lock(&td->io_u_lock); +} + +static inline void td_io_u_unlock(struct thread_data *td) +{ + if (td->o.verify_async) + pthread_mutex_unlock(&td->io_u_lock); +} + +static inline void td_io_u_free_notify(struct thread_data *td) +{ + if (td->o.verify_async) + pthread_cond_signal(&td->free_cond); +} + #endif diff --git a/io_u.c b/io_u.c index 2e9dac0d..65498048 100644 --- a/io_u.c +++ b/io_u.c @@ -401,7 +401,7 @@ static enum fio_ddir get_rw_ddir(struct thread_data *td) return td->rwmix_ddir; } -static void put_file_log(struct thread_data *td, struct fio_file *f) +void put_file_log(struct thread_data *td, struct fio_file *f) { int ret = put_file(td, f); @@ -411,16 +411,21 @@ static void put_file_log(struct thread_data *td, struct fio_file *f) void put_io_u(struct thread_data *td, struct io_u *io_u) { + td_io_u_lock(td); + assert((io_u->flags & IO_U_F_FREE) == 0); io_u->flags |= IO_U_F_FREE; + io_u->flags &= ~IO_U_F_FREE_DEF; if (io_u->file) put_file_log(td, io_u->file); io_u->file = NULL; - flist_del(&io_u->list); + flist_del_init(&io_u->list); flist_add(&io_u->list, &td->io_u_freelist); td->cur_depth--; + td_io_u_unlock(td); + td_io_u_free_notify(td); } void clear_io_u(struct thread_data *td, struct io_u *io_u) @@ -435,6 +440,8 @@ void requeue_io_u(struct thread_data *td, struct io_u **io_u) dprint(FD_IO, "requeue %p\n", __io_u); + td_io_u_lock(td); + __io_u->flags |= IO_U_F_FREE; if ((__io_u->flags & IO_U_F_FLIGHT) && !ddir_sync(__io_u->ddir)) td->io_issues[__io_u->ddir]--; @@ -444,6 +451,7 @@ void requeue_io_u(struct thread_data *td, struct io_u **io_u) flist_del(&__io_u->list); flist_add_tail(&__io_u->list, &td->io_u_requeues); td->cur_depth--; + td_io_u_unlock(td); *io_u = NULL; } @@ -826,6 +834,9 @@ struct io_u *__get_io_u(struct thread_data *td) { struct io_u *io_u = NULL; + td_io_u_lock(td); + +again: if (!flist_empty(&td->io_u_requeues)) io_u = flist_entry(td->io_u_requeues.next, struct io_u, list); else if (!queue_full(td)) { @@ -837,9 +848,18 @@ struct io_u *__get_io_u(struct thread_data *td) io_u->end_io = NULL; } + /* + * We ran out, wait for async verify threads to finish and return one + */ + if (!io_u && td->o.verify_async) { + pthread_cond_wait(&td->free_cond, &td->io_u_lock); + goto again; + } + if (io_u) { assert(io_u->flags & IO_U_F_FREE); io_u->flags &= ~IO_U_F_FREE; + io_u->flags &= ~IO_U_F_FREE_DEF; io_u->error = 0; flist_del(&io_u->list); @@ -847,6 +867,7 @@ struct io_u *__get_io_u(struct thread_data *td) td->cur_depth++; } + td_io_u_unlock(td); return io_u; } @@ -1042,7 +1063,9 @@ static void ios_completed(struct thread_data *td, io_u = td->io_ops->event(td, i); io_completed(td, io_u, icd); - put_io_u(td, io_u); + + if (!(io_u->flags & IO_U_F_FREE_DEF)) + put_io_u(td, io_u); } } @@ -1056,7 +1079,9 @@ int io_u_sync_complete(struct thread_data *td, struct io_u *io_u, init_icd(td, &icd, 1); io_completed(td, io_u, &icd); - put_io_u(td, io_u); + + if (!(io_u->flags & IO_U_F_FREE_DEF)) + put_io_u(td, io_u); if (icd.error) { td_verror(td, icd.error, "io_u_sync_complete"); diff --git a/ioengine.h b/ioengine.h index f9777999..3df0944a 100644 --- a/ioengine.h +++ b/ioengine.h @@ -6,6 +6,7 @@ enum { IO_U_F_FREE = 1 << 0, IO_U_F_FLIGHT = 1 << 1, + IO_U_F_FREE_DEF = 1 << 2, }; /* diff --git a/options.c b/options.c index 87515ef1..0954ccdb 100644 --- a/options.c +++ b/options.c @@ -303,14 +303,14 @@ static int str_cpumask_cb(void *data, unsigned int *val) return 0; } -static int str_cpus_allowed_cb(void *data, const char *input) +static int set_cpus_allowed(struct thread_data *td, os_cpu_mask_t *mask, + const char *input) { - struct thread_data *td = data; char *cpu, *str, *p; long max_cpu; int ret = 0; - ret = fio_cpuset_init(&td->o.cpumask); + ret = fio_cpuset_init(mask); if (ret < 0) { log_err("fio: cpuset_init failed\n"); td_verror(td, ret, "fio_cpuset_init"); @@ -358,7 +358,7 @@ static int str_cpus_allowed_cb(void *data, const char *input) } dprint(FD_PARSE, "set cpu allowed %d\n", icpu); - fio_cpu_set(&td->o.cpumask, icpu); + fio_cpu_set(mask, icpu); icpu++; } if (ret) @@ -370,6 +370,30 @@ static int str_cpus_allowed_cb(void *data, const char *input) td->o.cpumask_set = 1; return ret; } + +static int str_cpus_allowed_cb(void *data, const char *input) +{ + struct thread_data *td = data; + int ret; + + ret = set_cpus_allowed(td, &td->o.cpumask, input); + if (!ret) + td->o.cpumask_set = 1; + + return ret; +} + +static int str_verify_cpus_allowed_cb(void *data, const char *input) +{ + struct thread_data *td = data; + int ret; + + ret = set_cpus_allowed(td, &td->o.verify_cpumask, input); + if (!ret) + td->o.verify_cpumask_set = 1; + + return ret; +} #endif static int str_fst_cb(void *data, const char *str) @@ -1180,6 +1204,23 @@ static struct fio_option options[] = { .help = "Exit on a single verify failure, don't continue", .parent = "verify", }, + { + .name = "verify_async", + .type = FIO_OPT_INT, + .off1 = td_var_offset(verify_async), + .def = "0", + .help = "Number of async verifier threads to use", + .parent = "verify", + }, +#ifdef FIO_HAVE_CPU_AFFINITY + { + .name = "verify_async_cpus", + .type = FIO_OPT_STR, + .cb = str_verify_cpus_allowed_cb, + .help = "Set CPUs allowed for async verify threads", + .parent = "verify_async", + }, +#endif { .name = "write_iolog", .type = FIO_OPT_STR_STORE, diff --git a/os/os-linux.h b/os/os-linux.h index b766cbf9..dd9c5aaf 100644 --- a/os/os-linux.h +++ b/os/os-linux.h @@ -55,13 +55,13 @@ typedef struct drand48_data os_random_state_t; * the affinity helpers to work. */ #ifndef GLIBC_2_3_2 -#define fio_setaffinity(td) \ - sched_setaffinity((td)->pid, sizeof((td)->o.cpumask), &(td)->o.cpumask) +#define fio_setaffinity(pid, cpumask) \ + sched_setaffinity((pid), sizeof(cpumask), &(cpumask)) #define fio_getaffinity(pid, ptr) \ sched_getaffinity((pid), sizeof(cpu_set_t), (ptr)) #else -#define fio_setaffinity(td) \ - sched_setaffinity((td)->pid, &(td)->o.cpumask) +#define fio_setaffinity(pid, cpumask) \ + sched_setaffinity((pid), &(cpumask)) #define fio_getaffinity(pid, ptr) \ sched_getaffinity((pid), (ptr)) #endif diff --git a/os/os-solaris.h b/os/os-solaris.h index b58d1308..56729565 100644 --- a/os/os-solaris.h +++ b/os/os-solaris.h @@ -69,8 +69,8 @@ static inline int fio_set_odirect(int fd) /* * pset binding hooks for fio */ -#define fio_setaffinity(td) \ - pset_bind((td)->o.cpumask, P_PID, (td)->pid, NULL) +#define fio_setaffinity(pid, cpumask) \ + pset_bind(&(cpumask), P_PID, (pid), NULL) #define fio_getaffinity(pid, ptr) ({ 0; }) #define fio_cpu_clear(mask, cpu) pset_assign(PS_NONE, (cpu), NULL) diff --git a/os/os.h b/os/os.h index dbf09571..10e796fb 100644 --- a/os/os.h +++ b/os/os.h @@ -39,7 +39,7 @@ #endif /* FIO_HAVE_FADVISE */ #ifndef FIO_HAVE_CPU_AFFINITY -#define fio_setaffinity(td) (0) +#define fio_setaffinity(pid, mask) (0) #define fio_getaffinity(pid, mask) do { } while (0) #define fio_cpu_clear(mask, cpu) do { } while (0) #define fio_cpuset_exit(mask) (-1) diff --git a/verify.c b/verify.c index 2ae74b93..5dd9ee33 100644 --- a/verify.c +++ b/verify.c @@ -5,9 +5,11 @@ #include #include #include +#include #include "fio.h" #include "verify.h" +#include "smalloc.h" #include "crc/md5.h" #include "crc/crc64.h" @@ -417,6 +419,26 @@ int verify_io_u_pattern(unsigned long pattern, unsigned long pattern_size, return 0; } +/* + * Push IO verification to a separate thread + */ +int verify_io_u_async(struct thread_data *td, struct io_u *io_u) +{ + if (io_u->file) + put_file_log(td, io_u->file); + + io_u->file = NULL; + + pthread_mutex_lock(&td->io_u_lock); + flist_del(&io_u->list); + flist_add_tail(&io_u->list, &td->verify_list); + pthread_mutex_unlock(&td->io_u_lock); + + pthread_cond_signal(&td->verify_cond); + io_u->flags |= IO_U_F_FREE_DEF; + return 0; +} + int verify_io_u(struct thread_data *td, struct io_u *io_u) { struct verify_header *hdr; @@ -720,3 +742,103 @@ int get_next_verify(struct thread_data *td, struct io_u *io_u) dprint(FD_VERIFY, "get_next_verify: empty\n"); return 1; } + +static void *verify_async_thread(void *data) +{ + struct thread_data *td = data; + struct io_u *io_u; + int ret = 0; + + if (td->o.verify_cpumask_set && + fio_setaffinity(td->pid, td->o.verify_cpumask)) { + log_err("fio: failed setting verify thread affinity\n"); + goto done; + } + + do { + read_barrier(); + if (td->verify_thread_exit) + break; + + pthread_mutex_lock(&td->io_u_lock); + + while (flist_empty(&td->verify_list) && + !td->verify_thread_exit) { + ret = pthread_cond_wait(&td->verify_cond, &td->io_u_lock); + if (ret) { + pthread_mutex_unlock(&td->io_u_lock); + break; + } + } + + if (flist_empty(&td->verify_list)) { + pthread_mutex_unlock(&td->io_u_lock); + continue; + } + + io_u = flist_entry(td->verify_list.next, struct io_u, list); + flist_del_init(&io_u->list); + pthread_mutex_unlock(&td->io_u_lock); + + ret = verify_io_u(td, io_u); + put_io_u(td, io_u); + } while (!ret); + +done: + pthread_mutex_lock(&td->io_u_lock); + td->nr_verify_threads--; + pthread_mutex_unlock(&td->io_u_lock); + + pthread_cond_signal(&td->free_cond); + return NULL; +} + +int verify_async_init(struct thread_data *td) +{ + int i, ret; + + td->verify_thread_exit = 0; + + td->verify_threads = malloc(sizeof(pthread_t) * td->o.verify_async); + for (i = 0; i < td->o.verify_async; i++) { + ret = pthread_create(&td->verify_threads[i], NULL, + verify_async_thread, td); + if (ret) { + log_err("fio: async verify creation failed: %s\n", + strerror(ret)); + break; + } + ret = pthread_detach(td->verify_threads[i]); + if (ret) { + log_err("fio: async verify thread detach failed: %s\n", + strerror(ret)); + break; + } + td->nr_verify_threads++; + } + + if (i != td->o.verify_async) { + td->verify_thread_exit = 1; + write_barrier(); + pthread_cond_broadcast(&td->verify_cond); + return 1; + } + + return 0; +} + +void verify_async_exit(struct thread_data *td) +{ + td->verify_thread_exit = 1; + write_barrier(); + pthread_cond_broadcast(&td->verify_cond); + + pthread_mutex_lock(&td->io_u_lock); + + while (td->nr_verify_threads) + pthread_cond_wait(&td->free_cond, &td->io_u_lock); + + pthread_mutex_unlock(&td->io_u_lock); + free(td->verify_threads); + td->verify_threads = NULL; +} diff --git a/verify.h b/verify.h index 76d256d1..50c8e432 100644 --- a/verify.h +++ b/verify.h @@ -64,5 +64,12 @@ struct vhdr_meta { extern void populate_verify_io_u(struct thread_data *, struct io_u *); extern int __must_check get_next_verify(struct thread_data *td, struct io_u *); extern int __must_check verify_io_u(struct thread_data *, struct io_u *); +extern int verify_io_u_async(struct thread_data *, struct io_u *); + +/* + * Async verify offload + */ +extern int verify_async_init(struct thread_data *); +extern void verify_async_exit(struct thread_data *); #endif -- 2.25.1