First cut at supporting IO offload
authorJens Axboe <axboe@fb.com>
Fri, 20 Mar 2015 04:50:06 +0000 (22:50 -0600)
committerJens Axboe <axboe@fb.com>
Wed, 15 Apr 2015 15:50:11 +0000 (09:50 -0600)
rate_iops=x
io_submit_mode=offload

Signed-off-by: Jens Axboe <axboe@fb.com>
14 files changed:
Makefile
backend.c
cconv.c
fio.h
init.c
io_u.c
ioengine.h
ioengines.c
options.c
stat.c
thread_options.h
verify.c
workqueue.c [new file with mode: 0644]
workqueue.h [new file with mode: 0644]

index 4202ed80f2acb7ab12da6b8d7007107bd299e7a8..9b7f27ab735978fc1faa3dda5c2a09556a438ca5 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -36,7 +36,7 @@ SOURCE := gettime.c ioengines.c init.c stat.c log.c time.c filesetup.c \
                lib/lfsr.c gettime-thread.c helpers.c lib/flist_sort.c \
                lib/hweight.c lib/getrusage.c idletime.c td_error.c \
                profiles/tiobench.c profiles/act.c io_u_queue.c filelock.c \
-               lib/tp.c lib/bloom.c lib/gauss.c
+               lib/tp.c lib/bloom.c lib/gauss.c workqueue.c
 
 ifdef CONFIG_LIBHDFS
   HDFSFLAGS= -I $(JAVA_HOME)/include -I $(JAVA_HOME)/include/linux -I $(FIO_LIBHDFS_INCLUDE)
index f82b83f75ee4d7aa09e99079d2a427c531506a2f..65a3e184a6d77d7d5439f23c9a4c17813a405326 100644 (file)
--- a/backend.c
+++ b/backend.c
@@ -54,6 +54,7 @@
 #include "idletime.h"
 #include "err.h"
 #include "lib/tp.h"
+#include "workqueue.h"
 
 static pthread_t helper_thread;
 static pthread_mutex_t helper_lock;
@@ -623,7 +624,7 @@ static void do_verify(struct thread_data *td, uint64_t verify_bytes)
                                        continue;
                                } else if (io_u->ddir == DDIR_TRIM) {
                                        io_u->ddir = DDIR_READ;
-                                       io_u->flags |= IO_U_F_TRIMMED;
+                                       io_u_set(io_u, IO_U_F_TRIMMED);
                                        break;
                                } else if (io_u->ddir == DDIR_WRITE) {
                                        io_u->ddir = DDIR_READ;
@@ -868,21 +869,27 @@ static uint64_t do_io(struct thread_data *td)
                    !td->o.experimental_verify)
                        log_io_piece(td, io_u);
 
