Add support for async IO verification offload
authorJens Axboe <jens.axboe@oracle.com>
Mon, 6 Jul 2009 10:59:04 +0000 (12:59 +0200)
committerJens Axboe <jens.axboe@oracle.com>
Mon, 6 Jul 2009 10:59:04 +0000 (12:59 +0200)
This adds support for setting up a number of IO verification offload
threads, instead of doing the offload inline. An option for controlling
the CPU affinity of those threads are always added.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
13 files changed:
HOWTO
file.h
fio.1
fio.c
fio.h
io_u.c
ioengine.h
options.c
os/os-linux.h
os/os-solaris.h
os/os.h
verify.c
verify.h

diff --git a/HOWTO b/HOWTO
index 708eca0037293e80cc681c17f20d6638dc5c4cbd..55662d34bae7d7cc2ed46a6b87d2a5fa971568ee 100644 (file)
--- a/HOWTO
+++ b/HOWTO
@@ -827,6 +827,16 @@ verify_fatal=bool  Normally fio will keep checking the entire contents
                before quitting on a block verification failure. If this
                option is set, fio will exit the job on the first observed
                failure.
                before quitting on a block verification failure. If this
                option is set, fio will exit the job on the first observed
                failure.
+
+verify_async=int       Fio will normally verify IO inline from the submitting
+               thread. This option takes an integer describing how many
+               async offload threads to create for IO verification instead,
+               causing fio to offload the duty of verifying IO contents
+               to one or more separate threads.
+
+verify_async_cpus=str  Tell fio to set the given CPU affinity on the
+               async IO verification threads. See cpus_allowed for the
+               format used.
                
 stonewall      Wait for preceeding jobs in the job file to exit, before
                starting this one. Can be used to insert serialization
                
 stonewall      Wait for preceeding jobs in the job file to exit, before
                starting this one. Can be used to insert serialization
diff --git a/file.h b/file.h
index 6aa5b502c99246d2293e1598616b382349aa5aa2..dc22d4e058a177f373131acb370c838d4441b88f 100644 (file)
--- a/file.h
+++ b/file.h
@@ -133,6 +133,7 @@ extern int __must_check pre_read_files(struct thread_data *);
 extern int add_file(struct thread_data *, const char *);
 extern void get_file(struct fio_file *);
 extern int __must_check put_file(struct thread_data *, struct fio_file *);
 extern int add_file(struct thread_data *, const char *);
 extern void get_file(struct fio_file *);
 extern int __must_check put_file(struct thread_data *, struct fio_file *);
+extern void put_file_log(struct thread_data *, struct fio_file *);
 extern void lock_file(struct thread_data *, struct fio_file *, enum fio_ddir);
 extern void unlock_file(struct thread_data *, struct fio_file *);
 extern void unlock_file_all(struct thread_data *, struct fio_file *);
 extern void lock_file(struct thread_data *, struct fio_file *, enum fio_ddir);
 extern void unlock_file(struct thread_data *, struct fio_file *);
 extern void unlock_file_all(struct thread_data *, struct fio_file *);
diff --git a/fio.1 b/fio.1
index 32993b61da4bccc9ba633da361bc1e47a68aaa08..7b1fc8029008e683b17cb4710ea586285e1c0cd8 100644 (file)
--- a/fio.1
+++ b/fio.1
@@ -598,6 +598,16 @@ Write the verification header for this number of bytes, which should divide
 If true, exit the job on the first observed verification failure.  Default:
 false.
 .TP
 If true, exit the job on the first observed verification failure.  Default:
 false.
 .TP
+.BI verify_async \fR=\fPint
+Fio will normally verify IO inline from the submitting thread. This option
+takes an integer describing how many async offload threads to create for IO
+verification instead, causing fio to offload the duty of verifying IO contents
+to one or more separate threads.
+.TP
+.BI verify_async_cpus \fR=\fPstr
+Tell fio to set the given CPU affinity on the async IO verification threads.
+See \fBcpus_allowed\fP for the format used.
+.TP
 .B stonewall
 Wait for preceeding jobs in the job file to exit before starting this one.
 \fBstonewall\fR implies \fBnew_group\fR.
 .B stonewall
 Wait for preceeding jobs in the job file to exit before starting this one.
 \fBstonewall\fR implies \fBnew_group\fR.
