Add support for queuing > 1 command at the time
authorJens Axboe <jens.axboe@oracle.com>
Mon, 19 Feb 2007 12:08:12 +0000 (13:08 +0100)
committerJens Axboe <jens.axboe@oracle.com>
Mon, 19 Feb 2007 12:08:12 +0000 (13:08 +0100)
For the async engines, we currently do queuing by issuing one
command at the the time. Improve this by adding a ->commit()
hook to complement the ->queue() hook. When ->queue() returns
FIO_Q_BUSY, call ->commit() to actually send off the io to the
kernel.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
engines/libaio.c
fio.c
fio.h
io_u.c
ioengines.c

index cb488efb1b5acbbd524bbc25914f40b86ea772e7..510ecab417b43bf369bb3062d2238435b03e1a4f 100644 (file)
@@ -18,6 +18,8 @@
 struct libaio_data {
        io_context_t aio_ctx;
        struct io_event *aio_events;
 struct libaio_data {
        io_context_t aio_ctx;
        struct io_event *aio_events;
+       struct iocb **iocbs;
+       int iocbs_nr;
 };
 
 static int fio_libaio_prep(struct thread_data fio_unused *td, struct io_u *io_u)
 };
 
 static int fio_libaio_prep(struct thread_data fio_unused *td, struct io_u *io_u)
@@ -68,40 +70,60 @@ static int fio_libaio_getevents(struct thread_data *td, int min, int max,
 static int fio_libaio_queue(struct thread_data *td, struct io_u *io_u)
 {
        struct libaio_data *ld = td->io_ops->data;
 static int fio_libaio_queue(struct thread_data *td, struct io_u *io_u)
 {
        struct libaio_data *ld = td->io_ops->data;
-       struct iocb *iocb = &io_u->iocb;
-       long ret;
 
 
+       if (ld->iocbs_nr == (int) td->iodepth)
+               return FIO_Q_BUSY;
+
+       /*
+        * fsync is tricky, since it can fail and we need to do it
+        * serialized with other io. the reason is that linux doesn't
+        * support aio fsync yet. So return busy for the case where we
+        * have pending io, to let fio complete those first.
+        */
+       if (io_u->ddir == DDIR_SYNC) {
+               if (ld->iocbs_nr)
+                       return FIO_Q_BUSY;
+               if (fsync(io_u->file->fd) < 0)
+                       io_u->error = errno;
+
+               return FIO_Q_COMPLETED;
+       }
+
+       ld->iocbs[ld->iocbs_nr] = &io_u->iocb;
+       ld->iocbs_nr++;
+       return FIO_Q_QUEUED;
+}
+
+static int fio_libaio_commit(struct thread_data *td)
+{
+       struct libaio_data *ld = td->io_ops->data;
+       struct iocb **iocbs;
+       int ret, iocbs_nr;
+
+       if (!ld->iocbs_nr)
+               return 0;
+
+       iocbs_nr = ld->iocbs_nr;
+       iocbs = ld->iocbs;
        do {
        do {
-               ret = io_submit(ld->aio_ctx, 1, &iocb);
-               if (ret == 1)
-                       return FIO_Q_QUEUED;
-               else if (ret == -EAGAIN || !ret)
+               ret = io_submit(ld->aio_ctx, iocbs_nr, iocbs);
+               if (ret == iocbs_nr) {
+                       ret = 0;
+                       break;
+               } else if (ret > 0) {
+                       iocbs += ret;
+                       iocbs_nr -= ret;
+                       continue;
+               } else if (ret == -EAGAIN || !ret)
                        usleep(100);
                else if (ret == -EINTR)
                        continue;
                        usleep(100);
                else if (ret == -EINTR)
                        continue;
-               else if (ret == -EINVAL && io_u->ddir == DDIR_SYNC) {
-                       /*
-                        * the async fsync doesn't currently seem to be
-                        * supported, so just fsync if we fail with EINVAL
-                        * for a sync. since buffered io is also sync
-                        * with libaio (still), we don't have pending
-                        * requests to flush first.
-                        */
-                       if (fsync(io_u->file->fd) < 0)
-                               ret = -errno;
-                       else
-                               ret = FIO_Q_COMPLETED;
-                       break;
-               } else
+               else
                        break;
        } while (1);
 
                        break;
        } while (1);
 
-       if (ret <= 0) {
-               io_u->resid = io_u->xfer_buflen;
-               io_u->error = -ret;
-               td_verror(td, io_u->error);
-               return FIO_Q_COMPLETED;
-       }
+       if (!ret)
+               ld->iocbs_nr = 0;
 
        return ret;
 }
 
        return ret;
 }
@@ -121,6 +143,8 @@ static void fio_libaio_cleanup(struct thread_data *td)
                io_destroy(ld->aio_ctx);
                if (ld->aio_events)
                        free(ld->aio_events);
                io_destroy(ld->aio_ctx);
                if (ld->aio_events)
                        free(ld->aio_events);
+               if (ld->iocbs)
+                       free(ld->iocbs);
 
                free(ld);
                td->io_ops->data = NULL;
 
                free(ld);
                td->io_ops->data = NULL;
@@ -140,6 +164,10 @@ static int fio_libaio_init(struct thread_data *td)
 
        ld->aio_events = malloc(td->iodepth * sizeof(struct io_event));
        memset(ld->aio_events, 0, td->iodepth * sizeof(struct io_event));
 
        ld->aio_events = malloc(td->iodepth * sizeof(struct io_event));
        memset(ld->aio_events, 0, td->iodepth * sizeof(struct io_event));
+       ld->iocbs = malloc(td->iodepth * sizeof(struct iocb *));
+       memset(ld->iocbs, 0, sizeof(struct iocb *));
+       ld->iocbs_nr = 0;
+
        td->io_ops->data = ld;
        return 0;
 }
        td->io_ops->data = ld;
        return 0;
 }