-               ret = td_io_queue(td, io_u);
+               if (td->o.io_submit_mode == IO_MODE_OFFLOAD) {
+                       if (td->error)
+                               break;
+                       ret = workqueue_enqueue(&td->io_wq, io_u);
+               } else {
+                       ret = td_io_queue(td, io_u);
 
-               if (io_queue_event(td, io_u, &ret, ddir, &bytes_issued, 1, &comp_time))
-                       break;
+                       if (io_queue_event(td, io_u, &ret, ddir, &bytes_issued, 1, &comp_time))
+                               break;
 
-               /*
-                * See if we need to complete some commands. Note that we
-                * can get BUSY even without IO queued, if the system is
-                * resource starved.
-                */
+                       /*
+                        * See if we need to complete some commands. Note that
+                        * we can get BUSY even without IO queued, if the
+                        * system is resource starved.
+                        */
 reap:
-               full = queue_full(td) ||
-                       (ret == FIO_Q_BUSY && td->cur_depth);
-               if (full || !td->o.iodepth_batch_complete)
-                       ret = wait_for_completions(td, &comp_time);
+                       full = queue_full(td) ||
+                               (ret == FIO_Q_BUSY && td->cur_depth);
+                       if (full || !td->o.iodepth_batch_complete)
+                               ret = wait_for_completions(td, &comp_time);
+               }
                if (ret < 0)
                        break;
                if (!ddir_rw_sum(td->bytes_done) &&
@@ -931,7 +938,12 @@ reap:
        if (!td->error) {
                struct fio_file *f;
 
-               i = td->cur_depth;
+               if (td->o.io_submit_mode == IO_MODE_OFFLOAD) {
+                       workqueue_flush(&td->io_wq);
+                       i = 0;
+               } else
+                       i = td->cur_depth;
+
                if (i) {
                        ret = io_u_queued_complete(td, i);
                        if (td->o.fill_device && td->error == ENOSPC)
@@ -1242,7 +1254,7 @@ static uint64_t do_dry_run(struct thread_data *td)
                if (!io_u)
                        break;
 
-               io_u->flags |= IO_U_F_FLIGHT;
+               io_u_set(io_u, IO_U_F_FLIGHT);
                io_u->error = 0;
                io_u->resid = 0;
                if (ddir_rw(acct_ddir(io_u)))
@@ -1265,6 +1277,29 @@ static uint64_t do_dry_run(struct thread_data *td)
        return td->bytes_done[DDIR_WRITE] + td->bytes_done[DDIR_TRIM];
 }
 
+static void io_workqueue_fn(struct thread_data *td, struct io_u *io_u)
+{
+       const enum fio_ddir ddir = io_u->ddir;
+       int ret;
+
+       dprint(FD_RATE, "io_u %p queued by %u\n", io_u, gettid());
+
+       io_u_set(io_u, IO_U_F_NO_FILE_PUT);
+
+       td->cur_depth++;
+
+       ret = td_io_queue(td, io_u);
+
+       dprint(FD_RATE, "io_u %p ret %d by %u\n", io_u, ret, gettid());
+
+       io_queue_event(td, io_u, &ret, ddir, NULL, 0, NULL);
+
+       if (ret == FIO_Q_QUEUED)
+               ret = io_u_queued_complete(td, 1);
+
+       td->cur_depth--;
+}
+
 /*
  * Entry point for the thread based jobs. The process based jobs end up
  * here as well, after a little setup.
@@ -1462,6 +1497,10 @@ static void *thread_main(void *data)
 
        fio_verify_init(td);
 
+       if ((o->io_submit_mode == IO_MODE_OFFLOAD) &&
+           workqueue_init(td, &td->io_wq, io_workqueue_fn, td->o.iodepth))
+               goto err;
+
        fio_gettime(&td->epoch, NULL);
        fio_getrusage(&td->ru_start);
        clear_state = 0;
@@ -1570,6 +1609,9 @@ static void *thread_main(void *data)
 
        fio_writeout_logs(td);
 
+       if (o->io_submit_mode == IO_MODE_OFFLOAD)
+               workqueue_exit(&td->io_wq);
+
        if (td->flags & TD_F_COMPRESS_LOG)
                tp_exit(&td->tp_data);
 
diff --git a/cconv.c b/cconv.c
index 68f119f18c2f4312a5dfca7d9b850bebb80c8b9b..976059cde61ab1c298f6ca41949b054c686c2d5f 100644 (file)
--- a/cconv.c
+++ b/cconv.c
@@ -116,6 +116,7 @@ void convert_thread_options_to_cpu(struct thread_options *o,
        }
 
        o->ratecycle = le32_to_cpu(top->ratecycle);
+       o->io_submit_mode = le32_to_cpu(top->io_submit_mode);
        o->nr_files = le32_to_cpu(top->nr_files);
        o->open_files = le32_to_cpu(top->open_files);
        o->file_lock_mode = le32_to_cpu(top->file_lock_mode);
@@ -295,6 +296,7 @@ void convert_thread_options_to_net(struct thread_options_pack *top,
        top->fill_device = cpu_to_le32(o->fill_device);
        top->file_append = cpu_to_le32(o->file_append);
        top->ratecycle = cpu_to_le32(o->ratecycle);
+       top->io_submit_mode = cpu_to_le32(o->io_submit_mode);
        top->nr_files = cpu_to_le32(o->nr_files);
        top->open_files = cpu_to_le32(o->open_files);
        top->file_lock_mode = cpu_to_le32(o->file_lock_mode);
diff --git a/fio.h b/fio.h
index 446482354fd909125c911defe6f27b9ceb83fee9..a4637bb9101ca5cdaaf905687006a714804c29f9 100644 (file)
--- a/fio.h
+++ b/fio.h
@@ -40,6 +40,7 @@
 #include "stat.h"
 #include "flow.h"
 #include "io_u_queue.h"
+#include "workqueue.h"
 
 #ifdef CONFIG_SOLARISAIO
 #include <sys/asynch.h>
@@ -75,6 +76,8 @@ enum {
        TD_F_NOIO               = 256,
        TD_F_COMPRESS_LOG       = 512,
        TD_F_VSTATE_SAVED       = 1024,
+       TD_F_NEED_LOCK          = 2048,
+       TD_F_CHILD              = 4096,
 };
 
 enum {
@@ -94,6 +97,11 @@ enum {
        FIO_RAND_NR_OFFS,
 };
 
+enum {
+       IO_MODE_INLINE = 0,
+       IO_MODE_OFFLOAD,
+};
+
 /*
  * This describes a single thread/process executing a fio job.
  */
@@ -118,6 +126,8 @@ struct thread_data {
 
        struct tp_data *tp_data;
 
+       struct thread_data *parent;
+
        uint64_t stat_io_bytes[DDIR_RWDIR_CNT];
        struct timeval bw_sample_time;
 
@@ -232,6 +242,11 @@ struct thread_data {
        unsigned long rate_blocks[DDIR_RWDIR_CNT];
        struct timeval lastrate[DDIR_RWDIR_CNT];
 
+       /*
+        * Enforced rate submission/completion workqueue
+        */
+       struct workqueue io_wq;
+
        uint64_t total_io_size;
        uint64_t fill_device_size;
 
@@ -365,12 +380,23 @@ enum {
        } while (0)
 
 
-#define td_clear_error(td)             \
-       (td)->error = 0;
-#define td_verror(td, err, func)       \
-       __td_verror((td), (err), strerror((err)), (func))
-#define td_vmsg(td, err, msg, func)    \
-       __td_verror((td), (err), (msg), (func))
+#define td_clear_error(td)             do {            \
+       (td)->error = 0;                                \
+       if ((td)->parent)                               \
+               (td)->parent->error = 0;                \
+} while (0)
+
+#define td_verror(td, err, func)       do {                    \
+       __td_verror((td), (err), strerror((err)), (func));      \
+       if ((td)->parent)                                       \
+               __td_verror((td)->parent, (err), strerror((err)), (func)); \
+} while (0)
+
+#define td_vmsg(td, err, msg, func)    do {                    \
+       __td_verror((td), (err), (msg), (func));                \
+       if ((td)->parent)                                       \
+               __td_verror((td)->parent, (err), (msg), (func));        \
+} while (0)
 
 #define __fio_stringify_1(x)   #x
 #define __fio_stringify(x)     __fio_stringify_1(x)
@@ -610,25 +636,30 @@ static inline int is_power_of_2(uint64_t val)
        return (val != 0 && ((val & (val - 1)) == 0));
 }
 
+static inline int td_async_processing(struct thread_data *td)
+{
+       return (td->flags & TD_F_NEED_LOCK) != 0;
+}
+
 /*
  * We currently only need to do locking if we have verifier threads
  * accessing our internal structures too
  */
 static inline void td_io_u_lock(struct thread_data *td)
 {
-       if (td->o.verify_async)
+       if (td_async_processing(td))
                pthread_mutex_lock(&td->io_u_lock);
 }
 
 static inline void td_io_u_unlock(struct thread_data *td)
 {
-       if (td->o.verify_async)
+       if (td_async_processing(td))
                pthread_mutex_unlock(&td->io_u_lock);
 }
 
 static inline void td_io_u_free_notify(struct thread_data *td)
 {
-       if (td->o.verify_async)
+       if (td_async_processing(td))
                pthread_cond_signal(&td->free_cond);
 }
 
