Fully parallellize io_u verification
[fio.git] / fio.c
diff --git a/fio.c b/fio.c
index 5358af2c8a8dc7bea5234c329bdf39087679a15f..eb857afc690f4d794dfcbfa74f42b61d2986e0b9 100644 (file)
--- a/fio.c
+++ b/fio.c
@@ -174,7 +174,7 @@ static void cleanup_pending_aio(struct thread_data *td)
         */
        r = td_io_getevents(td, 0, td->cur_depth, &ts);
        if (r > 0) {
-               icd.nr = r;
+               init_icd(&icd, NULL, r);
                ios_completed(td, &icd);
        }
 
@@ -194,7 +194,7 @@ static void cleanup_pending_aio(struct thread_data *td)
        if (td->cur_depth) {
                r = td_io_getevents(td, td->cur_depth, td->cur_depth, NULL);
                if (r > 0) {
-                       icd.nr = r;
+                       init_icd(&icd, NULL, r);
                        ios_completed(td, &icd);
                }
        }
@@ -233,7 +233,7 @@ static int fio_io_sync(struct thread_data *td, struct fio_file *f)
                        return 1;
                }
 
-               icd.nr = ret;
+               init_icd(&icd, NULL, ret);
                ios_completed(td, &icd);
                if (icd.error) {
                        td_verror(td, icd.error);
@@ -245,7 +245,7 @@ static int fio_io_sync(struct thread_data *td, struct fio_file *f)
                        return 1;
                }
 
-               init_icd(&icd);
+               init_icd(&icd, NULL, 1);
                io_completed(td, io_u, &icd);
                put_io_u(td, io_u);
        }
@@ -261,7 +261,7 @@ static void do_verify(struct thread_data *td)
 {
        struct fio_file *f;
        struct io_u *io_u;
-       int ret, i;
+       int ret, i, min_events;
 
        /*
         * sync io first and invalidate cache, to make sure we really
@@ -276,6 +276,9 @@ static void do_verify(struct thread_data *td)
 
        io_u = NULL;
        while (!td->terminate) {
+               struct io_completion_data icd;
+               struct timespec *timeout;
+
                io_u = __get_io_u(td);
                if (!io_u)
                        break;
@@ -303,10 +306,13 @@ requeue:
                                io_u->xfer_buf += bytes;
                                goto requeue;
                        }
-                       if (do_io_u_verify(td, &io_u)) {
-                               ret = -EIO;
+                       init_icd(&icd, verify_io_u, 1);
+                       io_completed(td, io_u, &icd);
+                       if (icd.error) {
+                               ret = icd.error;
                                break;
                        }
+                       put_io_u(td, io_u);
                        continue;
                case FIO_Q_QUEUED:
                        break;
@@ -316,20 +322,42 @@ requeue:
                        break;
                }
 
+               if (ret < 0)
+                       break;
+
                /*
-                * We get here for a queued request, in the future we
-                * want to later make this take full advantage of
-                * keeping IO in flight while verifying others.
+                * if we can queue more, do so. but check if there are
+                * completed io_u's first.
                 */
-               ret = td_io_getevents(td, 1, 1, NULL);
+               if (queue_full(td)) {
+                       timeout = NULL;
+                       min_events = 1;
+               } else {
+                       struct timespec ts;
+
+                       ts.tv_sec = 0;
+                       ts.tv_nsec = 0;
+                       timeout = &ts;
+                       min_events = 0;
+               }
+
+               /*
+                * Reap required number of io units, if any, and do the
+                * verification on them through the callback handler
+                */
+               ret = td_io_getevents(td, min_events, td->cur_depth, timeout);
                if (ret < 0)
                        break;
+               else if (!ret)
+                       continue;
 
-               assert(ret == 1);
-               io_u = td->io_ops->event(td, 0);
+               init_icd(&icd, verify_io_u, ret);
+               ios_completed(td, &icd);
 
-               if (do_io_u_verify(td, &io_u))
+               if (icd.error) {
+                       td_verror(td, icd.error);
                        break;
+               }
        }
 
        if (io_u)
@@ -413,7 +441,7 @@ requeue:
                                io_u->xfer_buf += bytes;
                                goto requeue;
                        }
-                       init_icd(&icd);
+                       init_icd(&icd, NULL, 1);
                        io_completed(td, io_u, &icd);
                        put_io_u(td, io_u);
                        break;
@@ -450,7 +478,7 @@ requeue:
                        } else if (!ret)
                                continue;
 
-                       icd.nr = ret;
+                       init_icd(&icd, NULL, ret);
                        ios_completed(td, &icd);
                        if (icd.error) {
                                td_verror(td, icd.error);