@@ -150,6 +178,7 @@ static struct ioengine_ops ioengine = {
        .init           = fio_libaio_init,
        .prep           = fio_libaio_prep,
        .queue          = fio_libaio_queue,
        .init           = fio_libaio_init,
        .prep           = fio_libaio_prep,
        .queue          = fio_libaio_queue,
+       .commit         = fio_libaio_commit,
        .cancel         = fio_libaio_cancel,
        .getevents      = fio_libaio_getevents,
        .event          = fio_libaio_event,
        .cancel         = fio_libaio_cancel,
        .getevents      = fio_libaio_getevents,
        .event          = fio_libaio_event,
diff --git a/fio.c b/fio.c
index 6e78949b52fd7771b94d31bc0b14c0575e18c84c..858d6b81e6c5df199e17ae33f09d802105b1011b 100644 (file)
--- a/fio.c
+++ b/fio.c
@@ -209,6 +209,7 @@ static int fio_io_sync(struct thread_data *td, struct fio_file *f)
                return 1;
        }
 
                return 1;
        }
 
+requeue:
        ret = td_io_queue(td, io_u);
        if (ret < 0) {
                td_verror(td, io_u->error);
        ret = td_io_queue(td, io_u);
        if (ret < 0) {
                td_verror(td, io_u->error);
@@ -224,6 +225,10 @@ static int fio_io_sync(struct thread_data *td, struct fio_file *f)
                }
 
                io_u_sync_complete(td, io_u, NULL);
                }
 
                io_u_sync_complete(td, io_u, NULL);
+       } else if (ret == FIO_Q_BUSY) {
+               if (td_io_commit(td))
+                       return 1;
+               goto requeue;
        }
 
        return 0;
        }
 
        return 0;
@@ -285,6 +290,10 @@ requeue:
                        continue;
                case FIO_Q_QUEUED:
                        break;
                        continue;
                case FIO_Q_QUEUED:
                        break;
+               case FIO_Q_BUSY:
+                       requeue_io_u(td, &io_u);
+                       ret = td_io_commit(td);
+                       break;
                default:
                        assert(ret < 0);
                        td_verror(td, -ret);
                default:
                        assert(ret < 0);
                        td_verror(td, -ret);
