Add support for async IO verification offload
authorJens Axboe <jens.axboe@oracle.com>
Mon, 6 Jul 2009 10:59:04 +0000 (12:59 +0200)
committerJens Axboe <jens.axboe@oracle.com>
Mon, 6 Jul 2009 10:59:04 +0000 (12:59 +0200)
This adds support for setting up a number of IO verification offload
threads, instead of doing the offload inline. An option for controlling
the CPU affinity of those threads are always added.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
13 files changed:
HOWTO
file.h
fio.1
fio.c
fio.h
io_u.c
ioengine.h
options.c
os/os-linux.h
os/os-solaris.h
os/os.h
verify.c
verify.h

diff --git a/HOWTO b/HOWTO
index 708eca0037293e80cc681c17f20d6638dc5c4cbd..55662d34bae7d7cc2ed46a6b87d2a5fa971568ee 100644 (file)
--- a/HOWTO
+++ b/HOWTO
@@ -827,6 +827,16 @@ verify_fatal=bool  Normally fio will keep checking the entire contents
                before quitting on a block verification failure. If this
                option is set, fio will exit the job on the first observed
                failure.
+
+verify_async=int       Fio will normally verify IO inline from the submitting
+               thread. This option takes an integer describing how many
+               async offload threads to create for IO verification instead,
+               causing fio to offload the duty of verifying IO contents
+               to one or more separate threads.
+
+verify_async_cpus=str  Tell fio to set the given CPU affinity on the
+               async IO verification threads. See cpus_allowed for the
+               format used.
                
 stonewall      Wait for preceeding jobs in the job file to exit, before
                starting this one. Can be used to insert serialization
diff --git a/file.h b/file.h
index 6aa5b502c99246d2293e1598616b382349aa5aa2..dc22d4e058a177f373131acb370c838d4441b88f 100644 (file)
--- a/file.h
+++ b/file.h
@@ -133,6 +133,7 @@ extern int __must_check pre_read_files(struct thread_data *);
 extern int add_file(struct thread_data *, const char *);
 extern void get_file(struct fio_file *);
 extern int __must_check put_file(struct thread_data *, struct fio_file *);
+extern void put_file_log(struct thread_data *, struct fio_file *);
 extern void lock_file(struct thread_data *, struct fio_file *, enum fio_ddir);
 extern void unlock_file(struct thread_data *, struct fio_file *);
 extern void unlock_file_all(struct thread_data *, struct fio_file *);
diff --git a/fio.1 b/fio.1
index 32993b61da4bccc9ba633da361bc1e47a68aaa08..7b1fc8029008e683b17cb4710ea586285e1c0cd8 100644 (file)
--- a/fio.1
+++ b/fio.1
@@ -598,6 +598,16 @@ Write the verification header for this number of bytes, which should divide
 If true, exit the job on the first observed verification failure.  Default:
 false.
 .TP
+.BI verify_async \fR=\fPint
+Fio will normally verify IO inline from the submitting thread. This option
+takes an integer describing how many async offload threads to create for IO
+verification instead, causing fio to offload the duty of verifying IO contents
+to one or more separate threads.
+.TP
+.BI verify_async_cpus \fR=\fPstr
+Tell fio to set the given CPU affinity on the async IO verification threads.
+See \fBcpus_allowed\fP for the format used.
+.TP
 .B stonewall
 Wait for preceeding jobs in the job file to exit before starting this one.
 \fBstonewall\fR implies \fBnew_group\fR.
diff --git a/fio.c b/fio.c
index aabfee6544c2cda3c7228927500e77548d7440a6..fc6dd8aecbc0e11561dc23f8a0bbaec9a73565cc 100644 (file)
--- a/fio.c
+++ b/fio.c
@@ -472,7 +472,10 @@ static void do_verify(struct thread_data *td)
                        break;
                }
 
