workqueue: move private accounting to caller
[fio.git] / backend.c
index 224736f4c963dcef68db671b39b83ec9ed763758..c208b227846895c63d432c62531689a074dda66c 100644 (file)
--- a/backend.c
+++ b/backend.c
@@ -49,7 +49,7 @@
 #include "cgroup.h"
 #include "profile.h"
 #include "lib/rand.h"
-#include "memalign.h"
+#include "lib/memalign.h"
 #include "server.h"
 #include "lib/getrusage.h"
 #include "idletime.h"
@@ -144,8 +144,8 @@ static void set_sig_handlers(void)
 /*
  * Check if we are above the minimum rate given.
  */
-static int __check_min_rate(struct thread_data *td, struct timeval *now,
-                           enum fio_ddir ddir)
+static bool __check_min_rate(struct thread_data *td, struct timeval *now,
+                            enum fio_ddir ddir)
 {
        unsigned long long bytes = 0;
        unsigned long iops = 0;
@@ -158,13 +158,13 @@ static int __check_min_rate(struct thread_data *td, struct timeval *now,
        assert(ddir_rw(ddir));
 
        if (!td->o.ratemin[ddir] && !td->o.rate_iops_min[ddir])
-               return 0;
+               return false;
 
        /*
         * allow a 2 second settle period in the beginning
         */
        if (mtime_since(&td->start, now) < 2000)
-               return 0;
+               return false;
 
        iops += td->this_io_blocks[ddir];
        bytes += td->this_io_bytes[ddir];
@@ -178,7 +178,7 @@ static int __check_min_rate(struct thread_data *td, struct timeval *now,
        if (td->rate_bytes[ddir] || td->rate_blocks[ddir]) {
                spent = mtime_since(&td->lastrate[ddir], now);
                if (spent < td->o.ratecycle)
-                       return 0;
+                       return false;
 
                if (td->o.rate[ddir] || td->o.ratemin[ddir]) {
                        /*
@@ -187,7 +187,7 @@ static int __check_min_rate(struct thread_data *td, struct timeval *now,
                        if (bytes < td->rate_bytes[ddir]) {
                                log_err("%s: min rate %u not met\n", td->o.name,
                                                                ratemin);
-                               return 1;
+                               return true;
                        } else {
                                if (spent)
                                        rate = ((bytes - td->rate_bytes[ddir]) * 1000) / spent;
@@ -199,7 +199,7 @@ static int __check_min_rate(struct thread_data *td, struct timeval *now,
                                        log_err("%s: min rate %u not met, got"
                                                " %luKB/sec\n", td->o.name,
                                                        ratemin, rate);
-                                       return 1;
+                                       return true;
                                }
                        }
                } else {
@@ -209,7 +209,7 @@ static int __check_min_rate(struct thread_data *td, struct timeval *now,
                        if (iops < rate_iops) {
                                log_err("%s: min iops rate %u not met\n",
                                                td->o.name, rate_iops);
-                               return 1;
+                               return true;
                        } else {
                                if (spent)
                                        rate = ((iops - td->rate_blocks[ddir]) * 1000) / spent;
@@ -221,7 +221,7 @@ static int __check_min_rate(struct thread_data *td, struct timeval *now,
                                        log_err("%s: min iops rate %u not met,"
                                                " got %lu\n", td->o.name,
                                                        rate_iops_min, rate);
-                                       return 1;
+                                       return true;
                                }
                        }
                }
@@ -230,12 +230,12 @@ static int __check_min_rate(struct thread_data *td, struct timeval *now,
        td->rate_bytes[ddir] = bytes;
        td->rate_blocks[ddir] = iops;
        memcpy(&td->lastrate[ddir], now, sizeof(*now));
-       return 0;
+       return false;
 }
 
-static int check_min_rate(struct thread_data *td, struct timeval *now)
+static bool check_min_rate(struct thread_data *td, struct timeval *now)
 {
-       int ret = 0;
+       bool ret = false;
 
        if (td->bytes_done[DDIR_READ])
                ret |= __check_min_rate(td, now, DDIR_READ);
@@ -286,20 +286,20 @@ static void cleanup_pending_aio(struct thread_data *td)
  * Helper to handle the final sync of a file. Works just like the normal
  * io path, just does everything sync.
  */
-static int fio_io_sync(struct thread_data *td, struct fio_file *f)
+static bool fio_io_sync(struct thread_data *td, struct fio_file *f)
 {
        struct io_u *io_u = __get_io_u(td);
        int ret;
 
        if (!io_u)
-               return 1;
+               return true;
 
        io_u->ddir = DDIR_SYNC;
        io_u->file = f;
 
        if (td_io_prep(td, io_u)) {
                put_io_u(td, io_u);
-               return 1;
+               return true;
        }
 
 requeue:
@@ -307,25 +307,25 @@ requeue:
        if (ret < 0) {
                td_verror(td, io_u->error, "td_io_queue");
                put_io_u(td, io_u);
-               return 1;
+               return true;
        } else if (ret == FIO_Q_QUEUED) {
                if (io_u_queued_complete(td, 1) < 0)
-                       return 1;
+                       return true;
        } else if (ret == FIO_Q_COMPLETED) {
                if (io_u->error) {
                        td_verror(td, io_u->error, "td_io_queue");
-                       return 1;
+                       return true;
                }
 
                if (io_u_sync_complete(td, io_u) < 0)
-                       return 1;
+                       return true;
        } else if (ret == FIO_Q_BUSY) {
                if (td_io_commit(td))
-                       return 1;
+                       return true;
                goto requeue;
        }
 
-       return 0;
+       return false;
 }
 
 static int fio_file_fsync(struct thread_data *td, struct fio_file *f)
@@ -354,16 +354,16 @@ static inline void update_tv_cache(struct thread_data *td)
                __update_tv_cache(td);
 }
 
-static inline int runtime_exceeded(struct thread_data *td, struct timeval *t)
+static inline bool runtime_exceeded(struct thread_data *td, struct timeval *t)
 {
        if (in_ramp_time(td))
-               return 0;
+               return false;
        if (!td->o.timeout)
-               return 0;
+               return false;
        if (utime_since(&td->epoch, t) >= td->o.timeout)
-               return 1;
+               return true;
 
-       return 0;
+       return false;
 }
 
 /*
@@ -383,8 +383,8 @@ static inline void update_runtime(struct thread_data *td,
        td->ts.runtime[ddir] += (elapsed_us[ddir] + 999) / 1000;
 }
 
-static int break_on_this_error(struct thread_data *td, enum fio_ddir ddir,
-                              int *retptr)
+static bool break_on_this_error(struct thread_data *td, enum fio_ddir ddir,
+                               int *retptr)
 {
        int ret = *retptr;
 
@@ -397,7 +397,7 @@ static int break_on_this_error(struct thread_data *td, enum fio_ddir ddir,
 
                eb = td_error_type(ddir, err);
                if (!(td->o.continue_on_error & (1 << eb)))
-                       return 1;
+                       return true;
 
                if (td_non_fatal_error(td, eb, err)) {
                        /*
@@ -407,7 +407,7 @@ static int break_on_this_error(struct thread_data *td, enum fio_ddir ddir,
                        update_error_count(td, err);
                        td_clear_error(td);
                        *retptr = 0;
-                       return 0;
+                       return false;
                } else if (td->o.fill_device && err == ENOSPC) {
                        /*
                         * We expect to hit this error if
@@ -415,18 +415,18 @@ static int break_on_this_error(struct thread_data *td, enum fio_ddir ddir,
                         */
                        td_clear_error(td);
                        fio_mark_td_terminate(td);
-                       return 1;
+                       return true;
                } else {
                        /*
                         * Stop the I/O in case of a fatal
                         * error.
                         */
                        update_error_count(td, err);
-                       return 1;
+                       return true;
                }
        }
 
-       return 0;
+       return false;
 }
 
 static void check_update_rusage(struct thread_data *td)
@@ -552,7 +552,7 @@ sync_done:
        return 0;
 }
 