diff --git a/fio.c b/fio.c
index aabfee6544c2cda3c7228927500e77548d7440a6..fc6dd8aecbc0e11561dc23f8a0bbaec9a73565cc 100644 (file)
--- a/fio.c
+++ b/fio.c
@@ -472,7 +472,10 @@ static void do_verify(struct thread_data *td)
                        break;
                }
 
                        break;
                }
 
-               io_u->end_io = verify_io_u;
+               if (td->o.verify_async)
+                       io_u->end_io = verify_io_u_async;
+               else
+                       io_u->end_io = verify_io_u;
 
                ret = td_io_queue(td, io_u);
                switch (ret) {
 
                ret = td_io_queue(td, io_u);
                switch (ret) {
@@ -604,7 +607,10 @@ static void do_io(struct thread_data *td)
                 * a previously written file.
                 */
                if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_READ) {
                 * a previously written file.
                 */
                if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_READ) {
-                       io_u->end_io = verify_io_u;
+                       if (td->o.verify_async)
+                               io_u->end_io = verify_io_u_async;
+                       else
+                               io_u->end_io = verify_io_u;
                        td_set_runstate(td, TD_VERIFYING);
                } else if (in_ramp_time(td))
                        td_set_runstate(td, TD_RAMP);
                        td_set_runstate(td, TD_VERIFYING);
                } else if (in_ramp_time(td))
                        td_set_runstate(td, TD_RAMP);