-               io_u->end_io = verify_io_u;
+               if (td->o.verify_async)
+                       io_u->end_io = verify_io_u_async;
+               else
+                       io_u->end_io = verify_io_u;
 
                ret = td_io_queue(td, io_u);
                switch (ret) {
@@ -604,7 +607,10 @@ static void do_io(struct thread_data *td)
                 * a previously written file.
                 */
                if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_READ) {
-                       io_u->end_io = verify_io_u;
+                       if (td->o.verify_async)
+                               io_u->end_io = verify_io_u_async;
+                       else
+                               io_u->end_io = verify_io_u;
                        td_set_runstate(td, TD_VERIFYING);
                } else if (in_ramp_time(td))
                        td_set_runstate(td, TD_RAMP);
@@ -992,6 +998,7 @@ static void *thread_main(void *data)
 {
        unsigned long long runtime[2], elapsed;
        struct thread_data *td = data;
+       pthread_condattr_t attr;
        int clear_state;
 
        if (!td->o.use_thread)
@@ -1006,8 +1013,14 @@ static void *thread_main(void *data)
        INIT_FLIST_HEAD(&td->io_u_requeues);
        INIT_FLIST_HEAD(&td->io_log_list);
        INIT_FLIST_HEAD(&td->io_hist_list);
+       INIT_FLIST_HEAD(&td->verify_list);
+       pthread_mutex_init(&td->io_u_lock, NULL);
        td->io_hist_tree = RB_ROOT;
 
+       pthread_condattr_init(&attr);
+       pthread_cond_init(&td->verify_cond, &attr);
+       pthread_cond_init(&td->free_cond, &attr);
+
        td_set_runstate(td, TD_INITIALIZED);
        dprint(FD_MUTEX, "up startup_mutex\n");
        fio_mutex_up(startup_mutex);
@@ -1031,7 +1044,10 @@ static void *thread_main(void *data)
        if (init_io_u(td))
                goto err;
 
-       if (td->o.cpumask_set && fio_setaffinity(td) == -1) {
+       if (td->o.verify_async && verify_async_init(td))
+               goto err;
+
+       if (td->o.cpumask_set && fio_setaffinity(td->pid, td->o.cpumask) == -1) {
                td_verror(td, errno, "cpu_set_affinity");
                goto err;
        }
@@ -1042,7 +1058,7 @@ static void *thread_main(void *data)
         */
        if (td->o.gtod_cpu) {
                fio_cpu_clear(&td->o.cpumask, td->o.gtod_cpu);
-               if (fio_setaffinity(td) == -1) {
+               if (fio_setaffinity(td->pid, td->o.cpumask) == -1) {
                        td_verror(td, errno, "cpu_set_affinity");
                        goto err;
                }
@@ -1184,6 +1200,9 @@ err:
                td_verror(td, ret, "fio_cpuset_exit");
        }
 
+       if (td->o.verify_async)
+               verify_async_exit(td);
+
        /*
         * do this very late, it will log file closing as well
         */
diff --git a/fio.h b/fio.h
index b19101c20a4924892610bd78b0f6d645f3b93e9b..fb70b465bec4e1da83a9359b8d2dbe39d84d5c24 100644 (file)
--- a/fio.h
+++ b/fio.h
@@ -174,6 +174,7 @@ struct thread_options {
        unsigned int verify_pattern;
        unsigned int verify_pattern_bytes;
        unsigned int verify_fatal;
+       unsigned int verify_async;
        unsigned int use_thread;
        unsigned int unlink;
        unsigned int do_disk_util;
@@ -209,6 +210,8 @@ struct thread_options {
        unsigned int numjobs;
        os_cpu_mask_t cpumask;
        unsigned int cpumask_set;
+       os_cpu_mask_t verify_cpumask;
+       unsigned int verify_cpumask_set;
        unsigned int iolog;
        unsigned int rwmixcycle;
        unsigned int rwmix[2];
@@ -319,6 +322,17 @@ struct thread_data {
        struct flist_head io_u_freelist;
        struct flist_head io_u_busylist;
        struct flist_head io_u_requeues;
+       pthread_mutex_t io_u_lock;
+       pthread_cond_t free_cond;
+
+       /*
+        * async verify offload
+        */
+       struct flist_head verify_list;
+       pthread_t *verify_threads;
+       unsigned int nr_verify_threads;
+       pthread_cond_t verify_cond;
+       int verify_thread_exit;
 
        /*
         * Rate state
@@ -661,4 +675,26 @@ static inline int is_power_of_2(unsigned int val)
        return (val != 0 && ((val & (val - 1)) == 0));
 }
 
+/*
+ * We currently only need to do locking if we have verifier threads
+ * accessing our internal structures too
+ */
+static inline void td_io_u_lock(struct thread_data *td)
+{
+       if (td->o.verify_async)
+               pthread_mutex_lock(&td->io_u_lock);
+}
+
+static inline void td_io_u_unlock(struct thread_data *td)
+{
+       if (td->o.verify_async)
+               pthread_mutex_unlock(&td->io_u_lock);
+}
+
+static inline void td_io_u_free_notify(struct thread_data *td)
+{
+       if (td->o.verify_async)
+               pthread_cond_signal(&td->free_cond);
+}
+
 #endif
diff --git a/io_u.c b/io_u.c
index 2e9dac0db6b4aa0adae673a3b21d26173e48da3e..654980481c8f7fbd5450991684ed6a171fdf8931 100644 (file)
--- a/io_u.c
+++ b/io_u.c
@@ -401,7 +401,7 @@ static enum fio_ddir get_rw_ddir(struct thread_data *td)
        return td->rwmix_ddir;
 }
 
-static void put_file_log(struct thread_data *td, struct fio_file *f)
+void put_file_log(struct thread_data *td, struct fio_file *f)
 {
        int ret = put_file(td, f);
 
@@ -411,16 +411,21 @@ static void put_file_log(struct thread_data *td, struct fio_file *f)
 
 void put_io_u(struct thread_data *td, struct io_u *io_u)
 {
+       td_io_u_lock(td);
+
        assert((io_u->flags & IO_U_F_FREE) == 0);
        io_u->flags |= IO_U_F_FREE;
+       io_u->flags &= ~IO_U_F_FREE_DEF;
 
        if (io_u->file)
                put_file_log(td, io_u->file);
 
        io_u->file = NULL;
-       flist_del(&io_u->list);
+       flist_del_init(&io_u->list);
        flist_add(&io_u->list, &td->io_u_freelist);
        td->cur_depth--;
+       td_io_u_unlock(td);
+       td_io_u_free_notify(td);
 }
 
 void clear_io_u(struct thread_data *td, struct io_u *io_u)
@@ -435,6 +440,8 @@ void requeue_io_u(struct thread_data *td, struct io_u **io_u)
 
        dprint(FD_IO, "requeue %p\n", __io_u);
 
+       td_io_u_lock(td);
+
        __io_u->flags |= IO_U_F_FREE;
        if ((__io_u->flags & IO_U_F_FLIGHT) && !ddir_sync(__io_u->ddir))
                td->io_issues[__io_u->ddir]--;
@@ -444,6 +451,7 @@ void requeue_io_u(struct thread_data *td, struct io_u **io_u)
        flist_del(&__io_u->list);
        flist_add_tail(&__io_u->list, &td->io_u_requeues);
        td->cur_depth--;
+       td_io_u_unlock(td);
        *io_u = NULL;
 }
 
@@ -826,6 +834,9 @@ struct io_u *__get_io_u(struct thread_data *td)
 {
        struct io_u *io_u = NULL;
 
+       td_io_u_lock(td);
+
+again:
        if (!flist_empty(&td->io_u_requeues))
                io_u = flist_entry(td->io_u_requeues.next, struct io_u, list);
        else if (!queue_full(td)) {
@@ -837,9 +848,18 @@ struct io_u *__get_io_u(struct thread_data *td)
                io_u->end_io = NULL;
        }
 
+       /*
+        * We ran out, wait for async verify threads to finish and return one
+        */
+       if (!io_u && td->o.verify_async) {
+               pthread_cond_wait(&td->free_cond, &td->io_u_lock);
+               goto again;
+       }
+
        if (io_u) {
                assert(io_u->flags & IO_U_F_FREE);
                io_u->flags &= ~IO_U_F_FREE;
+               io_u->flags &= ~IO_U_F_FREE_DEF;
 
                io_u->error = 0;
                flist_del(&io_u->list);
@@ -847,6 +867,7 @@ struct io_u *__get_io_u(struct thread_data *td)
                td->cur_depth++;
        }
 
+       td_io_u_unlock(td);
        return io_u;
 }
 
@@ -1042,7 +1063,9 @@ static void ios_completed(struct thread_data *td,
                io_u = td->io_ops->event(td, i);
 
                io_completed(td, io_u, icd);
-               put_io_u(td, io_u);
+
+               if (!(io_u->flags & IO_U_F_FREE_DEF))
+                       put_io_u(td, io_u);
        }
 }
 
@@ -1056,7 +1079,9 @@ int io_u_sync_complete(struct thread_data *td, struct io_u *io_u,
 
        init_icd(td, &icd, 1);
        io_completed(td, io_u, &icd);
-       put_io_u(td, io_u);
+
+       if (!(io_u->flags & IO_U_F_FREE_DEF))
+               put_io_u(td, io_u);
 
        if (icd.error) {
                td_verror(td, icd.error, "io_u_sync_complete");
index f9777999d0e45aff2849683d1edde5b2b3edb1df..3df0944a30ed30cec9a54b9d4fa464506e311885 100644 (file)
@@ -6,6 +6,7 @@
 enum {
        IO_U_F_FREE     = 1 << 0,
        IO_U_F_FLIGHT   = 1 << 1,
+       IO_U_F_FREE_DEF = 1 << 2,
 };
 
 /*
index 87515ef13313da1d0cd0c2282306e75e6cf011ed..0954ccdb2bd9138b45ad36d9b7cd25e0d8a2bdd3 100644 (file)
--- a/options.c
+++ b/options.c
@@ -303,14 +303,14 @@ static int str_cpumask_cb(void *data, unsigned int *val)
        return 0;
 }
 
-static int str_cpus_allowed_cb(void *data, const char *input)
+static int set_cpus_allowed(struct thread_data *td, os_cpu_mask_t *mask,
+                           const char *input)
 {
-       struct thread_data *td = data;
        char *cpu, *str, *p;
        long max_cpu;
        int ret = 0;
 
-       ret = fio_cpuset_init(&td->o.cpumask);
+       ret = fio_cpuset_init(mask);
        if (ret < 0) {
                log_err("fio: cpuset_init failed\n");
                td_verror(td, ret, "fio_cpuset_init");
@@ -358,7 +358,7 @@ static int str_cpus_allowed_cb(void *data, const char *input)
                        }
        
                        dprint(FD_PARSE, "set cpu allowed %d\n", icpu);
-                       fio_cpu_set(&td->o.cpumask, icpu);
+                       fio_cpu_set(mask, icpu);
                        icpu++;
                }
                if (ret)
@@ -370,6 +370,30 @@ static int str_cpus_allowed_cb(void *data, const char *input)
                td->o.cpumask_set = 1;
        return ret;
 }
+
+static int str_cpus_allowed_cb(void *data, const char *input)
+{
+       struct thread_data *td = data;
+       int ret;
+
+       ret = set_cpus_allowed(td, &td->o.cpumask, input);
+       if (!ret)
+               td->o.cpumask_set = 1;
+
+       return ret;
+}
+
+static int str_verify_cpus_allowed_cb(void *data, const char *input)
+{
+       struct thread_data *td = data;
+       int ret;
+
+       ret = set_cpus_allowed(td, &td->o.verify_cpumask, input);
+       if (!ret)
+               td->o.verify_cpumask_set = 1;
+
+       return ret;
+}
 #endif
 
 static int str_fst_cb(void *data, const char *str)
@@ -1180,6 +1204,23 @@ static struct fio_option options[] = {
                .help   = "Exit on a single verify failure, don't continue",
                .parent = "verify",
        },
+       {
+               .name   = "verify_async",
+               .type   = FIO_OPT_INT,
+               .off1   = td_var_offset(verify_async),
+               .def    = "0",
+               .help   = "Number of async verifier threads to use",
+               .parent = "verify",
+       },
+#ifdef FIO_HAVE_CPU_AFFINITY
+       {
+               .name   = "verify_async_cpus",
+               .type   = FIO_OPT_STR,
+               .cb     = str_verify_cpus_allowed_cb,
+               .help   = "Set CPUs allowed for async verify threads",
+               .parent = "verify_async",
+       },
+#endif
        {
                .name   = "write_iolog",
                .type   = FIO_OPT_STR_STORE,
index b766cbf9503af73a144e5cddf1f542ed8e13e3b4..dd9c5aaf493ce6ff403e5b1ec92c5390d7b82bb1 100644 (file)
@@ -55,13 +55,13 @@ typedef struct drand48_data os_random_state_t;
  * the affinity helpers to work.
  */
 #ifndef GLIBC_2_3_2
-#define fio_setaffinity(td)            \
-       sched_setaffinity((td)->pid, sizeof((td)->o.cpumask), &(td)->o.cpumask)
+#define fio_setaffinity(pid, cpumask)          \
+       sched_setaffinity((pid), sizeof(cpumask), &(cpumask))
 #define fio_getaffinity(pid, ptr)      \
        sched_getaffinity((pid), sizeof(cpu_set_t), (ptr))
 #else
-#define fio_setaffinity(td)            \
-       sched_setaffinity((td)->pid, &(td)->o.cpumask)
+#define fio_setaffinity(pid, cpumask)  \
+       sched_setaffinity((pid), &(cpumask))
 #define fio_getaffinity(pid, ptr)      \
        sched_getaffinity((pid), (ptr))
 #endif
index b58d1308efc49e4bd1cf0d530256f90e9e076afd..56729565b127631b2061258c7a9e373e47da7f21 100644 (file)
@@ -69,8 +69,8 @@ static inline int fio_set_odirect(int fd)
 /*
  * pset binding hooks for fio
  */
-#define fio_setaffinity(td)            \
-       pset_bind((td)->o.cpumask, P_PID, (td)->pid, NULL)
+#define fio_setaffinity(pid, cpumask)          \
+       pset_bind(&(cpumask), P_PID, (pid), NULL)
 #define fio_getaffinity(pid, ptr)      ({ 0; })
 
 #define fio_cpu_clear(mask, cpu)       pset_assign(PS_NONE, (cpu), NULL)
diff --git a/os/os.h b/os/os.h
index dbf095711c08344e054966512dae3f41d8c75280..10e796fbcbe3313d1faee845bc26d751645aa011 100644 (file)
--- a/os/os.h
+++ b/os/os.h
@@ -39,7 +39,7 @@
 #endif /* FIO_HAVE_FADVISE */
 
 #ifndef FIO_HAVE_CPU_AFFINITY
-#define fio_setaffinity(td)            (0)
+#define fio_setaffinity(pid, mask)     (0)
 #define fio_getaffinity(pid, mask)     do { } while (0)
 #define fio_cpu_clear(mask, cpu)       do { } while (0)
 #define fio_cpuset_exit(mask)          (-1)
index 2ae74b93c28e7e6a2c7c71165a5dc98a76b876ea..5dd9ee339b3858ebe630c67ad4c11e80248b683f 100644 (file)
--- a/verify.c
+++ b/verify.c
@@ -5,9 +5,11 @@
 #include <fcntl.h>
 #include <string.h>
 #include <assert.h>
+#include <pthread.h>
 
 #include "fio.h"
 #include "verify.h"
+#include "smalloc.h"
 
 #include "crc/md5.h"
 #include "crc/crc64.h"
@@ -417,6 +419,26 @@ int verify_io_u_pattern(unsigned long pattern, unsigned long pattern_size,
        return 0;
 }
 
+/*
+ * Push IO verification to a separate thread
+ */
+int verify_io_u_async(struct thread_data *td, struct io_u *io_u)
+{
+       if (io_u->file)
+               put_file_log(td, io_u->file);
+
+       io_u->file = NULL;
+
+       pthread_mutex_lock(&td->io_u_lock);
+       flist_del(&io_u->list);
+       flist_add_tail(&io_u->list, &td->verify_list);
+       pthread_mutex_unlock(&td->io_u_lock);
+
+       pthread_cond_signal(&td->verify_cond);
+       io_u->flags |= IO_U_F_FREE_DEF;
+       return 0;
+}
+
 int verify_io_u(struct thread_data *td, struct io_u *io_u)
 {
        struct verify_header *hdr;
@@ -720,3 +742,103 @@ int get_next_verify(struct thread_data *td, struct io_u *io_u)
        dprint(FD_VERIFY, "get_next_verify: empty\n");
        return 1;
 }
+
+static void *verify_async_thread(void *data)
+{
+       struct thread_data *td = data;
+       struct io_u *io_u;
+       int ret = 0;
+
+       if (td->o.verify_cpumask_set &&
+           fio_setaffinity(td->pid, td->o.verify_cpumask)) {
+               log_err("fio: failed setting verify thread affinity\n");
+               goto done;
+       }
+
+       do {
+               read_barrier();
+               if (td->verify_thread_exit)
+                       break;
+
+               pthread_mutex_lock(&td->io_u_lock);
+
+               while (flist_empty(&td->verify_list) &&
+                      !td->verify_thread_exit) {
+                       ret = pthread_cond_wait(&td->verify_cond, &td->io_u_lock);
+                       if (ret) {
+                               pthread_mutex_unlock(&td->io_u_lock);
+                               break;
+                       }
+               }
+
+               if (flist_empty(&td->verify_list)) {
+                       pthread_mutex_unlock(&td->io_u_lock);
+                       continue;
+               }
+
+               io_u = flist_entry(td->verify_list.next, struct io_u, list);
+               flist_del_init(&io_u->list);
+               pthread_mutex_unlock(&td->io_u_lock);
+
+               ret = verify_io_u(td, io_u);
+               put_io_u(td, io_u);
+       } while (!ret);
+
+done:
+       pthread_mutex_lock(&td->io_u_lock);
+       td->nr_verify_threads--;
+       pthread_mutex_unlock(&td->io_u_lock);
+
+       pthread_cond_signal(&td->free_cond);
+       return NULL;
+}
+
+int verify_async_init(struct thread_data *td)
+{
+       int i, ret;
+
+       td->verify_thread_exit = 0;
+
+       td->verify_threads = malloc(sizeof(pthread_t) * td->o.verify_async);
+       for (i = 0; i < td->o.verify_async; i++) {
+               ret = pthread_create(&td->verify_threads[i], NULL,
+                                       verify_async_thread, td);
+               if (ret) {
+                       log_err("fio: async verify creation failed: %s\n",
+                                       strerror(ret));
+                       break;
+               }
+               ret = pthread_detach(td->verify_threads[i]);
+               if (ret) {
+                       log_err("fio: async verify thread detach failed: %s\n",
+                                       strerror(ret));
+                       break;
+               }
+               td->nr_verify_threads++;
+       }
+
+       if (i != td->o.verify_async) {
+               td->verify_thread_exit = 1;
+               write_barrier();
+               pthread_cond_broadcast(&td->verify_cond);
+               return 1;
+       }
+
+       return 0;
+}
+
+void verify_async_exit(struct thread_data *td)
+{
+       td->verify_thread_exit = 1;
+       write_barrier();
+       pthread_cond_broadcast(&td->verify_cond);
+
+       pthread_mutex_lock(&td->io_u_lock);
+
+       while (td->nr_verify_threads)
+               pthread_cond_wait(&td->free_cond, &td->io_u_lock);
+
+       pthread_mutex_unlock(&td->io_u_lock);
+       free(td->verify_threads);
+       td->verify_threads = NULL;
+}
index 76d256d1d2b4d8fd7e42e445704ba746e0de538c..50c8e4328b44960b1056ed733589ad269fa656c7 100644 (file)
--- a/verify.h
+++ b/verify.h
@@ -64,5 +64,12 @@ struct vhdr_meta {
 extern void populate_verify_io_u(struct thread_data *, struct io_u *);
 extern int __must_check get_next_verify(struct thread_data *td, struct io_u *);
 extern int __must_check verify_io_u(struct thread_data *, struct io_u *);
+extern int verify_io_u_async(struct thread_data *, struct io_u *);
+
+/*
+ * Async verify offload
+ */
+extern int verify_async_init(struct thread_data *);
+extern void verify_async_exit(struct thread_data *);
 
 #endif