From bb49c8bd53e2b3fd7abe31fde742b5f2b1563ef5 Mon Sep 17 00:00:00 2001 From: Vincent Fu Date: Fri, 2 Dec 2016 12:51:36 -0500 Subject: [PATCH] steadystate: implement transmission of steadystate data over the wire in client/server mode 1) Pare down thread_stat steadystate members to only those that need to go over the wire 2) Plumb up sending/receiving of steadystate data including ring buffers --- TODO | 2 -- client.c | 26 +++++++++++++++++ server.c | 33 +++++++++++++++++++++- stat.c | 77 ++++++++++++++++++++++++++++----------------------- stat.h | 13 ++++++++- steadystate.c | 29 +++++++++---------- steadystate.h | 17 ++++++------ 7 files changed, 133 insertions(+), 64 deletions(-) diff --git a/TODO b/TODO index 5a8d71d0..e4b146e9 100644 --- a/TODO +++ b/TODO @@ -1,7 +1,5 @@ Known issues/TODO (for steady-state) -- Does not work over a network connection - - Allow user to specify the frequency of measurements - Better documentation for output diff --git a/client.c b/client.c index d502a4b3..96bb29c1 100644 --- a/client.c +++ b/client.c @@ -946,6 +946,22 @@ static void convert_ts(struct thread_stat *dst, struct thread_stat *src) dst->nr_block_infos = le64_to_cpu(src->nr_block_infos); for (i = 0; i < dst->nr_block_infos; i++) dst->block_infos[i] = le32_to_cpu(src->block_infos[i]); + + dst->ss_dur = le64_to_cpu(src->ss_dur); + dst->ss_state = le32_to_cpu(src->ss_state); + dst->ss_head = le32_to_cpu(src->ss_head); + dst->ss_sum_y = le64_to_cpu(src->ss_sum_y); + dst->ss_limit.u.f = fio_uint64_to_double(le64_to_cpu(src->ss_limit.u.i)); + dst->ss_slope.u.f = fio_uint64_to_double(le64_to_cpu(src->ss_slope.u.i)); + dst->ss_deviation.u.f = fio_uint64_to_double(le64_to_cpu(src->ss_deviation.u.i)); + dst->ss_criterion.u.f = fio_uint64_to_double(le64_to_cpu(src->ss_criterion.u.i)); + + if (dst->ss_state & __FIO_SS_DATA) { + for (i = 0; i < dst->ss_dur; i++ ) { + dst->ss_iops_data[i] = le64_to_cpu(src->ss_iops_data[i]); + dst->ss_bw_data[i] = le64_to_cpu(src->ss_bw_data[i]); + } + } } static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src) @@ -1547,6 +1563,7 @@ int fio_handle_client(struct fio_client *client) { struct client_ops *ops = client->ops; struct fio_net_cmd *cmd; + int size; dprint(FD_NET, "client: handle %s\n", client->hostname); @@ -1579,6 +1596,15 @@ int fio_handle_client(struct fio_client *client) case FIO_NET_CMD_TS: { struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload; + dprint(FD_NET, "client: ts->ss_state = %u\n", (unsigned int) le32_to_cpu(p->ts.ss_state)); + if (le32_to_cpu(p->ts.ss_state) & __FIO_SS_DATA) { + dprint(FD_NET, "client: received steadystate ring buffers\n"); + + size = le64_to_cpu(p->ts.ss_dur); + p->ts.ss_iops_data = (uint64_t *) ((struct cmd_ts_pdu *)cmd->payload + 1); + p->ts.ss_bw_data = p->ts.ss_iops_data + size; + } + convert_ts(&p->ts, &p->ts); convert_gs(&p->rs, &p->rs); diff --git a/server.c b/server.c index 2fd9b45a..bef4f826 100644 --- a/server.c +++ b/server.c @@ -1458,6 +1458,8 @@ void fio_server_send_ts(struct thread_stat *ts, struct group_run_stats *rs) { struct cmd_ts_pdu p; int i, j; + void *ss_buf; + uint64_t *ss_iops, *ss_bw; dprint(FD_NET, "server sending end stats\n"); @@ -1541,9 +1543,38 @@ void fio_server_send_ts(struct thread_stat *ts, struct group_run_stats *rs) for (i = 0; i < p.ts.nr_block_infos; i++) p.ts.block_infos[i] = le32_to_cpu(ts->block_infos[i]); + p.ts.ss_dur = cpu_to_le64(ts->ss_dur); + p.ts.ss_state = cpu_to_le32(ts->ss_state); + p.ts.ss_head = cpu_to_le32(ts->ss_head); + p.ts.ss_sum_y = cpu_to_le64(ts->ss_sum_y); + p.ts.ss_limit.u.i = cpu_to_le64(fio_double_to_uint64(ts->ss_limit.u.f)); + p.ts.ss_slope.u.i = cpu_to_le64(fio_double_to_uint64(ts->ss_slope.u.f)); + p.ts.ss_deviation.u.i = cpu_to_le64(fio_double_to_uint64(ts->ss_deviation.u.f)); + p.ts.ss_criterion.u.i = cpu_to_le64(fio_double_to_uint64(ts->ss_criterion.u.f)); + convert_gs(&p.rs, rs); - fio_net_queue_cmd(FIO_NET_CMD_TS, &p, sizeof(p), NULL, SK_F_COPY); + dprint(FD_NET, "ts->ss_state = %d\n", ts->ss_state); + if (ts->ss_state & __FIO_SS_DATA) { + dprint(FD_NET, "server sending steadystate ring buffers\n"); + + ss_buf = malloc(sizeof(p) + 2*ts->ss_dur*sizeof(uint64_t)); + + memcpy(ss_buf, &p, sizeof(p)); + + ss_iops = (uint64_t *) ((struct cmd_ts_pdu *)ss_buf + 1); + ss_bw = ss_iops + (int) ts->ss_dur; + for (i = 0; i < ts->ss_dur; i++) { + ss_iops[i] = cpu_to_le64(ts->ss_iops_data[i]); + ss_bw[i] = cpu_to_le64(ts->ss_bw_data[i]); + } + + fio_net_queue_cmd(FIO_NET_CMD_TS, ss_buf, sizeof(p) + 2*ts->ss_dur*sizeof(uint64_t), NULL, SK_F_COPY); + + free(ss_buf); + } + else + fio_net_queue_cmd(FIO_NET_CMD_TS, &p, sizeof(p), NULL, SK_F_COPY); } void fio_server_send_gs(struct group_run_stats *rs) diff --git a/stat.c b/stat.c index d635f0ac..fab0a82d 100644 --- a/stat.c +++ b/stat.c @@ -660,26 +660,25 @@ static void show_block_infos(int nr_block_infos, uint32_t *block_infos, static void show_ss_normal(struct thread_stat *ts, struct buf_output *out) { char *p1, *p2; - struct steadystate_data *ss = ts->ss; unsigned long long bw_mean, iops_mean; const int i2p = is_power_of_2(ts->kb_base); - if (!ss->state) + if (!ts->ss_state) return; - bw_mean = steadystate_bw_mean(ss); - iops_mean = steadystate_iops_mean(ss); + bw_mean = steadystate_bw_mean(ts); + iops_mean = steadystate_iops_mean(ts); p1 = num2str(bw_mean / ts->kb_base, 6, ts->kb_base, i2p, ts->unit_base); p2 = num2str(iops_mean, 6, 1, 0, 0); log_buf(out, " steadystate : attained=%s, bw=%s/s, iops=%s, %s%s=%.3f%s\n", - ss->state & __FIO_SS_ATTAINED ? "yes" : "no", + ts->ss_state & __FIO_SS_ATTAINED ? "yes" : "no", p1, p2, - ss->state & __FIO_SS_IOPS ? "iops" : "bw", - ss->state & __FIO_SS_SLOPE ? " slope": " mean dev", - ss->criterion, - ss->state & __FIO_SS_PCT ? "%" : ""); + ts->ss_state & __FIO_SS_IOPS ? "iops" : "bw", + ts->ss_state & __FIO_SS_SLOPE ? " slope": " mean dev", + ts->ss_criterion.u.f, + ts->ss_state & __FIO_SS_PCT ? "%" : ""); free(p1); free(p2); @@ -790,7 +789,7 @@ static void show_thread_status_normal(struct thread_stat *ts, show_block_infos(ts->nr_block_infos, ts->block_infos, ts->percentile_list, out); - if (ts->ss) + if (ts->ss_dur) show_ss_normal(ts, out); } @@ -1286,31 +1285,29 @@ static struct json_object *show_thread_status_json(struct thread_stat *ts, } } - if (ts->ss) { + if (ts->ss_dur) { struct json_object *data; struct json_array *iops, *bw; - struct steadystate_data *ss = ts->ss; int i, j, k; char ss_buf[64]; snprintf(ss_buf, sizeof(ss_buf), "%s%s:%f%s", - ss->state & __FIO_SS_IOPS ? "iops" : "bw", - ss->state & __FIO_SS_SLOPE ? "_slope" : "", - (float) ss->limit, - ss->state & __FIO_SS_PCT ? "%" : ""); + ts->ss_state & __FIO_SS_IOPS ? "iops" : "bw", + ts->ss_state & __FIO_SS_SLOPE ? "_slope" : "", + (float) ts->ss_limit.u.f, + ts->ss_state & __FIO_SS_PCT ? "%" : ""); tmp = json_create_object(); json_object_add_value_object(root, "steadystate", tmp); json_object_add_value_string(tmp, "ss", ss_buf); - json_object_add_value_int(tmp, "duration", (int)ss->dur); - json_object_add_value_int(tmp, "steadystate_ramptime", ss->ramp_time / 1000000L); - json_object_add_value_int(tmp, "attained", (ss->state & __FIO_SS_ATTAINED) > 0); + json_object_add_value_int(tmp, "duration", (int)ts->ss_dur); + json_object_add_value_int(tmp, "attained", (ts->ss_state & __FIO_SS_ATTAINED) > 0); - snprintf(ss_buf, sizeof(ss_buf), "%f%s", (float) ss->criterion, - ss->state & __FIO_SS_PCT ? "%" : ""); + snprintf(ss_buf, sizeof(ss_buf), "%f%s", (float) ts->ss_criterion.u.f, + ts->ss_state & __FIO_SS_PCT ? "%" : ""); json_object_add_value_string(tmp, "criterion", ss_buf); - json_object_add_value_float(tmp, "max_deviation", ss->deviation); - json_object_add_value_float(tmp, "slope", ss->slope); + json_object_add_value_float(tmp, "max_deviation", ts->ss_deviation.u.f); + json_object_add_value_float(tmp, "slope", ts->ss_slope.u.f); data = json_create_object(); json_object_add_value_object(tmp, "data", data); @@ -1323,17 +1320,17 @@ static struct json_object *show_thread_status_json(struct thread_stat *ts, ** otherwise it actually points to the second element ** in the list */ - if ((ss->state & __FIO_SS_ATTAINED) || ss->sum_y == 0) - j = ss->head; + if ((ts->ss_state & __FIO_SS_ATTAINED) || ts->ss_sum_y == 0) + j = ts->ss_head; else - j = ss->head == 0 ? ss->dur - 1 : ss->head - 1; - for (i = 0; i < ss->dur; i++) { - k = (j + i) % ss->dur; - json_array_add_value_int(bw, ss->bw_data[k]); - json_array_add_value_int(iops, ss->iops_data[k]); + j = ts->ss_head == 0 ? ts->ss_dur - 1 : ts->ss_head - 1; + for (i = 0; i < ts->ss_dur; i++) { + k = (j + i) % ts->ss_dur; + json_array_add_value_int(bw, ts->ss_bw_data[k]); + json_array_add_value_int(iops, ts->ss_iops_data[k]); } - json_object_add_value_int(data, "bw_mean", steadystate_bw_mean(ss)); - json_object_add_value_int(data, "iops_mean", steadystate_iops_mean(ss)); + json_object_add_value_int(data, "bw_mean", steadystate_bw_mean(ts)); + json_object_add_value_int(data, "iops_mean", steadystate_iops_mean(ts)); json_object_add_value_array(data, "iops", iops); json_object_add_value_array(data, "bw", bw); } @@ -1662,10 +1659,20 @@ void __show_run_stats(void) sum_thread_stats(ts, &td->ts, idx == 1); - if (td->o.ss_dur) - ts->ss = &td->ss; + if (td->o.ss_dur) { + ts->ss_state = td->ss.state; + ts->ss_dur = td->ss.dur; + ts->ss_head = td->ss.head; + ts->ss_sum_y = td->ss.sum_y; + ts->ss_bw_data = td->ss.bw_data; + ts->ss_iops_data = td->ss.iops_data; + ts->ss_limit.u.f = td->ss.limit; + ts->ss_slope.u.f = td->ss.slope; + ts->ss_deviation.u.f = td->ss.deviation; + ts->ss_criterion.u.f = td->ss.criterion; + } else - ts->ss = NULL; + ts->ss_dur = ts->ss_state = 0; } for (i = 0; i < nr_ts; i++) { diff --git a/stat.h b/stat.h index 357a1ff1..eb7397d2 100644 --- a/stat.h +++ b/stat.h @@ -214,7 +214,18 @@ struct thread_stat { fio_fp64_t latency_percentile; uint64_t latency_window; - struct steadystate_data *ss; + uint64_t ss_dur; + uint32_t ss_state; + uint32_t ss_head; + uint64_t ss_sum_y; + + uint64_t *ss_iops_data; + uint64_t *ss_bw_data; + + fio_fp64_t ss_limit; + fio_fp64_t ss_slope; + fio_fp64_t ss_deviation; + fio_fp64_t ss_criterion; } __attribute__((packed)); struct jobs_eta { diff --git a/steadystate.c b/steadystate.c index 3468428f..94d1f5e2 100644 --- a/steadystate.c +++ b/steadystate.c @@ -10,11 +10,13 @@ static void steadystate_alloc(struct thread_data *td) { int i; - td->ss.bw_data = malloc(td->ss.dur * sizeof(unsigned long)); - td->ss.iops_data = malloc(td->ss.dur * sizeof(unsigned long)); + td->ss.bw_data = malloc(td->ss.dur * sizeof(uint64_t)); + td->ss.iops_data = malloc(td->ss.dur * sizeof(uint64_t)); /* initialize so that it is obvious if the cache is not full in the output */ for (i = 0; i < td->ss.dur; i++) td->ss.iops_data[i] = td->ss.bw_data[i] = 0; + + td->ss.state |= __FIO_SS_DATA; } void steadystate_setup(void) @@ -33,18 +35,16 @@ void steadystate_setup(void) prev_groupid = -1; prev_td = NULL; for_each_td(td, i) { - if (td->ts.ss == NULL) + if (!td->ss.dur) continue; if (!td->o.group_reporting) { steadystate_alloc(td); - td->ss.state |= __FIO_SS_DATA; continue; } if (prev_groupid != td->groupid) { if (prev_td != NULL) { - prev_td->ss.state |= __FIO_SS_DATA; steadystate_alloc(prev_td); } prev_groupid = td->groupid; @@ -53,7 +53,6 @@ void steadystate_setup(void) } if (prev_td != NULL && prev_td->o.group_reporting) { - prev_td->ss.state |= __FIO_SS_DATA; steadystate_alloc(prev_td); } } @@ -319,8 +318,6 @@ int td_steadystate_init(struct thread_data *td) ss->sum_x = o->ss_dur * (o->ss_dur - 1) / 2; ss->sum_x_sq = (o->ss_dur - 1) * (o->ss_dur) * (2*o->ss_dur - 1) / 6; - - td->ts.ss = ss; } /* make sure that ss options are consistent within reporting group */ @@ -343,24 +340,24 @@ int td_steadystate_init(struct thread_data *td) return 0; } -unsigned long long steadystate_bw_mean(struct steadystate_data *ss) +unsigned long long steadystate_bw_mean(struct thread_stat *ts) { int i; unsigned long long sum; - for (i = 0, sum = 0; i < ss->dur; i++) - sum += ss->bw_data[i]; + for (i = 0, sum = 0; i < ts->ss_dur; i++) + sum += ts->ss_bw_data[i]; - return sum / ss->dur; + return sum / ts->ss_dur; } -unsigned long long steadystate_iops_mean(struct steadystate_data *ss) +unsigned long long steadystate_iops_mean(struct thread_stat *ts) { int i; unsigned long long sum; - for (i = 0, sum = 0; i < ss->dur; i++) - sum += ss->iops_data[i]; + for (i = 0, sum = 0; i < ts->ss_dur; i++) + sum += ts->ss_iops_data[i]; - return sum / ss->dur; + return sum / ts->ss_dur; } diff --git a/steadystate.h b/steadystate.h index 441817a8..deba5fb1 100644 --- a/steadystate.h +++ b/steadystate.h @@ -1,34 +1,33 @@ #ifndef FIO_STEADYSTATE_H #define FIO_STEADYSTATE_H +#include "stat.h" #include "thread_options.h" +#include "lib/ieee754.h" extern void steadystate_check(void); extern void steadystate_setup(void); extern int td_steadystate_init(struct thread_data *); -extern unsigned long long steadystate_bw_mean(struct steadystate_data *); -extern unsigned long long steadystate_iops_mean(struct steadystate_data *); +extern unsigned long long steadystate_bw_mean(struct thread_stat *); +extern unsigned long long steadystate_iops_mean(struct thread_stat *); extern bool steadystate_enabled; -/* - * For steady state detection - */ struct steadystate_data { double limit; unsigned long long dur; unsigned long long ramp_time; - unsigned int state; + uint32_t state; unsigned int head; unsigned int tail; - unsigned long *iops_data; - unsigned long *bw_data; + uint64_t *iops_data; + uint64_t *bw_data; double slope; - double criterion; double deviation; + double criterion; unsigned long long sum_y; unsigned long long sum_x; -- 2.25.1