backend: split queue io_u event handling into helper
[fio.git] / backend.c
index fdb7413d7719ff9ff4b46af8e611c072c6918435..f82b83f75ee4d7aa09e99079d2a427c531506a2f 100644 (file)
--- a/backend.c
+++ b/backend.c
@@ -229,16 +229,15 @@ static int __check_min_rate(struct thread_data *td, struct timeval *now,
        return 0;
 }
 
-static int check_min_rate(struct thread_data *td, struct timeval *now,
-                         uint64_t *bytes_done)
+static int check_min_rate(struct thread_data *td, struct timeval *now)
 {
        int ret = 0;
 
-       if (bytes_done[DDIR_READ])
+       if (td->bytes_done[DDIR_READ])
                ret |= __check_min_rate(td, now, DDIR_READ);
-       if (bytes_done[DDIR_WRITE])
+       if (td->bytes_done[DDIR_WRITE])
                ret |= __check_min_rate(td, now, DDIR_WRITE);
-       if (bytes_done[DDIR_TRIM])
+       if (td->bytes_done[DDIR_TRIM])
                ret |= __check_min_rate(td, now, DDIR_TRIM);
 
        return ret;
@@ -255,7 +254,7 @@ static void cleanup_pending_aio(struct thread_data *td)
        /*
         * get immediately available events, if any
         */
-       r = io_u_queued_complete(td, 0, NULL);
+       r = io_u_queued_complete(td, 0);
        if (r < 0)
                return;
 
@@ -276,7 +275,7 @@ static void cleanup_pending_aio(struct thread_data *td)
        }
 
        if (td->cur_depth)
