steadystate: implement transmission of steadystate data over the wire in client/serve...
authorVincent Fu <Vincent.Fu@sandisk.com>
Fri, 2 Dec 2016 17:51:36 +0000 (12:51 -0500)
committerVincent Fu <Vincent.Fu@sandisk.com>
Wed, 7 Dec 2016 21:02:44 +0000 (16:02 -0500)
1) Pare down thread_stat steadystate members to only those that need to go over the wire
2) Plumb up sending/receiving of steadystate data including ring buffers

TODO
client.c
server.c
stat.c
stat.h
steadystate.c
steadystate.h

diff --git a/TODO b/TODO
index 5a8d71d08071fd3c5e35c0279d8ad52dcc941d66..e4b146e93c1ed4b0ac10d0a437882d652ec4cf15 100644 (file)
--- a/TODO
+++ b/TODO
@@ -1,7 +1,5 @@
 Known issues/TODO (for steady-state)
 
-- Does not work over a network connection
-
 - Allow user to specify the frequency of measurements
 
 - Better documentation for output
index d502a4b34d4c57a060d96f3c24d4436f4a0bf9bb..96bb29c19260d3399d81845a000ab9554e017077 100644 (file)
--- a/client.c
+++ b/client.c
@@ -946,6 +946,22 @@ static void convert_ts(struct thread_stat *dst, struct thread_stat *src)
        dst->nr_block_infos     = le64_to_cpu(src->nr_block_infos);
        for (i = 0; i < dst->nr_block_infos; i++)
                dst->block_infos[i] = le32_to_cpu(src->block_infos[i]);
+
+       dst->ss_dur             = le64_to_cpu(src->ss_dur);
+       dst->ss_state           = le32_to_cpu(src->ss_state);
+       dst->ss_head            = le32_to_cpu(src->ss_head);
+       dst->ss_sum_y           = le64_to_cpu(src->ss_sum_y);
+       dst->ss_limit.u.f       = fio_uint64_to_double(le64_to_cpu(src->ss_limit.u.i));
+       dst->ss_slope.u.f       = fio_uint64_to_double(le64_to_cpu(src->ss_slope.u.i));
+       dst->ss_deviation.u.f   = fio_uint64_to_double(le64_to_cpu(src->ss_deviation.u.i));
+       dst->ss_criterion.u.f   = fio_uint64_to_double(le64_to_cpu(src->ss_criterion.u.i));
+
+       if (dst->ss_state & __FIO_SS_DATA) {
+               for (i = 0; i < dst->ss_dur; i++ ) {
+                       dst->ss_iops_data[i] = le64_to_cpu(src->ss_iops_data[i]);
+                       dst->ss_bw_data[i] = le64_to_cpu(src->ss_bw_data[i]);
+               }
+       }
 }
 
 static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src)