@@ -992,6 +998,7 @@ static void *thread_main(void *data)
 {
        unsigned long long runtime[2], elapsed;
        struct thread_data *td = data;
 {
        unsigned long long runtime[2], elapsed;
        struct thread_data *td = data;
+       pthread_condattr_t attr;
        int clear_state;
 
        if (!td->o.use_thread)
        int clear_state;
 
        if (!td->o.use_thread)
@@ -1006,8 +1013,14 @@ static void *thread_main(void *data)
        INIT_FLIST_HEAD(&td->io_u_requeues);
        INIT_FLIST_HEAD(&td->io_log_list);
        INIT_FLIST_HEAD(&td->io_hist_list);
        INIT_FLIST_HEAD(&td->io_u_requeues);
        INIT_FLIST_HEAD(&td->io_log_list);
        INIT_FLIST_HEAD(&td->io_hist_list);
+       INIT_FLIST_HEAD(&td->verify_list);
+       pthread_mutex_init(&td->io_u_lock, NULL);
        td->io_hist_tree = RB_ROOT;
 
        td->io_hist_tree = RB_ROOT;
 
+       pthread_condattr_init(&attr);
+       pthread_cond_init(&td->verify_cond, &attr);
+       pthread_cond_init(&td->free_cond, &attr);
+
        td_set_runstate(td, TD_INITIALIZED);
        dprint(FD_MUTEX, "up startup_mutex\n");
        fio_mutex_up(startup_mutex);
        td_set_runstate(td, TD_INITIALIZED);
        dprint(FD_MUTEX, "up startup_mutex\n");
        fio_mutex_up(startup_mutex);
@@ -1031,7 +1044,10 @@ static void *thread_main(void *data)
        if (init_io_u(td))
                goto err;
 
        if (init_io_u(td))
                goto err;
 
-       if (td->o.cpumask_set && fio_setaffinity(td) == -1) {
+       if (td->o.verify_async && verify_async_init(td))
+               goto err;
+
+       if (td->o.cpumask_set && fio_setaffinity(td->pid, td->o.cpumask) == -1) {
                td_verror(td, errno, "cpu_set_affinity");
                goto err;
        }
                td_verror(td, errno, "cpu_set_affinity");
                goto err;
        }
@@ -1042,7 +1058,7 @@ static void *thread_main(void *data)
         */
        if (td->o.gtod_cpu) {
                fio_cpu_clear(&td->o.cpumask, td->o.gtod_cpu);
         */
        if (td->o.gtod_cpu) {
                fio_cpu_clear(&td->o.cpumask, td->o.gtod_cpu);
-               if (fio_setaffinity(td) == -1) {
+               if (fio_setaffinity(td->pid, td->o.cpumask) == -1) {
                        td_verror(td, errno, "cpu_set_affinity");
                        goto err;
                }
                        td_verror(td, errno, "cpu_set_affinity");
                        goto err;
                }
@@ -1184,6 +1200,9 @@ err:
                td_verror(td, ret, "fio_cpuset_exit");
        }
 
                td_verror(td, ret, "fio_cpuset_exit");
        }
 
+       if (td->o.verify_async)
+               verify_async_exit(td);
+
        /*
         * do this very late, it will log file closing as well
         */
        /*
         * do this very late, it will log file closing as well
         */
diff --git a/fio.h b/fio.h
index b19101c20a4924892610bd78b0f6d645f3b93e9b..fb70b465bec4e1da83a9359b8d2dbe39d84d5c24 100644 (file)
--- a/fio.h
+++ b/fio.h
@@ -174,6 +174,7 @@ struct thread_options {
        unsigned int verify_pattern;
        unsigned int verify_pattern_bytes;
        unsigned int verify_fatal;
        unsigned int verify_pattern;
        unsigned int verify_pattern_bytes;
        unsigned int verify_fatal;
+       unsigned int verify_async;
        unsigned int use_thread;
        unsigned int unlink;
        unsigned int do_disk_util;
        unsigned int use_thread;
        unsigned int unlink;
        unsigned int do_disk_util;
@@ -209,6 +210,8 @@ struct thread_options {
        unsigned int numjobs;
        os_cpu_mask_t cpumask;
        unsigned int cpumask_set;
        unsigned int numjobs;
        os_cpu_mask_t cpumask;
        unsigned int cpumask_set;
+       os_cpu_mask_t verify_cpumask;
+       unsigned int verify_cpumask_set;
        unsigned int iolog;
        unsigned int rwmixcycle;
        unsigned int rwmix[2];
        unsigned int iolog;
        unsigned int rwmixcycle;
        unsigned int rwmix[2];
@@ -319,6 +322,17 @@ struct thread_data {
        struct flist_head io_u_freelist;
        struct flist_head io_u_busylist;
        struct flist_head io_u_requeues;
        struct flist_head io_u_freelist;
        struct flist_head io_u_busylist;
        struct flist_head io_u_requeues;
+       pthread_mutex_t io_u_lock;
+       pthread_cond_t free_cond;
+
+       /*
+        * async verify offload
+        */
+       struct flist_head verify_list;
+       pthread_t *verify_threads;
+       unsigned int nr_verify_threads;
+       pthread_cond_t verify_cond;
+       int verify_thread_exit;
 
        /*
         * Rate state
 
        /*
         * Rate state
@@ -661,4 +675,26 @@ static inline int is_power_of_2(unsigned int val)
        return (val != 0 && ((val & (val - 1)) == 0));
 }
 
        return (val != 0 && ((val & (val - 1)) == 0));
 }
 
+/*
+ * We currently only need to do locking if we have verifier threads
+ * accessing our internal structures too
+ */
+static inline void td_io_u_lock(struct thread_data *td)
+{
+       if (td->o.verify_async)
+               pthread_mutex_lock(&td->io_u_lock);
+}
+
+static inline void td_io_u_unlock(struct thread_data *td)
+{
+       if (td->o.verify_async)
+               pthread_mutex_unlock(&td->io_u_lock);
+}
+
+static inline void td_io_u_free_notify(struct thread_data *td)
+{
+       if (td->o.verify_async)
+               pthread_cond_signal(&td->free_cond);
+}
+
 #endif
 #endif
diff --git a/io_u.c b/io_u.c
index 2e9dac0db6b4aa0adae673a3b21d26173e48da3e..654980481c8f7fbd5450991684ed6a171fdf8931 100644 (file)
--- a/io_u.c
+++ b/io_u.c
@@ -401,7 +401,7 @@ static enum fio_ddir get_rw_ddir(struct thread_data *td)
        return td->rwmix_ddir;
 }
 
        return td->rwmix_ddir;
 }
 
-static void put_file_log(struct thread_data *td, struct fio_file *f)
+void put_file_log(struct thread_data *td, struct fio_file *f)
 {
        int ret = put_file(td, f);
 
 {
        int ret = put_file(td, f);
 
@@ -411,16 +411,21 @@ static void put_file_log(struct thread_data *td, struct fio_file *f)
 
 void put_io_u(struct thread_data *td, struct io_u *io_u)
 {
 
 void put_io_u(struct thread_data *td, struct io_u *io_u)
 {
+       td_io_u_lock(td);
+
        assert((io_u->flags & IO_U_F_FREE) == 0);
        io_u->flags |= IO_U_F_FREE;
        assert((io_u->flags & IO_U_F_FREE) == 0);
        io_u->flags |= IO_U_F_FREE;
+       io_u->flags &= ~IO_U_F_FREE_DEF;
 
        if (io_u->file)
                put_file_log(td, io_u->file);
 
        io_u->file = NULL;
 
        if (io_u->file)
                put_file_log(td, io_u->file);
 
        io_u->file = NULL;
-       flist_del(&io_u->list);
+       flist_del_init(&io_u->list);
        flist_add(&io_u->list, &td->io_u_freelist);
        td->cur_depth--;
        flist_add(&io_u->list, &td->io_u_freelist);
        td->cur_depth--;
+       td_io_u_unlock(td);
+       td_io_u_free_notify(td);
 }
 
 void clear_io_u(struct thread_data *td, struct io_u *io_u)
 }
 
 void clear_io_u(struct thread_data *td, struct io_u *io_u)
@@ -435,6 +440,8 @@ void requeue_io_u(struct thread_data *td, struct io_u **io_u)
 
        dprint(FD_IO, "requeue %p\n", __io_u);
 
 
        dprint(FD_IO, "requeue %p\n", __io_u);
 
+       td_io_u_lock(td);
+
        __io_u->flags |= IO_U_F_FREE;
        if ((__io_u->flags & IO_U_F_FLIGHT) && !ddir_sync(__io_u->ddir))
                td->io_issues[__io_u->ddir]--;
        __io_u->flags |= IO_U_F_FREE;
        if ((__io_u->flags & IO_U_F_FLIGHT) && !ddir_sync(__io_u->ddir))
                td->io_issues[__io_u->ddir]--;
@@ -444,6 +451,7 @@ void requeue_io_u(struct thread_data *td, struct io_u **io_u)
        flist_del(&__io_u->list);
        flist_add_tail(&__io_u->list, &td->io_u_requeues);
        td->cur_depth--;
        flist_del(&__io_u->list);
        flist_add_tail(&__io_u->list, &td->io_u_requeues);
        td->cur_depth--;
+       td_io_u_unlock(td);
        *io_u = NULL;
 }
 
        *io_u = NULL;
 }
 
@@ -826,6 +834,9 @@ struct io_u *__get_io_u(struct thread_data *td)
 {
        struct io_u *io_u = NULL;
 
 {
        struct io_u *io_u = NULL;
 
+       td_io_u_lock(td);
+
+again:
        if (!flist_empty(&td->io_u_requeues))
                io_u = flist_entry(td->io_u_requeues.next, struct io_u, list);
        else if (!queue_full(td)) {
        if (!flist_empty(&td->io_u_requeues))
                io_u = flist_entry(td->io_u_requeues.next, struct io_u, list);
        else if (!queue_full(td)) {
@@ -837,9 +848,18 @@ struct io_u *__get_io_u(struct thread_data *td)
                io_u->end_io = NULL;
        }
 
                io_u->end_io = NULL;
        }
 
+       /*
+        * We ran out, wait for async verify threads to finish and return one
+        */
+       if (!io_u && td->o.verify_async) {
+               pthread_cond_wait(&td->free_cond, &td->io_u_lock);
+               goto again;
+       }
+
        if (io_u) {
                assert(io_u->flags & IO_U_F_FREE);
                io_u->flags &= ~IO_U_F_FREE;
        if (io_u) {
                assert(io_u->flags & IO_U_F_FREE);
                io_u->flags &= ~IO_U_F_FREE;
+               io_u->flags &= ~IO_U_F_FREE_DEF;
 
                io_u->error = 0;
                flist_del(&io_u->list);
 
                io_u->error = 0;
                flist_del(&io_u->list);
@@ -847,6 +867,7 @@ struct io_u *__get_io_u(struct thread_data *td)
                td->cur_depth++;
        }
 
                td->cur_depth++;
        }
 
+       td_io_u_unlock(td);
        return io_u;
 }
 
        return io_u;
 }
 
@@ -1042,7 +1063,9 @@ static void ios_completed(struct thread_data *td,
                io_u = td->io_ops->event(td, i);
 
                io_completed(td, io_u, icd);
                io_u = td->io_ops->event(td, i);
 
                io_completed(td, io_u, icd);
-               put_io_u(td, io_u);
+
+               if (!(io_u->flags & IO_U_F_FREE_DEF))
+                       put_io_u(td, io_u);
        }
 }
 
        }
 }
 