-               r = io_u_queued_complete(td, td->cur_depth, NULL);
+               r = io_u_queued_complete(td, td->cur_depth);
 }
 
 /*
@@ -306,7 +305,7 @@ requeue:
                put_io_u(td, io_u);
                return 1;
        } else if (ret == FIO_Q_QUEUED) {
-               if (io_u_queued_complete(td, 1, NULL) < 0)
+               if (io_u_queued_complete(td, 1) < 0)
                        return 1;
        } else if (ret == FIO_Q_COMPLETED) {
                if (io_u->error) {
@@ -314,7 +313,7 @@ requeue:
                        return 1;
                }
 
-               if (io_u_sync_complete(td, io_u, NULL) < 0)
+               if (io_u_sync_complete(td, io_u) < 0)
                        return 1;
        } else if (ret == FIO_Q_BUSY) {
                if (td_io_commit(td))
@@ -418,8 +417,7 @@ static void check_update_rusage(struct thread_data *td)
        }
 }
 
-static int wait_for_completions(struct thread_data *td, struct timeval *time,
-                               uint64_t *bytes_done)
+static int wait_for_completions(struct thread_data *td, struct timeval *time)
 {
        const int full = queue_full(td);
        int min_evts = 0;
@@ -438,7 +436,7 @@ static int wait_for_completions(struct thread_data *td, struct timeval *time,
                fio_gettime(time, NULL);
 
        do {
-               ret = io_u_queued_complete(td, min_evts, bytes_done);
+               ret = io_u_queued_complete(td, min_evts);
                if (ret < 0)
                        break;
        } while (full && (td->cur_depth > td->o.iodepth_low));
@@ -446,13 +444,99 @@ static int wait_for_completions(struct thread_data *td, struct timeval *time,
        return ret;
 }
 
+int io_queue_event(struct thread_data *td, struct io_u *io_u, int *ret,
+                  enum fio_ddir ddir, uint64_t *bytes_issued, int from_verify,
+                  struct timeval *comp_time)
+{
+       int ret2;
+
+       switch (*ret) {
+       case FIO_Q_COMPLETED:
+               if (io_u->error) {
+                       *ret = -io_u->error;
+                       clear_io_u(td, io_u);
+               } else if (io_u->resid) {
+                       int bytes = io_u->xfer_buflen - io_u->resid;
+                       struct fio_file *f = io_u->file;
+
+                       if (bytes_issued)
+                               *bytes_issued += bytes;
+
+                       if (!from_verify)
+                               trim_io_piece(td, io_u);
+
+                       /*
+                        * zero read, fail
+                        */
+                       if (!bytes) {
+                               if (!from_verify)
+                                       unlog_io_piece(td, io_u);
+                               td_verror(td, EIO, "full resid");
+                               put_io_u(td, io_u);
+                               break;
+                       }
+
+                       io_u->xfer_buflen = io_u->resid;
+                       io_u->xfer_buf += bytes;
+                       io_u->offset += bytes;
+
+                       if (ddir_rw(io_u->ddir))
+                               td->ts.short_io_u[io_u->ddir]++;
+
+                       f = io_u->file;
+                       if (io_u->offset == f->real_file_size)
+                               goto sync_done;
+
+                       requeue_io_u(td, &io_u);
+               } else {
+sync_done:
+                       if (comp_time && (__should_check_rate(td, DDIR_READ) ||
+                           __should_check_rate(td, DDIR_WRITE) ||
+                           __should_check_rate(td, DDIR_TRIM)))
+                               fio_gettime(comp_time, NULL);
+
+                       *ret = io_u_sync_complete(td, io_u);
+                       if (*ret < 0)
+                               break;
+               }
+               return 0;
+       case FIO_Q_QUEUED:
+               /*
+                * if the engine doesn't have a commit hook,
+                * the io_u is really queued. if it does have such
+                * a hook, it has to call io_u_queued() itself.
+                */
+               if (td->io_ops->commit == NULL)
+                       io_u_queued(td, io_u);
+               if (bytes_issued)
+                       *bytes_issued += io_u->xfer_buflen;
+               break;
+       case FIO_Q_BUSY:
+               if (!from_verify)
+                       unlog_io_piece(td, io_u);
+               requeue_io_u(td, &io_u);
+               ret2 = td_io_commit(td);
+               if (ret2 < 0)
+                       *ret = ret2;
+               break;
+       default:
+               assert(ret < 0);
+               td_verror(td, -(*ret), "td_io_queue");
+               break;
+       }
+
+       if (break_on_this_error(td, ddir, ret))
+               return 1;
+
+       return 0;
+}
+
 /*
  * The main verify engine. Runs over the writes we previously submitted,
  * reads the blocks back in, and checks the crc/md5 of the data.
  */
 static void do_verify(struct thread_data *td, uint64_t verify_bytes)
 {
-       uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
        struct fio_file *f;
        struct io_u *io_u;
        int ret, min_events;
@@ -483,7 +567,7 @@ static void do_verify(struct thread_data *td, uint64_t verify_bytes)
        io_u = NULL;
        while (!td->terminate) {
                enum fio_ddir ddir;
-               int ret2, full;
+               int full;
 
                update_tv_cache(td);
                check_update_rusage(td);
@@ -514,7 +598,7 @@ static void do_verify(struct thread_data *td, uint64_t verify_bytes)
                                break;
                        }
                } else {
-                       if (ddir_rw_sum(bytes_done) + td->o.rw_min_bs > verify_bytes)
+                       if (ddir_rw_sum(td->bytes_done) + td->o.rw_min_bs > verify_bytes)
                                break;
 
                        while ((io_u = get_io_u(td)) != NULL) {
@@ -569,57 +653,8 @@ static void do_verify(struct thread_data *td, uint64_t verify_bytes)
                        fio_gettime(&io_u->start_time, NULL);
 
                ret = td_io_queue(td, io_u);
-               switch (ret) {
-               case FIO_Q_COMPLETED:
-                       if (io_u->error) {
-                               ret = -io_u->error;
-                               clear_io_u(td, io_u);
-                       } else if (io_u->resid) {
-                               int bytes = io_u->xfer_buflen - io_u->resid;
 
-                               /*
-                                * zero read, fail
-                                */
-                               if (!bytes) {
-                                       td_verror(td, EIO, "full resid");
-                                       put_io_u(td, io_u);
-                                       break;
-                               }
-
-                               io_u->xfer_buflen = io_u->resid;
-                               io_u->xfer_buf += bytes;
-                               io_u->offset += bytes;
-
-                               if (ddir_rw(io_u->ddir))
-                                       td->ts.short_io_u[io_u->ddir]++;
-
-                               f = io_u->file;
-                               if (io_u->offset == f->real_file_size)
-                                       goto sync_done;
-
-                               requeue_io_u(td, &io_u);
-                       } else {
-sync_done:
-                               ret = io_u_sync_complete(td, io_u, bytes_done);
-                               if (ret < 0)
-                                       break;
-                       }
-                       continue;
-               case FIO_Q_QUEUED:
-                       break;
-               case FIO_Q_BUSY:
-                       requeue_io_u(td, &io_u);
-                       ret2 = td_io_commit(td);
-                       if (ret2 < 0)
-                               ret = ret2;
-                       break;
-               default:
-                       assert(ret < 0);
-                       td_verror(td, -ret, "td_io_queue");
-                       break;
-               }
-
-               if (break_on_this_error(td, ddir, &ret))
+               if (io_queue_event(td, io_u, &ret, ddir, NULL, 1, NULL))
                        break;
 
                /*
@@ -630,7 +665,7 @@ sync_done:
 reap:
                full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth);
                if (full || !td->o.iodepth_batch_complete)
-                       ret = wait_for_completions(td, NULL, bytes_done);
+                       ret = wait_for_completions(td, NULL);
 
                if (ret < 0)
                        break;
@@ -642,7 +677,7 @@ reap:
                min_events = td->cur_depth;
 
                if (min_events)
-                       ret = io_u_queued_complete(td, min_events, NULL);
+                       ret = io_u_queued_complete(td, min_events);
        } else
                cleanup_pending_aio(td);
 
@@ -716,7 +751,6 @@ static int io_complete_bytes_exceeded(struct thread_data *td)
  */
 static uint64_t do_io(struct thread_data *td)
 {
-       uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
        unsigned int i;
        int ret = 0;
        uint64_t total_bytes, bytes_issued = 0;
@@ -744,12 +778,17 @@ static uint64_t do_io(struct thread_data *td)
           (td_write(td) && td->o.verify_backlog))
                total_bytes += td->o.size;
 
+       /* In trimwrite mode, each byte is trimmed and then written, so
+        * allow total_bytes to be twice as big */
+       if (td_trimwrite(td))
+               total_bytes += td->total_io_size;
+
        while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) ||
                (!flist_empty(&td->trim_list)) || !io_issue_bytes_exceeded(td) ||
                td->o.time_based) {
                struct timeval comp_time;
                struct io_u *io_u;
-               int ret2, full;
+               int full;
                enum fio_ddir ddir;
 
                check_update_rusage(td);
@@ -830,78 +869,8 @@ static uint64_t do_io(struct thread_data *td)
                        log_io_piece(td, io_u);
 
                ret = td_io_queue(td, io_u);
-               switch (ret) {
-               case FIO_Q_COMPLETED:
-                       if (io_u->error) {
-                               ret = -io_u->error;
-                               unlog_io_piece(td, io_u);
-                               clear_io_u(td, io_u);
-                       } else if (io_u->resid) {
-                               int bytes = io_u->xfer_buflen - io_u->resid;
-                               struct fio_file *f = io_u->file;
-
-                               bytes_issued += bytes;
-
-                               trim_io_piece(td, io_u);
-
-                               /*
-                                * zero read, fail
-                                */
-                               if (!bytes) {
-                                       unlog_io_piece(td, io_u);
-                                       td_verror(td, EIO, "full resid");
-                                       put_io_u(td, io_u);
-                                       break;
-                               }
-
-                               io_u->xfer_buflen = io_u->resid;
-                               io_u->xfer_buf += bytes;
-                               io_u->offset += bytes;
-
-                               if (ddir_rw(io_u->ddir))
-                                       td->ts.short_io_u[io_u->ddir]++;
-
-                               if (io_u->offset == f->real_file_size)
-                                       goto sync_done;
-
-                               requeue_io_u(td, &io_u);
-                       } else {
-sync_done:
-                               if (__should_check_rate(td, DDIR_READ) ||
-                                   __should_check_rate(td, DDIR_WRITE) ||
-                                   __should_check_rate(td, DDIR_TRIM))
-                                       fio_gettime(&comp_time, NULL);
 
-                               ret = io_u_sync_complete(td, io_u, bytes_done);
-                               if (ret < 0)
-                                       break;
-                               bytes_issued += io_u->xfer_buflen;
-                       }
-                       break;
-               case FIO_Q_QUEUED:
-                       /*
-                        * if the engine doesn't have a commit hook,
-                        * the io_u is really queued. if it does have such
-                        * a hook, it has to call io_u_queued() itself.
-                        */
-                       if (td->io_ops->commit == NULL)
-                               io_u_queued(td, io_u);
-                       bytes_issued += io_u->xfer_buflen;
-                       break;
-               case FIO_Q_BUSY:
-                       unlog_io_piece(td, io_u);
-                       requeue_io_u(td, &io_u);
-                       ret2 = td_io_commit(td);
-                       if (ret2 < 0)
-                               ret = ret2;
-                       break;
-               default:
-                       assert(ret < 0);
-                       put_io_u(td, io_u);
-                       break;
-               }
-
-               if (break_on_this_error(td, ddir, &ret))
+               if (io_queue_event(td, io_u, &ret, ddir, &bytes_issued, 1, &comp_time))
                        break;
 
                /*
@@ -910,16 +879,18 @@ sync_done:
                 * resource starved.
                 */
 reap:
-               full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth);
+               full = queue_full(td) ||
+                       (ret == FIO_Q_BUSY && td->cur_depth);
                if (full || !td->o.iodepth_batch_complete)