diff --git a/init.c b/init.c
index a126f79f1b9c1a4b20bd388f71013d35179e2b86..1a5d4c9ed0919648f6f5c79c009c599b4f3386a6 100644 (file)
--- a/init.c
+++ b/init.c
@@ -956,6 +956,9 @@ static void init_flags(struct thread_data *td)
                td->flags |= TD_F_SCRAMBLE_BUFFERS;
        if (o->verify != VERIFY_NONE)
                td->flags |= TD_F_VER_NONE;
+
+       if (o->verify_async || o->io_submit_mode == IO_MODE_OFFLOAD)
+               td->flags |= TD_F_NEED_LOCK;
 }
 
 static int setup_random_seeds(struct thread_data *td)
diff --git a/io_u.c b/io_u.c
index e4fcfd8332570e549da2d0475e63ca8cb4d4df64..ba3f7ca00fcd1db4f657f5dcf7321fc8c9f7ca0e 100644 (file)
--- a/io_u.c
+++ b/io_u.c
@@ -326,7 +326,7 @@ static int get_next_block(struct thread_data *td, struct io_u *io_u,
                                *is_random = 1;
                        } else {
                                *is_random = 0;
-                               io_u->flags |= IO_U_F_BUSY_OK;
+                               io_u_set(io_u, IO_U_F_BUSY_OK);
                                ret = get_next_seq_offset(td, f, ddir, &offset);
                                if (ret)
                                        ret = get_next_rand_block(td, f, ddir, &b);
@@ -336,7 +336,7 @@ static int get_next_block(struct thread_data *td, struct io_u *io_u,
                        ret = get_next_seq_offset(td, f, ddir, &offset);
                }
        } else {
-               io_u->flags |= IO_U_F_BUSY_OK;
+               io_u_set(io_u, IO_U_F_BUSY_OK);
                *is_random = 0;
 
                if (td->o.rw_seq == RW_SEQ_SEQ) {
@@ -591,7 +591,8 @@ static enum fio_ddir rate_ddir(struct thread_data *td, enum fio_ddir ddir)
        } else
                usec = td->rate_pending_usleep[ddir];
 
-       io_u_quiesce(td);
+       if (td->o.io_submit_mode == IO_MODE_INLINE)
+               io_u_quiesce(td);
 
        usec = usec_sleep(td, usec);
 
@@ -684,7 +685,7 @@ static void set_rw_ddir(struct thread_data *td, struct io_u *io_u)
            td->o.barrier_blocks &&
           !(td->io_issues[DDIR_WRITE] % td->o.barrier_blocks) &&
             td->io_issues[DDIR_WRITE])
-               io_u->flags |= IO_U_F_BARRIER;
+               io_u_set(io_u, IO_U_F_BARRIER);
 }
 
 void put_file_log(struct thread_data *td, struct fio_file *f)
@@ -697,16 +698,21 @@ void put_file_log(struct thread_data *td, struct fio_file *f)
 
 void put_io_u(struct thread_data *td, struct io_u *io_u)
 {
+       if (td->parent)
+               td = td->parent;
+
        td_io_u_lock(td);
 
        if (io_u->file && !(io_u->flags & IO_U_F_NO_FILE_PUT))
                put_file_log(td, io_u->file);
 
        io_u->file = NULL;
-       io_u->flags |= IO_U_F_FREE;
+       io_u_set(io_u, IO_U_F_FREE);
 
-       if (io_u->flags & IO_U_F_IN_CUR_DEPTH)
+       if (io_u->flags & IO_U_F_IN_CUR_DEPTH) {
                td->cur_depth--;
+               assert(!(td->flags & TD_F_CHILD));
+       }
        io_u_qpush(&td->io_u_freelist, io_u);
        td_io_u_unlock(td);
        td_io_u_free_notify(td);
@@ -714,7 +720,7 @@ void put_io_u(struct thread_data *td, struct io_u *io_u)
 
 void clear_io_u(struct thread_data *td, struct io_u *io_u)
 {
-       io_u->flags &= ~IO_U_F_FLIGHT;
+       io_u_clear(io_u, IO_U_F_FLIGHT);
        put_io_u(td, io_u);
 }
 
@@ -725,18 +731,24 @@ void requeue_io_u(struct thread_data *td, struct io_u **io_u)
 
        dprint(FD_IO, "requeue %p\n", __io_u);
 
+       if (td->parent)
+               td = td->parent;
+
        td_io_u_lock(td);
 
-       __io_u->flags |= IO_U_F_FREE;
+       io_u_set(__io_u, IO_U_F_FREE);
        if ((__io_u->flags & IO_U_F_FLIGHT) && ddir_rw(ddir))
                td->io_issues[ddir]--;
 
-       __io_u->flags &= ~IO_U_F_FLIGHT;
-       if (__io_u->flags & IO_U_F_IN_CUR_DEPTH)
+       io_u_clear(__io_u, IO_U_F_FLIGHT);
+       if (__io_u->flags & IO_U_F_IN_CUR_DEPTH) {
                td->cur_depth--;
+               assert(!(td->flags & TD_F_CHILD));
+       }
 
        io_u_rpush(&td->io_u_requeues, __io_u);
        td_io_u_unlock(td);
+       td_io_u_free_notify(td);
        *io_u = NULL;
 }
 
