Change IO engine queuing
authorJens Axboe <jens.axboe@oracle.com>
Sun, 18 Feb 2007 04:41:31 +0000 (05:41 +0100)
committerJens Axboe <jens.axboe@oracle.com>
Sun, 18 Feb 2007 04:41:31 +0000 (05:41 +0100)
Instead of always pretending to be async, let the IO engines
return FIO_Q_COMPLETED or FIO_Q_QUEUED to signal async or
sync completions regardless of their nature. This cleans up
the queuing model quite a bit.

Also fixed a verification error spotted while doing this
transformation.

The main intent of this is to allow queuing more than 1 piece
of IO at the time, that will come in a later changeset.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
15 files changed:
engines/libaio.c
engines/mmap.c
engines/net.c
engines/null.c
engines/posixaio.c
engines/sg.c
engines/skeleton_external.c
engines/splice.c
engines/sync.c
engines/syslet-rw.c
fio.c
fio.h
io_u.c
ioengines.c
verify.c

index c2f47d8..ba8c49d 100644 (file)
@@ -18,7 +18,6 @@
 struct libaio_data {
        io_context_t aio_ctx;
        struct io_event *aio_events;
-       struct io_u *sync_io_u;
 };
 
 static int fio_libaio_prep(struct thread_data fio_unused *td, struct io_u *io_u)
@@ -41,13 +40,6 @@ static struct io_u *fio_libaio_event(struct thread_data *td, int event)
 {
        struct libaio_data *ld = td->io_ops->data;
 
-       if (ld->sync_io_u) {
-               struct io_u *ret = ld->sync_io_u;
-
-               ld->sync_io_u = NULL;
-               return ret;
-       }
-
        return ev_to_iou(ld->aio_events + event);
 }
 
@@ -57,9 +49,6 @@ static int fio_libaio_getevents(struct thread_data *td, int min, int max,
        struct libaio_data *ld = td->io_ops->data;
        long r;
 
-       if (ld->sync_io_u)
-               return 1;
-
        do {
                r = io_getevents(ld->aio_ctx, min, max, ld->aio_events, t);
                if (r >= min)
@@ -88,7 +77,7 @@ static int fio_libaio_queue(struct thread_data *td, struct io_u *io_u)
        do {
                ret = io_submit(ld->aio_ctx, 1, &iocb);
                if (ret == 1)
-                       break;
+                       return FIO_Q_QUEUED;
                else if (ret == -EAGAIN || !ret)
                        usleep(100);
                else if (ret == -EINTR)
@@ -103,10 +92,8 @@ static int fio_libaio_queue(struct thread_data *td, struct io_u *io_u)
                         */
                        if (fsync(io_u->file->fd) < 0)
                                ret = errno;
-                       else {
-                               ret = 1;
-                               ld->sync_io_u = io_u;
-                       }
+                       else
+                               ret = FIO_Q_COMPLETED;
                        break;
                } else
                        break;
@@ -116,10 +103,10 @@ static int fio_libaio_queue(struct thread_data *td, struct io_u *io_u)
                io_u->resid = io_u->xfer_buflen;
                io_u->error = -ret;
                td_verror(td, io_u->error);
-               return 1;
+               return FIO_Q_COMPLETED;
        }
 
-       return 0;
+       return ret;
 }
 
 static int fio_libaio_cancel(struct thread_data *td, struct io_u *io_u)
index b214319..d32fe96 100644 (file)
 #include "../fio.h"
 #include "../os.h"
 
-struct mmapio_data {
-       struct io_u *last_io_u;
-};
-
-static int fio_mmapio_getevents(struct thread_data *td, int fio_unused min,
-                               int max, struct timespec fio_unused *t)
-{
-       assert(max <= 1);
-
-       /*
-        * we can only have one finished io_u for sync io, since the depth
-        * is always 1
-        */
-       if (list_empty(&td->io_u_busylist))
-               return 0;
-
-       return 1;
-}
-
-static struct io_u *fio_mmapio_event(struct thread_data *td, int event)
-{
-       struct mmapio_data *sd = td->io_ops->data;
-
-       assert(event == 0);
-
-       return sd->last_io_u;
-}
-
-
 static int fio_mmapio_queue(struct thread_data *td, struct io_u *io_u)
 {
        struct fio_file *f = io_u->file;
        unsigned long long real_off = io_u->offset - f->file_offset;
-       struct mmapio_data *sd = td->io_ops->data;
 
        if (io_u->ddir == DDIR_READ)
                memcpy(io_u->xfer_buf, f->mmap + real_off, io_u->xfer_buflen);
@@ -66,39 +36,16 @@ static int fio_mmapio_queue(struct thread_data *td, struct io_u *io_u)
                        io_u->error = errno;
        }
 