@@ -299,7 +308,7 @@ requeue:
                 * completed io_u's first.
                 */
                min_events = 0;
                 * completed io_u's first.
                 */
                min_events = 0;
-               if (queue_full(td))
+               if (queue_full(td) || ret == FIO_Q_BUSY)
                        min_events = 1;
 
                /*
                        min_events = 1;
 
                /*
@@ -403,6 +412,10 @@ requeue:
                        break;
                case FIO_Q_QUEUED:
                        break;
                        break;
                case FIO_Q_QUEUED:
                        break;
+               case FIO_Q_BUSY:
+                       requeue_io_u(td, &io_u);
+                       ret = td_io_commit(td);
+                       break;
                default:
                        assert(ret < 0);
                        put_io_u(td, io_u);
                default:
                        assert(ret < 0);
                        put_io_u(td, io_u);
@@ -412,14 +425,15 @@ requeue:
                if (ret < 0 || td->error)
                        break;
 
                if (ret < 0 || td->error)
                        break;
 
-               add_slat_sample(td, io_u->ddir, mtime_since(&io_u->start_time, &io_u->issue_time));
+               if (io_u)
+                       add_slat_sample(td, io_u->ddir, mtime_since(&io_u->start_time, &io_u->issue_time));
 
                /*
                 * See if we need to complete some commands
                 */
 
                /*
                 * See if we need to complete some commands
                 */
