Allow fio to terminate jobs when steady state is attained
authorVincent Fu <Vincent.Fu@sandisk.com>
Sun, 19 Jun 2016 02:59:32 +0000 (22:59 -0400)
committerJens Axboe <axboe@fb.com>
Mon, 15 Aug 2016 15:44:46 +0000 (09:44 -0600)
KNOWN ISSUES
Will not work over a network connection

15 files changed:
HOWTO
Makefile
backend.c
debug.h
fio.h
helper_thread.c
init.c
libfio.c
options.c
stat.c
stat.h
steadystate.c [new file with mode: 0644]
steadystate.h [new file with mode: 0644]
thread_options.h
unit_tests/steadystate_tests.py [new file with mode: 0755]

diff --git a/HOWTO b/HOWTO
index 5bf7125..5daaef0 100644 (file)
--- a/HOWTO
+++ b/HOWTO
@@ -297,7 +297,7 @@ irange      Integer range with suffix. Allows value range to be given, such
        1k:4k. If the option allows two sets of ranges, they can be
        specified with a ',' or '/' delimiter: 1k-4k/8k-32k. Also see
        int.
-float_list     A list of floating numbers, separated by a ':' character.
+float_list     A list of floating point numbers, separated by a ':' character.
 
 With the above in mind, here follows the complete list of fio job
 parameters.
@@ -1214,8 +1214,47 @@ ramp_time=time   If set, fio will run the specified workload for this amount
                thus it will increase the total runtime if a special timeout
                or runtime is specified.
 
-invalidate=bool        Invalidate the buffer/page cache parts for this file prior
-               to starting io. Defaults to true.
+steadystate=str:float
+ss=str:float   Define the criterion and limit for assessing steady state
+               performance. The first parameter designates the criterion
+               whereas the second parameter sets the threshold. When the
+               criterion falls below the threshold for the specified duration,
+               the job will stop. For example, iops_slope:0.1% will direct fio
+               to terminate the job when the least squares regression slope
+               falls below 0.1% of the mean IOPS. If group_reporting is
+               enabled this will apply to all jobs in the group. Below is the
+               list of available steady state assessment criteria. All
+               assessments are carried out using only data from the rolling
+               collection window. Threshold limits can be expressed as a fixed
+               value or as a percentage of the mean in the collection window.
+                       iops    Collect IOPS data. Stop the job if all
+                               individual IOPS measurements are within the
+                               specified limit of the mean IOPS (e.g., iops:2
+                               means that all individual IOPS values must be
+                               within 2 of the mean, whereas iops:0.2% means
+                               that all individual IOPS values must be within
+                               0.2% of the mean IOPS to terminate the job).
+                       iops_slope
+                               Collect IOPS data and calculate the least
+                               squares regression slope. Stop the job if the
+                               slope falls below the specified limit.
+                       bw      Collect bandwidth data. Stop the job if all
+                               individual bandwidth measurements are within
+                               the specified limit of the mean bandwidth.
+                       bw_slope
+                               Collect bandwidth data and calculate the least
+                               squares regression slope. Stop the job if the
+                               slope falls below the specified limit.
+
+steadystate_duration=time
+ss_dur=time    A rolling window of this duration will be used to judge whether
+               steady state has been reached. Data will be collected once per
+               second. The default is 0 which disables steady state detection.
+
+steadystate_ramp_time=time
+ss_ramp=time   Allow the job to run for the specified duration before
+               beginning data collection for checking the steady state job
+               termination criterion. The default is 0.
 
 sync=bool      Use sync io for buffered writes. For the majority of the
                io engines, this means using O_SYNC.