@@ -1329,21 +1341,23 @@ again:
 
        if (io_u) {
                assert(io_u->flags & IO_U_F_FREE);
-               io_u->flags &= ~(IO_U_F_FREE | IO_U_F_NO_FILE_PUT |
+               io_u_clear(io_u, IO_U_F_FREE | IO_U_F_NO_FILE_PUT |
                                 IO_U_F_TRIMMED | IO_U_F_BARRIER |
                                 IO_U_F_VER_LIST);
 
                io_u->error = 0;
                io_u->acct_ddir = -1;
                td->cur_depth++;
-               io_u->flags |= IO_U_F_IN_CUR_DEPTH;
+               assert(!(td->flags & TD_F_CHILD));
+               io_u_set(io_u, IO_U_F_IN_CUR_DEPTH);
                io_u->ipo = NULL;
-       } else if (td->o.verify_async) {
+       } else if (td_async_processing(td)) {
                /*
                 * We ran out, wait for async verify threads to finish and
                 * return one
                 */
-               pthread_cond_wait(&td->free_cond, &td->io_u_lock);
+               assert(!(td->flags & TD_F_CHILD));
+               assert(!pthread_cond_wait(&td->free_cond, &td->io_u_lock));
                goto again;
        }
 
@@ -1542,7 +1556,7 @@ err_put:
        return ERR_PTR(ret);
 }
 
-void io_u_log_error(struct thread_data *td, struct io_u *io_u)
+static void __io_u_log_error(struct thread_data *td, struct io_u *io_u)
 {
        enum error_type_bit eb = td_error_type(io_u->ddir, io_u->error);
 
@@ -1560,6 +1574,13 @@ void io_u_log_error(struct thread_data *td, struct io_u *io_u)
                td_verror(td, io_u->error, "io_u error");
 }
 