-                       ret = wait_for_completions(td, &comp_time, bytes_done);
+                       ret = wait_for_completions(td, &comp_time);
                if (ret < 0)
                        break;
-               if (!ddir_rw_sum(bytes_done) && !(td->io_ops->flags & FIO_NOIO))
+               if (!ddir_rw_sum(td->bytes_done) &&
+                   !(td->io_ops->flags & FIO_NOIO))
                        continue;
 
-               if (!in_ramp_time(td) && should_check_rate(td, bytes_done)) {
-                       if (check_min_rate(td, &comp_time, bytes_done)) {
+               if (!in_ramp_time(td) && should_check_rate(td)) {
+                       if (check_min_rate(td, &comp_time)) {
                                if (exitall_on_terminate)
                                        fio_terminate_threads(td->groupid);
                                td_verror(td, EIO, "check_min_rate");
@@ -962,7 +933,7 @@ reap:
 
                i = td->cur_depth;
                if (i) {
-                       ret = io_u_queued_complete(td, i, bytes_done);
+                       ret = io_u_queued_complete(td, i);
                        if (td->o.fill_device && td->error == ENOSPC)
                                td->error = 0;
                }
@@ -987,7 +958,7 @@ reap:
        if (!ddir_rw_sum(td->this_io_bytes))
                td->done = 1;
 
-       return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM];
+       return td->bytes_done[DDIR_WRITE] + td->bytes_done[DDIR_TRIM];
 }
 
 static void cleanup_io_u(struct thread_data *td)
@@ -1257,8 +1228,6 @@ static int exec_string(struct thread_options *o, const char *string, const char
  */
 static uint64_t do_dry_run(struct thread_data *td)
 {
-       uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
-
        td_set_runstate(td, TD_RUNNING);
 
        while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) ||
@@ -1289,11 +1258,11 @@ static uint64_t do_dry_run(struct thread_data *td)
                    !td->o.experimental_verify)
                        log_io_piece(td, io_u);
 
-               ret = io_u_sync_complete(td, io_u, bytes_done);
+               ret = io_u_sync_complete(td, io_u);
                (void) ret;
        }
 
-       return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM];
+       return td->bytes_done[DDIR_WRITE] + td->bytes_done[DDIR_TRIM];
 }
 
 /*
@@ -1526,6 +1495,15 @@ static void *thread_main(void *data)
 
                clear_state = 1;
 
+               /*
+                * Make sure we've successfully updated the rusage stats
+                * before waiting on the stat mutex. Otherwise we could have
+                * the stat thread holding stat mutex and waiting for
+                * the rusage_sem, which would never get upped because
+                * this thread is waiting for the stat mutex.
+                */
+               check_update_rusage(td);
+
                fio_mutex_down(stat_mutex);
                if (td_read(td) && td->io_bytes[DDIR_READ]) {
                        elapsed = mtime_since_now(&td->start);
@@ -1556,6 +1534,11 @@ static void *thread_main(void *data)
 
                do_verify(td, verify_bytes);
 
+               /*
+                * See comment further up for why this is done here.
+                */
+               check_update_rusage(td);
+
                fio_mutex_down(stat_mutex);
                td->ts.runtime[DDIR_READ] += mtime_since_now(&td->start);
                fio_gettime(&td->start, NULL);