@@ -1056,7 +1079,9 @@ int io_u_sync_complete(struct thread_data *td, struct io_u *io_u,
 
        init_icd(td, &icd, 1);
        io_completed(td, io_u, &icd);
 
        init_icd(td, &icd, 1);
        io_completed(td, io_u, &icd);
-       put_io_u(td, io_u);
+
+       if (!(io_u->flags & IO_U_F_FREE_DEF))
+               put_io_u(td, io_u);
 
        if (icd.error) {
                td_verror(td, icd.error, "io_u_sync_complete");
 
        if (icd.error) {
                td_verror(td, icd.error, "io_u_sync_complete");
index f9777999d0e45aff2849683d1edde5b2b3edb1df..3df0944a30ed30cec9a54b9d4fa464506e311885 100644 (file)
@@ -6,6 +6,7 @@
 enum {
        IO_U_F_FREE     = 1 << 0,
        IO_U_F_FLIGHT   = 1 << 1,
 enum {
        IO_U_F_FREE     = 1 << 0,
        IO_U_F_FLIGHT   = 1 << 1,
+       IO_U_F_FREE_DEF = 1 << 2,
 };
 
 /*
 };
 
 /*
index 87515ef13313da1d0cd0c2282306e75e6cf011ed..0954ccdb2bd9138b45ad36d9b7cd25e0d8a2bdd3 100644 (file)
--- a/options.c
+++ b/options.c
@@ -303,14 +303,14 @@ static int str_cpumask_cb(void *data, unsigned int *val)
        return 0;
 }
 
        return 0;
 }
 
-static int str_cpus_allowed_cb(void *data, const char *input)
+static int set_cpus_allowed(struct thread_data *td, os_cpu_mask_t *mask,
+                           const char *input)
 {
 {
-       struct thread_data *td = data;
        char *cpu, *str, *p;
        long max_cpu;
        int ret = 0;
 
        char *cpu, *str, *p;
        long max_cpu;
        int ret = 0;
 
-       ret = fio_cpuset_init(&td->o.cpumask);
+       ret = fio_cpuset_init(mask);
        if (ret < 0) {
                log_err("fio: cpuset_init failed\n");
                td_verror(td, ret, "fio_cpuset_init");
        if (ret < 0) {
                log_err("fio: cpuset_init failed\n");
                td_verror(td, ret, "fio_cpuset_init");
@@ -358,7 +358,7 @@ static int str_cpus_allowed_cb(void *data, const char *input)
                        }
        
                        dprint(FD_PARSE, "set cpu allowed %d\n", icpu);
                        }
        
                        dprint(FD_PARSE, "set cpu allowed %d\n", icpu);
-                       fio_cpu_set(&td->o.cpumask, icpu);
+                       fio_cpu_set(mask, icpu);
                        icpu++;
                }
                if (ret)
                        icpu++;
                }
                if (ret)
@@ -370,6 +370,30 @@ static int str_cpus_allowed_cb(void *data, const char *input)
                td->o.cpumask_set = 1;
        return ret;
 }
                td->o.cpumask_set = 1;
        return ret;
 }
+
+static int str_cpus_allowed_cb(void *data, const char *input)
+{
+       struct thread_data *td = data;
+       int ret;
+
+       ret = set_cpus_allowed(td, &td->o.cpumask, input);
+       if (!ret)
+               td->o.cpumask_set = 1;
+
+       return ret;
+}
+
+static int str_verify_cpus_allowed_cb(void *data, const char *input)
+{
+       struct thread_data *td = data;
+       int ret;
+
+       ret = set_cpus_allowed(td, &td->o.verify_cpumask, input);
+       if (!ret)
+               td->o.verify_cpumask_set = 1;
+
+       return ret;
+}
 #endif
 
 static int str_fst_cb(void *data, const char *str)
 #endif
 
 static int str_fst_cb(void *data, const char *str)
@@ -1180,6 +1204,23 @@ static struct fio_option options[] = {
                .help   = "Exit on a single verify failure, don't continue",
                .parent = "verify",
        },
                .help   = "Exit on a single verify failure, don't continue",
                .parent = "verify",
        },
+       {
+               .name   = "verify_async",
+               .type   = FIO_OPT_INT,
+               .off1   = td_var_offset(verify_async),
+               .def    = "0",
+               .help   = "Number of async verifier threads to use",
+               .parent = "verify",
+       },
+#ifdef FIO_HAVE_CPU_AFFINITY
+       {
+               .name   = "verify_async_cpus",
+               .type   = FIO_OPT_STR,
+               .cb     = str_verify_cpus_allowed_cb,
+               .help   = "Set CPUs allowed for async verify threads",
+               .parent = "verify_async",
+       },
+#endif
        {
                .name   = "write_iolog",
                .type   = FIO_OPT_STR_STORE,
        {
                .name   = "write_iolog",
                .type   = FIO_OPT_STR_STORE,
index b766cbf9503af73a144e5cddf1f542ed8e13e3b4..dd9c5aaf493ce6ff403e5b1ec92c5390d7b82bb1 100644 (file)
@@ -55,13 +55,13 @@ typedef struct drand48_data os_random_state_t;
  * the affinity helpers to work.
  */
 #ifndef GLIBC_2_3_2
  * the affinity helpers to work.
  */
 #ifndef GLIBC_2_3_2
-#define fio_setaffinity(td)            \
-       sched_setaffinity((td)->pid, sizeof((td)->o.cpumask), &(td)->o.cpumask)
+#define fio_setaffinity(pid, cpumask)          \
+       sched_setaffinity((pid), sizeof(cpumask), &(cpumask))
 #define fio_getaffinity(pid, ptr)      \
        sched_getaffinity((pid), sizeof(cpu_set_t), (ptr))
 #else
 #define fio_getaffinity(pid, ptr)      \
        sched_getaffinity((pid), sizeof(cpu_set_t), (ptr))
 #else
-#define fio_setaffinity(td)            \
-       sched_setaffinity((td)->pid, &(td)->o.cpumask)
+#define fio_setaffinity(pid, cpumask)  \
+       sched_setaffinity((pid), &(cpumask))
 #define fio_getaffinity(pid, ptr)      \
        sched_getaffinity((pid), (ptr))
 #endif
 #define fio_getaffinity(pid, ptr)      \
        sched_getaffinity((pid), (ptr))
 #endif
index b58d1308efc49e4bd1cf0d530256f90e9e076afd..56729565b127631b2061258c7a9e373e47da7f21 100644 (file)
@@ -69,8 +69,8 @@ static inline int fio_set_odirect(int fd)
 /*
  * pset binding hooks for fio
  */
 /*
  * pset binding hooks for fio
  */
-#define fio_setaffinity(td)            \
-       pset_bind((td)->o.cpumask, P_PID, (td)->pid, NULL)
+#define fio_setaffinity(pid, cpumask)          \
+       pset_bind(&(cpumask), P_PID, (pid), NULL)
 #define fio_getaffinity(pid, ptr)      ({ 0; })
 
 #define fio_cpu_clear(mask, cpu)       pset_assign(PS_NONE, (cpu), NULL)
 #define fio_getaffinity(pid, ptr)      ({ 0; })
 
 #define fio_cpu_clear(mask, cpu)       pset_assign(PS_NONE, (cpu), NULL)
diff --git a/os/os.h b/os/os.h
index dbf095711c08344e054966512dae3f41d8c75280..10e796fbcbe3313d1faee845bc26d751645aa011 100644 (file)
--- a/os/os.h
+++ b/os/os.h
@@ -39,7 +39,7 @@
 #endif /* FIO_HAVE_FADVISE */
 
 #ifndef FIO_HAVE_CPU_AFFINITY
 #endif /* FIO_HAVE_FADVISE */
 
 #ifndef FIO_HAVE_CPU_AFFINITY
-#define fio_setaffinity(td)            (0)
+#define fio_setaffinity(pid, mask)     (0)
 #define fio_getaffinity(pid, mask)     do { } while (0)
 #define fio_cpu_clear(mask, cpu)       do { } while (0)
 #define fio_cpuset_exit(mask)          (-1)
 #define fio_getaffinity(pid, mask)     do { } while (0)
 #define fio_cpu_clear(mask, cpu)       do { } while (0)
 #define fio_cpuset_exit(mask)          (-1)
index 2ae74b93c28e7e6a2c7c71165a5dc98a76b876ea..5dd9ee339b3858ebe630c67ad4c11e80248b683f 100644 (file)
--- a/verify.c
+++ b/verify.c
@@ -5,9 +5,11 @@
 #include <fcntl.h>
 #include <string.h>
 #include <assert.h>
 #include <fcntl.h>
 #include <string.h>
 #include <assert.h>
+#include <pthread.h>
 
 #include "fio.h"
 #include "verify.h"
 
 #include "fio.h"
 #include "verify.h"
+#include "smalloc.h"
 
 #include "crc/md5.h"
 #include "crc/crc64.h"
 
 #include "crc/md5.h"
 #include "crc/crc64.h"
@@ -417,6 +419,26 @@ int verify_io_u_pattern(unsigned long pattern, unsigned long pattern_size,
        return 0;
 }
 
        return 0;
 }
 
+/*
+ * Push IO verification to a separate thread
+ */
+int verify_io_u_async(struct thread_data *td, struct io_u *io_u)
+{
+       if (io_u->file)
+               put_file_log(td, io_u->file);
+
+       io_u->file = NULL;
+
+       pthread_mutex_lock(&td->io_u_lock);
+       flist_del(&io_u->list);
+       flist_add_tail(&io_u->list, &td->verify_list);
+       pthread_mutex_unlock(&td->io_u_lock);
+
+       pthread_cond_signal(&td->verify_cond);
+       io_u->flags |= IO_U_F_FREE_DEF;
+       return 0;
+}
+
 int verify_io_u(struct thread_data *td, struct io_u *io_u)
 {
        struct verify_header *hdr;
 int verify_io_u(struct thread_data *td, struct io_u *io_u)
 {
        struct verify_header *hdr;
@@ -720,3 +742,103 @@ int get_next_verify(struct thread_data *td, struct io_u *io_u)
        dprint(FD_VERIFY, "get_next_verify: empty\n");
        return 1;
 }
        dprint(FD_VERIFY, "get_next_verify: empty\n");
        return 1;
 }
+
+static void *verify_async_thread(void *data)
+{
+       struct thread_data *td = data;
+       struct io_u *io_u;
+       int ret = 0;
+
+       if (td->o.verify_cpumask_set &&
+           fio_setaffinity(td->pid, td->o.verify_cpumask)) {
+               log_err("fio: failed setting verify thread affinity\n");
+               goto done;
+       }
+
+       do {
+               read_barrier();
+               if (td->verify_thread_exit)
+                       break;
+
+               pthread_mutex_lock(&td->io_u_lock);
+
+               while (flist_empty(&td->verify_list) &&
+                      !td->verify_thread_exit) {
+                       ret = pthread_cond_wait(&td->verify_cond, &td->io_u_lock);
+                       if (ret) {
+                               pthread_mutex_unlock(&td->io_u_lock);
+                               break;
+                       }
+               }
+
+               if (flist_empty(&td->verify_list)) {
+                       pthread_mutex_unlock(&td->io_u_lock);
+                       continue;
+               }
+
+               io_u = flist_entry(td->verify_list.next, struct io_u, list);
+               flist_del_init(&io_u->list);
+               pthread_mutex_unlock(&td->io_u_lock);
+
+               ret = verify_io_u(td, io_u);
+               put_io_u(td, io_u);
+       } while (!ret);
+
+done:
+       pthread_mutex_lock(&td->io_u_lock);
+       td->nr_verify_threads--;
+       pthread_mutex_unlock(&td->io_u_lock);
+
+       pthread_cond_signal(&td->free_cond);
+       return NULL;
+}
+
+int verify_async_init(struct thread_data *td)
+{
+       int i, ret;
+
+       td->verify_thread_exit = 0;
+
+       td->verify_threads = malloc(sizeof(pthread_t) * td->o.verify_async);
+       for (i = 0; i < td->o.verify_async; i++) {
+               ret = pthread_create(&td->verify_threads[i], NULL,
+                                       verify_async_thread, td);
+               if (ret) {
+                       log_err("fio: async verify creation failed: %s\n",
+                                       strerror(ret));
+                       break;
+               }
+               ret = pthread_detach(td->verify_threads[i]);
+               if (ret) {
+                       log_err("fio: async verify thread detach failed: %s\n",
+                                       strerror(ret));
+                       break;
+               }
+               td->nr_verify_threads++;
+       }
+
+       if (i != td->o.verify_async) {
+               td->verify_thread_exit = 1;
+               write_barrier();
+               pthread_cond_broadcast(&td->verify_cond);
+               return 1;
+       }
+
+       return 0;
+}
+
+void verify_async_exit(struct thread_data *td)
+{
+       td->verify_thread_exit = 1;
+       write_barrier();
+       pthread_cond_broadcast(&td->verify_cond);
+
+       pthread_mutex_lock(&td->io_u_lock);
+
+       while (td->nr_verify_threads)
+               pthread_cond_wait(&td->free_cond, &td->io_u_lock);
+
+       pthread_mutex_unlock(&td->io_u_lock);
+       free(td->verify_threads);
+       td->verify_threads = NULL;
+}
index 76d256d1d2b4d8fd7e42e445704ba746e0de538c..50c8e4328b44960b1056ed733589ad269fa656c7 100644 (file)
--- a/verify.h
+++ b/verify.h
@@ -64,5 +64,12 @@ struct vhdr_meta {
 extern void populate_verify_io_u(struct thread_data *, struct io_u *);
 extern int __must_check get_next_verify(struct thread_data *td, struct io_u *);
 extern int __must_check verify_io_u(struct thread_data *, struct io_u *);
 extern void populate_verify_io_u(struct thread_data *, struct io_u *);
 extern int __must_check get_next_verify(struct thread_data *td, struct io_u *);
 extern int __must_check verify_io_u(struct thread_data *, struct io_u *);
+extern int verify_io_u_async(struct thread_data *, struct io_u *);
+
+/*
+ * Async verify offload
+ */
+extern int verify_async_init(struct thread_data *);
+extern void verify_async_exit(struct thread_data *);
 
 #endif
 
 #endif