@@ -1547,6 +1563,7 @@ int fio_handle_client(struct fio_client *client)
 {
        struct client_ops *ops = client->ops;
        struct fio_net_cmd *cmd;
+       int size;
 
        dprint(FD_NET, "client: handle %s\n", client->hostname);
 
@@ -1579,6 +1596,15 @@ int fio_handle_client(struct fio_client *client)
        case FIO_NET_CMD_TS: {
                struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload;
 
+               dprint(FD_NET, "client: ts->ss_state = %u\n", (unsigned int) le32_to_cpu(p->ts.ss_state));
+               if (le32_to_cpu(p->ts.ss_state) & __FIO_SS_DATA) {
+                       dprint(FD_NET, "client: received steadystate ring buffers\n");
+
+                       size = le64_to_cpu(p->ts.ss_dur);
+                       p->ts.ss_iops_data = (uint64_t *) ((struct cmd_ts_pdu *)cmd->payload + 1);
+                       p->ts.ss_bw_data = p->ts.ss_iops_data + size;
+               }
+
                convert_ts(&p->ts, &p->ts);
                convert_gs(&p->rs, &p->rs);
 
index 2fd9b45afb0e60403333440e26555faf270b8846..bef4f826d7dc73910b3982afddb19c41435d66ef 100644 (file)
--- a/server.c
+++ b/server.c
@@ -1458,6 +1458,8 @@ void fio_server_send_ts(struct thread_stat *ts, struct group_run_stats *rs)
 {
        struct cmd_ts_pdu p;
        int i, j;
+       void *ss_buf;
+       uint64_t *ss_iops, *ss_bw;
 
        dprint(FD_NET, "server sending end stats\n");
 
@@ -1541,9 +1543,38 @@ void fio_server_send_ts(struct thread_stat *ts, struct group_run_stats *rs)
        for (i = 0; i < p.ts.nr_block_infos; i++)
                p.ts.block_infos[i] = le32_to_cpu(ts->block_infos[i]);
 
+       p.ts.ss_dur             = cpu_to_le64(ts->ss_dur);
+       p.ts.ss_state           = cpu_to_le32(ts->ss_state);
+       p.ts.ss_head            = cpu_to_le32(ts->ss_head);
+       p.ts.ss_sum_y           = cpu_to_le64(ts->ss_sum_y);
+       p.ts.ss_limit.u.i       = cpu_to_le64(fio_double_to_uint64(ts->ss_limit.u.f));
+       p.ts.ss_slope.u.i       = cpu_to_le64(fio_double_to_uint64(ts->ss_slope.u.f));
+       p.ts.ss_deviation.u.i   = cpu_to_le64(fio_double_to_uint64(ts->ss_deviation.u.f));
+       p.ts.ss_criterion.u.i   = cpu_to_le64(fio_double_to_uint64(ts->ss_criterion.u.f));
+
        convert_gs(&p.rs, rs);
 
-       fio_net_queue_cmd(FIO_NET_CMD_TS, &p, sizeof(p), NULL, SK_F_COPY);
+       dprint(FD_NET, "ts->ss_state = %d\n", ts->ss_state);
+       if (ts->ss_state & __FIO_SS_DATA) {
+               dprint(FD_NET, "server sending steadystate ring buffers\n");
+
+               ss_buf = malloc(sizeof(p) + 2*ts->ss_dur*sizeof(uint64_t));
+
+               memcpy(ss_buf, &p, sizeof(p));
+
+               ss_iops = (uint64_t *) ((struct cmd_ts_pdu *)ss_buf + 1);
+               ss_bw = ss_iops + (int) ts->ss_dur;
+               for (i = 0; i < ts->ss_dur; i++) {
+                       ss_iops[i] = cpu_to_le64(ts->ss_iops_data[i]);
+                       ss_bw[i] = cpu_to_le64(ts->ss_bw_data[i]);
+               }
+
+               fio_net_queue_cmd(FIO_NET_CMD_TS, ss_buf, sizeof(p) + 2*ts->ss_dur*sizeof(uint64_t), NULL, SK_F_COPY);
+
+               free(ss_buf);
+       }
+       else
+               fio_net_queue_cmd(FIO_NET_CMD_TS, &p, sizeof(p), NULL, SK_F_COPY);
 }
 
 void fio_server_send_gs(struct group_run_stats *rs)
diff --git a/stat.c b/stat.c
index d635f0ac5e4938a9cb0411d85beb77c5b130c86a..fab0a82d889dff23d9e7173d07ed77f8b3645500 100644 (file)
--- a/stat.c
+++ b/stat.c
@@ -660,26 +660,25 @@ static void show_block_infos(int nr_block_infos, uint32_t *block_infos,
 static void show_ss_normal(struct thread_stat *ts, struct buf_output *out)
 {
        char *p1, *p2;
-       struct steadystate_data *ss = ts->ss;
        unsigned long long bw_mean, iops_mean;
        const int i2p = is_power_of_2(ts->kb_base);
 
-       if (!ss->state)
+       if (!ts->ss_state)
                return;
 
-       bw_mean = steadystate_bw_mean(ss);
-       iops_mean = steadystate_iops_mean(ss);
+       bw_mean = steadystate_bw_mean(ts);
+       iops_mean = steadystate_iops_mean(ts);
 
        p1 = num2str(bw_mean / ts->kb_base, 6, ts->kb_base, i2p, ts->unit_base);
        p2 = num2str(iops_mean, 6, 1, 0, 0);
 
        log_buf(out, "  steadystate  : attained=%s, bw=%s/s, iops=%s, %s%s=%.3f%s\n",
-               ss->state & __FIO_SS_ATTAINED ? "yes" : "no",
+               ts->ss_state & __FIO_SS_ATTAINED ? "yes" : "no",
                p1, p2,
-               ss->state & __FIO_SS_IOPS ? "iops" : "bw",
-               ss->state & __FIO_SS_SLOPE ? " slope": " mean dev",
-               ss->criterion,
-               ss->state & __FIO_SS_PCT ? "%" : "");
+               ts->ss_state & __FIO_SS_IOPS ? "iops" : "bw",
+               ts->ss_state & __FIO_SS_SLOPE ? " slope": " mean dev",
+               ts->ss_criterion.u.f,
+               ts->ss_state & __FIO_SS_PCT ? "%" : "");
 
        free(p1);
        free(p2);
@@ -790,7 +789,7 @@ static void show_thread_status_normal(struct thread_stat *ts,
                show_block_infos(ts->nr_block_infos, ts->block_infos,
                                  ts->percentile_list, out);
 
-       if (ts->ss)
+       if (ts->ss_dur)
                show_ss_normal(ts, out);
 }
 
@@ -1286,31 +1285,29 @@ static struct json_object *show_thread_status_json(struct thread_stat *ts,
                }
        }
 
-       if (ts->ss) {
+       if (ts->ss_dur) {
                struct json_object *data;
                struct json_array *iops, *bw;
-               struct steadystate_data *ss = ts->ss;
                int i, j, k;
                char ss_buf[64];
 
                snprintf(ss_buf, sizeof(ss_buf), "%s%s:%f%s",
-                       ss->state & __FIO_SS_IOPS ? "iops" : "bw",
-                       ss->state & __FIO_SS_SLOPE ? "_slope" : "",
-                       (float) ss->limit,
-                       ss->state & __FIO_SS_PCT ? "%" : "");
+                       ts->ss_state & __FIO_SS_IOPS ? "iops" : "bw",
+                       ts->ss_state & __FIO_SS_SLOPE ? "_slope" : "",
+                       (float) ts->ss_limit.u.f,
+                       ts->ss_state & __FIO_SS_PCT ? "%" : "");
 
                tmp = json_create_object();
                json_object_add_value_object(root, "steadystate", tmp);
                json_object_add_value_string(tmp, "ss", ss_buf);
-               json_object_add_value_int(tmp, "duration", (int)ss->dur);
-               json_object_add_value_int(tmp, "steadystate_ramptime", ss->ramp_time / 1000000L);
-               json_object_add_value_int(tmp, "attained", (ss->state & __FIO_SS_ATTAINED) > 0);
+               json_object_add_value_int(tmp, "duration", (int)ts->ss_dur);
+               json_object_add_value_int(tmp, "attained", (ts->ss_state & __FIO_SS_ATTAINED) > 0);
 
-               snprintf(ss_buf, sizeof(ss_buf), "%f%s", (float) ss->criterion,
-                       ss->state & __FIO_SS_PCT ? "%" : "");
+               snprintf(ss_buf, sizeof(ss_buf), "%f%s", (float) ts->ss_criterion.u.f,
+                       ts->ss_state & __FIO_SS_PCT ? "%" : "");
                json_object_add_value_string(tmp, "criterion", ss_buf);
-               json_object_add_value_float(tmp, "max_deviation", ss->deviation);
-               json_object_add_value_float(tmp, "slope", ss->slope);
+               json_object_add_value_float(tmp, "max_deviation", ts->ss_deviation.u.f);
+               json_object_add_value_float(tmp, "slope", ts->ss_slope.u.f);
 
                data = json_create_object();
                json_object_add_value_object(tmp, "data", data);
@@ -1323,17 +1320,17 @@ static struct json_object *show_thread_status_json(struct thread_stat *ts,
                ** otherwise it actually points to the second element
                ** in the list
                */
-               if ((ss->state & __FIO_SS_ATTAINED) || ss->sum_y == 0)
-                       j = ss->head;
+               if ((ts->ss_state & __FIO_SS_ATTAINED) || ts->ss_sum_y == 0)
+                       j = ts->ss_head;
                else
-                       j = ss->head == 0 ? ss->dur - 1 : ss->head - 1;
-               for (i = 0; i < ss->dur; i++) {
-                       k = (j + i) % ss->dur;
-                       json_array_add_value_int(bw, ss->bw_data[k]);
-                       json_array_add_value_int(iops, ss->iops_data[k]);
+                       j = ts->ss_head == 0 ? ts->ss_dur - 1 : ts->ss_head - 1;
+               for (i = 0; i < ts->ss_dur; i++) {
+                       k = (j + i) % ts->ss_dur;
+                       json_array_add_value_int(bw, ts->ss_bw_data[k]);
+                       json_array_add_value_int(iops, ts->ss_iops_data[k]);
                }
-               json_object_add_value_int(data, "bw_mean", steadystate_bw_mean(ss));
-               json_object_add_value_int(data, "iops_mean", steadystate_iops_mean(ss));
+               json_object_add_value_int(data, "bw_mean", steadystate_bw_mean(ts));
+               json_object_add_value_int(data, "iops_mean", steadystate_iops_mean(ts));
                json_object_add_value_array(data, "iops", iops);
                json_object_add_value_array(data, "bw", bw);
        }
@@ -1662,10 +1659,20 @@ void __show_run_stats(void)
 
                sum_thread_stats(ts, &td->ts, idx == 1);
 
-               if (td->o.ss_dur)
-                       ts->ss = &td->ss;
+               if (td->o.ss_dur) {
+                       ts->ss_state = td->ss.state;
+                       ts->ss_dur = td->ss.dur;
+                       ts->ss_head = td->ss.head;
+                       ts->ss_sum_y = td->ss.sum_y;
+                       ts->ss_bw_data = td->ss.bw_data;
+                       ts->ss_iops_data = td->ss.iops_data;
+                       ts->ss_limit.u.f = td->ss.limit;
+                       ts->ss_slope.u.f = td->ss.slope;
+                       ts->ss_deviation.u.f = td->ss.deviation;
+                       ts->ss_criterion.u.f = td->ss.criterion;
+               }
                else
-                       ts->ss = NULL;
+                       ts->ss_dur = ts->ss_state = 0;
        }
 
        for (i = 0; i < nr_ts; i++) {
diff --git a/stat.h b/stat.h
index 357a1ff1ab59a0fd8886461b3fb1c605182a9616..eb7397d2f1af83122efbc4df1a41fd78617135ce 100644 (file)
--- a/stat.h
+++ b/stat.h
@@ -214,7 +214,18 @@ struct thread_stat {
        fio_fp64_t latency_percentile;
        uint64_t latency_window;
 
-       struct steadystate_data *ss;
+       uint64_t ss_dur;
+       uint32_t ss_state;
+       uint32_t ss_head;
+       uint64_t ss_sum_y;
+
+       uint64_t *ss_iops_data;
+       uint64_t *ss_bw_data;
+
+       fio_fp64_t ss_limit;
+       fio_fp64_t ss_slope;
+       fio_fp64_t ss_deviation;
+       fio_fp64_t ss_criterion;
 } __attribute__((packed));
 
 struct jobs_eta {
index 3468428f4ab97c228e9766274a79785820b561a8..94d1f5e24da99882885b7ec14e807a92856d75eb 100644 (file)
@@ -10,11 +10,13 @@ static void steadystate_alloc(struct thread_data *td)
 {
        int i;
 
-       td->ss.bw_data = malloc(td->ss.dur * sizeof(unsigned long));
-       td->ss.iops_data = malloc(td->ss.dur * sizeof(unsigned long));
+       td->ss.bw_data = malloc(td->ss.dur * sizeof(uint64_t));
+       td->ss.iops_data = malloc(td->ss.dur * sizeof(uint64_t));
        /* initialize so that it is obvious if the cache is not full in the output */
        for (i = 0; i < td->ss.dur; i++)
                td->ss.iops_data[i] = td->ss.bw_data[i] = 0;
+
+       td->ss.state |= __FIO_SS_DATA;
 }
 
 void steadystate_setup(void)
@@ -33,18 +35,16 @@ void steadystate_setup(void)
        prev_groupid = -1;
        prev_td = NULL;
        for_each_td(td, i) {
-               if (td->ts.ss == NULL)
+               if (!td->ss.dur)
                        continue;
 
                if (!td->o.group_reporting) {
                        steadystate_alloc(td);
-                       td->ss.state |= __FIO_SS_DATA;
                        continue;
                }
 
                if (prev_groupid != td->groupid) {
                        if (prev_td != NULL) {
-                               prev_td->ss.state |= __FIO_SS_DATA;
                                steadystate_alloc(prev_td);
                        }
                        prev_groupid = td->groupid;
@@ -53,7 +53,6 @@ void steadystate_setup(void)
        }
 
        if (prev_td != NULL && prev_td->o.group_reporting) {
-               prev_td->ss.state |= __FIO_SS_DATA;
                steadystate_alloc(prev_td);
        }
 }
@@ -319,8 +318,6 @@ int td_steadystate_init(struct thread_data *td)
 
                ss->sum_x = o->ss_dur * (o->ss_dur - 1) / 2;
                ss->sum_x_sq = (o->ss_dur - 1) * (o->ss_dur) * (2*o->ss_dur - 1) / 6;
-
-               td->ts.ss = ss;
        }
 
        /* make sure that ss options are consistent within reporting group */
@@ -343,24 +340,24 @@ int td_steadystate_init(struct thread_data *td)
        return 0;
 }
 
-unsigned long long steadystate_bw_mean(struct steadystate_data *ss)
+unsigned long long steadystate_bw_mean(struct thread_stat *ts)
 {
        int i;
        unsigned long long sum;
 
-       for (i = 0, sum = 0; i < ss->dur; i++)
-               sum += ss->bw_data[i];
+       for (i = 0, sum = 0; i < ts->ss_dur; i++)
+               sum += ts->ss_bw_data[i];
 
-       return sum / ss->dur;
+       return sum / ts->ss_dur;
 }
 
-unsigned long long steadystate_iops_mean(struct steadystate_data *ss)
+unsigned long long steadystate_iops_mean(struct thread_stat *ts)
 {
        int i;
        unsigned long long sum;
 
-       for (i = 0, sum = 0; i < ss->dur; i++)
-               sum += ss->iops_data[i];
+       for (i = 0, sum = 0; i < ts->ss_dur; i++)
+               sum += ts->ss_iops_data[i];
 
-       return sum / ss->dur;
+       return sum / ts->ss_dur;
 }
index 441817a85a4e949d3b24ac8ad511865c5f440636..deba5fb11262ebd481f407534dd47508bb03d1e5 100644 (file)
@@ -1,34 +1,33 @@
 #ifndef FIO_STEADYSTATE_H
 #define FIO_STEADYSTATE_H
 
+#include "stat.h"
 #include "thread_options.h"
+#include "lib/ieee754.h"
 
 extern void steadystate_check(void);
 extern void steadystate_setup(void);
 extern int td_steadystate_init(struct thread_data *);
-extern unsigned long long steadystate_bw_mean(struct steadystate_data *);
-extern unsigned long long steadystate_iops_mean(struct steadystate_data *);
+extern unsigned long long steadystate_bw_mean(struct thread_stat *);
+extern unsigned long long steadystate_iops_mean(struct thread_stat *);
 
 extern bool steadystate_enabled;
 
-/*
- * For steady state detection
- */
 struct steadystate_data {
        double limit;
        unsigned long long dur;
        unsigned long long ramp_time;
 
-       unsigned int state;
+       uint32_t state;
 
        unsigned int head;
        unsigned int tail;
-       unsigned long *iops_data;
-       unsigned long *bw_data;
+       uint64_t *iops_data;
+       uint64_t *bw_data;
 
        double slope;
-       double criterion;
        double deviation;
+       double criterion;
 
        unsigned long long sum_y;
        unsigned long long sum_x;