index b54f7e9..83414c3 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -45,7 +45,8 @@ SOURCE :=     $(patsubst $(SRCDIR)/%,%,$(wildcard $(SRCDIR)/crc/*.c)) \
                server.c client.c iolog.c backend.c libfio.c flow.c cconv.c \
                gettime-thread.c helpers.c json.c idletime.c td_error.c \
                profiles/tiobench.c profiles/act.c io_u_queue.c filelock.c \
-               workqueue.c rate-submit.c optgroup.c helper_thread.c
+               workqueue.c rate-submit.c optgroup.c helper_thread.c \
+               steadystate.c
 
 ifdef CONFIG_LIBHDFS
   HDFSFLAGS= -I $(JAVA_HOME)/include -I $(JAVA_HOME)/include/linux -I $(FIO_LIBHDFS_INCLUDE)
index b43486d..b55a527 100644 (file)
--- a/backend.c
+++ b/backend.c
@@ -1679,6 +1679,7 @@ static void *thread_main(void *data)
        fio_getrusage(&td->ru_start);
        memcpy(&td->bw_sample_time, &td->epoch, sizeof(td->epoch));
        memcpy(&td->iops_sample_time, &td->epoch, sizeof(td->epoch));
+       memcpy(&td->ss.prev_time, &td->epoch, sizeof(td->epoch));
 
        if (o->ratemin[DDIR_READ] || o->ratemin[DDIR_WRITE] ||
                        o->ratemin[DDIR_TRIM]) {
@@ -2410,6 +2411,8 @@ int fio_backend(struct sk_out *sk_out)
        }
 
        for_each_td(td, i) {
+               if (td->ss.dur)
+                       free(td->ss.cache);
                fio_options_free(td);
                if (td->rusage_sem) {
                        fio_mutex_remove(td->rusage_sem);
diff --git a/debug.h b/debug.h
index 923fa39..fb95747 100644 (file)
--- a/debug.h
+++ b/debug.h
@@ -21,6 +21,7 @@ enum {
        FD_NET,
        FD_RATE,
        FD_COMPRESS,
+       FD_STEADYSTATE,
        FD_DEBUG_MAX,
 };
 
diff --git a/fio.h b/fio.h
index e96a4dd..2d0327c 100644 (file)
--- a/fio.h
+++ b/fio.h
@@ -122,6 +122,40 @@ struct zone_split_index {
        uint8_t size_perc_prev;
 };
 
+/*
+ * For steady state detection
+ */
+struct steadystate_data {
+       double limit;
+       unsigned long long dur;
+       unsigned long long ramp_time;
+       bool (*evaluate)(unsigned long, unsigned long, struct thread_data *);
+       bool check_iops;
+       bool check_slope;
+       bool pct;
+
+       int attained;
+       int last_in_group;
+       int ramp_time_over;
+
+       unsigned int head;
+       unsigned int tail;
+       unsigned long *cache;
+
+       double criterion;
+
+       unsigned long long sum_y;
+       unsigned long long sum_x;
+       unsigned long long sum_x_sq;
+       unsigned long long sum_xy;
+       unsigned long long oldest_y;
+
+       struct timeval prev_time;
+       unsigned long long prev_iops;
+       unsigned long long prev_bytes;
+};
+
+
 /*
  * This describes a single thread/process executing a fio job.
  */
@@ -394,6 +428,8 @@ struct thread_data {
 
        void *pinned_mem;
 
+       struct steadystate_data ss;
+
        char verror[FIO_VERROR_SIZE];
 };
 
@@ -469,6 +505,9 @@ extern char *trigger_remote_cmd;
 extern long long trigger_timeout;
 extern char *aux_path;
 
+extern bool steadystate;
+#define STEADYSTATE_MSEC (1000)
+
 extern struct thread_data *threads;
 
 static inline void fio_ro_check(const struct thread_data *td, struct io_u *io_u)
@@ -785,6 +824,14 @@ enum {
        FIO_CPUS_SPLIT,
 };
 
+enum {
+       FIO_STEADYSTATE_IOPS    = 0,
+       FIO_STEADYSTATE_IOPS_SLOPE,
+       FIO_STEADYSTATE_BW,
+       FIO_STEADYSTATE_BW_SLOPE,
+};
+
+
 extern void exec_trigger(const char *);
 extern void check_trigger_file(void);
 
index f031df4..d716f2b 100644 (file)
@@ -1,6 +1,7 @@
 #include "fio.h"
 #include "smalloc.h"
 #include "helper_thread.h"
+#include "steadystate.h"
 
 static struct helper_data {
        volatile int exit;
@@ -69,14 +70,15 @@ void helper_thread_exit(void)
 static void *helper_thread_main(void *data)
 {
        struct helper_data *hd = data;
-       unsigned int msec_to_next_event, next_log;
-       struct timeval tv, last_du;
+       unsigned int msec_to_next_event, next_log, next_ss = STEADYSTATE_MSEC;
+       struct timeval tv, last_du, last_ss;
        int ret = 0;
 
        sk_out_assign(hd->sk_out);
 
        gettimeofday(&tv, NULL);
        memcpy(&last_du, &tv, sizeof(tv));
+       memcpy(&last_ss, &tv, sizeof(tv));
 
        fio_mutex_up(hd->startup_mutex);
 
@@ -84,7 +86,7 @@ static void *helper_thread_main(void *data)
        while (!ret && !hd->exit) {
                struct timespec ts;
                struct timeval now;
-               uint64_t since_du;
+               uint64_t since_du, since_ss = 0;
 
                timeval_add_msec(&tv, msec_to_next_event);
                ts.tv_sec = tv.tv_sec;
@@ -98,6 +100,7 @@ static void *helper_thread_main(void *data)
                if (hd->reset) {
                        memcpy(&tv, &now, sizeof(tv));
                        memcpy(&last_du, &now, sizeof(last_du));
+                       memcpy(&last_ss, &now, sizeof(last_ss));
                        hd->reset = 0;
                }
 
@@ -122,7 +125,22 @@ static void *helper_thread_main(void *data)
                if (!next_log)
                        next_log = DISK_UTIL_MSEC;
 
-               msec_to_next_event = min(next_log, msec_to_next_event);
+               if (steadystate) {
+                       since_ss = mtime_since(&last_ss, &now);
+                       if (since_ss >= STEADYSTATE_MSEC || STEADYSTATE_MSEC - since_ss < 10) {
+                               steadystate_check();
+                               timeval_add_msec(&last_ss, since_ss);
+                               if (since_ss > STEADYSTATE_MSEC)
+                                       next_ss = STEADYSTATE_MSEC - (since_ss - STEADYSTATE_MSEC);
+                               else
+                                       next_ss = STEADYSTATE_MSEC;
+                       }
+                       else
+                               next_ss = STEADYSTATE_MSEC - since_ss;
+                }
+
+               msec_to_next_event = min(min(next_log, msec_to_next_event), next_ss);
+               dprint(FD_STEADYSTATE, "since_ss: %llu, next_ss: %u, next_log: %u, msec_to_next_event: %u\n", (unsigned long long)since_ss, next_ss, next_log, msec_to_next_event);
 
                if (!is_backend)
                        print_thread_status();
@@ -142,6 +160,7 @@ int helper_thread_create(struct fio_mutex *startup_mutex, struct sk_out *sk_out)
        hd = smalloc(sizeof(*hd));
 
        setup_disk_util();
+       steadystate_setup();
 
        hd->sk_out = sk_out;
 
diff --git a/init.c b/init.c
index a1a0087..7f91c04 100644 (file)
--- a/init.c
+++ b/init.c
@@ -25,6 +25,7 @@
 #include "server.h"
 #include "idletime.h"
 #include "filelock.h"
+#include "steadystate.h"
 
 #include "oslib/getopt.h"
 #include "oslib/strcasestr.h"
@@ -80,6 +81,8 @@ unsigned int *fio_debug_jobp = NULL;
 static char cmd_optstr[256];
 static int did_arg;
 
+bool steadystate = false;
+
 #define FIO_CLIENT_FLAG                (1 << 16)
 
 /*
@@ -1577,6 +1580,58 @@ static int add_job(struct thread_data *td, const char *jobname, int job_add_num,
                        log_info("...\n");
        }
 
+       if (o->ss_dur) {
+               steadystate = true;
+               o->ss_dur /= 1000000L;
+
+               /* put all steady state info in one place */
+               td->ss.dur = o->ss_dur;
+               td->ss.limit = o->ss_limit.u.f; 
+               td->ss.ramp_time = o->ss_ramp_time;
+               td->ss.pct = o->ss_pct;
+
+               if (o->ss == FIO_STEADYSTATE_IOPS_SLOPE || o->ss == FIO_STEADYSTATE_BW_SLOPE) {
+                       td->ss.check_slope = true;
+                       td->ss.evaluate = &steadystate_slope;
+               } else {
+                       td->ss.check_slope = false;
+                       td->ss.evaluate = &steadystate_deviation;
+               }
+
+               if (o->ss == FIO_STEADYSTATE_IOPS || o->ss == FIO_STEADYSTATE_IOPS_SLOPE)
+                       td->ss.check_iops = true;
+               else
+                       td->ss.check_iops = false;
+
+
+               /* when group reporting is enabled only the cache allocated for the final td is actually used */
+               td->ss.cache = malloc(o->ss_dur * sizeof(*(td->ss.cache)));
+               if (td->ss.cache == NULL)
+               {
+                       log_err("fio: unable to allocate memory for steadystate cache\n");
+                       goto err;
+               }
+               for (i = 0; i < td->ss.dur; i++)
+                       td->ss.cache[i] = 0;
+               /* initialize so that it is obvious if the cache is not full in the output */
+
+               td->ss.ramp_time_over = (td->ss.ramp_time == 0);
+               td->ss.attained = 0;
+               td->ss.last_in_group = 0;
+               td->ss.head = 0;
+               td->ss.tail = 0;
+               td->ss.sum_x = o->ss_dur * (o->ss_dur - 1) / 2;
+               td->ss.sum_x_sq = (o->ss_dur - 1) * (o->ss_dur) * (2*o->ss_dur - 1) / 6;
+               td->ss.prev_bytes = 0;
+               td->ss.prev_iops = 0;
+               td->ss.sum_y = 0;
+               td->ss.oldest_y = 0;
+               td->ss.criterion = 0.0;
+               td->ts.ss = &td->ss;
+       }
+       else
+               td->ts.ss = NULL;
+
        /*
         * recurse add identical jobs, clear numjobs and stonewall options
         * as they don't apply to sub-jobs
@@ -1592,6 +1647,8 @@ static int add_job(struct thread_data *td, const char *jobname, int job_add_num,
                td_new->o.stonewall = 0;
                td_new->o.new_group = 0;
                td_new->subjob_number = numjobs;
+               td_new->o.ss_dur = o->ss_dur * 1000000l;
+               td_new->o.ss_limit = o->ss_limit;
 
                if (file_alloced) {
                        if (td_new->files) {
@@ -2134,6 +2191,10 @@ struct debug_level debug_levels[] = {
          .help = "Log compression logging",
          .shift = FD_COMPRESS,
        },
+       { .name = "steadystate",
+         .help = "Steady state detection logging",
+         .shift = FD_STEADYSTATE,
+       },
        { .name = NULL, },
 };
 
index fb7d35a..20ce7cd 100644 (file)
--- a/libfio.c
+++ b/libfio.c
@@ -153,6 +153,7 @@ void reset_all_stats(struct thread_data *td)
        memcpy(&td->start, &tv, sizeof(tv));
        memcpy(&td->iops_sample_time, &tv, sizeof(tv));
        memcpy(&td->bw_sample_time, &tv, sizeof(tv));
+       memcpy(&td->ss.prev_time, &tv, sizeof(tv));
 
        lat_target_reset(td);
        clear_rusage_stat(td);
index 6161ac8..dfa98d3 100644 (file)
--- a/options.c
+++ b/options.c
@@ -1061,6 +1061,86 @@ static int str_random_distribution_cb(void *data, const char *str)
        return 0;
 }
 
+static int str_steadystate_cb(void *data, const char *str)
+{
+       struct thread_data *td = data;
+       double val;
+       char *nr;
+       char *pct;
+       long long ll;
+
+       if (td->o.ss != FIO_STEADYSTATE_IOPS &&
+           td->o.ss != FIO_STEADYSTATE_IOPS_SLOPE &&
+           td->o.ss != FIO_STEADYSTATE_BW &&
+           td->o.ss != FIO_STEADYSTATE_BW_SLOPE) {
+               /* should be impossible to get here */
+               log_err("fio: unknown steady state criterion\n");
+               return 1;
+       }
+
+       nr = get_opt_postfix(str);
+       if (!nr) {
+               log_err("fio: steadystate threshold must be specified in addition to criterion\n");
+               free(nr);
+               return 1;
+       }
+
+       /* ENHANCEMENT Allow fio to understand size=10.2% and use here */
+       pct = strstr(nr, "%");
+       if (pct) {
+               *pct = '\0';
+               strip_blank_end(nr);
+               if (!str_to_float(nr, &val, 0)) {
+                       log_err("fio: could not parse steadystate threshold percentage\n");
+                       free(nr);
+                       return 1;
+               }
+
+               dprint(FD_PARSE, "set steady state threshold to %f%%\n", val);
+               free(nr);
+               if (parse_dryrun())
+                       return 0;
+
+               td->o.ss_pct = true;
+               td->o.ss_limit.u.f = val;
+
+
+       } else if (td->o.ss == FIO_STEADYSTATE_IOPS ||
+                  td->o.ss == FIO_STEADYSTATE_IOPS_SLOPE) {
+               if (!str_to_float(nr, &val, 0)) {
+                       log_err("fio: steadystate IOPS threshold postfix parsing failed\n");
+                       free(nr);
+                       return 1;
+               }
+
+               dprint(FD_PARSE, "set steady state IOPS threshold to %f\n", val);
+               free(nr);
+               if (parse_dryrun())
+                       return 0;
+
+               td->o.ss_pct = false;
+               td->o.ss_limit.u.f = val;
+
+       } else {        /* bandwidth criterion */
+               if (str_to_decimal(nr, &ll, 1, td, 0, 0)) {
+                       log_err("fio: steadystate BW threshold postfix parsing failed\n");
+                       free(nr);
+                       return 1;
+               }
+
+               dprint(FD_PARSE, "set steady state BW threshold to %lld\n", ll);
+               free(nr);
+               if (parse_dryrun())
+                       return 0;
+
+               td->o.ss_pct = false;
+               td->o.ss_limit.u.f = (double) ll;
+
+       }
+
+       return 0;
+}
+
 /*
  * Return next name in the string. Files are separated with ':'. If the ':'
  * is escaped with a '\', then that ':' is part of the filename and does not
@@ -4133,6 +4213,63 @@ struct fio_option fio_options[FIO_MAX_OPTS] = {
                .category = FIO_OPT_C_IO,
                .group  = FIO_OPT_G_MTD,
        },
+       {
+               .name   = "steadystate",
+               .lname  = "Steady state threshold",
+               .alias  = "ss",
+               .type   = FIO_OPT_STR,
+               .off1   = offsetof(struct thread_options, ss),
+               .cb     = str_steadystate_cb,
+               .help   = "Define the criterion and limit to judge when a job has reached steady state",
+               .def    = "iops_slope:0.01%",
+               .posval = {
+                         { .ival = "iops",
+                           .oval = FIO_STEADYSTATE_IOPS,
+                           .help = "maximum mean deviation of IOPS measurements",
+                         },
+                         { .ival = "iops_slope",
+                           .oval = FIO_STEADYSTATE_IOPS_SLOPE,
+                           .help = "slope calculated from IOPS measurements",
+                         },
+                         { .ival = "bw",
+                           .oval = FIO_STEADYSTATE_BW,
+                           .help = "maximum mean deviation of bandwidth measurements",
+                         },
+                         {
+                           .ival = "bw_slope",
+                           .oval = FIO_STEADYSTATE_BW_SLOPE,
+                           .help = "slope calculated from bandwidth measurements",
+                         },
+               },
+               .category = FIO_OPT_C_GENERAL,
+               .group  = FIO_OPT_G_RUNTIME,
+       },
+        {
+               .name   = "steadystate_duration",
+               .lname  = "Steady state duration",
+               .alias  = "ss_dur",
+               .type   = FIO_OPT_STR_VAL_TIME,
+               .off1   = offsetof(struct thread_options, ss_dur),
+               .help   = "Stop workload upon attaining steady state for specified duration",
+               .def    = "0",
+               .is_seconds = 1,
+               .is_time = 1,
+               .category = FIO_OPT_C_GENERAL,
+               .group  = FIO_OPT_G_RUNTIME,
+       },
+        {
+               .name   = "steadystate_ramp_time",
+               .lname  = "Steady state ramp time",
+               .alias  = "ss_ramp",
+               .type   = FIO_OPT_STR_VAL_TIME,
+               .off1   = offsetof(struct thread_options, ss_ramp_time),
+               .help   = "Delay before initiation of data collection for steady state job termination testing",
+               .def    = "0",
+               .is_seconds = 1,
+               .is_time = 1,
+               .category = FIO_OPT_C_GENERAL,
+               .group  = FIO_OPT_G_RUNTIME,
+       },
        {
                .name = NULL,
        },
diff --git a/stat.c b/stat.c
index ef9fe7d..3e7ff75 100644 (file)
--- a/stat.c
+++ b/stat.c
@@ -1255,6 +1255,36 @@ static struct json_object *show_thread_status_json(struct thread_stat *ts,
                }
        }
 
+       /* s}teady state detection; move this behind json+? */
+       if (ts->ss) {
+               struct json_array *cache;
+               struct steadystate_data *ss = ts->ss;
+               int i, x;
+               char ss_option[64];
+
+               snprintf(ss_option, sizeof(ss_option), "%s%s:%f%s", 
+                       ss->check_iops ? "iops" : "bw",
+                       ss->check_slope ? "_slope" : "",
+                       (float) ss->limit,
+                       ss->pct ? "%" : "");
+
+               tmp = json_create_object();
+               json_object_add_value_object(root, "steadystate", tmp);
+               json_object_add_value_string(tmp, "ss", ss_option);
+               json_object_add_value_float(tmp, "limit", (float)ss->limit);
+               json_object_add_value_int(tmp, "duration", (int)ss->dur);
+               json_object_add_value_int(tmp, "steadystate_ramptime", ss->ramp_time / 1000000L);
+               json_object_add_value_int(tmp, "attained", ss->attained);
+               json_object_add_value_float(tmp, "criterion", ss->criterion);
+
+               cache = json_create_array();
+               json_object_add_value_array(tmp, "data", cache);
+               for (i = 0; i < ss->dur; i++) {
+                       x = (ss->head + i) % ss->dur;
+                       json_array_add_value_int(cache, ss->cache[x]);
+               }
+       }
+
        return root;
 }
 
@@ -1578,6 +1608,11 @@ void __show_run_stats(void)
                        ts->block_infos[k] = td->ts.block_infos[k];
 
                sum_thread_stats(ts, &td->ts, idx == 1);
+
+               if (td->o.ss_dur)
+                       ts->ss = &td->ss;
+               else
+                       ts->ss = NULL;
        }
 
        for (i = 0; i < nr_ts; i++) {
diff --git a/stat.h b/stat.h
index 86f1a0b..357a1ff 100644 (file)
--- a/stat.h
+++ b/stat.h
@@ -213,6 +213,8 @@ struct thread_stat {
        uint64_t latency_target;
        fio_fp64_t latency_percentile;
        uint64_t latency_window;
+
+       struct steadystate_data *ss;
 } __attribute__((packed));
 
 struct jobs_eta {
diff --git a/steadystate.c b/steadystate.c
new file mode 100644 (file)
index 0000000..1e7212f
--- /dev/null
@@ -0,0 +1,218 @@
+#include "fio.h"
+#include "steadystate.h"
+#include "helper_thread.h"
+
+void steadystate_setup()
+{
+       int i, prev_groupid;
+       struct thread_data *td, *prev_td;
+
+       if (!steadystate)
+               return;
+
+       /*
+        * if group reporting is enabled, identify the last td
+        * for each group and use it for storing steady state
+        * data
+        */
+       prev_groupid = -1;
+       prev_td = NULL;
+       for_each_td(td, i) {
+               if (!td->o.group_reporting)
+                       continue;
+
+               if (prev_groupid != td->groupid) {
+                       if (prev_td != NULL)
+                               prev_td->ss.last_in_group = 1;
+                       prev_groupid = td->groupid;
+               }
+               prev_td = td;
+       }
+
+       if (prev_td != NULL && prev_td->o.group_reporting)
+               prev_td->ss.last_in_group = 1;
+}
+
+void steadystate_check()
+{
+       int i, j, ddir, prev_groupid, group_ramp_time_over = 0;
+       unsigned long rate_time;
+       struct thread_data *td, *td2;
+       struct timeval now;
+       unsigned long group_bw = 0, group_iops = 0;
+       unsigned long long td_iops;
+       unsigned long long td_bytes;
+
+       prev_groupid = -1;
+       for_each_td(td, i) {
+               struct steadystate_data *ss = &td->ss;
+
+               if (!ss->dur || td->runstate <= TD_SETTING_UP || td->runstate >= TD_EXITED || ss->attained)
+                       continue;
+
+               td_iops = 0;
+               td_bytes = 0;
+               if (!td->o.group_reporting ||
+                   (td->o.group_reporting && td->groupid != prev_groupid)) {
+                       group_bw = 0;
+                       group_iops = 0;
+                       group_ramp_time_over = 0;
+               }
+               prev_groupid = td->groupid;
+
+               fio_gettime(&now, NULL);
+               if (ss->ramp_time && !ss->ramp_time_over)
+                       /* 
+                        * Begin recording data one second after ss->ramp_time
+                        * has elapsed
+                        */
+                       if (utime_since(&td->epoch, &now) >= (ss->ramp_time + 1000000L))
+                               ss->ramp_time_over = 1;
+
+               for (ddir = DDIR_READ; ddir < DDIR_RWDIR_CNT; ddir++) {
+                       td_iops += td->io_blocks[ddir];
+                       td_bytes += td->io_bytes[ddir];
+               }
+
+               rate_time = mtime_since(&ss->prev_time, &now);
+               memcpy(&ss->prev_time, &now, sizeof(now));
+
+               /* 
+                * Begin monitoring when job starts but don't actually use
+                * data in checking stopping criterion until ss->ramp_time is
+                * over. This ensures that we will have a sane value in
+                * prev_iops/bw the first time through after ss->ramp_time
+                * is done.
+                */
+               if (ss->ramp_time_over) {
+                       group_bw += 1000 * (td_bytes - ss->prev_bytes) / rate_time;
+                       group_iops += 1000 * (td_iops - ss->prev_iops) / rate_time;
+                       ++group_ramp_time_over;
+               }
+               ss->prev_iops = td_iops;
+               ss->prev_bytes = td_bytes;
+
+               if (td->o.group_reporting && !ss->last_in_group)
+                       continue;
+
+               /* don't begin checking criterion until ss->ramp_time is over for at least one thread in group */
+               if (!group_ramp_time_over)
+                       continue;
+
+               dprint(FD_STEADYSTATE, "steadystate_check() thread: %d, groupid: %u, rate_msec: %ld, iops: %lu, bw: %lu, head: %d, tail: %d\n", 
+                       i, td->groupid, rate_time, group_iops, group_bw, ss->head, ss->tail);
+
+               if (ss->evaluate(group_iops, group_bw, td))
+               {
+                       if (td->o.group_reporting)
+                               for_each_td(td2, j) {
+                                       if (td2->groupid == td->groupid) {
+                                               td2->ss.attained = 1;
+                                               fio_mark_td_terminate(td2);
+                                       }
+                               }
+                       else {
+                               ss->attained = 1;
+                               fio_mark_td_terminate(td);
+                       }
+               }
+       }
+}
+
+bool steadystate_slope(unsigned long iops, unsigned long bw, struct thread_data *td)
+{
+       int i, x;
+       double result;
+       double slope;
+       struct steadystate_data *ss = &td->ss;
+
+       ss->cache[ss->tail] = ss->check_iops ? iops : bw;
+
+       if (ss->tail < ss->head || (ss->tail - ss->head == ss->dur - 1))
+       {
+               if (ss->sum_y == 0)     /* first time through */
+               {
+                       for(i = 0; i < ss->dur; i++)
+                       {
+                               ss->sum_y += ss->cache[i];
+                               x = ss->head + i;
+                               if (x >= ss->dur)
+                                       x -= ss->dur;
+                               ss->sum_xy += ss->cache[x] * i;
+                       }
+               } else {                /* easy to update the sums */
+                       ss->sum_y -= ss->oldest_y;
+                       ss->sum_y += ss->cache[ss->tail];
+                       ss->sum_xy = ss->sum_xy - ss->sum_y + ss->dur * ss->cache[ss->tail];
+               }
+
+               ss->oldest_y = ss->cache[ss->head];
+
+               /*
+                * calculate slope as (sum_xy - sum_x * sum_y / n) / (sum_(x^2) - (sum_x)^2 / n)
+                * This code assumes that all x values are equally spaced when they are often
+                * off by a few milliseconds. This assumption greatly simplifies the
+                * calculations.
+                */
+               slope = (ss->sum_xy - (double) ss->sum_x * ss->sum_y / ss->dur) / (ss->sum_x_sq - (double) ss->sum_x * ss->sum_x / ss->dur);
+               ss->criterion = ss->pct ? slope / (ss->sum_y / ss->dur) * 100.0: slope;
+
+               dprint(FD_STEADYSTATE, "sum_y: %llu, sum_xy: %llu, slope: %f, criterion: %f, limit: %f\n",
+                       ss->sum_y, ss->sum_xy, slope, ss->criterion, ss->limit);
+
+               result = ss->criterion * (ss->criterion < 0.0 ? -1 : 1);
+               if (result < ss->limit)
+                       return true;
+       }
+
+       ss->tail = (ss->tail + 1) % ss->dur;
+       if (ss->tail <= ss->head)
+               ss->head = (ss->head + 1) % ss->dur;
+       return false;
+}
+
+bool steadystate_deviation(unsigned long iops, unsigned long bw, struct thread_data *td)
+{
+       int i;
+       double diff;
+       double mean;
+       double deviation;
+
+       struct steadystate_data *ss = &td->ss;
+
+       ss->cache[ss->tail] = ss->check_iops ? iops : bw;
+
+       if (ss->tail < ss->head || (ss->tail - ss->head == ss->dur - 1))
+       {
+               if (ss->sum_y == 0)     /* first time through */
+               {
+                       for(i = 0; i < ss->dur; i++)
+                               ss->sum_y += ss->cache[i];
+               } else {                /* easy to update the sum */
+                       ss->sum_y -= ss->oldest_y;
+                       ss->sum_y += ss->cache[ss->tail];
+               }
+
+               ss->oldest_y = ss->cache[ss->head];
+               mean = (double) ss->sum_y / ss->dur;
+               deviation = 0.0;
+
+               for (i = 0; i < ss->dur; i++)
+               {       
+                       diff = (double) ss->cache[i] - mean;
+                       deviation = max(deviation, diff * (diff < 0.0 ? -1 : 1));
+               }
+
+               ss->criterion = ss->pct ? deviation / mean * 100.0 : deviation;
+
+               dprint(FD_STEADYSTATE, "sum_y: %llu, mean: %f, max diff: %f, objective: %f, limit: %f\n", ss->sum_y, mean, deviation, ss->criterion, ss->limit);
+
+               if (ss->criterion < ss->limit)
+                       return true;
+       }
+
+       ss->tail = (ss->tail + 1) % ss->dur;
+       if (ss->tail <= ss->head)
+               ss->head = (ss->head + 1) % ss->dur;
+       return false;
+}
diff --git a/steadystate.h b/steadystate.h
new file mode 100644 (file)
index 0000000..039ffc9
--- /dev/null
@@ -0,0 +1,9 @@
+#ifndef FIO_STEADYSTATE_H
+#define FIO_STEADYSTATE_H
+
+extern void steadystate_check(void);
+extern void steadystate_setup(void);
+extern bool steadystate_deviation(unsigned long, unsigned long, struct thread_data *);
+extern bool steadystate_slope(unsigned long, unsigned long, struct thread_data *);
+#endif
+
index d70fda3..088819b 100644 (file)
@@ -169,6 +169,11 @@ struct thread_options {
        unsigned long long start_delay_high;
        unsigned long long timeout;
        unsigned long long ramp_time;
+       unsigned int ss;                /* TODO add to thread_options_pack */
+       bool ss_pct;
+       fio_fp64_t ss_limit;
+       unsigned long long ss_dur;
+       unsigned long long ss_ramp_time;
        unsigned int overwrite;
        unsigned int bw_avg_time;
        unsigned int iops_avg_time;
diff --git a/unit_tests/steadystate_tests.py b/unit_tests/steadystate_tests.py
new file mode 100755 (executable)
index 0000000..02b2b0d
--- /dev/null
@@ -0,0 +1,203 @@
+#!/usr/bin/python
+#
+# steadystate_tests.py
+#
+# Test option parsing and functonality for fio's steady state detection feature.
+#
+# steadystate_tests.py ./fio file-for-read-testing file-for-write-testing
+#
+# REQUIREMENTS
+# Python 2.6+
+# SciPy
+#
+# KNOWN ISSUES
+# only option parsing and read tests are carried out
+# the read test fails when ss_ramp > timeout because it tries to calculate the stopping criterion and finds that
+#     it does not match what fio reports
+# min runtime:
+# if ss attained: min runtime = ss_dur + ss_ramp
+# if not attained: runtime = timeout
+
+import os
+import json
+import tempfile
+import argparse
+import subprocess
+from scipy import stats
+
+def parse_args():
+    parser = argparse.ArgumentParser()
+    parser.add_argument('fio',
+                        help='path to fio executable');
+    parser.add_argument('read',
+                        help='target for read testing')
+    parser.add_argument('write',
+                        help='target for write testing')
+    args = parser.parse_args()
+
+    return args
+
+
+def check(data, iops, slope, pct, limit, dur, criterion):
+    mean = sum(data) / len(data)
+    if slope:
+        x = range(len(data))
+        m, intercept, r_value, p_value, std_err = stats.linregress(x,data)
+        m = abs(m)
+        if pct:
+            target = m / mean * 100
+        else:
+            target = m
+    else:
+        maxdev = 0
+        for x in data:
+            maxdev = max(abs(mean-x), maxdev)
+        if pct:
+            target = maxdev / mean * 100
+        else:
+            target = maxdev
+
+    return (abs(target - criterion) / criterion < 0.001), target < limit, mean, target
+
+
+if __name__ == '__main__':
+    args = parse_args()
+
+#
+# test option parsing
+#
+    parsing = [ { 'args': ["--parse-only", "--debug=parse", "--ss_dur=10s", "--ss=iops:10", "--ss_ramp=5"], 
+                  'output': "set steady state IOPS threshold to 10.000000" },
+                { 'args': ["--parse-only", "--debug=parse", "--ss_dur=10s", "--ss=iops:10%", "--ss_ramp=5"], 
+                  'output': "set steady state threshold to 10.000000%" },
+                { 'args': ["--parse-only", "--debug=parse", "--ss_dur=10s", "--ss=iops:.1%", "--ss_ramp=5"], 
+                  'output': "set steady state threshold to 0.100000%" },
+                { 'args': ["--parse-only", "--debug=parse", "--ss_dur=10s", "--ss=bw:10%", "--ss_ramp=5"], 
+                  'output': "set steady state threshold to 10.000000%" },
+                { 'args': ["--parse-only", "--debug=parse", "--ss_dur=10s", "--ss=bw:.1%", "--ss_ramp=5"], 
+                  'output': "set steady state threshold to 0.100000%" },
+                { 'args': ["--parse-only", "--debug=parse", "--ss_dur=10s", "--ss=bw:12", "--ss_ramp=5"], 
+                  'output': "set steady state BW threshold to 12" },
+              ]
+    for test in parsing:
+        output = subprocess.check_output([args.fio] + test['args']);
+        if test['output'] in output:
+            print "PASSED '{0}' found with arguments {1}".format(test['output'], test['args'])
+        else:
+            print "FAILED '{0}' NOT found with arguments {1}".format(test['output'], test['args'])
+
+#
+# test some read workloads
+#
+# if ss active and attained,
+#   check that runtime is less than job time
+#   check criteria
+#   how to check ramp time?
+#
+# if ss inactive
+#   check that runtime is what was specified
+#
+    reads = [ [ {'s': True, 'timeout': 100, 'numjobs': 1, 'ss_dur': 5, 'ss_ramp': 3, 'iops': True, 'slope': True, 'ss_limit': 0.1, 'pct': True},
+                {'s': False, 'timeout': 20, 'numjobs': 2},
+                {'s': True, 'timeout': 100, 'numjobs': 3, 'ss_dur': 10, 'ss_ramp': 5, 'iops': False, 'slope': True, 'ss_limit': 0.1, 'pct': True},
+                {'s': True, 'timeout': 10, 'numjobs': 3, 'ss_dur': 10, 'ss_ramp': 500, 'iops': False, 'slope': True, 'ss_limit': 0.1, 'pct': True} ],
+            ]
+
+    accum = []
+    suitenum = 0
+    for suite in reads:
+        jobnum = 0
+        for job in suite:
+            parameters = [ "--name=job{0}".format(jobnum),
+                           "--thread",
+                           "--filename={0}".format(args.read),
+                           "--rw=randrw", "--rwmixread=100", "--stonewall",
+                           "--group_reporting", "--numjobs={0}".format(job['numjobs']),
+                           "--time_based", "--runtime={0}".format(job['timeout']) ]
+            if job['s']:
+               if job['iops']:
+                   ss = 'iops'
+               else:
+                   ss = 'bw'
+               if job['slope']:
+                   ss += "_slope"
+               ss += ":" + str(job['ss_limit'])
+               if job['pct']:
+                   ss += '%'
+               parameters.extend([ '--ss_dur={0}'.format(job['ss_dur']),
+                                   '--ss={0}'.format(ss),
+                                   '--ss_ramp={0}'.format(job['ss_ramp']) ])
+            accum.extend(parameters)
+            jobnum += 1
+
+        tf = tempfile.NamedTemporaryFile(delete=False)
+       tf.close()
+        output = subprocess.check_output([args.fio, 
+                                          "--output-format=json", 
+                                          "--output={0}".format(tf.name)] + accum)
+        with open(tf.name, 'r') as source:
+            jsondata = json.loads(source.read())
+        os.remove(tf.name)
+        jobnum = 0
+        for job in jsondata['jobs']:
+            line = "suite {0}, {1}".format(suitenum, job['job options']['name'])
+            if suite[jobnum]['s']:
+                if job['steadystate']['attained'] == 1:
+                    # check runtime >= ss_dur + ss_ramp, check criterion, check criterion < limit
+                    mintime = (suite[jobnum]['ss_dur'] + suite[jobnum]['ss_ramp']) * 1000
+                    actual = job['read']['runtime']
+                    if mintime > actual:
+                        line = 'FAILED ' + line + ' ss attained, runtime {0} < ss_dur {1} + ss_ramp {2}'.format(actual, suite[jobnum]['ss_dur'], suite[jobnum]['ss_ramp'])
+                    else:
+                        line = line + ' ss attained, runtime {0} > ss_dur {1} + ss_ramp {2},'.format(actual, suite[jobnum]['ss_dur'], suite[jobnum]['ss_ramp'])
+                        objsame, met, mean, target = check(data=job['steadystate']['data'],
+                            iops=suite[jobnum]['iops'],
+                            slope=suite[jobnum]['slope'],
+                            pct=suite[jobnum]['pct'],
+                            limit=suite[jobnum]['ss_limit'],
+                            dur=suite[jobnum]['ss_dur'],
+                            criterion=job['steadystate']['criterion'])
+                        if not objsame:
+                            line = 'FAILED ' + line + ' fio criterion {0} != calculated criterion {1}, data: {2} '.format(job['steadystate']['criterion'], target, job['steadystate']['data'])
+                        else:
+                            if met:
+                                line = 'PASSED ' + line + ' target {0} < limit {1}, data {2}'.format(target, suite[jobnum]['ss_limit'], job['steadystate']['data'])
+                            else:
+                                line = 'FAILED ' + line + ' target {0} < limit {1} but fio reports ss not attained, data: {2}'.format(target, suite[jobnum]['ss_limit'], job['steadystate']['data'])
+                    
+                else:
+                    # check runtime, confirm criterion calculation, and confirm that criterion was not met
+                    expected = suite[jobnum]['timeout'] * 1000
+                    actual = job['read']['runtime']
+                    if abs(expected - actual) > 10:
+                        line = 'FAILED ' + line + ' ss not attained, expected runtime {0} != actual runtime {1}'.format(expected, actual)
+                    else:
+                        line = line + ' ss not attained, runtime {0} != ss_dur {1} + ss_ramp {2},'.format(actual, suite[jobnum]['ss_dur'], suite[jobnum]['ss_ramp'])
+                        objsame, met, mean, target = check(data=job['steadystate']['data'],
+                            iops=suite[jobnum]['iops'],
+                            slope=suite[jobnum]['slope'],
+                            pct=suite[jobnum]['pct'],
+                            limit=suite[jobnum]['ss_limit'],
+                            dur=suite[jobnum]['ss_dur'],
+                            criterion=job['steadystate']['criterion'])
+                        if not objsame:
+                            if actual > (suite[jobnum]['ss_dur'] + suite[jobnum]['ss_ramp'])*1000:
+                                line = 'FAILED ' + line + ' fio criterion {0} != calculated criterion {1}, data: {2} '.format(job['steadystate']['criterion'], target, job['steadystate']['data'])
+                            else:
+                                line = 'PASSED ' + line + ' fio criterion {0} == 0.0 since ss_dur + ss_ramp has not elapsed, data: {1} '.format(job['steadystate']['criterion'], job['steadystate']['data'])
+                        else:
+                            if met:
+                                line = 'FAILED ' + line + ' target {0} < threshold {1} but fio reports ss not attained, data: {2}'.format(target, suite[jobnum]['ss_limit'], job['steadystate']['data'])
+                            else:
+                                line = 'PASSED ' + line + ' criterion {0} > threshold {1}, data {2}'.format(target, suite[jobnum]['ss_limit'], job['steadystate']['data'])
+            else:
+                expected = suite[jobnum]['timeout'] * 1000
+                actual = job['read']['runtime']
+                if abs(expected - actual) < 10:
+                    result = 'PASSED '
+                else:
+                    result = 'FAILED '
+                line = result + line + ' no ss, expected runtime {0} ~= actual runtime {1}'.format(expected, actual)
+            print line
+            jobnum += 1
+        suitenum += 1