-       if (!io_u->error)
-               sd->last_io_u = io_u;
-       else
+       if (io_u->error)
                td_verror(td, io_u->error);
 
-       return io_u->error;
-}
-
-static void fio_mmapio_cleanup(struct thread_data *td)
-{
-       if (td->io_ops->data) {
-               free(td->io_ops->data);
-               td->io_ops->data = NULL;
-       }
-}
-
-static int fio_mmapio_init(struct thread_data *td)
-{
-       struct mmapio_data *sd = malloc(sizeof(*sd));
-
-       sd->last_io_u = NULL;
-       td->io_ops->data = sd;
-       return 0;
+       return FIO_Q_COMPLETED;
 }
 
 static struct ioengine_ops ioengine = {
        .name           = "mmap",
        .version        = FIO_IOOPS_VERSION,
-       .init           = fio_mmapio_init,
        .queue          = fio_mmapio_queue,
-       .getevents      = fio_mmapio_getevents,
-       .event          = fio_mmapio_event,
-       .cleanup        = fio_mmapio_cleanup,
        .flags          = FIO_SYNCIO | FIO_MMAPIO,
 };
 
index 0231451..4f070f9 100644 (file)
 #include "../fio.h"
 #include "../os.h"
 
-struct net_data {
-       int send_to_net;
-       struct io_u *last_io_u;
-};
-
-static int fio_netio_getevents(struct thread_data *td, int fio_unused min,
-                               int max, struct timespec fio_unused *t)
-{
-       assert(max <= 1);
-
-       /*
-        * we can only have one finished io_u for sync io, since the depth
-        * is always 1
-        */
-       if (list_empty(&td->io_u_busylist))
-               return 0;
-
-       return 1;
-}
-
-static struct io_u *fio_netio_event(struct thread_data *td, int event)
-{
-       struct net_data *nd = td->io_ops->data;
-
-       assert(event == 0);
-
-       return nd->last_io_u;
-}
+#define send_to_net(td)        ((td)->io_ops->priv)
 
 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
 {
-       struct net_data *nd = td->io_ops->data;
        struct fio_file *f = io_u->file;
 
        /*
         * Make sure we don't see spurious reads to a receiver, and vice versa
         */
-       if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
-           (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
-               printf("boo!\n");
+       if ((send_to_net(td) && io_u->ddir == DDIR_READ) ||
+           (!send_to_net(td) && io_u->ddir == DDIR_WRITE)) {
                td_verror(td, EINVAL);
                return 1;
        }
@@ -73,7 +44,6 @@ static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
 
 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
 {
-       struct net_data *nd = td->io_ops->data;
        struct fio_file *f = io_u->file;
        int ret, flags = 0;
 
@@ -96,17 +66,15 @@ static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
                if (ret > 0) {
                        io_u->resid = io_u->xfer_buflen - ret;
                        io_u->error = 0;
-                       return ret;
+                       return FIO_Q_COMPLETED;
                } else
                        io_u->error = errno;
        }
 
-       if (!io_u->error)
-               nd->last_io_u = io_u;
-       else
+       if (io_u->error)
                td_verror(td, io_u->error);
 
-       return io_u->error;
+       return FIO_Q_COMPLETED;
 }
 
 static int fio_netio_setup_connect(struct thread_data *td, const char *host,
@@ -245,7 +213,6 @@ static int fio_netio_setup_listen(struct thread_data *td, unsigned short port)
 static int fio_netio_setup(struct thread_data *td)
 {
        char host[64], buf[128];
-       struct net_data *nd;
        unsigned short port;
        struct fio_file *f;
        char *sep;
@@ -256,14 +223,6 @@ static int fio_netio_setup(struct thread_data *td)
                return 1;
        }
 
-       /*
-        * work around for late init call
-        */
-       if (td->io_ops->init(td))
-               return 1;
-
-       nd = td->io_ops->data;
-
        if (td->iomix) {
                log_err("fio: network connections must be read OR write\n");
                return 1;
@@ -283,10 +242,10 @@ static int fio_netio_setup(struct thread_data *td)
        port = atoi(sep);
 
        if (td->ddir == DDIR_READ) {
-               nd->send_to_net = 0;
+               send_to_net(td) = 0;
                ret = fio_netio_setup_listen(td, port);
        } else {
-               nd->send_to_net = 1;
+               send_to_net(td) = 1;
                ret = fio_netio_setup_connect(td, host, port);
        }
 
@@ -304,40 +263,11 @@ static int fio_netio_setup(struct thread_data *td)
        return 0;
 }
 
-static void fio_netio_cleanup(struct thread_data *td)
-{
-       if (td->io_ops->data) {
-               free(td->io_ops->data);
-               td->io_ops->data = NULL;
-       }
-}
-
-static int fio_netio_init(struct thread_data *td)
-{
-       struct net_data *nd;
-
-       /*
-        * Hack to work-around the ->setup() function calling init on its
-        * own, since it needs ->io_ops->data to be set up.
-        */
-       if (td->io_ops->data)
-               return 0;
-
-       nd  = malloc(sizeof(*nd));
-       nd->last_io_u = NULL;
-       td->io_ops->data = nd;
-       return 0;
-}
-
 static struct ioengine_ops ioengine = {
        .name           = "net",
        .version        = FIO_IOOPS_VERSION,
-       .init           = fio_netio_init,
        .prep           = fio_netio_prep,
        .queue          = fio_netio_queue,
-       .getevents      = fio_netio_getevents,
-       .event          = fio_netio_event,
-       .cleanup        = fio_netio_cleanup,
        .setup          = fio_netio_setup,
        .flags          = FIO_SYNCIO | FIO_NETIO,
 };
index 7b4b217..6bd51ba 100644 (file)
 #include "../fio.h"
 #include "../os.h"
 
-struct null_data {
-       struct io_u *last_io_u;
-};
-
-static int fio_null_getevents(struct thread_data *td, int fio_unused min,
-                             int max, struct timespec fio_unused *t)
-{
-       assert(max <= 1);
-
-       if (list_empty(&td->io_u_busylist))
-               return 0;
-
-       return 1;
-}
-
-static struct io_u *fio_null_event(struct thread_data *td, int event)
-{
-       struct null_data *nd = td->io_ops->data;
-
-       assert(event == 0);
-
-       return nd->last_io_u;
-}
-
-static int fio_null_queue(struct thread_data *td, struct io_u *io_u)
+static int fio_null_queue(struct thread_data fio_unused *td, struct io_u *io_u)
 {
-       struct null_data *nd = td->io_ops->data;
-
        io_u->resid = 0;
        io_u->error = 0;
-       nd->last_io_u = io_u;
-       return 0;
-}
-
-static void fio_null_cleanup(struct thread_data *td)
-{
-       if (td->io_ops->data) {
-               free(td->io_ops->data);
-               td->io_ops->data = NULL;
-       }
-}
-
-static int fio_null_init(struct thread_data *td)
-{
-       struct null_data *nd = malloc(sizeof(*nd));
-
-       nd->last_io_u = NULL;
-       td->io_ops->data = nd;
-       return 0;
+       return FIO_Q_COMPLETED;
 }
 
 static struct ioengine_ops ioengine = {
        .name           = "null",
        .version        = FIO_IOOPS_VERSION,
-       .init           = fio_null_init,
        .queue          = fio_null_queue,
-       .getevents      = fio_null_getevents,
-       .event          = fio_null_event,
-       .cleanup        = fio_null_cleanup,
        .flags          = FIO_SYNCIO | FIO_NULLIO,
 };
 
index 2fc56cd..a56ab3a 100644 (file)
@@ -154,9 +154,10 @@ static int fio_posixaio_queue(struct thread_data fio_unused *td,
        if (ret) {
                io_u->error = errno;
                td_verror(td, io_u->error);
+               return FIO_Q_COMPLETED;
        }
-               
-       return io_u->error;
+
+       return FIO_Q_QUEUED;
 }
 
 static void fio_posixaio_cleanup(struct thread_data *td)
index 8d086bf..2713976 100644 (file)
@@ -48,21 +48,6 @@ static void sgio_hdr_init(struct sgio_data *sd, struct sg_io_hdr *hdr,
        }
 }
 
-static int fio_sgio_ioctl_getevents(struct thread_data *td, int fio_unused min,
-                                   int max, struct timespec fio_unused *t)
-{
-       assert(max <= 1);
-
-       /*
-        * we can only have one finished io_u for sync io, since the depth
-        * is always 1
-        */
-       if (list_empty(&td->io_u_busylist))
-               return 0;
-
-       return 1;
-}
-
 static int pollin_events(struct pollfd *pfds, int fds)
 {
        int i;
@@ -171,10 +156,15 @@ static int fio_sgio_ioctl_doio(struct thread_data *td,
 {
        struct sgio_data *sd = td->io_ops->data;
        struct sg_io_hdr *hdr = &io_u->hdr;
+       int ret;
 
        sd->events[0] = io_u;
 
-       return ioctl(f->fd, SG_IO, hdr);
+       ret = ioctl(f->fd, SG_IO, hdr);
+       if (ret < 0)
+               return ret;
+
+       return FIO_Q_COMPLETED;
 }
 
 static int fio_sgio_rw_doio(struct fio_file *f, struct io_u *io_u, int sync)
@@ -190,9 +180,10 @@ static int fio_sgio_rw_doio(struct fio_file *f, struct io_u *io_u, int sync)
                ret = read(f->fd, hdr, sizeof(*hdr));
                if (ret < 0)
                        return errno;
+               return FIO_Q_COMPLETED;
        }
 
-       return 0;
+       return FIO_Q_QUEUED;
 }
 
 static int fio_sgio_doio(struct thread_data *td, struct io_u *io_u, int sync)
@@ -263,10 +254,10 @@ static int fio_sgio_queue(struct thread_data *td, struct io_u *io_u)
 
        if (io_u->error) {
                td_verror(td, io_u->error);
-               return io_u->error;
+               return FIO_Q_COMPLETED;
        }
 
-       return 0;
+       return ret;
 }
 
 static struct io_u *fio_sgio_event(struct thread_data *td, int event)