-static inline int io_in_polling(struct thread_data *td)
+static inline bool io_in_polling(struct thread_data *td)
 {
        return !td->o.iodepth_batch_complete_min &&
                   !td->o.iodepth_batch_complete_max;
@@ -713,12 +713,12 @@ reap:
        dprint(FD_VERIFY, "exiting loop\n");
 }
 
-static unsigned int exceeds_number_ios(struct thread_data *td)
+static bool exceeds_number_ios(struct thread_data *td)
 {
        unsigned long long number_ios;
 
        if (!td->o.number_ios)
-               return 0;
+               return false;
 
        number_ios = ddir_rw_sum(td->io_blocks);
        number_ios += td->io_u_queued + td->io_u_in_flight;
@@ -726,7 +726,7 @@ static unsigned int exceeds_number_ios(struct thread_data *td)
        return number_ios >= (td->o.number_ios * td->loops);
 }
 
-static int io_issue_bytes_exceeded(struct thread_data *td)
+static bool io_issue_bytes_exceeded(struct thread_data *td)
 {
        unsigned long long bytes, limit;
 
@@ -748,7 +748,7 @@ static int io_issue_bytes_exceeded(struct thread_data *td)
        return bytes >= limit || exceeds_number_ios(td);
 }
 
-static int io_complete_bytes_exceeded(struct thread_data *td)
+static bool io_complete_bytes_exceeded(struct thread_data *td)
 {
        unsigned long long bytes, limit;
 
@@ -928,9 +928,23 @@ static uint64_t do_io(struct thread_data *td)
                        log_io_piece(td, io_u);
 
                if (td->o.io_submit_mode == IO_MODE_OFFLOAD) {
+                       const unsigned long blen = io_u->xfer_buflen;
+                       const enum fio_ddir ddir = acct_ddir(io_u);
+
                        if (td->error)
                                break;
-                       ret = workqueue_enqueue(&td->io_wq, io_u);
+
+                       ret = workqueue_enqueue(&td->io_wq, &io_u->work);
+                       if (ret)
+                               ret = FIO_Q_QUEUED;
+                       else
+                               ret = FIO_Q_BUSY;
+
+                       if (ret == FIO_Q_QUEUED && ddir_rw(ddir)) {
+                               td->io_issues[ddir]++;
+                               td->io_issue_bytes[ddir] += blen;
+                               td->rate_io_issue_bytes[ddir] += blen;
+                       }
 
                        if (should_check_rate(td))
                                td->rate_next_io_time[ddir] = usec_for_io(td, ddir);
@@ -1247,20 +1261,20 @@ static int switch_ioscheduler(struct thread_data *td)
        return 0;
 }
 
-static int keep_running(struct thread_data *td)
+static bool keep_running(struct thread_data *td)
 {
        unsigned long long limit;
 
        if (td->done)
-               return 0;
+               return false;
        if (td->o.time_based)
-               return 1;
+               return true;
        if (td->o.loops) {
                td->o.loops--;
-               return 1;
+               return true;
        }
        if (exceeds_number_ios(td))
-               return 0;
+               return false;
 
        if (td->o.io_limit)
                limit = td->o.io_limit;
@@ -1276,15 +1290,15 @@ static int keep_running(struct thread_data *td)
                 */
                diff = limit - ddir_rw_sum(td->io_bytes);
                if (diff < td_max_bs(td))
-                       return 0;
+                       return false;
 
                if (fio_files_done(td))
-                       return 0;
+                       return false;
 
-               return 1;
+               return true;
        }
 
-       return 0;
+       return false;
 }
 
 static int exec_string(struct thread_options *o, const char *string, const char *mode)
