From 16e56d250f72e4fec4591d562a5f4d7a16ba1bb8 Mon Sep 17 00:00:00 2001 From: Vincent Fu Date: Sat, 18 Jun 2016 22:59:32 -0400 Subject: [PATCH] Allow fio to terminate jobs when steady state is attained KNOWN ISSUES Will not work over a network connection --- HOWTO | 45 ++++++- Makefile | 3 +- backend.c | 3 + debug.h | 1 + fio.h | 47 +++++++ helper_thread.c | 27 +++- init.c | 61 +++++++++ libfio.c | 1 + options.c | 137 ++++++++++++++++++++ stat.c | 35 +++++ stat.h | 2 + steadystate.c | 218 ++++++++++++++++++++++++++++++++ steadystate.h | 9 ++ thread_options.h | 5 + unit_tests/steadystate_tests.py | 203 +++++++++++++++++++++++++++++ 15 files changed, 789 insertions(+), 8 deletions(-) create mode 100644 steadystate.c create mode 100644 steadystate.h create mode 100755 unit_tests/steadystate_tests.py diff --git a/HOWTO b/HOWTO index 5bf71252..5daaef03 100644 --- a/HOWTO +++ b/HOWTO @@ -297,7 +297,7 @@ irange Integer range with suffix. Allows value range to be given, such 1k:4k. If the option allows two sets of ranges, they can be specified with a ',' or '/' delimiter: 1k-4k/8k-32k. Also see int. -float_list A list of floating numbers, separated by a ':' character. +float_list A list of floating point numbers, separated by a ':' character. With the above in mind, here follows the complete list of fio job parameters. @@ -1214,8 +1214,47 @@ ramp_time=time If set, fio will run the specified workload for this amount thus it will increase the total runtime if a special timeout or runtime is specified. -invalidate=bool Invalidate the buffer/page cache parts for this file prior - to starting io. Defaults to true. +steadystate=str:float +ss=str:float Define the criterion and limit for assessing steady state + performance. The first parameter designates the criterion + whereas the second parameter sets the threshold. When the + criterion falls below the threshold for the specified duration, + the job will stop. For example, iops_slope:0.1% will direct fio + to terminate the job when the least squares regression slope + falls below 0.1% of the mean IOPS. If group_reporting is + enabled this will apply to all jobs in the group. Below is the + list of available steady state assessment criteria. All + assessments are carried out using only data from the rolling + collection window. Threshold limits can be expressed as a fixed + value or as a percentage of the mean in the collection window. + iops Collect IOPS data. Stop the job if all + individual IOPS measurements are within the + specified limit of the mean IOPS (e.g., iops:2 + means that all individual IOPS values must be + within 2 of the mean, whereas iops:0.2% means + that all individual IOPS values must be within + 0.2% of the mean IOPS to terminate the job). + iops_slope + Collect IOPS data and calculate the least + squares regression slope. Stop the job if the + slope falls below the specified limit. + bw Collect bandwidth data. Stop the job if all + individual bandwidth measurements are within + the specified limit of the mean bandwidth. + bw_slope + Collect bandwidth data and calculate the least + squares regression slope. Stop the job if the + slope falls below the specified limit. + +steadystate_duration=time +ss_dur=time A rolling window of this duration will be used to judge whether + steady state has been reached. Data will be collected once per + second. The default is 0 which disables steady state detection. + +steadystate_ramp_time=time +ss_ramp=time Allow the job to run for the specified duration before + beginning data collection for checking the steady state job + termination criterion. The default is 0. sync=bool Use sync io for buffered writes. For the majority of the io engines, this means using O_SYNC. diff --git a/Makefile b/Makefile index b54f7e9e..83414c3a 100644 --- a/Makefile +++ b/Makefile @@ -45,7 +45,8 @@ SOURCE := $(patsubst $(SRCDIR)/%,%,$(wildcard $(SRCDIR)/crc/*.c)) \ server.c client.c iolog.c backend.c libfio.c flow.c cconv.c \ gettime-thread.c helpers.c json.c idletime.c td_error.c \ profiles/tiobench.c profiles/act.c io_u_queue.c filelock.c \ - workqueue.c rate-submit.c optgroup.c helper_thread.c + workqueue.c rate-submit.c optgroup.c helper_thread.c \ + steadystate.c ifdef CONFIG_LIBHDFS HDFSFLAGS= -I $(JAVA_HOME)/include -I $(JAVA_HOME)/include/linux -I $(FIO_LIBHDFS_INCLUDE) diff --git a/backend.c b/backend.c index b43486dc..b55a5274 100644 --- a/backend.c +++ b/backend.c @@ -1679,6 +1679,7 @@ static void *thread_main(void *data) fio_getrusage(&td->ru_start); memcpy(&td->bw_sample_time, &td->epoch, sizeof(td->epoch)); memcpy(&td->iops_sample_time, &td->epoch, sizeof(td->epoch)); + memcpy(&td->ss.prev_time, &td->epoch, sizeof(td->epoch)); if (o->ratemin[DDIR_READ] || o->ratemin[DDIR_WRITE] || o->ratemin[DDIR_TRIM]) { @@ -2410,6 +2411,8 @@ int fio_backend(struct sk_out *sk_out) } for_each_td(td, i) { + if (td->ss.dur) + free(td->ss.cache); fio_options_free(td); if (td->rusage_sem) { fio_mutex_remove(td->rusage_sem); diff --git a/debug.h b/debug.h index 923fa399..fb957474 100644 --- a/debug.h +++ b/debug.h @@ -21,6 +21,7 @@ enum { FD_NET, FD_RATE, FD_COMPRESS, + FD_STEADYSTATE, FD_DEBUG_MAX, }; diff --git a/fio.h b/fio.h index e96a4dd1..2d0327c5 100644 --- a/fio.h +++ b/fio.h @@ -122,6 +122,40 @@ struct zone_split_index { uint8_t size_perc_prev; }; +/* + * For steady state detection + */ +struct steadystate_data { + double limit; + unsigned long long dur; + unsigned long long ramp_time; + bool (*evaluate)(unsigned long, unsigned long, struct thread_data *); + bool check_iops; + bool check_slope; + bool pct; + + int attained; + int last_in_group; + int ramp_time_over; + + unsigned int head; + unsigned int tail; + unsigned long *cache; + + double criterion; + + unsigned long long sum_y; + unsigned long long sum_x; + unsigned long long sum_x_sq; + unsigned long long sum_xy; + unsigned long long oldest_y; + + struct timeval prev_time; + unsigned long long prev_iops; + unsigned long long prev_bytes; +}; + + /* * This describes a single thread/process executing a fio job. */ @@ -394,6 +428,8 @@ struct thread_data { void *pinned_mem; + struct steadystate_data ss; + char verror[FIO_VERROR_SIZE]; }; @@ -469,6 +505,9 @@ extern char *trigger_remote_cmd; extern long long trigger_timeout; extern char *aux_path; +extern bool steadystate; +#define STEADYSTATE_MSEC (1000) + extern struct thread_data *threads; static inline void fio_ro_check(const struct thread_data *td, struct io_u *io_u) @@ -785,6 +824,14 @@ enum { FIO_CPUS_SPLIT, }; +enum { + FIO_STEADYSTATE_IOPS = 0, + FIO_STEADYSTATE_IOPS_SLOPE, + FIO_STEADYSTATE_BW, + FIO_STEADYSTATE_BW_SLOPE, +}; + + extern void exec_trigger(const char *); extern void check_trigger_file(void); diff --git a/helper_thread.c b/helper_thread.c index f031df4d..d716f2b2 100644 --- a/helper_thread.c +++ b/helper_thread.c @@ -1,6 +1,7 @@ #include "fio.h" #include "smalloc.h" #include "helper_thread.h" +#include "steadystate.h" static struct helper_data { volatile int exit; @@ -69,14 +70,15 @@ void helper_thread_exit(void) static void *helper_thread_main(void *data) { struct helper_data *hd = data; - unsigned int msec_to_next_event, next_log; - struct timeval tv, last_du; + unsigned int msec_to_next_event, next_log, next_ss = STEADYSTATE_MSEC; + struct timeval tv, last_du, last_ss; int ret = 0; sk_out_assign(hd->sk_out); gettimeofday(&tv, NULL); memcpy(&last_du, &tv, sizeof(tv)); + memcpy(&last_ss, &tv, sizeof(tv)); fio_mutex_up(hd->startup_mutex); @@ -84,7 +86,7 @@ static void *helper_thread_main(void *data) while (!ret && !hd->exit) { struct timespec ts; struct timeval now; - uint64_t since_du; + uint64_t since_du, since_ss = 0; timeval_add_msec(&tv, msec_to_next_event); ts.tv_sec = tv.tv_sec; @@ -98,6 +100,7 @@ static void *helper_thread_main(void *data) if (hd->reset) { memcpy(&tv, &now, sizeof(tv)); memcpy(&last_du, &now, sizeof(last_du)); + memcpy(&last_ss, &now, sizeof(last_ss)); hd->reset = 0; } @@ -122,7 +125,22 @@ static void *helper_thread_main(void *data) if (!next_log) next_log = DISK_UTIL_MSEC; - msec_to_next_event = min(next_log, msec_to_next_event); + if (steadystate) { + since_ss = mtime_since(&last_ss, &now); + if (since_ss >= STEADYSTATE_MSEC || STEADYSTATE_MSEC - since_ss < 10) { + steadystate_check(); + timeval_add_msec(&last_ss, since_ss); + if (since_ss > STEADYSTATE_MSEC) + next_ss = STEADYSTATE_MSEC - (since_ss - STEADYSTATE_MSEC); + else + next_ss = STEADYSTATE_MSEC; + } + else + next_ss = STEADYSTATE_MSEC - since_ss; + } + + msec_to_next_event = min(min(next_log, msec_to_next_event), next_ss); + dprint(FD_STEADYSTATE, "since_ss: %llu, next_ss: %u, next_log: %u, msec_to_next_event: %u\n", (unsigned long long)since_ss, next_ss, next_log, msec_to_next_event); if (!is_backend) print_thread_status(); @@ -142,6 +160,7 @@ int helper_thread_create(struct fio_mutex *startup_mutex, struct sk_out *sk_out) hd = smalloc(sizeof(*hd)); setup_disk_util(); + steadystate_setup(); hd->sk_out = sk_out; diff --git a/init.c b/init.c index a1a00879..7f91c048 100644 --- a/init.c +++ b/init.c @@ -25,6 +25,7 @@ #include "server.h" #include "idletime.h" #include "filelock.h" +#include "steadystate.h" #include "oslib/getopt.h" #include "oslib/strcasestr.h" @@ -80,6 +81,8 @@ unsigned int *fio_debug_jobp = NULL; static char cmd_optstr[256]; static int did_arg; +bool steadystate = false; + #define FIO_CLIENT_FLAG (1 << 16) /* @@ -1577,6 +1580,58 @@ static int add_job(struct thread_data *td, const char *jobname, int job_add_num, log_info("...\n"); } + if (o->ss_dur) { + steadystate = true; + o->ss_dur /= 1000000L; + + /* put all steady state info in one place */ + td->ss.dur = o->ss_dur; + td->ss.limit = o->ss_limit.u.f; + td->ss.ramp_time = o->ss_ramp_time; + td->ss.pct = o->ss_pct; + + if (o->ss == FIO_STEADYSTATE_IOPS_SLOPE || o->ss == FIO_STEADYSTATE_BW_SLOPE) { + td->ss.check_slope = true; + td->ss.evaluate = &steadystate_slope; + } else { + td->ss.check_slope = false; + td->ss.evaluate = &steadystate_deviation; + } + + if (o->ss == FIO_STEADYSTATE_IOPS || o->ss == FIO_STEADYSTATE_IOPS_SLOPE) + td->ss.check_iops = true; + else + td->ss.check_iops = false; + + + /* when group reporting is enabled only the cache allocated for the final td is actually used */ + td->ss.cache = malloc(o->ss_dur * sizeof(*(td->ss.cache))); + if (td->ss.cache == NULL) + { + log_err("fio: unable to allocate memory for steadystate cache\n"); + goto err; + } + for (i = 0; i < td->ss.dur; i++) + td->ss.cache[i] = 0; + /* initialize so that it is obvious if the cache is not full in the output */ + + td->ss.ramp_time_over = (td->ss.ramp_time == 0); + td->ss.attained = 0; + td->ss.last_in_group = 0; + td->ss.head = 0; + td->ss.tail = 0; + td->ss.sum_x = o->ss_dur * (o->ss_dur - 1) / 2; + td->ss.sum_x_sq = (o->ss_dur - 1) * (o->ss_dur) * (2*o->ss_dur - 1) / 6; + td->ss.prev_bytes = 0; + td->ss.prev_iops = 0; + td->ss.sum_y = 0; + td->ss.oldest_y = 0; + td->ss.criterion = 0.0; + td->ts.ss = &td->ss; + } + else + td->ts.ss = NULL; + /* * recurse add identical jobs, clear numjobs and stonewall options * as they don't apply to sub-jobs @@ -1592,6 +1647,8 @@ static int add_job(struct thread_data *td, const char *jobname, int job_add_num, td_new->o.stonewall = 0; td_new->o.new_group = 0; td_new->subjob_number = numjobs; + td_new->o.ss_dur = o->ss_dur * 1000000l; + td_new->o.ss_limit = o->ss_limit; if (file_alloced) { if (td_new->files) { @@ -2134,6 +2191,10 @@ struct debug_level debug_levels[] = { .help = "Log compression logging", .shift = FD_COMPRESS, }, + { .name = "steadystate", + .help = "Steady state detection logging", + .shift = FD_STEADYSTATE, + }, { .name = NULL, }, }; diff --git a/libfio.c b/libfio.c index fb7d35ae..20ce7cdf 100644 --- a/libfio.c +++ b/libfio.c @@ -153,6 +153,7 @@ void reset_all_stats(struct thread_data *td) memcpy(&td->start, &tv, sizeof(tv)); memcpy(&td->iops_sample_time, &tv, sizeof(tv)); memcpy(&td->bw_sample_time, &tv, sizeof(tv)); + memcpy(&td->ss.prev_time, &tv, sizeof(tv)); lat_target_reset(td); clear_rusage_stat(td); diff --git a/options.c b/options.c index 6161ac8f..dfa98d3e 100644 --- a/options.c +++ b/options.c @@ -1061,6 +1061,86 @@ static int str_random_distribution_cb(void *data, const char *str) return 0; } +static int str_steadystate_cb(void *data, const char *str) +{ + struct thread_data *td = data; + double val; + char *nr; + char *pct; + long long ll; + + if (td->o.ss != FIO_STEADYSTATE_IOPS && + td->o.ss != FIO_STEADYSTATE_IOPS_SLOPE && + td->o.ss != FIO_STEADYSTATE_BW && + td->o.ss != FIO_STEADYSTATE_BW_SLOPE) { + /* should be impossible to get here */ + log_err("fio: unknown steady state criterion\n"); + return 1; + } + + nr = get_opt_postfix(str); + if (!nr) { + log_err("fio: steadystate threshold must be specified in addition to criterion\n"); + free(nr); + return 1; + } + + /* ENHANCEMENT Allow fio to understand size=10.2% and use here */ + pct = strstr(nr, "%"); + if (pct) { + *pct = '\0'; + strip_blank_end(nr); + if (!str_to_float(nr, &val, 0)) { + log_err("fio: could not parse steadystate threshold percentage\n"); + free(nr); + return 1; + } + + dprint(FD_PARSE, "set steady state threshold to %f%%\n", val); + free(nr); + if (parse_dryrun()) + return 0; + + td->o.ss_pct = true; + td->o.ss_limit.u.f = val; + + + } else if (td->o.ss == FIO_STEADYSTATE_IOPS || + td->o.ss == FIO_STEADYSTATE_IOPS_SLOPE) { + if (!str_to_float(nr, &val, 0)) { + log_err("fio: steadystate IOPS threshold postfix parsing failed\n"); + free(nr); + return 1; + } + + dprint(FD_PARSE, "set steady state IOPS threshold to %f\n", val); + free(nr); + if (parse_dryrun()) + return 0; + + td->o.ss_pct = false; + td->o.ss_limit.u.f = val; + + } else { /* bandwidth criterion */ + if (str_to_decimal(nr, &ll, 1, td, 0, 0)) { + log_err("fio: steadystate BW threshold postfix parsing failed\n"); + free(nr); + return 1; + } + + dprint(FD_PARSE, "set steady state BW threshold to %lld\n", ll); + free(nr); + if (parse_dryrun()) + return 0; + + td->o.ss_pct = false; + td->o.ss_limit.u.f = (double) ll; + + } + + return 0; +} + /* * Return next name in the string. Files are separated with ':'. If the ':' * is escaped with a '\', then that ':' is part of the filename and does not @@ -4133,6 +4213,63 @@ struct fio_option fio_options[FIO_MAX_OPTS] = { .category = FIO_OPT_C_IO, .group = FIO_OPT_G_MTD, }, + { + .name = "steadystate", + .lname = "Steady state threshold", + .alias = "ss", + .type = FIO_OPT_STR, + .off1 = offsetof(struct thread_options, ss), + .cb = str_steadystate_cb, + .help = "Define the criterion and limit to judge when a job has reached steady state", + .def = "iops_slope:0.01%", + .posval = { + { .ival = "iops", + .oval = FIO_STEADYSTATE_IOPS, + .help = "maximum mean deviation of IOPS measurements", + }, + { .ival = "iops_slope", + .oval = FIO_STEADYSTATE_IOPS_SLOPE, + .help = "slope calculated from IOPS measurements", + }, + { .ival = "bw", + .oval = FIO_STEADYSTATE_BW, + .help = "maximum mean deviation of bandwidth measurements", + }, + { + .ival = "bw_slope", + .oval = FIO_STEADYSTATE_BW_SLOPE, + .help = "slope calculated from bandwidth measurements", + }, + }, + .category = FIO_OPT_C_GENERAL, + .group = FIO_OPT_G_RUNTIME, + }, + { + .name = "steadystate_duration", + .lname = "Steady state duration", + .alias = "ss_dur", + .type = FIO_OPT_STR_VAL_TIME, + .off1 = offsetof(struct thread_options, ss_dur), + .help = "Stop workload upon attaining steady state for specified duration", + .def = "0", + .is_seconds = 1, + .is_time = 1, + .category = FIO_OPT_C_GENERAL, + .group = FIO_OPT_G_RUNTIME, + }, + { + .name = "steadystate_ramp_time", + .lname = "Steady state ramp time", + .alias = "ss_ramp", + .type = FIO_OPT_STR_VAL_TIME, + .off1 = offsetof(struct thread_options, ss_ramp_time), + .help = "Delay before initiation of data collection for steady state job termination testing", + .def = "0", + .is_seconds = 1, + .is_time = 1, + .category = FIO_OPT_C_GENERAL, + .group = FIO_OPT_G_RUNTIME, + }, { .name = NULL, }, diff --git a/stat.c b/stat.c index ef9fe7d4..3e7ff756 100644 --- a/stat.c +++ b/stat.c @@ -1255,6 +1255,36 @@ static struct json_object *show_thread_status_json(struct thread_stat *ts, } } + /* s}teady state detection; move this behind json+? */ + if (ts->ss) { + struct json_array *cache; + struct steadystate_data *ss = ts->ss; + int i, x; + char ss_option[64]; + + snprintf(ss_option, sizeof(ss_option), "%s%s:%f%s", + ss->check_iops ? "iops" : "bw", + ss->check_slope ? "_slope" : "", + (float) ss->limit, + ss->pct ? "%" : ""); + + tmp = json_create_object(); + json_object_add_value_object(root, "steadystate", tmp); + json_object_add_value_string(tmp, "ss", ss_option); + json_object_add_value_float(tmp, "limit", (float)ss->limit); + json_object_add_value_int(tmp, "duration", (int)ss->dur); + json_object_add_value_int(tmp, "steadystate_ramptime", ss->ramp_time / 1000000L); + json_object_add_value_int(tmp, "attained", ss->attained); + json_object_add_value_float(tmp, "criterion", ss->criterion); + + cache = json_create_array(); + json_object_add_value_array(tmp, "data", cache); + for (i = 0; i < ss->dur; i++) { + x = (ss->head + i) % ss->dur; + json_array_add_value_int(cache, ss->cache[x]); + } + } + return root; } @@ -1578,6 +1608,11 @@ void __show_run_stats(void) ts->block_infos[k] = td->ts.block_infos[k]; sum_thread_stats(ts, &td->ts, idx == 1); + + if (td->o.ss_dur) + ts->ss = &td->ss; + else + ts->ss = NULL; } for (i = 0; i < nr_ts; i++) { diff --git a/stat.h b/stat.h index 86f1a0b5..357a1ff1 100644 --- a/stat.h +++ b/stat.h @@ -213,6 +213,8 @@ struct thread_stat { uint64_t latency_target; fio_fp64_t latency_percentile; uint64_t latency_window; + + struct steadystate_data *ss; } __attribute__((packed)); struct jobs_eta { diff --git a/steadystate.c b/steadystate.c new file mode 100644 index 00000000..1e7212f8 --- /dev/null +++ b/steadystate.c @@ -0,0 +1,218 @@ +#include "fio.h" +#include "steadystate.h" +#include "helper_thread.h" + +void steadystate_setup() +{ + int i, prev_groupid; + struct thread_data *td, *prev_td; + + if (!steadystate) + return; + + /* + * if group reporting is enabled, identify the last td + * for each group and use it for storing steady state + * data + */ + prev_groupid = -1; + prev_td = NULL; + for_each_td(td, i) { + if (!td->o.group_reporting) + continue; + + if (prev_groupid != td->groupid) { + if (prev_td != NULL) + prev_td->ss.last_in_group = 1; + prev_groupid = td->groupid; + } + prev_td = td; + } + + if (prev_td != NULL && prev_td->o.group_reporting) + prev_td->ss.last_in_group = 1; +} + +void steadystate_check() +{ + int i, j, ddir, prev_groupid, group_ramp_time_over = 0; + unsigned long rate_time; + struct thread_data *td, *td2; + struct timeval now; + unsigned long group_bw = 0, group_iops = 0; + unsigned long long td_iops; + unsigned long long td_bytes; + + prev_groupid = -1; + for_each_td(td, i) { + struct steadystate_data *ss = &td->ss; + + if (!ss->dur || td->runstate <= TD_SETTING_UP || td->runstate >= TD_EXITED || ss->attained) + continue; + + td_iops = 0; + td_bytes = 0; + if (!td->o.group_reporting || + (td->o.group_reporting && td->groupid != prev_groupid)) { + group_bw = 0; + group_iops = 0; + group_ramp_time_over = 0; + } + prev_groupid = td->groupid; + + fio_gettime(&now, NULL); + if (ss->ramp_time && !ss->ramp_time_over) + /* + * Begin recording data one second after ss->ramp_time + * has elapsed + */ + if (utime_since(&td->epoch, &now) >= (ss->ramp_time + 1000000L)) + ss->ramp_time_over = 1; + + for (ddir = DDIR_READ; ddir < DDIR_RWDIR_CNT; ddir++) { + td_iops += td->io_blocks[ddir]; + td_bytes += td->io_bytes[ddir]; + } + + rate_time = mtime_since(&ss->prev_time, &now); + memcpy(&ss->prev_time, &now, sizeof(now)); + + /* + * Begin monitoring when job starts but don't actually use + * data in checking stopping criterion until ss->ramp_time is + * over. This ensures that we will have a sane value in + * prev_iops/bw the first time through after ss->ramp_time + * is done. + */ + if (ss->ramp_time_over) { + group_bw += 1000 * (td_bytes - ss->prev_bytes) / rate_time; + group_iops += 1000 * (td_iops - ss->prev_iops) / rate_time; + ++group_ramp_time_over; + } + ss->prev_iops = td_iops; + ss->prev_bytes = td_bytes; + + if (td->o.group_reporting && !ss->last_in_group) + continue; + + /* don't begin checking criterion until ss->ramp_time is over for at least one thread in group */ + if (!group_ramp_time_over) + continue; + + dprint(FD_STEADYSTATE, "steadystate_check() thread: %d, groupid: %u, rate_msec: %ld, iops: %lu, bw: %lu, head: %d, tail: %d\n", + i, td->groupid, rate_time, group_iops, group_bw, ss->head, ss->tail); + + if (ss->evaluate(group_iops, group_bw, td)) + { + if (td->o.group_reporting) + for_each_td(td2, j) { + if (td2->groupid == td->groupid) { + td2->ss.attained = 1; + fio_mark_td_terminate(td2); + } + } + else { + ss->attained = 1; + fio_mark_td_terminate(td); + } + } + } +} + +bool steadystate_slope(unsigned long iops, unsigned long bw, struct thread_data *td) +{ + int i, x; + double result; + double slope; + struct steadystate_data *ss = &td->ss; + + ss->cache[ss->tail] = ss->check_iops ? iops : bw; + + if (ss->tail < ss->head || (ss->tail - ss->head == ss->dur - 1)) + { + if (ss->sum_y == 0) /* first time through */ + { + for(i = 0; i < ss->dur; i++) + { + ss->sum_y += ss->cache[i]; + x = ss->head + i; + if (x >= ss->dur) + x -= ss->dur; + ss->sum_xy += ss->cache[x] * i; + } + } else { /* easy to update the sums */ + ss->sum_y -= ss->oldest_y; + ss->sum_y += ss->cache[ss->tail]; + ss->sum_xy = ss->sum_xy - ss->sum_y + ss->dur * ss->cache[ss->tail]; + } + + ss->oldest_y = ss->cache[ss->head]; + + /* + * calculate slope as (sum_xy - sum_x * sum_y / n) / (sum_(x^2) - (sum_x)^2 / n) + * This code assumes that all x values are equally spaced when they are often + * off by a few milliseconds. This assumption greatly simplifies the + * calculations. + */ + slope = (ss->sum_xy - (double) ss->sum_x * ss->sum_y / ss->dur) / (ss->sum_x_sq - (double) ss->sum_x * ss->sum_x / ss->dur); + ss->criterion = ss->pct ? slope / (ss->sum_y / ss->dur) * 100.0: slope; + + dprint(FD_STEADYSTATE, "sum_y: %llu, sum_xy: %llu, slope: %f, criterion: %f, limit: %f\n", + ss->sum_y, ss->sum_xy, slope, ss->criterion, ss->limit); + + result = ss->criterion * (ss->criterion < 0.0 ? -1 : 1); + if (result < ss->limit) + return true; + } + + ss->tail = (ss->tail + 1) % ss->dur; + if (ss->tail <= ss->head) + ss->head = (ss->head + 1) % ss->dur; + return false; +} + +bool steadystate_deviation(unsigned long iops, unsigned long bw, struct thread_data *td) +{ + int i; + double diff; + double mean; + double deviation; + + struct steadystate_data *ss = &td->ss; + + ss->cache[ss->tail] = ss->check_iops ? iops : bw; + + if (ss->tail < ss->head || (ss->tail - ss->head == ss->dur - 1)) + { + if (ss->sum_y == 0) /* first time through */ + { + for(i = 0; i < ss->dur; i++) + ss->sum_y += ss->cache[i]; + } else { /* easy to update the sum */ + ss->sum_y -= ss->oldest_y; + ss->sum_y += ss->cache[ss->tail]; + } + + ss->oldest_y = ss->cache[ss->head]; + mean = (double) ss->sum_y / ss->dur; + deviation = 0.0; + + for (i = 0; i < ss->dur; i++) + { + diff = (double) ss->cache[i] - mean; + deviation = max(deviation, diff * (diff < 0.0 ? -1 : 1)); + } + + ss->criterion = ss->pct ? deviation / mean * 100.0 : deviation; + + dprint(FD_STEADYSTATE, "sum_y: %llu, mean: %f, max diff: %f, objective: %f, limit: %f\n", ss->sum_y, mean, deviation, ss->criterion, ss->limit); + + if (ss->criterion < ss->limit) + return true; + } + + ss->tail = (ss->tail + 1) % ss->dur; + if (ss->tail <= ss->head) + ss->head = (ss->head + 1) % ss->dur; + return false; +} diff --git a/steadystate.h b/steadystate.h new file mode 100644 index 00000000..039ffc90 --- /dev/null +++ b/steadystate.h @@ -0,0 +1,9 @@ +#ifndef FIO_STEADYSTATE_H +#define FIO_STEADYSTATE_H + +extern void steadystate_check(void); +extern void steadystate_setup(void); +extern bool steadystate_deviation(unsigned long, unsigned long, struct thread_data *); +extern bool steadystate_slope(unsigned long, unsigned long, struct thread_data *); +#endif + diff --git a/thread_options.h b/thread_options.h index d70fda3f..088819bf 100644 --- a/thread_options.h +++ b/thread_options.h @@ -169,6 +169,11 @@ struct thread_options { unsigned long long start_delay_high; unsigned long long timeout; unsigned long long ramp_time; + unsigned int ss; /* TODO add to thread_options_pack */ + bool ss_pct; + fio_fp64_t ss_limit; + unsigned long long ss_dur; + unsigned long long ss_ramp_time; unsigned int overwrite; unsigned int bw_avg_time; unsigned int iops_avg_time; diff --git a/unit_tests/steadystate_tests.py b/unit_tests/steadystate_tests.py new file mode 100755 index 00000000..02b2b0d6 --- /dev/null +++ b/unit_tests/steadystate_tests.py @@ -0,0 +1,203 @@ +#!/usr/bin/python +# +# steadystate_tests.py +# +# Test option parsing and functonality for fio's steady state detection feature. +# +# steadystate_tests.py ./fio file-for-read-testing file-for-write-testing +# +# REQUIREMENTS +# Python 2.6+ +# SciPy +# +# KNOWN ISSUES +# only option parsing and read tests are carried out +# the read test fails when ss_ramp > timeout because it tries to calculate the stopping criterion and finds that +# it does not match what fio reports +# min runtime: +# if ss attained: min runtime = ss_dur + ss_ramp +# if not attained: runtime = timeout + +import os +import json +import tempfile +import argparse +import subprocess +from scipy import stats + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument('fio', + help='path to fio executable'); + parser.add_argument('read', + help='target for read testing') + parser.add_argument('write', + help='target for write testing') + args = parser.parse_args() + + return args + + +def check(data, iops, slope, pct, limit, dur, criterion): + mean = sum(data) / len(data) + if slope: + x = range(len(data)) + m, intercept, r_value, p_value, std_err = stats.linregress(x,data) + m = abs(m) + if pct: + target = m / mean * 100 + else: + target = m + else: + maxdev = 0 + for x in data: + maxdev = max(abs(mean-x), maxdev) + if pct: + target = maxdev / mean * 100 + else: + target = maxdev + + return (abs(target - criterion) / criterion < 0.001), target < limit, mean, target + + +if __name__ == '__main__': + args = parse_args() + +# +# test option parsing +# + parsing = [ { 'args': ["--parse-only", "--debug=parse", "--ss_dur=10s", "--ss=iops:10", "--ss_ramp=5"], + 'output': "set steady state IOPS threshold to 10.000000" }, + { 'args': ["--parse-only", "--debug=parse", "--ss_dur=10s", "--ss=iops:10%", "--ss_ramp=5"], + 'output': "set steady state threshold to 10.000000%" }, + { 'args': ["--parse-only", "--debug=parse", "--ss_dur=10s", "--ss=iops:.1%", "--ss_ramp=5"], + 'output': "set steady state threshold to 0.100000%" }, + { 'args': ["--parse-only", "--debug=parse", "--ss_dur=10s", "--ss=bw:10%", "--ss_ramp=5"], + 'output': "set steady state threshold to 10.000000%" }, + { 'args': ["--parse-only", "--debug=parse", "--ss_dur=10s", "--ss=bw:.1%", "--ss_ramp=5"], + 'output': "set steady state threshold to 0.100000%" }, + { 'args': ["--parse-only", "--debug=parse", "--ss_dur=10s", "--ss=bw:12", "--ss_ramp=5"], + 'output': "set steady state BW threshold to 12" }, + ] + for test in parsing: + output = subprocess.check_output([args.fio] + test['args']); + if test['output'] in output: + print "PASSED '{0}' found with arguments {1}".format(test['output'], test['args']) + else: + print "FAILED '{0}' NOT found with arguments {1}".format(test['output'], test['args']) + +# +# test some read workloads +# +# if ss active and attained, +# check that runtime is less than job time +# check criteria +# how to check ramp time? +# +# if ss inactive +# check that runtime is what was specified +# + reads = [ [ {'s': True, 'timeout': 100, 'numjobs': 1, 'ss_dur': 5, 'ss_ramp': 3, 'iops': True, 'slope': True, 'ss_limit': 0.1, 'pct': True}, + {'s': False, 'timeout': 20, 'numjobs': 2}, + {'s': True, 'timeout': 100, 'numjobs': 3, 'ss_dur': 10, 'ss_ramp': 5, 'iops': False, 'slope': True, 'ss_limit': 0.1, 'pct': True}, + {'s': True, 'timeout': 10, 'numjobs': 3, 'ss_dur': 10, 'ss_ramp': 500, 'iops': False, 'slope': True, 'ss_limit': 0.1, 'pct': True} ], + ] + + accum = [] + suitenum = 0 + for suite in reads: + jobnum = 0 + for job in suite: + parameters = [ "--name=job{0}".format(jobnum), + "--thread", + "--filename={0}".format(args.read), + "--rw=randrw", "--rwmixread=100", "--stonewall", + "--group_reporting", "--numjobs={0}".format(job['numjobs']), + "--time_based", "--runtime={0}".format(job['timeout']) ] + if job['s']: + if job['iops']: + ss = 'iops' + else: + ss = 'bw' + if job['slope']: + ss += "_slope" + ss += ":" + str(job['ss_limit']) + if job['pct']: + ss += '%' + parameters.extend([ '--ss_dur={0}'.format(job['ss_dur']), + '--ss={0}'.format(ss), + '--ss_ramp={0}'.format(job['ss_ramp']) ]) + accum.extend(parameters) + jobnum += 1 + + tf = tempfile.NamedTemporaryFile(delete=False) + tf.close() + output = subprocess.check_output([args.fio, + "--output-format=json", + "--output={0}".format(tf.name)] + accum) + with open(tf.name, 'r') as source: + jsondata = json.loads(source.read()) + os.remove(tf.name) + jobnum = 0 + for job in jsondata['jobs']: + line = "suite {0}, {1}".format(suitenum, job['job options']['name']) + if suite[jobnum]['s']: + if job['steadystate']['attained'] == 1: + # check runtime >= ss_dur + ss_ramp, check criterion, check criterion < limit + mintime = (suite[jobnum]['ss_dur'] + suite[jobnum]['ss_ramp']) * 1000 + actual = job['read']['runtime'] + if mintime > actual: + line = 'FAILED ' + line + ' ss attained, runtime {0} < ss_dur {1} + ss_ramp {2}'.format(actual, suite[jobnum]['ss_dur'], suite[jobnum]['ss_ramp']) + else: + line = line + ' ss attained, runtime {0} > ss_dur {1} + ss_ramp {2},'.format(actual, suite[jobnum]['ss_dur'], suite[jobnum]['ss_ramp']) + objsame, met, mean, target = check(data=job['steadystate']['data'], + iops=suite[jobnum]['iops'], + slope=suite[jobnum]['slope'], + pct=suite[jobnum]['pct'], + limit=suite[jobnum]['ss_limit'], + dur=suite[jobnum]['ss_dur'], + criterion=job['steadystate']['criterion']) + if not objsame: + line = 'FAILED ' + line + ' fio criterion {0} != calculated criterion {1}, data: {2} '.format(job['steadystate']['criterion'], target, job['steadystate']['data']) + else: + if met: + line = 'PASSED ' + line + ' target {0} < limit {1}, data {2}'.format(target, suite[jobnum]['ss_limit'], job['steadystate']['data']) + else: + line = 'FAILED ' + line + ' target {0} < limit {1} but fio reports ss not attained, data: {2}'.format(target, suite[jobnum]['ss_limit'], job['steadystate']['data']) + + else: + # check runtime, confirm criterion calculation, and confirm that criterion was not met + expected = suite[jobnum]['timeout'] * 1000 + actual = job['read']['runtime'] + if abs(expected - actual) > 10: + line = 'FAILED ' + line + ' ss not attained, expected runtime {0} != actual runtime {1}'.format(expected, actual) + else: + line = line + ' ss not attained, runtime {0} != ss_dur {1} + ss_ramp {2},'.format(actual, suite[jobnum]['ss_dur'], suite[jobnum]['ss_ramp']) + objsame, met, mean, target = check(data=job['steadystate']['data'], + iops=suite[jobnum]['iops'], + slope=suite[jobnum]['slope'], + pct=suite[jobnum]['pct'], + limit=suite[jobnum]['ss_limit'], + dur=suite[jobnum]['ss_dur'], + criterion=job['steadystate']['criterion']) + if not objsame: + if actual > (suite[jobnum]['ss_dur'] + suite[jobnum]['ss_ramp'])*1000: + line = 'FAILED ' + line + ' fio criterion {0} != calculated criterion {1}, data: {2} '.format(job['steadystate']['criterion'], target, job['steadystate']['data']) + else: + line = 'PASSED ' + line + ' fio criterion {0} == 0.0 since ss_dur + ss_ramp has not elapsed, data: {1} '.format(job['steadystate']['criterion'], job['steadystate']['data']) + else: + if met: + line = 'FAILED ' + line + ' target {0} < threshold {1} but fio reports ss not attained, data: {2}'.format(target, suite[jobnum]['ss_limit'], job['steadystate']['data']) + else: + line = 'PASSED ' + line + ' criterion {0} > threshold {1}, data {2}'.format(target, suite[jobnum]['ss_limit'], job['steadystate']['data']) + else: + expected = suite[jobnum]['timeout'] * 1000 + actual = job['read']['runtime'] + if abs(expected - actual) < 10: + result = 'PASSED ' + else: + result = 'FAILED ' + line = result + line + ' no ss, expected runtime {0} ~= actual runtime {1}'.format(expected, actual) + print line + jobnum += 1 + suitenum += 1 -- 2.25.1