@@ -369,10 +360,10 @@ static int fio_sgio_init(struct thread_data *td)
 
        sd->bs = bs;
 
-       if (td->filetype == FIO_TYPE_BD)
-               td->io_ops->getevents = fio_sgio_ioctl_getevents;
-       else
-               td->io_ops->getevents = fio_sgio_getevents;
+       if (td->filetype == FIO_TYPE_BD) {
+               td->io_ops->getevents = NULL;
+               td->io_ops->event = NULL;
+       }
 
        /*
         * we want to do it, regardless of whether odirect is set or not
index 785b9a6..0937d68 100644 (file)
@@ -61,12 +61,17 @@ static int fio_skeleton_cancel(struct thread_data *td, struct io_u *io_u)
  * The io engine must transfer in the direction noted by io_u->ddir
  * to the buffer pointed to by io_u->xfer_buf for as many bytes as
  * io_u->xfer_buflen. Residual data count may be set in io_u->residual
- * for a short read/write. Should return 0 for io_u complete, < 0 for
- * an error, and > 0 for the number of bytes transferred.
+ * for a short read/write.
  */
 static int fio_skeleton_queue(struct thread_data *td, struct io_u *io_u)
 {
-       return 0;
+       /*
+        * Could return FIO_Q_QUEUED for a queued request,
+        * FIO_Q_COMPLETED for a completed request, and FIO_Q_BUSY
+        * if we could queue no more at this point (you'd have to
+        * define ->commit() to handle that.
+        */
+       return FIO_Q_COMPLETED;
 }
 
 /*
index 432ba79..f55e5c0 100644 (file)
 #ifdef FIO_HAVE_SPLICE
 
 struct spliceio_data {
-       struct io_u *last_io_u;
        int pipe[2];
 };
 
-static int fio_spliceio_getevents(struct thread_data *td, int fio_unused min,
-                                 int max, struct timespec fio_unused *t)
-{
-       assert(max <= 1);
-
-       /*
-        * we can only have one finished io_u for sync io, since the depth
-        * is always 1
-        */
-       if (list_empty(&td->io_u_busylist))
-               return 0;
-
-       return 1;
-}
-
-static struct io_u *fio_spliceio_event(struct thread_data *td, int event)
-{
-       struct spliceio_data *sd = td->io_ops->data;
-
-       assert(event == 0);
-
-       return sd->last_io_u;
-}
-
 /*
  * For splice reading, we unfortunately cannot (yet) vmsplice the other way.
  * So just splice the data from the file into the pipe, and use regular
@@ -131,7 +106,6 @@ static int fio_splice_write(struct thread_data *td, struct io_u *io_u)
 
 static int fio_spliceio_queue(struct thread_data *td, struct io_u *io_u)
 {
-       struct spliceio_data *sd = td->io_ops->data;
        int ret;
 
        if (io_u->ddir == DDIR_READ)
@@ -145,17 +119,15 @@ static int fio_spliceio_queue(struct thread_data *td, struct io_u *io_u)
                if (ret > 0) {
                        io_u->resid = io_u->xfer_buflen - ret;
                        io_u->error = 0;
-                       return ret;
+                       return FIO_Q_COMPLETED;
                } else
                        io_u->error = errno;
        }
 
-       if (!io_u->error)
-               sd->last_io_u = io_u;
-       else
+       if (io_u->error)
                td_verror(td, io_u->error);
 
-       return io_u->error;
+       return FIO_Q_COMPLETED;
 }
 
 static void fio_spliceio_cleanup(struct thread_data *td)
@@ -174,7 +146,6 @@ static int fio_spliceio_init(struct thread_data *td)
 {
        struct spliceio_data *sd = malloc(sizeof(*sd));
 
-       sd->last_io_u = NULL;
        if (pipe(sd->pipe) < 0) {
                td_verror(td, errno);
                free(sd);
@@ -190,8 +161,6 @@ static struct ioengine_ops ioengine = {
        .version        = FIO_IOOPS_VERSION,
        .init           = fio_spliceio_init,
        .queue          = fio_spliceio_queue,
-       .getevents      = fio_spliceio_getevents,
-       .event          = fio_spliceio_event,
        .cleanup        = fio_spliceio_cleanup,
        .flags          = FIO_SYNCIO,
 };
index f689cbe..6a5b7d3 100644 (file)
 #include "../fio.h"
 #include "../os.h"
 
-struct syncio_data {
-       struct io_u *last_io_u;
-};
-
-static int fio_syncio_getevents(struct thread_data *td, int fio_unused min,
-                               int max, struct timespec fio_unused *t)
-{
-       assert(max <= 1);
-
-       /*
-        * we can only have one finished io_u for sync io, since the depth
-        * is always 1
-        */
-       if (list_empty(&td->io_u_busylist))
-               return 0;
-
-       return 1;
-}
-
-static struct io_u *fio_syncio_event(struct thread_data *td, int event)
-{
-       struct syncio_data *sd = td->io_ops->data;
-
-       assert(event == 0);
-
-       return sd->last_io_u;
-}
-
 static int fio_syncio_prep(struct thread_data *td, struct io_u *io_u)
 {
        struct fio_file *f = io_u->file;
@@ -58,7 +30,6 @@ static int fio_syncio_prep(struct thread_data *td, struct io_u *io_u)
 
 static int fio_syncio_queue(struct thread_data *td, struct io_u *io_u)
 {
-       struct syncio_data *sd = td->io_ops->data;
        struct fio_file *f = io_u->file;
        int ret;
 
@@ -73,45 +44,22 @@ static int fio_syncio_queue(struct thread_data *td, struct io_u *io_u)
                if (ret > 0) {
                        io_u->resid = io_u->xfer_buflen - ret;
                        io_u->error = 0;
-                       return ret;
+                       return FIO_Q_COMPLETED;
                } else
                        io_u->error = errno;
        }
 
-       if (!io_u->error)
-               sd->last_io_u = io_u;
-       else
+       if (io_u->error)
                td_verror(td, io_u->error);
 
-       return io_u->error;
-}
-
-static void fio_syncio_cleanup(struct thread_data *td)
-{
-       if (td->io_ops->data) {
-               free(td->io_ops->data);
-               td->io_ops->data = NULL;
-       }
-}
-
-static int fio_syncio_init(struct thread_data *td)
-{
-       struct syncio_data *sd = malloc(sizeof(*sd));
-
-       sd->last_io_u = NULL;
-       td->io_ops->data = sd;
-       return 0;
+       return FIO_Q_COMPLETED;
 }
 
 static struct ioengine_ops ioengine = {
        .name           = "sync",
        .version        = FIO_IOOPS_VERSION,
-       .init           = fio_syncio_init,
        .prep           = fio_syncio_prep,
        .queue          = fio_syncio_queue,
-       .getevents      = fio_syncio_getevents,
-       .event          = fio_syncio_event,
-       .cleanup        = fio_syncio_cleanup,
        .flags          = FIO_SYNCIO,
 };
 