-               if (ret == FIO_Q_QUEUED) {
+               if (ret == FIO_Q_QUEUED || ret == FIO_Q_BUSY) {
                        min_evts = 0;
                        min_evts = 0;
-                       if (queue_full(td))
+                       if (queue_full(td) || ret == FIO_Q_BUSY)
                                min_evts = 1;
 
                        fio_gettime(&comp_time, NULL);
                                min_evts = 1;
 
                        fio_gettime(&comp_time, NULL);
@@ -633,6 +647,7 @@ static void *thread_main(void *data)
 
        INIT_LIST_HEAD(&td->io_u_freelist);
        INIT_LIST_HEAD(&td->io_u_busylist);
 
        INIT_LIST_HEAD(&td->io_u_freelist);
        INIT_LIST_HEAD(&td->io_u_busylist);
+       INIT_LIST_HEAD(&td->io_u_requeues);
        INIT_LIST_HEAD(&td->io_hist_list);
        INIT_LIST_HEAD(&td->io_log_list);
 
        INIT_LIST_HEAD(&td->io_hist_list);
        INIT_LIST_HEAD(&td->io_log_list);
 
diff --git a/fio.h b/fio.h
index aa66ecd4e06bce3dbe7591c8fad859e05f7571b9..baaa9a8d51f57512d2a5ed062e12fa0d4cc658be 100644 (file)
--- a/fio.h
+++ b/fio.h
@@ -135,6 +135,7 @@ struct io_u {
 enum {
        FIO_Q_COMPLETED = 0,            /* completed sync */
        FIO_Q_QUEUED    = 1,            /* queued, will complete async */
 enum {
        FIO_Q_COMPLETED = 0,            /* completed sync */
        FIO_Q_QUEUED    = 1,            /* queued, will complete async */
+       FIO_Q_BUSY      = 2,            /* no more room, call ->commit() */
 };
 
 #define FIO_HDR_MAGIC  0xf00baaef
 };
 
 #define FIO_HDR_MAGIC  0xf00baaef
@@ -334,6 +335,7 @@ struct thread_data {
        unsigned long total_io_u;
        struct list_head io_u_freelist;
        struct list_head io_u_busylist;
        unsigned long total_io_u;
        struct list_head io_u_freelist;
        struct list_head io_u_busylist;
+       struct list_head io_u_requeues;
 
        /*
         * Rate state
 
        /*
         * Rate state
@@ -352,6 +354,7 @@ struct thread_data {
        unsigned long long start_offset;
        unsigned long long total_io_size;
 
        unsigned long long start_offset;
        unsigned long long total_io_size;
 
+       unsigned long io_issues[2];
        unsigned long long io_blocks[2];
        unsigned long long io_bytes[2];
        unsigned long long zone_bytes;
        unsigned long long io_blocks[2];
        unsigned long long io_bytes[2];
        unsigned long long zone_bytes;
@@ -608,6 +611,7 @@ extern void free_io_mem(struct thread_data *);
 extern struct io_u *__get_io_u(struct thread_data *);
 extern struct io_u *get_io_u(struct thread_data *, struct fio_file *);
 extern void put_io_u(struct thread_data *, struct io_u *);
 extern struct io_u *__get_io_u(struct thread_data *);
 extern struct io_u *get_io_u(struct thread_data *, struct fio_file *);
 extern void put_io_u(struct thread_data *, struct io_u *);
+extern void requeue_io_u(struct thread_data *, struct io_u **);
 extern long io_u_sync_complete(struct thread_data *, struct io_u *, endio_handler *);
 extern long io_u_queued_complete(struct thread_data *, int, endio_handler *);
 
 extern long io_u_sync_complete(struct thread_data *, struct io_u *, endio_handler *);
 extern long io_u_queued_complete(struct thread_data *, int, endio_handler *);
 
@@ -619,6 +623,7 @@ extern int td_io_prep(struct thread_data *, struct io_u *);
 extern int td_io_queue(struct thread_data *, struct io_u *);
 extern int td_io_sync(struct thread_data *, struct fio_file *);
 extern int td_io_getevents(struct thread_data *, int, int, struct timespec *);
 extern int td_io_queue(struct thread_data *, struct io_u *);
 extern int td_io_sync(struct thread_data *, struct fio_file *);
 extern int td_io_getevents(struct thread_data *, int, int, struct timespec *);
+extern int td_io_commit(struct thread_data *);
 
 /*
  * This is a pretty crappy semaphore implementation, but with the use that fio
 
 /*
  * This is a pretty crappy semaphore implementation, but with the use that fio
@@ -662,6 +667,7 @@ struct ioengine_ops {
        int (*init)(struct thread_data *);
        int (*prep)(struct thread_data *, struct io_u *);
        int (*queue)(struct thread_data *, struct io_u *);
        int (*init)(struct thread_data *);
        int (*prep)(struct thread_data *, struct io_u *);
        int (*queue)(struct thread_data *, struct io_u *);
+       int (*commit)(struct thread_data *);
        int (*getevents)(struct thread_data *, int, int, struct timespec *);
        struct io_u *(*event)(struct thread_data *, int);
        int (*cancel)(struct thread_data *, struct io_u *);
        int (*getevents)(struct thread_data *, int, int, struct timespec *);
        struct io_u *(*event)(struct thread_data *, int);
        int (*cancel)(struct thread_data *, struct io_u *);
@@ -671,7 +677,7 @@ struct ioengine_ops {
        unsigned long priv;
 };
 
        unsigned long priv;
 };
 
-#define FIO_IOOPS_VERSION      4
+#define FIO_IOOPS_VERSION      5
 
 extern struct ioengine_ops *load_ioengine(struct thread_data *, const char *);
 extern int register_ioengine(struct ioengine_ops *);
 
 extern struct ioengine_ops *load_ioengine(struct thread_data *, const char *);
 extern int register_ioengine(struct ioengine_ops *);
diff --git a/io_u.c b/io_u.c
index c04c9de06572f4cbddc476d36f6282eb1913fa80..781599f48724c14530e81a0e6020c100fffd1a31 100644 (file)
--- a/io_u.c
+++ b/io_u.c
@@ -199,6 +199,16 @@ void put_io_u(struct thread_data *td, struct io_u *io_u)
        td->cur_depth--;
 }
 
        td->cur_depth--;
 }
 
+void requeue_io_u(struct thread_data *td, struct io_u **io_u)
+{
+       struct io_u *__io_u = *io_u;
+
+       list_del(&__io_u->list);
+       list_add_tail(&__io_u->list, &td->io_u_requeues);
+       td->cur_depth--;
+       *io_u = NULL;
+}
+
 static int fill_io_u(struct thread_data *td, struct fio_file *f,
                     struct io_u *io_u)
 {
 static int fill_io_u(struct thread_data *td, struct fio_file *f,
                     struct io_u *io_u)
 {
@@ -211,8 +221,8 @@ static int fill_io_u(struct thread_data *td, struct fio_file *f,
        /*
         * see if it's time to sync
         */
        /*
         * see if it's time to sync
         */
-       if (td->fsync_blocks && !(td->io_blocks[DDIR_WRITE] % td->fsync_blocks)
-           && should_fsync(td)) {
+       if (td->fsync_blocks && !(td->io_issues[DDIR_WRITE] % td->fsync_blocks)
+           && td->io_issues[DDIR_WRITE] && should_fsync(td)) {
                io_u->ddir = DDIR_SYNC;
                io_u->file = f;
                return 0;
                io_u->ddir = DDIR_SYNC;
                io_u->file = f;
                return 0;
@@ -310,12 +320,18 @@ struct io_u *__get_io_u(struct thread_data *td)
 {
        struct io_u *io_u = NULL;
 
 {
        struct io_u *io_u = NULL;
 
-       if (!queue_full(td)) {
+       if (!list_empty(&td->io_u_requeues))
+               io_u = list_entry(td->io_u_requeues.next, struct io_u, list);
+       else if (!queue_full(td)) {
                io_u = list_entry(td->io_u_freelist.next, struct io_u, list);
 
                io_u->buflen = 0;
                io_u = list_entry(td->io_u_freelist.next, struct io_u, list);
 
                io_u->buflen = 0;
-               io_u->error = 0;
                io_u->resid = 0;
                io_u->resid = 0;
+               io_u->file = NULL;
+       }
+
+       if (io_u) {
+               io_u->error = 0;
                list_del(&io_u->list);
                list_add(&io_u->list, &td->io_u_busylist);
                td->cur_depth++;
                list_del(&io_u->list);
                list_add(&io_u->list, &td->io_u_busylist);
                td->cur_depth++;
@@ -337,6 +353,12 @@ struct io_u *get_io_u(struct thread_data *td, struct fio_file *f)
        if (!io_u)
                return NULL;
 
        if (!io_u)
                return NULL;
 
+       /*
+        * from a requeue, io_u already setup
+        */
+       if (io_u->file)
+               return io_u;
+
        if (td->zone_bytes >= td->zone_size) {
                td->zone_bytes = 0;
                f->last_pos += td->zone_skip;
        if (td->zone_bytes >= td->zone_size) {
                td->zone_bytes = 0;
                f->last_pos += td->zone_skip;
@@ -477,8 +499,14 @@ long io_u_queued_complete(struct thread_data *td, int min_events,
        struct io_completion_data icd;
        int ret;
 
        struct io_completion_data icd;
        int ret;
 
-       if (min_events > 0)
+       if (min_events > 0) {
                tsp = &ts;
                tsp = &ts;
+               ret = td_io_commit(td);
+               if (ret < 0) {
+                       td_verror(td, -ret);
+                       return ret;
+               }
+       }
 
        ret = td_io_getevents(td, min_events, td->cur_depth, tsp);
        if (ret < 0) {
 
        ret = td_io_getevents(td, min_events, td->cur_depth, tsp);
        if (ret < 0) {
index 1b510dfe1daef46cd2be9c17255fff438a8d9928..16ea928fe727c094f651e75922f52308eb36c70a 100644 (file)
@@ -189,6 +189,9 @@ int td_io_queue(struct thread_data *td, struct io_u *io_u)
 {
        fio_gettime(&io_u->issue_time, NULL);
 
 {
        fio_gettime(&io_u->issue_time, NULL);
 
+       if (io_u->ddir != DDIR_SYNC)
+               td->io_issues[io_u->ddir]++;
+
        return td->io_ops->queue(td, io_u);
 }
 
        return td->io_ops->queue(td, io_u);
 }
 
@@ -199,3 +202,11 @@ int td_io_init(struct thread_data *td)
 
        return 0;
 }
 
        return 0;
 }
+
+int td_io_commit(struct thread_data *td)
+{
+       if (td->io_ops->commit)
+               return td->io_ops->commit(td);
+
+       return 0;
+}