+void io_u_log_error(struct thread_data *td, struct io_u *io_u)
+{
+       __io_u_log_error(td, io_u);
+       if (td->parent)
+               __io_u_log_error(td, io_u);
+}
+
 static inline int gtod_reduce(struct thread_data *td)
 {
        return td->o.disable_clat && td->o.disable_lat && td->o.disable_slat
@@ -1570,9 +1591,10 @@ static void account_io_completion(struct thread_data *td, struct io_u *io_u,
                                  struct io_completion_data *icd,
                                  const enum fio_ddir idx, unsigned int bytes)
 {
+       const int no_reduce = !gtod_reduce(td);
        unsigned long lusec = 0;
 
-       if (!gtod_reduce(td))
+       if (no_reduce)
                lusec = utime_since(&io_u->issue_time, &icd->time);
 
        if (!td->o.disable_lat) {
@@ -1601,10 +1623,13 @@ static void account_io_completion(struct thread_data *td, struct io_u *io_u,
                io_u_mark_latency(td, lusec);
        }
 
+       if (td->parent)
+               td = td->parent;
+
        if (!td->o.disable_bw)
                add_bw_sample(td, idx, bytes, &icd->time);
 
-       if (!gtod_reduce(td))
+       if (no_reduce)
                add_iops_sample(td, idx, bytes, &icd->time);
 
        if (td->ts.nr_block_infos && io_u->ddir == DDIR_TRIM) {
@@ -1625,6 +1650,7 @@ static long long usec_for_io(struct thread_data *td, enum fio_ddir ddir)
 {
        uint64_t secs, remainder, bps, bytes;
 
+       assert(!(td->flags & TD_F_CHILD));
        bytes = td->this_io_bytes[ddir];
        bps = td->rate_bps[ddir];
        secs = bytes / bps;
@@ -1641,9 +1667,8 @@ static void io_completed(struct thread_data *td, struct io_u **io_u_ptr,
 
        dprint_io_u(io_u, "io complete");
 
-       td_io_u_lock(td);
        assert(io_u->flags & IO_U_F_FLIGHT);
-       io_u->flags &= ~(IO_U_F_FLIGHT | IO_U_F_BUSY_OK);
+       io_u_clear(io_u, IO_U_F_FLIGHT | IO_U_F_BUSY_OK);
 
        /*
         * Mark IO ok to verify
@@ -1660,8 +1685,6 @@ static void io_completed(struct thread_data *td, struct io_u **io_u_ptr,
                }
        }
 
-       td_io_u_unlock(td);
-
        if (ddir_sync(ddir)) {
                td->last_was_sync = 1;
                if (f) {
@@ -1706,18 +1729,23 @@ static void io_completed(struct thread_data *td, struct io_u **io_u_ptr,
 
                if (ramp_time_over(td) && (td->runstate == TD_RUNNING ||
                                           td->runstate == TD_VERIFYING)) {
+                       struct thread_data *__td = td;
+
                        account_io_completion(td, io_u, icd, ddir, bytes);
 
-                       if (__should_check_rate(td, ddir)) {
-                               td->rate_pending_usleep[ddir] =
-                                       (usec_for_io(td, ddir) -
-                                        utime_since_now(&td->start));
+                       if (td->parent)
+                               __td = td->parent;
+
+                       if (__should_check_rate(__td, ddir)) {
+                               __td->rate_pending_usleep[ddir] =
+                                       (usec_for_io(__td, ddir) -
+                                        utime_since_now(&__td->start));
                        }
                        if (ddir != DDIR_TRIM &&
-                           __should_check_rate(td, oddir)) {
-                               td->rate_pending_usleep[oddir] =
-                                       (usec_for_io(td, oddir) -
-                                        utime_since_now(&td->start));
+                           __should_check_rate(__td, oddir)) {
+                               __td->rate_pending_usleep[oddir] =
+                                       (usec_for_io(__td, oddir) -
+                                        utime_since_now(&__td->start));
                        }
                }
 
index a55290d47f55eaac7d034e4246ee5ac44ad7b924..3d49993699294c2148f8601e884169ab7ab0a0fe 100644 (file)
@@ -119,6 +119,7 @@ struct io_u {
                struct ibv_mr *mr;
 #endif
                void *mmap_data;
+               uint64_t null;
        };
 };
 
@@ -251,4 +252,14 @@ static inline enum fio_ddir acct_ddir(struct io_u *io_u)
        return io_u->ddir;
 }
 
+static inline void io_u_clear(struct io_u *io_u, unsigned int flags)
+{
+       __sync_fetch_and_and(&io_u->flags, ~flags);
+}
+
+static inline void io_u_set(struct io_u *io_u, unsigned int flags)
+{
+       __sync_fetch_and_or(&io_u->flags, flags);
+}
+
 #endif
index b42e2c48c7a5d4fe9f660b49a136fc88b69aa06f..b724e0e90e089668ef0a32acd1eb95c7133b4e09 100644 (file)
@@ -264,13 +264,15 @@ out:
 
 int td_io_queue(struct thread_data *td, struct io_u *io_u)
 {
+       const enum fio_ddir ddir = acct_ddir(io_u);
+       unsigned long buflen = io_u->xfer_buflen;
        int ret;
 
        dprint_io_u(io_u, "queue");
        fio_ro_check(td, io_u);
 
        assert((io_u->flags & IO_U_F_FLIGHT) == 0);
-       io_u->flags |= IO_U_F_FLIGHT;
+       io_u_set(io_u, IO_U_F_FLIGHT);
 
        assert(fio_file_open(io_u->file));
 
@@ -294,18 +296,18 @@ int td_io_queue(struct thread_data *td, struct io_u *io_u)
                                        sizeof(struct timeval));
        }
 
-       if (ddir_rw(acct_ddir(io_u))) {
-               td->io_issues[acct_ddir(io_u)]++;
-               td->io_issue_bytes[acct_ddir(io_u)] += io_u->xfer_buflen;
+       if (ddir_rw(ddir)) {
+               td->io_issues[ddir]++;
+               td->io_issue_bytes[ddir] += buflen;
        }
 
        ret = td->io_ops->queue(td, io_u);
 
        unlock_file(td, io_u->file);
 
-       if (ret == FIO_Q_BUSY && ddir_rw(acct_ddir(io_u))) {
-               td->io_issues[acct_ddir(io_u)]--;
-               td->io_issue_bytes[acct_ddir(io_u)] -= io_u->xfer_buflen;
+       if (ret == FIO_Q_BUSY && ddir_rw(ddir)) {
+               td->io_issues[ddir]--;
+               td->io_issue_bytes[ddir] -= buflen;
        }
 
        /*
index 017920e14031357e9ae2aae6dca977b64ced62fc..b34e2c72dc3a91e6fe1b15216b0d708d9badb57c 100644 (file)
--- a/options.c
+++ b/options.c
@@ -1622,6 +1622,26 @@ struct fio_option fio_options[FIO_MAX_OPTS] = {
                .category = FIO_OPT_C_IO,
                .group  = FIO_OPT_G_IO_BASIC,
        },
+       {
+               .name   = "io_submit_mode",
+               .lname  = "IO submit mode",
+               .type   = FIO_OPT_STR,
+               .off1   = td_var_offset(io_submit_mode),
+               .help   = "How IO submissions and completions are done",
+               .def    = "inline",
+               .category = FIO_OPT_C_IO,
+               .group  = FIO_OPT_G_IO_BASIC,
+               .posval = {
+                         { .ival = "inline",
+                           .oval = IO_MODE_INLINE,
+                           .help = "Submit and complete IO inline",
+                         },
+                         { .ival = "offload",
+                           .oval = IO_MODE_OFFLOAD,
+                           .help = "Offload submit and complete to threads",
+                         },
+               },
+       },
        {
                .name   = "size",
                .lname  = "Size",
diff --git a/stat.c b/stat.c
index 34e3792fd8f1d5ca8bbcf2f13c74a4320012ca4c..533981fcc89ac20d9297808d8c88ec2b69617202 100644 (file)
--- a/stat.c
+++ b/stat.c
@@ -2009,6 +2009,8 @@ void add_bw_sample(struct thread_data *td, enum fio_ddir ddir, unsigned int bs,
        if (spent < td->o.bw_avg_time)
                return;
 
+       td_io_u_lock(td);
+
        /*
         * Compute both read and write rates for the interval.
         */
@@ -2033,6 +2035,7 @@ void add_bw_sample(struct thread_data *td, enum fio_ddir ddir, unsigned int bs,
        }
 
        fio_gettime(&td->bw_sample_time, NULL);
+       td_io_u_unlock(td);
 }
 
 void add_iops_sample(struct thread_data *td, enum fio_ddir ddir, unsigned int bs,
@@ -2048,6 +2051,8 @@ void add_iops_sample(struct thread_data *td, enum fio_ddir ddir, unsigned int bs
        if (spent < td->o.iops_avg_time)
                return;
 
+       td_io_u_lock(td);
+
        /*
         * Compute both read and write rates for the interval.
         */
@@ -2072,6 +2077,7 @@ void add_iops_sample(struct thread_data *td, enum fio_ddir ddir, unsigned int bs
        }
 
        fio_gettime(&td->iops_sample_time, NULL);
+       td_io_u_unlock(td);
 }
 
 void stat_init(void)
index 026b85b444587372656371475b5b231cd19123ac..aa7f3f2657c8c985e94ec1449e01a34b5fed1ce5 100644 (file)
@@ -223,6 +223,7 @@ struct thread_options {
        unsigned int rate[DDIR_RWDIR_CNT];
        unsigned int ratemin[DDIR_RWDIR_CNT];
        unsigned int ratecycle;
+       unsigned int io_submit_mode;
        unsigned int rate_iops[DDIR_RWDIR_CNT];
        unsigned int rate_iops_min[DDIR_RWDIR_CNT];
 
@@ -452,6 +453,7 @@ struct thread_options_pack {
        uint32_t rate[DDIR_RWDIR_CNT];
        uint32_t ratemin[DDIR_RWDIR_CNT];
        uint32_t ratecycle;
+       uint32_t io_submit_mode;
        uint32_t rate_iops[DDIR_RWDIR_CNT];
        uint32_t rate_iops_min[DDIR_RWDIR_CNT];
 
@@ -489,7 +491,6 @@ struct thread_options_pack {
 
        uint64_t latency_target;
        uint64_t latency_window;
-       uint32_t pad3;
        fio_fp64_t latency_percentile;
 
        uint32_t block_error_hist;
index b6793d7da76a7f98d68df2847de727ee359cd3dc..aa178e950b83bf43757b707c6e4b681ff08d015d 100644 (file)
--- a/verify.c
+++ b/verify.c
@@ -656,7 +656,7 @@ int verify_io_u_async(struct thread_data *td, struct io_u **io_u_ptr)
 
        if (io_u->flags & IO_U_F_IN_CUR_DEPTH) {
                td->cur_depth--;
-               io_u->flags &= ~IO_U_F_IN_CUR_DEPTH;
+               io_u_clear(io_u, IO_U_F_IN_CUR_DEPTH);
        }
        flist_add_tail(&io_u->verify_list, &td->verify_list);
        *io_u_ptr = NULL;
@@ -1105,10 +1105,10 @@ int get_next_verify(struct thread_data *td, struct io_u *io_u)
                io_u->buflen = ipo->len;
                io_u->numberio = ipo->numberio;
                io_u->file = ipo->file;
-               io_u->flags |= IO_U_F_VER_LIST;
+               io_u_set(io_u, IO_U_F_VER_LIST);
 
                if (ipo->flags & IP_F_TRIMMED)
-                       io_u->flags |= IO_U_F_TRIMMED;
+                       io_u_set(io_u, IO_U_F_TRIMMED);
 
                if (!fio_file_open(io_u->file)) {
                        int r = td_io_open_file(td, io_u->file);
@@ -1192,7 +1192,7 @@ static void *verify_async_thread(void *data)
                        io_u = flist_first_entry(&list, struct io_u, verify_list);
                        flist_del_init(&io_u->verify_list);
 
-                       io_u->flags |= IO_U_F_NO_FILE_PUT;
+                       io_u_set(io_u, IO_U_F_NO_FILE_PUT);
                        ret = verify_io_u(td, &io_u);
 
                        put_io_u(td, io_u);
diff --git a/workqueue.c b/workqueue.c
new file mode 100644 (file)
index 0000000..92088ba
--- /dev/null
@@ -0,0 +1,444 @@
+/*
+ * Rated submission helpers
+ *
+ * Copyright (C) 2015 Jens Axboe <axboe@kernel.dk>
+ *
+ */
+#include <unistd.h>
+
+#include "fio.h"
+#include "ioengine.h"
+#include "flist.h"
+#include "workqueue.h"
+#include "lib/getrusage.h"
+
+struct submit_worker {
+       pthread_t thread;
+       pthread_mutex_t lock;
+       pthread_cond_t cond;
+       struct flist_head work_list;
+       unsigned int flags;
+       unsigned int index;
+       uint64_t seq;
+       struct workqueue *wq;
+       struct thread_data td;
+};
+
+enum {
+       SW_F_IDLE       = 1 << 0,
+       SW_F_RUNNING    = 1 << 1,
+       SW_F_EXIT       = 1 << 2,
+       SW_F_EXITED     = 1 << 3,
+       SW_F_ACCOUNTED  = 1 << 4,
+       SW_F_ERROR      = 1 << 5,
+};
+
+static struct submit_worker *__get_submit_worker(struct workqueue *wq,
+                                                unsigned int start,
+                                                unsigned int end,
+                                                struct submit_worker **best)
+{
+       struct submit_worker *sw = NULL;
+
+       while (start <= end) {
+               sw = &wq->workers[start];
+               if (sw->flags & SW_F_IDLE)
+                       return sw;
+               if (!(*best) || sw->seq < (*best)->seq)
+                       *best = sw;
+               start++;
+       }
+
+       return NULL;
+}
+
+static struct submit_worker *get_submit_worker(struct workqueue *wq)
+{
+       unsigned int next = wq->next_free_worker;
+       struct submit_worker *sw, *best = NULL;
+
+       assert(next < wq->max_workers);
+
+       sw = __get_submit_worker(wq, next, wq->max_workers - 1, &best);
+       if (!sw && next)
+               sw = __get_submit_worker(wq, 0, next - 1, &best);
+
+       /*
+        * No truly idle found, use best match
+        */
+       if (!sw)
+               sw = best;
+
+       if (sw->index == wq->next_free_worker) {
+               if (sw->index + 1 < wq->max_workers)
+                       wq->next_free_worker = sw->index + 1;
+               else
+                       wq->next_free_worker = 0;
+       }
+
+       return sw;
+}
+
+static int all_sw_idle(struct workqueue *wq)
+{
+       int i;
+
+       for (i = 0; i < wq->max_workers; i++) {
+               struct submit_worker *sw = &wq->workers[i];
+
+               if (!(sw->flags & SW_F_IDLE))
+                       return 0;
+       }
+
+       return 1;
+}
+
+/*
+ * Must be serialized wrt workqueue_enqueue() by caller
+ */
+void workqueue_flush(struct workqueue *wq)
+{
+       wq->wake_idle = 1;
+
+       while (!all_sw_idle(wq)) {
+               pthread_mutex_lock(&wq->flush_lock);
+               pthread_cond_wait(&wq->flush_cond, &wq->flush_lock);
+               pthread_mutex_unlock(&wq->flush_lock);
+       }
+
+       wq->wake_idle = 0;
+}
+
+/*
+ * Must be serialized by caller.
+ */
+int workqueue_enqueue(struct workqueue *wq, struct io_u *io_u)
+{
+       struct submit_worker *sw;
+
+       sw = get_submit_worker(wq);
+       if (sw) {
+               const enum fio_ddir ddir = acct_ddir(io_u);
+               struct thread_data *parent = wq->td;
+
+               if (ddir_rw(ddir)) {
+                       parent->io_issues[ddir]++;
+                       parent->io_issue_bytes[ddir] += io_u->xfer_buflen;
+               }
+
+               pthread_mutex_lock(&sw->lock);
+               flist_add_tail(&io_u->verify_list, &sw->work_list);
+               sw->seq = ++wq->work_seq;
+               sw->flags &= ~SW_F_IDLE;
+               pthread_mutex_unlock(&sw->lock);
+
+               pthread_cond_signal(&sw->cond);
+               return FIO_Q_QUEUED;
+       }
+
+       return FIO_Q_BUSY;
+}
+
+static void handle_list(struct submit_worker *sw, struct flist_head *list)
+{
+       struct workqueue *wq = sw->wq;
+       struct io_u *io_u;
+
+       while (!flist_empty(list)) {
+               io_u = flist_first_entry(list, struct io_u, verify_list);
+               flist_del_init(&io_u->verify_list);
+               wq->fn(&sw->td, io_u);
+       }
+}
+
+static int init_submit_worker(struct submit_worker *sw)
+{
+       struct thread_data *parent = sw->wq->td;
+       struct thread_data *td = &sw->td;
+       int fio_unused ret;
+
+       memcpy(&td->o, &parent->o, sizeof(td->o));
+       memcpy(&td->ts, &parent->ts, sizeof(td->ts));
+       td->o.uid = td->o.gid = -1U;
+       dup_files(td, parent);
+       fio_options_mem_dupe(td);
+
+       if (ioengine_load(td))
+               goto err;
+
+       if (td->o.odirect)
+               td->io_ops->flags |= FIO_RAWIO;
+
+       td->pid = gettid();
+
+       INIT_FLIST_HEAD(&td->io_log_list);
+       INIT_FLIST_HEAD(&td->io_hist_list);
+       INIT_FLIST_HEAD(&td->verify_list);
+       INIT_FLIST_HEAD(&td->trim_list);
+       INIT_FLIST_HEAD(&td->next_rand_list);
+       td->io_hist_tree = RB_ROOT;
+
+       td->o.iodepth = 1;
+       if (td_io_init(td))
+               goto err_io_init;
+
+       fio_gettime(&td->epoch, NULL);
+       fio_getrusage(&td->ru_start);
+       clear_io_state(td);
+
+       td_set_runstate(td, TD_RUNNING);
+       td->flags |= TD_F_CHILD;
+       td->parent = parent;
+       return 0;
+
+err_io_init:
+       close_ioengine(td);
+err:
+       return 1;
+}
+
+static void sum_val(uint64_t *dst, uint64_t *src)
+{
+       if (*src) {
+               __sync_fetch_and_add(dst, *src);
+               *src = 0;
+       }
+}
+
+static void sum_ddir(struct thread_data *dst, struct thread_data *src,
+                    enum fio_ddir ddir)
+{
+       sum_val(&dst->io_bytes[ddir], &src->io_bytes[ddir]);
+       sum_val(&dst->io_blocks[ddir], &src->io_blocks[ddir]);
+       sum_val(&dst->this_io_blocks[ddir], &src->this_io_blocks[ddir]);
+       sum_val(&dst->this_io_bytes[ddir], &src->this_io_bytes[ddir]);
+       sum_val(&dst->bytes_done[ddir], &src->bytes_done[ddir]);
+}
+
+static void update_accounting(struct submit_worker *sw)
+{
+       struct thread_data *src = &sw->td;
+       struct thread_data *dst = sw->wq->td;
+
+       if (td_read(src))
+               sum_ddir(dst, src, DDIR_READ);
+       if (td_write(src))
+               sum_ddir(dst, src, DDIR_WRITE);
+       if (td_trim(src))
+               sum_ddir(dst, src, DDIR_TRIM);
+}
+
+static void *worker_thread(void *data)
+{
+       struct submit_worker *sw = data;
+       struct workqueue *wq = sw->wq;
+       struct thread_data *td = &sw->td;
+       unsigned int eflags = 0, ret;
+       FLIST_HEAD(local_list);
+
+       ret = init_submit_worker(sw);
+       pthread_mutex_lock(&sw->lock);
+       sw->flags |= SW_F_RUNNING;
+       if (ret)
+               sw->flags |= SW_F_ERROR;
+       pthread_mutex_unlock(&sw->lock);
+
+       pthread_mutex_lock(&wq->flush_lock);
+       pthread_cond_signal(&wq->flush_cond);
+       pthread_mutex_unlock(&wq->flush_lock);
+
+       if (sw->flags & SW_F_ERROR)
+               goto done;
+
+       while (1) {
+               pthread_mutex_lock(&sw->lock);
+
+               if (flist_empty(&sw->work_list)) {
+                       if (sw->flags & SW_F_EXIT) {
+                               pthread_mutex_unlock(&sw->lock);
+                               break;
+                       }
+
+                       if (td->io_u_queued || td->cur_depth ||
+                           td->io_u_in_flight) {
+                               pthread_mutex_unlock(&sw->lock);
+                               io_u_quiesce(td);
+                               pthread_mutex_lock(&sw->lock);
+                       }
+
+                       /*
+                        * We dropped and reaquired the lock, check
+                        * state again.
+                        */
+                       if (!flist_empty(&sw->work_list))
+                               goto handle_work;
+
+                       if (sw->flags & SW_F_EXIT) {
+                               pthread_mutex_unlock(&sw->lock);
+                               break;
+                       } else if (!(sw->flags & SW_F_IDLE)) {
+                               sw->flags |= SW_F_IDLE;
+                               wq->next_free_worker = sw->index;
+                               if (wq->wake_idle)
+                                       pthread_cond_signal(&wq->flush_cond);
+                       }
+                       update_accounting(sw);
+                       pthread_cond_wait(&sw->cond, &sw->lock);
+               } else {
+handle_work:
+                       flist_splice_init(&sw->work_list, &local_list);
+               }
+               pthread_mutex_unlock(&sw->lock);
+               handle_list(sw, &local_list);
+       }
+
+       update_accounting(sw);
+
+done:
+       pthread_mutex_lock(&sw->lock);
+       sw->flags |= (SW_F_EXITED | eflags);
+       pthread_mutex_unlock(&sw->lock);
+       return NULL;
+}
+
+static void free_worker(struct submit_worker *sw)
+{
+       struct thread_data *td = &sw->td;
+
+       fio_options_free(td);
+       close_and_free_files(td);
+       if (td->io_ops)
+               close_ioengine(td);
+       td_set_runstate(td, TD_EXITED);
+
+       pthread_cond_destroy(&sw->cond);
+       pthread_mutex_destroy(&sw->lock);
+}
+
+static void shutdown_worker(struct submit_worker *sw, unsigned int *sum_cnt)
+{
+       struct thread_data *parent = sw->wq->td;
+
+       pthread_join(sw->thread, NULL);
+       (*sum_cnt)++;
+       sum_thread_stats(&parent->ts, &sw->td.ts, *sum_cnt);
+       free_worker(sw);
+}
+
+void workqueue_exit(struct workqueue *wq)
+{
+       unsigned int shutdown, sum_cnt = 0;
+       struct submit_worker *sw;
+       int i;
+
+       for (i = 0; i < wq->max_workers; i++) {
+               sw = &wq->workers[i];
+
+               pthread_mutex_lock(&sw->lock);
+               sw->flags |= SW_F_EXIT;
+               pthread_cond_signal(&sw->cond);
+               pthread_mutex_unlock(&sw->lock);
+       }
+
+       do {
+               shutdown = 0;
+               for (i = 0; i < wq->max_workers; i++) {
+                       sw = &wq->workers[i];
+                       if (sw->flags & SW_F_ACCOUNTED)
+                               continue;
+                       sw->flags |= SW_F_ACCOUNTED;
+                       shutdown_worker(sw, &sum_cnt);
+                       shutdown++;
+               }
+       } while (shutdown && shutdown != wq->max_workers);
+
+       free(wq->workers);
+       pthread_mutex_destroy(&wq->flush_lock);
+       pthread_cond_destroy(&wq->flush_cond);
+}
+
+static int start_worker(struct workqueue *wq, unsigned int index)
+{
+       struct submit_worker *sw = &wq->workers[index];
+       int ret;
+
+       INIT_FLIST_HEAD(&sw->work_list);
+       pthread_cond_init(&sw->cond, NULL);
+       pthread_mutex_init(&sw->lock, NULL);
+       sw->wq = wq;
+       sw->index = index;
+
+       ret = pthread_create(&sw->thread, NULL, worker_thread, sw);
+       if (!ret) {
+               pthread_mutex_lock(&sw->lock);
+               sw->flags = SW_F_IDLE;
+               pthread_mutex_unlock(&sw->lock);
+               return 0;
+       }
+
+       free_worker(sw);
+       return 1;
+}
+
+int workqueue_init(struct thread_data *td, struct workqueue *wq,
+                  workqueue_fn *fn, unsigned max_pending)
+{
+       unsigned int running;
+       int i, error;
+
+       wq->max_workers = max_pending;
+       wq->td = td;
+       wq->fn = fn;
+       wq->work_seq = 0;
+       wq->next_free_worker = 0;
+       pthread_cond_init(&wq->flush_cond, NULL);
+       pthread_mutex_init(&wq->flush_lock, NULL);
+
+       wq->workers = calloc(wq->max_workers, sizeof(struct submit_worker));
+
+       for (i = 0; i < wq->max_workers; i++)
+               if (start_worker(wq, i))
+                       break;
+
+       wq->max_workers = i;
+       if (!wq->max_workers) {
+err:
+               log_err("Can't create rate workqueue\n");
+               td_verror(td, ESRCH, "workqueue_init");
+               workqueue_exit(wq);
+               return 1;
+       }
+
+       /*
+        * Wait for them all to be started and initialized
+        */
+       error = 0;
+       do {
+               struct submit_worker *sw;
+
+               running = 0;
+               pthread_mutex_lock(&wq->flush_lock);
+               for (i = 0; i < wq->max_workers; i++) {
+                       sw = &wq->workers[i];
+                       pthread_mutex_lock(&sw->lock);
+                       if (sw->flags & SW_F_RUNNING)
+                               running++;
+                       if (sw->flags & SW_F_ERROR)
+                               error++;
+                       pthread_mutex_unlock(&sw->lock);
+               }
+
+               if (error || running == wq->max_workers) {
+                       pthread_mutex_unlock(&wq->flush_lock);
+                       break;
+               }
+
+               pthread_cond_wait(&wq->flush_cond, &wq->flush_lock);
+               pthread_mutex_unlock(&wq->flush_lock);
+       } while (1);
+
+       if (error)
+               goto err;
+
+       return 0;
+}
diff --git a/workqueue.h b/workqueue.h
new file mode 100644 (file)
index 0000000..5d47a5e
--- /dev/null
@@ -0,0 +1,29 @@
+#ifndef FIO_RATE_H
+#define FIO_RATE_H
+
+#include "flist.h"
+
+typedef void (workqueue_fn)(struct thread_data *, struct io_u *);
+
+struct workqueue {
+       unsigned int max_workers;
+
+       struct thread_data *td;
+       workqueue_fn *fn;
+
+       uint64_t work_seq;
+       struct submit_worker *workers;
+       unsigned int next_free_worker;
+
+       pthread_cond_t flush_cond;
+       pthread_mutex_t flush_lock;
+       volatile int wake_idle;
+};
+
+int workqueue_init(struct thread_data *td, struct workqueue *wq, workqueue_fn *fn, unsigned int max_workers);
+void workqueue_exit(struct workqueue *wq);
+
+int workqueue_enqueue(struct workqueue *wq, struct io_u *io_u);
+void workqueue_flush(struct workqueue *wq);
+
+#endif