index 4b65b2d..bfb6021 100644 (file)
@@ -154,7 +154,7 @@ static int fio_syslet_queue(struct thread_data *td, struct io_u *io_u)
         * it's queued asynchronously.
         */
        if (!async_exec(&io_u->req.atom))
-               return 0;
+               return FIO_Q_QUEUED;
 
        /*
         * completed sync
@@ -164,7 +164,7 @@ static int fio_syslet_queue(struct thread_data *td, struct io_u *io_u)
                if (ret > 0) {
                        io_u->resid = io_u->xfer_buflen - ret;
                        io_u->error = 0;
-                       return ret;
+                       return FIO_Q_COMPLETED;
                } else
                        io_u->error = errno;
        }
@@ -174,7 +174,7 @@ static int fio_syslet_queue(struct thread_data *td, struct io_u *io_u)
        else
                td_verror(td, io_u->error);
 
-       return io_u->error;
+       return FIO_Q_COMPLETED;
 }
 
 static int async_head_init(struct syslet_data *sd, unsigned int depth)
diff --git a/fio.c b/fio.c
index ff169f8..5358af2 100644 (file)
--- a/fio.c
+++ b/fio.c
@@ -27,6 +27,7 @@
 #include <signal.h>
 #include <time.h>
 #include <locale.h>
+#include <assert.h>
 #include <sys/stat.h>
 #include <sys/wait.h>
 #include <sys/ipc.h>
@@ -221,23 +222,32 @@ static int fio_io_sync(struct thread_data *td, struct fio_file *f)
        }
 
        ret = td_io_queue(td, io_u);
-       if (ret) {
+       if (ret < 0) {
                td_verror(td, io_u->error);
                put_io_u(td, io_u);
                return 1;
-       }
+       } else if (ret == FIO_Q_QUEUED) {
+               ret = td_io_getevents(td, 1, td->cur_depth, NULL);
+               if (ret < 0) {
+                       td_verror(td, ret);
+                       return 1;
+               }
 
-       ret = td_io_getevents(td, 1, td->cur_depth, NULL);
-       if (ret < 0) {
-               td_verror(td, ret);
-               return 1;
-       }
+               icd.nr = ret;
+               ios_completed(td, &icd);
+               if (icd.error) {
+                       td_verror(td, icd.error);
+                       return 1;
+               }
+       } else if (ret == FIO_Q_COMPLETED) {
+               if (io_u->error) {
+                       td_verror(td, io_u->error);
+                       return 1;
+               }
 
-       icd.nr = ret;
-       ios_completed(td, &icd);
-       if (icd.error) {
-               td_verror(td, icd.error);
-               return 1;
+               init_icd(&icd);
+               io_completed(td, io_u, &icd);
+               put_io_u(td, io_u);
        }
 
        return 0;
@@ -249,9 +259,8 @@ static int fio_io_sync(struct thread_data *td, struct fio_file *f)
  */
 static void do_verify(struct thread_data *td)
 {
-       struct io_u *io_u, *v_io_u = NULL;
-       struct io_completion_data icd;
        struct fio_file *f;
+       struct io_u *io_u;
        int ret, i;
 
        /*
@@ -265,78 +274,66 @@ static void do_verify(struct thread_data *td)
 
        td_set_runstate(td, TD_VERIFYING);
 
-       do {
-               if (td->terminate)
-                       break;
-
+       io_u = NULL;
+       while (!td->terminate) {
                io_u = __get_io_u(td);
                if (!io_u)
                        break;
 
-               if (runtime_exceeded(td, &io_u->start_time)) {
-                       put_io_u(td, io_u);
+               if (runtime_exceeded(td, &io_u->start_time))
                        break;
-               }
 
-               if (get_next_verify(td, io_u)) {
-                       put_io_u(td, io_u);
+               if (get_next_verify(td, io_u))
                        break;
-               }
 
-               f = get_next_file(td);
-               if (!f)
+               if (td_io_prep(td, io_u))
                        break;
 
-               io_u->file = f;
+requeue:
+               ret = td_io_queue(td, io_u);
 
-               if (td_io_prep(td, io_u)) {
-                       put_io_u(td, io_u);
-                       break;
-               }
+               switch (ret) {
+               case FIO_Q_COMPLETED:
+                       if (io_u->error)
+                               ret = io_u->error;
+                       if (io_u->xfer_buflen != io_u->resid && io_u->resid) {
+                               int bytes = io_u->xfer_buflen - io_u->resid;
 
-               ret = td_io_queue(td, io_u);
-               if (ret) {
-                       td_verror(td, io_u->error);
-                       put_io_u(td, io_u);
+                               io_u->xfer_buflen = io_u->resid;
+                               io_u->xfer_buf += bytes;
+                               goto requeue;
+                       }
+                       if (do_io_u_verify(td, &io_u)) {
+                               ret = -EIO;
+                               break;
+                       }
+                       continue;
+               case FIO_Q_QUEUED:
+                       break;
+               default:
+                       assert(ret < 0);
+                       td_verror(td, ret);
                        break;
                }
 
                /*
-                * we have one pending to verify, do that while
-                * we are doing io on the next one
+                * We get here for a queued request, in the future we
+                * want to later make this take full advantage of
+                * keeping IO in flight while verifying others.
                 */
-               if (do_io_u_verify(td, &v_io_u))
-                       break;
-
                ret = td_io_getevents(td, 1, 1, NULL);
-               if (ret != 1) {
-                       if (ret < 0)
-                               td_verror(td, ret);
+               if (ret < 0)
                        break;
-               }
 
-               v_io_u = td->io_ops->event(td, 0);
-               icd.nr = 1;
-               icd.error = 0;
-               fio_gettime(&icd.time, NULL);
-               io_completed(td, v_io_u, &icd);
+               assert(ret == 1);
+               io_u = td->io_ops->event(td, 0);
 
-               if (icd.error) {
-                       td_verror(td, icd.error);
-                       put_io_u(td, v_io_u);
-                       v_io_u = NULL;
+               if (do_io_u_verify(td, &io_u))
                        break;
-               }
-
-               /*
-                * if we can't submit more io, we need to verify now
-                */
-               if (queue_full(td) && do_io_u_verify(td, &v_io_u))
-                       break;
-
-       } while (1);
+       }
 