@@ -1347,9 +1361,12 @@ static uint64_t do_dry_run(struct thread_data *td)
        return td->bytes_done[DDIR_WRITE] + td->bytes_done[DDIR_TRIM];
 }
 
-static void io_workqueue_fn(struct thread_data *td, struct io_u *io_u)
+static void io_workqueue_fn(struct submit_worker *sw,
+                           struct workqueue_work *work)
 {
+       struct io_u *io_u = container_of(work, struct io_u, work);
        const enum fio_ddir ddir = io_u->ddir;
+       struct thread_data *td = sw->private;
        int ret;
 
        dprint(FD_RATE, "io_u %p queued by %u\n", io_u, gettid());
@@ -1358,18 +1375,213 @@ static void io_workqueue_fn(struct thread_data *td, struct io_u *io_u)
 
        td->cur_depth++;
 
-       ret = td_io_queue(td, io_u);
+       do {
+               ret = td_io_queue(td, io_u);
+               if (ret != FIO_Q_BUSY)
+                       break;
+               ret = io_u_queued_complete(td, 1);
+               if (ret > 0)
+                       td->cur_depth -= ret;
+               io_u_clear(io_u, IO_U_F_FLIGHT);
+       } while (1);
 
        dprint(FD_RATE, "io_u %p ret %d by %u\n", io_u, ret, gettid());
 
        io_queue_event(td, io_u, &ret, ddir, NULL, 0, NULL);
 
-       if (ret == FIO_Q_QUEUED)
-               ret = io_u_queued_complete(td, 1);
+       if (ret == FIO_Q_COMPLETED)
+               td->cur_depth--;
+       else if (ret == FIO_Q_QUEUED) {
+               unsigned int min_evts;
+
+               if (td->o.iodepth == 1)
+                       min_evts = 1;
+               else
+                       min_evts = 0;
+
+               ret = io_u_queued_complete(td, min_evts);
+               if (ret > 0)
+                       td->cur_depth -= ret;
+       } else if (ret == FIO_Q_BUSY) {
+               ret = io_u_queued_complete(td, td->cur_depth);
+               if (ret > 0)
+                       td->cur_depth -= ret;
+       }
+}
 