-       do_io_u_verify(td, &v_io_u);
+       if (io_u)
+               put_io_u(td, io_u);
 
        if (td->cur_depth)
                cleanup_pending_aio(td);
@@ -402,45 +399,63 @@ static void do_io(struct thread_data *td)
                memcpy(&s, &io_u->start_time, sizeof(s));
 requeue:
                ret = td_io_queue(td, io_u);
-               if (ret) {
-                       if (ret > 0 && (io_u->xfer_buflen != io_u->resid) &&
-                           io_u->resid) {
-                               /*
-                                * short read/write. requeue.
-                                */
+
+               switch (ret) {
+               case FIO_Q_COMPLETED:
+                       if (io_u->error) {
+                               ret = io_u->error;
+                               break;
+                       }
+                       if (io_u->xfer_buflen != io_u->resid && io_u->resid) {
+                               int bytes = io_u->xfer_buflen - io_u->resid;
+
                                io_u->xfer_buflen = io_u->resid;
-                               io_u->xfer_buf += ret;
+                               io_u->xfer_buf += bytes;
                                goto requeue;
-                       } else {
-                               put_io_u(td, io_u);
-                               break;
                        }
+                       init_icd(&icd);
+                       io_completed(td, io_u, &icd);
+                       put_io_u(td, io_u);
+                       break;
+               case FIO_Q_QUEUED:
+                       break;
+               default:
+                       assert(ret < 0);
+                       put_io_u(td, io_u);
+                       break;
                }
 
+               if (ret < 0)
+                       break;
+
                add_slat_sample(td, io_u->ddir, mtime_since(&io_u->start_time, &io_u->issue_time));
 
-               if (td->cur_depth < td->iodepth) {
-                       struct timespec ts = { .tv_sec = 0, .tv_nsec = 0};
+               if (ret == FIO_Q_QUEUED) {
+                       if (td->cur_depth < td->iodepth) {
+                               struct timespec ts;
 
-                       timeout = &ts;
-                       min_evts = 0;
-               } else {
-                       timeout = NULL;
-                       min_evts = 1;
-               }
+                               ts.tv_sec = 0;
+                               ts.tv_nsec = 0;
+                               timeout = &ts;
+                               min_evts = 0;
+                       } else {
+                               timeout = NULL;
+                               min_evts = 1;
+                       }
 
-               ret = td_io_getevents(td, min_evts, td->cur_depth, timeout);
-               if (ret < 0) {
-                       td_verror(td, ret);
-                       break;
-               } else if (!ret)
-                       continue;
+                       ret = td_io_getevents(td, min_evts, td->cur_depth, timeout);
+                       if (ret < 0) {
+                               td_verror(td, ret);
+                               break;
+                       } else if (!ret)
+                               continue;
 
-               icd.nr = ret;
-               ios_completed(td, &icd);
-               if (icd.error) {
-                       td_verror(td, icd.error);
-                       break;
+                       icd.nr = ret;
+                       ios_completed(td, &icd);
+                       if (icd.error) {
+                               td_verror(td, icd.error);
+                               break;
+                       }
                }
 
                /*
diff --git a/fio.h b/fio.h
index 934d897..b087f97 100644 (file)
--- a/fio.h
+++ b/fio.h
@@ -129,6 +129,14 @@ struct io_u {
        struct list_head list;
 };
 
+/*
+ * io_ops->queue() return values
+ */
+enum {
+       FIO_Q_COMPLETED = 0,            /* completed sync */
+       FIO_Q_QUEUED    = 1,            /* queued, will complete async */
+};
+
 #define FIO_HDR_MAGIC  0xf00baaef
 
 enum {
@@ -608,6 +616,7 @@ extern struct io_u *get_io_u(struct thread_data *, struct fio_file *);
 extern void put_io_u(struct thread_data *, struct io_u *);
 extern void ios_completed(struct thread_data *, struct io_completion_data *);
 extern void io_completed(struct thread_data *, struct io_u *, struct io_completion_data *);
+extern void init_icd(struct io_completion_data *);
 
 /*
  * io engine entry points
@@ -666,9 +675,10 @@ struct ioengine_ops {
        void (*cleanup)(struct thread_data *);
        void *data;
        void *dlhandle;
+       unsigned long priv;
 };
 
-#define FIO_IOOPS_VERSION      3
+#define FIO_IOOPS_VERSION      4
 
 extern struct ioengine_ops *load_ioengine(struct thread_data *, const char *);
 extern int register_ioengine(struct ioengine_ops *);
diff --git a/io_u.c b/io_u.c
index 132d897..6439979 100644 (file)
--- a/io_u.c
+++ b/io_u.c
@@ -353,23 +353,23 @@ struct io_u *get_io_u(struct thread_data *td, struct fio_file *f)
                        return NULL;
                }
 
-               f->last_pos += io_u->buflen;
+               f->last_pos = io_u->offset + io_u->buflen;
 
                if (td->verify != VERIFY_NONE)
                        populate_verify_io_u(td, io_u);
        }
 
-       if (td_io_prep(td, io_u)) {
-               put_io_u(td, io_u);
-               return NULL;
-       }
-
        /*
         * Set io data pointers.
         */
        io_u->xfer_buf = io_u->buf;
        io_u->xfer_buflen = io_u->buflen;
 
+       if (td_io_prep(td, io_u)) {
+               put_io_u(td, io_u);
+               return NULL;
+       }
+
        fio_gettime(&io_u->start_time, NULL);
        return io_u;
 }
@@ -411,15 +411,20 @@ void io_completed(struct thread_data *td, struct io_u *io_u,
                icd->error = io_u->error;
 }
 
-void ios_completed(struct thread_data *td, struct io_completion_data *icd)
+void init_icd(struct io_completion_data *icd)
 {
-       struct io_u *io_u;
-       int i;
-
        fio_gettime(&icd->time, NULL);
 
        icd->error = 0;
        icd->bytes_done[0] = icd->bytes_done[1] = 0;
+}
+
+void ios_completed(struct thread_data *td, struct io_completion_data *icd)
+{
+       struct io_u *io_u;
+       int i;
+
+       init_icd(icd);
 
        for (i = 0; i < icd->nr; i++) {
                io_u = td->io_ops->event(td, i);
index 2ed2749..1b510df 100644 (file)
@@ -33,16 +33,27 @@ static int check_engine_ops(struct ioengine_ops *ops)
        if (ops->flags & FIO_CPUIO)
                return 0;
 
+       if (!ops->queue) {
+               log_err("%s: no queue handler\n", ops->name);
+               return 1;
+       }
+
+       /*
+        * sync engines only need a ->queue()
+        */
+       if (ops->flags & FIO_SYNCIO)
+               return 0;
+       
        if (!ops->event) {
-               log_err("%s: no event handler)\n", ops->name);
+               log_err("%s: no event handler\n", ops->name);
                return 1;
        }
        if (!ops->getevents) {
-               log_err("%s: no getevents handler)\n", ops->name);
+               log_err("%s: no getevents handler\n", ops->name);
                return 1;
        }
        if (!ops->queue) {
-               log_err("%s: no queue handler)\n", ops->name);
+               log_err("%s: no queue handler\n", ops->name);
                return 1;
        }
                
@@ -159,8 +170,8 @@ void close_ioengine(struct thread_data *td)
 
 int td_io_prep(struct thread_data *td, struct io_u *io_u)
 {
-       if (td->io_ops->prep && td->io_ops->prep(td, io_u))
-               return 1;
+       if (td->io_ops->prep)
+               return td->io_ops->prep(td, io_u);
 
        return 0;
 }
@@ -168,7 +179,10 @@ int td_io_prep(struct thread_data *td, struct io_u *io_u)
 int td_io_getevents(struct thread_data *td, int min, int max,
                    struct timespec *t)
 {
-       return td->io_ops->getevents(td, min, max, t);
+       if (td->io_ops->getevents)
+               return td->io_ops->getevents(td, min, max, t);
+
+       return 0;
 }
 
 int td_io_queue(struct thread_data *td, struct io_u *io_u)
index 4440d44..32cfdd7 100644 (file)
--- a/verify.c
+++ b/verify.c
@@ -83,8 +83,10 @@ static int verify_io_u(struct io_u *io_u)
        struct verify_header *hdr = (struct verify_header *) io_u->buf;
        int ret;
 
-       if (hdr->fio_magic != FIO_HDR_MAGIC)
+       if (hdr->fio_magic != FIO_HDR_MAGIC) {
+               log_err("Bad verify header %x\n", hdr->fio_magic);
                return 1;
+       }
 
        if (hdr->verify_type == VERIFY_MD5)
                ret = verify_io_u_md5(hdr, io_u);
@@ -148,7 +150,10 @@ int get_next_verify(struct thread_data *td, struct io_u *io_u)
 
                io_u->offset = ipo->offset;
                io_u->buflen = ipo->len;
+               io_u->file = ipo->file;
                io_u->ddir = DDIR_READ;
+               io_u->xfer_buf = io_u->buf;
+               io_u->xfer_buflen = io_u->buflen;
                free(ipo);
                return 0;
        }
@@ -162,7 +167,11 @@ int do_io_u_verify(struct thread_data *td, struct io_u **io_u)
        int ret = 0;
 
        if (v_io_u) {
+               struct io_completion_data icd;
+
                ret = verify_io_u(v_io_u);
+               init_icd(&icd);
+               io_completed(td, v_io_u, &icd);
                put_io_u(td, v_io_u);
                *io_u = NULL;
        }