-       td->cur_depth--;
+static bool io_workqueue_pre_sleep_flush_fn(struct submit_worker *sw)
+{
+       struct thread_data *td = sw->private;
+
+       if (td->io_u_queued || td->cur_depth || td->io_u_in_flight)
+               return true;
+
+       return false;
+}
+
+static void io_workqueue_pre_sleep_fn(struct submit_worker *sw)
+{
+       struct thread_data *td = sw->private;
+       int ret;
+
+       ret = io_u_quiesce(td);
+       if (ret > 0)
+               td->cur_depth -= ret;
+}
+
+static int io_workqueue_alloc_fn(struct submit_worker *sw)
+{
+       struct thread_data *td;
+
+       td = calloc(1, sizeof(*td));
+       sw->private = td;
+       return 0;
+}
+
+static void io_workqueue_free_fn(struct submit_worker *sw)
+{
+       free(sw->private);
+       sw->private = NULL;
 }
 
+static int io_workqueue_init_worker_fn(struct submit_worker *sw)
+{
+       struct thread_data *parent = sw->wq->td;
+       struct thread_data *td = sw->private;
+       int fio_unused ret;
+
+       memcpy(&td->o, &parent->o, sizeof(td->o));
+       memcpy(&td->ts, &parent->ts, sizeof(td->ts));
+       td->o.uid = td->o.gid = -1U;
+       dup_files(td, parent);
+       td->eo = parent->eo;
+       fio_options_mem_dupe(td);
+
+       if (ioengine_load(td))
+               goto err;
+
+       if (td->o.odirect)
+               td->io_ops->flags |= FIO_RAWIO;
+
+       td->pid = gettid();
+
+       INIT_FLIST_HEAD(&td->io_log_list);
+       INIT_FLIST_HEAD(&td->io_hist_list);
+       INIT_FLIST_HEAD(&td->verify_list);
+       INIT_FLIST_HEAD(&td->trim_list);
+       INIT_FLIST_HEAD(&td->next_rand_list);
+       td->io_hist_tree = RB_ROOT;
+
+       td->o.iodepth = 1;
+       if (td_io_init(td))
+               goto err_io_init;
+
+       fio_gettime(&td->epoch, NULL);
+       fio_getrusage(&td->ru_start);
+       clear_io_state(td, 1);
+
+       td_set_runstate(td, TD_RUNNING);
+       td->flags |= TD_F_CHILD;
+       td->parent = parent;
+       return 0;
+
+err_io_init:
+       close_ioengine(td);
+err:
+       return 1;
+
+}
+
+static void io_workqueue_exit_worker_fn(struct submit_worker *sw)
+{
+       struct thread_data *td = sw->private;
+
+       fio_options_free(td);
+       close_and_free_files(td);
+       if (td->io_ops)
+               close_ioengine(td);
+       td_set_runstate(td, TD_EXITED);
+}
+
+#ifdef CONFIG_SFAA
+static void sum_val(uint64_t *dst, uint64_t *src)
+{
+       if (*src) {
+               __sync_fetch_and_add(dst, *src);
+               *src = 0;
+       }
+}
+#else
+static void sum_val(uint64_t *dst, uint64_t *src)
+{
+       if (*src) {
+               *dst += *src;
+               *src = 0;
+       }
+}
+#endif
+
+static void pthread_double_unlock(pthread_mutex_t *lock1,
+                                 pthread_mutex_t *lock2)
+{
+#ifndef CONFIG_SFAA
+       pthread_mutex_unlock(lock1);
+       pthread_mutex_unlock(lock2);
+#endif
+}
+
+static void pthread_double_lock(pthread_mutex_t *lock1, pthread_mutex_t *lock2)
+{
+#ifndef CONFIG_SFAA
+       if (lock1 < lock2) {
+               pthread_mutex_lock(lock1);
+               pthread_mutex_lock(lock2);
+       } else {
+               pthread_mutex_lock(lock2);
+               pthread_mutex_lock(lock1);
+       }
+#endif
+}
+
+static void sum_ddir(struct thread_data *dst, struct thread_data *src,
+                    enum fio_ddir ddir)
+{
+       pthread_double_lock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
+
+       sum_val(&dst->io_bytes[ddir], &src->io_bytes[ddir]);
+       sum_val(&dst->io_blocks[ddir], &src->io_blocks[ddir]);
+       sum_val(&dst->this_io_blocks[ddir], &src->this_io_blocks[ddir]);
+       sum_val(&dst->this_io_bytes[ddir], &src->this_io_bytes[ddir]);
+       sum_val(&dst->bytes_done[ddir], &src->bytes_done[ddir]);
+
+       pthread_double_unlock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
+}
+
+static void io_workqueue_update_acct_fn(struct submit_worker *sw)
+{
+       struct thread_data *src = sw->private;
+       struct thread_data *dst = sw->wq->td;
+
+       if (td_read(src))
+               sum_ddir(dst, src, DDIR_READ);
+       if (td_write(src))
+               sum_ddir(dst, src, DDIR_WRITE);
+       if (td_trim(src))
+               sum_ddir(dst, src, DDIR_TRIM);
+
+}
+
+struct workqueue_ops rated_wq_ops = {
+       .fn                     = io_workqueue_fn,
+       .pre_sleep_flush_fn     = io_workqueue_pre_sleep_flush_fn,
+       .pre_sleep_fn           = io_workqueue_pre_sleep_fn,
+       .update_acct_fn         = io_workqueue_update_acct_fn,
+       .alloc_worker_fn        = io_workqueue_alloc_fn,
+       .free_worker_fn         = io_workqueue_free_fn,
+       .init_worker_fn         = io_workqueue_init_worker_fn,
+       .exit_worker_fn         = io_workqueue_exit_worker_fn,
+};
+
 /*
  * Entry point for the thread based jobs. The process based jobs end up
  * here as well, after a little setup.
@@ -1568,7 +1780,7 @@ static void *thread_main(void *data)
        fio_verify_init(td);
 
        if ((o->io_submit_mode == IO_MODE_OFFLOAD) &&
-           workqueue_init(td, &td->io_wq, io_workqueue_fn, td->o.iodepth))
+           workqueue_init(td, &td->io_wq, &rated_wq_ops, td->o.iodepth))
                goto err;
 
        fio_gettime(&td->epoch, NULL);
@@ -1867,29 +2079,29 @@ reaped:
                fio_terminate_threads(TERMINATE_ALL);
 }
 
-static int __check_trigger_file(void)
+static bool __check_trigger_file(void)
 {
        struct stat sb;
 
        if (!trigger_file)
-               return 0;
+               return false;
 
        if (stat(trigger_file, &sb))
-               return 0;
+               return false;
 
        if (unlink(trigger_file) < 0)
                log_err("fio: failed to unlink %s: %s\n", trigger_file,
                                                        strerror(errno));
 
-       return 1;
+       return true;
 }
 
-static int trigger_timedout(void)
+static bool trigger_timedout(void)
 {
        if (trigger_timeout)
                return time_since_genesis() >= trigger_timeout;
 
-       return 0;
+       return false;
 }
 
 void exec_trigger(const char *cmd)
@@ -1945,13 +2157,13 @@ static void do_usleep(unsigned int usecs)
        usleep(usecs);
 }
 
-static int check_mount_writes(struct thread_data *td)
+static bool check_mount_writes(struct thread_data *td)
 {
        struct fio_file *f;
        unsigned int i;
 
        if (!td_write(td) || td->o.allow_mounted_write)
-               return 0;
+               return false;
 
        for_each_file(td, f, i) {
                if (f->filetype != FIO_TYPE_BD)
@@ -1960,10 +2172,10 @@ static int check_mount_writes(struct thread_data *td)
                        goto mounted;
        }
 
-       return 0;
+       return false;
 mounted:
        log_err("fio: %s appears mounted, and 'allow_mounted_write' isn't set. Aborting.", f->file_name);
-       return 1;
+       return true;
 }
 
 /*