client/server: request ETA instead of having the server send it automatically
authorJens Axboe <axboe@kernel.dk>
Sun, 9 Oct 2011 19:54:10 +0000 (21:54 +0200)
committerJens Axboe <axboe@kernel.dk>
Sun, 9 Oct 2011 19:54:10 +0000 (21:54 +0200)
Also changes the 'serial' of the command to a tag, that's passed
back and forth for commands that need to use it.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
client.c
eta.c
fio.c
server.c
server.h
stat.h

index dbc85226d90bbe38a5d21443e66c5db2b5ef2387..76935ca597b2fd6b17ec2aaa304ccaaa596771a6 100644 (file)
--- a/client.c
+++ b/client.c
@@ -32,15 +32,16 @@ struct fio_client {
        char *name;
 
        int state;
+
        int skip_newline;
        int is_sock;
+       int waiting_eta;
 
        uint16_t argc;
        char **argv;
 };
 
-static struct jobs_eta client_etas;
-static int received_etas;
+static struct timeval eta_tv;
 
 enum {
        Client_created          = 0,
@@ -50,6 +51,11 @@ enum {
        Client_exited           = 4,
 };
 
+struct client_eta {
+       struct jobs_eta eta;
+       unsigned int pending;
+};
+
 static FLIST_HEAD(client_list);
 
 #define FIO_CLIENT_HASH_BITS   7
@@ -104,6 +110,8 @@ static void remove_client(struct fio_client *client)
 
        fio_client_remove_hash(client);
 
+       /* FIXME: check ->waiting_eta and handle it */
+
        free(client->hostname);
        if (client->argv)
                free(client->argv);
@@ -310,7 +318,7 @@ static int send_client_cmd_line(struct fio_client *client)
        }
 
        clp->lines = cpu_to_le16(client->argc);
-       ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOBLINE, pdu, mem);
+       ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOBLINE, pdu, mem, 0);
        free(pdu);
        return ret;
 }
@@ -393,7 +401,7 @@ static int fio_client_send_ini(struct fio_client *client, const char *filename)
                return 1;
        }
 
-       ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, buf, sb.st_size);
+       ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, buf, sb.st_size, 0);
        free(buf);
        close(fd);
        return ret;
@@ -549,9 +557,8 @@ static void convert_jobs_eta(struct jobs_eta *je)
        je->eta_sec             = le64_to_cpu(je->eta_sec);
 }
 
-static void sum_jobs_eta(struct jobs_eta *je)
+static void sum_jobs_eta(struct jobs_eta *dst, struct jobs_eta *je)
 {
-       struct jobs_eta *dst = &client_etas;
        int i;
 
        dst->nr_running         += je->nr_running;
@@ -577,19 +584,17 @@ static void sum_jobs_eta(struct jobs_eta *je)
 static void handle_eta(struct fio_net_cmd *cmd)
 {
        struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
+       struct client_eta *eta = (struct client_eta *) cmd->tag;
+
+       dprint(FD_NET, "client: got eta tag %p, %d\n", eta, eta->pending);
 
        convert_jobs_eta(je);
+       sum_jobs_eta(&eta->eta, je);
 
-       if (nr_clients > 1) {
-               sum_jobs_eta(je);
-               received_etas++;
-               if (received_etas == nr_clients) {
-                       received_etas = 0;
-                       display_thread_status(&client_etas);
-                       memset(&client_etas, 0, sizeof(client_etas));
-               }
-       } else
-               display_thread_status(je);
+       if (!--eta->pending) {
+               display_thread_status(&eta->eta);
+               free(eta);
+       }
 }
 
 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd)
@@ -679,6 +684,35 @@ static int handle_client(struct fio_client *client)
        return 1;
 }
 
+static void request_client_etas(void)
+{
+       struct fio_client *client;
+       struct flist_head *entry;
+       struct client_eta *eta;
+
+       dprint(FD_NET, "client: request eta (%d)\n", nr_clients);
+
+       /*
+        * We need to do something more clever about checking status
+        * of command being send, client haven't sent previous ETA
+        * already, etc.
+        */
+
+       eta = malloc(sizeof(*eta));
+       memset(&eta->eta, 0, sizeof(eta->eta));
+       eta->pending = nr_clients;
+
+       flist_for_each(entry, &client_list) {
+               client = flist_entry(entry, struct fio_client, list);
+
+               client->waiting_eta = 1;
+               fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_SEND_ETA,
+                                               (uint64_t) eta);
+       }
+
+       dprint(FD_NET, "client: requested eta tag %p\n", eta);
+}
+
 int fio_handle_clients(void)
 {
        struct fio_client *client;
@@ -686,6 +720,8 @@ int fio_handle_clients(void)
        struct pollfd *pfds;
        int i, ret = 0;
 
+       gettimeofday(&eta_tv, NULL);
+
        pfds = malloc(nr_clients * sizeof(struct pollfd));
 
        while (!exit_backend && nr_clients) {
@@ -701,6 +737,14 @@ int fio_handle_clients(void)
                assert(i == nr_clients);
 
                do {
+                       struct timeval tv;
+
+                       gettimeofday(&tv, NULL);
+                       if (mtime_since(&eta_tv, &tv) >= 900) {
+                               request_client_etas();
+                               memcpy(&eta_tv, &tv, sizeof(tv));
+                       }
+
                        ret = poll(pfds, nr_clients, 100);
                        if (ret < 0) {
                                if (errno == EINTR)
diff --git a/eta.c b/eta.c
index 5b912eb7166c81b1cb310e42a5c61701ecb6e9c0..b7f1fd62cffe88e2bdac92c5d325ec8fbfb25a55 100644 (file)
--- a/eta.c
+++ b/eta.c
@@ -230,7 +230,7 @@ static void calc_iops(unsigned long mtime, unsigned long long *io_iops,
  * Print status of the jobs we know about. This includes rate estimates,
  * ETA, thread state, etc.
  */
-int calc_thread_status(struct jobs_eta *je)
+int calc_thread_status(struct jobs_eta *je, int force)
 {
        struct thread_data *td;
        int i;
@@ -245,11 +245,13 @@ int calc_thread_status(struct jobs_eta *je)
        static struct timeval rate_prev_time, disp_prev_time;
        int i2p = 0;
 
-       if (temp_stall_ts || terse_output || eta_print == FIO_ETA_NEVER)
-               return 0;
+       if (!force) {
+               if (temp_stall_ts || terse_output || eta_print == FIO_ETA_NEVER)
+                       return 0;
 
-       if (!isatty(STDOUT_FILENO) && (eta_print != FIO_ETA_ALWAYS))
-               return 0;
+               if (!isatty(STDOUT_FILENO) && (eta_print != FIO_ETA_ALWAYS))
+                       return 0;
+       }
 
        if (!rate_io_bytes[0] && !rate_io_bytes[1])
                fill_start_time(&rate_prev_time);
@@ -332,7 +334,7 @@ int calc_thread_status(struct jobs_eta *je)
        /*
         * Allow a little slack, the target is to print it every 1000 msecs
         */
-       if (disp_time < 900)
+       if (!force && disp_time < 900)
                return 0;
 
        calc_rate(disp_time, io_bytes, disp_io_bytes, je->rate);
@@ -340,7 +342,7 @@ int calc_thread_status(struct jobs_eta *je)
 
        memcpy(&disp_prev_time, &now, sizeof(now));
 
-       if (!je->nr_running && !je->nr_pending)
+       if (!force && !je->nr_running && !je->nr_pending)
                return 0;
 
        je->nr_threads = thread_number;
@@ -421,7 +423,7 @@ void print_thread_status(void)
 
        memset(je, 0, sizeof(*je) + thread_number * sizeof(char));
 
-       if (calc_thread_status(je))
+       if (calc_thread_status(je, 0))
                display_thread_status(je);
 
        free(je);
diff --git a/fio.c b/fio.c
index 212b72e8706ac84b3ebe8c4fbaadc46b58eec688..ece897e5fa3bdab4cec8f411f429ded502418312 100644 (file)
--- a/fio.c
+++ b/fio.c
@@ -183,9 +183,7 @@ static void *disk_thread_main(void *data)
                        break;
                update_io_ticks();
 
-               if (is_backend)
-                       fio_server_send_status();
-               else
+               if (!is_backend)
                        print_thread_status();
        }
 
index a54db7bf5d385aee5dbbc9e66d159169681f4de3..5afd8d455f72c3178f9b9560719bd8e982cfca31 100644 (file)
--- a/server.c
+++ b/server.c
@@ -98,7 +98,7 @@ static int verify_convert_cmd(struct fio_net_cmd *cmd)
        cmd->version    = le16_to_cpu(cmd->version);
        cmd->opcode     = le16_to_cpu(cmd->opcode);
        cmd->flags      = le32_to_cpu(cmd->flags);
-       cmd->serial     = le64_to_cpu(cmd->serial);
+       cmd->tag        = le64_to_cpu(cmd->tag);
        cmd->pdu_len    = le32_to_cpu(cmd->pdu_len);
 
        switch (cmd->version) {
@@ -205,7 +205,8 @@ void fio_net_cmd_crc(struct fio_net_cmd *cmd)
                cmd->pdu_crc16 = __cpu_to_le16(crc16(cmd->payload, pdu_len));
 }
 
-int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size)
+int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size,
+                    uint64_t tag)
 {
        struct fio_net_cmd *cmd;
        size_t this_len;
@@ -218,7 +219,7 @@ int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size)
 
                cmd = malloc(sizeof(*cmd) + this_len);
 
-               fio_init_net_cmd(cmd, opcode, buf, this_len);
+               fio_init_net_cmd(cmd, opcode, buf, this_len, tag);
 
                if (this_len < size)
                        cmd->flags = __cpu_to_le32(FIO_NET_CMD_F_MORE);
@@ -234,11 +235,11 @@ int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size)
        return ret;
 }
 
-int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t serial)
+int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag)
 {
        struct fio_net_cmd cmd;
 
-       fio_init_net_cmd(&cmd, opcode, NULL, 0);
+       fio_init_net_cmd(&cmd, opcode, NULL, 0, tag);
        fio_net_cmd_crc(&cmd);
 
        return fio_send_data(sk, &cmd, sizeof(cmd));
@@ -324,7 +325,48 @@ static int handle_probe_cmd(struct fio_net_cmd *cmd)
        probe.os        = FIO_OS;
        probe.arch      = FIO_ARCH;
 
-       return fio_net_send_cmd(server_fd, FIO_NET_CMD_PROBE, &probe, sizeof(probe));
+       return fio_net_send_cmd(server_fd, FIO_NET_CMD_PROBE, &probe, sizeof(probe), 0);
+}
+
+static int handle_send_eta_cmd(struct fio_net_cmd *cmd)
+{
+       struct jobs_eta *je;
+       size_t size;
+       void *buf;
+       int i;
+
+       size = sizeof(*je) + thread_number * sizeof(char);
+       buf = malloc(size);
+       memset(buf, 0, size);
+       je = buf;
+
+       if (!calc_thread_status(je, 1)) {
+               free(je);
+               return 0;
+       }
+
+       dprint(FD_NET, "server sending status\n");
+
+       je->nr_running          = cpu_to_le32(je->nr_running);
+       je->nr_ramp             = cpu_to_le32(je->nr_ramp);
+       je->nr_pending          = cpu_to_le32(je->nr_pending);
+       je->files_open          = cpu_to_le32(je->files_open);
+       je->m_rate              = cpu_to_le32(je->m_rate);
+       je->t_rate              = cpu_to_le32(je->t_rate);
+       je->m_iops              = cpu_to_le32(je->m_iops);
+       je->t_iops              = cpu_to_le32(je->t_iops);
+
+       for (i = 0; i < 2; i++) {
+               je->rate[i]     = cpu_to_le32(je->rate[i]);
+               je->iops[i]     = cpu_to_le32(je->iops[i]);
+       }
+
+       je->elapsed_sec         = cpu_to_le32(je->nr_running);
+       je->eta_sec             = cpu_to_le64(je->eta_sec);
+
+       fio_net_send_cmd(server_fd, FIO_NET_CMD_ETA, buf, size, cmd->tag);
+       free(je);
+       return 0;
 }
 
 static int handle_command(struct fio_net_cmd *cmd)
@@ -349,6 +391,9 @@ static int handle_command(struct fio_net_cmd *cmd)
        case FIO_NET_CMD_PROBE:
                ret = handle_probe_cmd(cmd);
                break;
+       case FIO_NET_CMD_SEND_ETA:
+               ret = handle_send_eta_cmd(cmd);
+               break;
        default:
                log_err("fio: unknown opcode: %d\n", cmd->opcode);
                ret = 1;
@@ -477,7 +522,7 @@ out:
 int fio_server_text_output(const char *buf, unsigned int len)
 {
        if (server_fd != -1)
-               return fio_net_send_cmd(server_fd, FIO_NET_CMD_TEXT, buf, len);
+               return fio_net_send_cmd(server_fd, FIO_NET_CMD_TEXT, buf, len, 0);
 
        return fwrite(buf, len, 1, f_err);
 }
@@ -590,7 +635,7 @@ void fio_server_send_ts(struct thread_stat *ts, struct group_run_stats *rs)
 
        convert_gs(&p.rs, rs);
 
-       fio_net_send_cmd(server_fd, FIO_NET_CMD_TS, &p, sizeof(p));
+       fio_net_send_cmd(server_fd, FIO_NET_CMD_TS, &p, sizeof(p), 0);
 }
 
 void fio_server_send_gs(struct group_run_stats *rs)
@@ -600,47 +645,7 @@ void fio_server_send_gs(struct group_run_stats *rs)
        dprint(FD_NET, "server sending group run stats\n");
 
        convert_gs(&gs, rs);
-       fio_net_send_cmd(server_fd, FIO_NET_CMD_GS, &gs, sizeof(gs));
-}
-
-void fio_server_send_status(void)
-{
-       struct jobs_eta *je;
-       size_t size;
-       void *buf;
-       int i;
-
-       size = sizeof(*je) + thread_number * sizeof(char);
-       buf = malloc(size);
-       memset(buf, 0, size);
-       je = buf;
-
-       if (!calc_thread_status(je)) {
-               free(je);
-               return;
-       }
-
-       dprint(FD_NET, "server sending status\n");
-
-       je->nr_running          = cpu_to_le32(je->nr_running);
-       je->nr_ramp             = cpu_to_le32(je->nr_ramp);
-       je->nr_pending          = cpu_to_le32(je->nr_pending);
-       je->files_open          = cpu_to_le32(je->files_open);
-       je->m_rate              = cpu_to_le32(je->m_rate);
-       je->t_rate              = cpu_to_le32(je->t_rate);
-       je->m_iops              = cpu_to_le32(je->m_iops);
-       je->t_iops              = cpu_to_le32(je->t_iops);
-
-       for (i = 0; i < 2; i++) {
-               je->rate[i]     = cpu_to_le32(je->rate[i]);
-               je->iops[i]     = cpu_to_le32(je->iops[i]);
-       }
-
-       je->elapsed_sec         = cpu_to_le32(je->nr_running);
-       je->eta_sec             = cpu_to_le64(je->eta_sec);
-
-       fio_net_send_cmd(server_fd, FIO_NET_CMD_ETA, buf, size);
-       free(je);
+       fio_net_send_cmd(server_fd, FIO_NET_CMD_GS, &gs, sizeof(gs), 0);
 }
 
 int fio_server_log(const char *format, ...)
index d68cbf52dbbaa3f90f0a8fc283ef79e84b1be57a..9f7eeaecc1340f3adc5fe9e90711eff59c714c34 100644 (file)
--- a/server.h
+++ b/server.h
@@ -14,7 +14,7 @@ struct fio_net_cmd {
        uint16_t version;       /* protocol version */
        uint16_t opcode;        /* command opcode */
        uint32_t flags;         /* modifier flags */
-       uint64_t serial;        /* serial number */
+       uint64_t tag;           /* passed back on reply */
        uint32_t pdu_len;       /* length of post-cmd layload */
        /*
         * These must be immediately before the payload, anything before
@@ -26,7 +26,7 @@ struct fio_net_cmd {
 };
 
 enum {
-       FIO_SERVER_VER          = 4,
+       FIO_SERVER_VER          = 5,
 
        FIO_SERVER_MAX_PDU      = 1024,
 
@@ -37,10 +37,11 @@ enum {
        FIO_NET_CMD_TEXT        = 5,
        FIO_NET_CMD_TS          = 6,
        FIO_NET_CMD_GS          = 7,
-       FIO_NET_CMD_ETA         = 8,
-       FIO_NET_CMD_PROBE       = 9,
-       FIO_NET_CMD_START       = 10,
-       FIO_NET_CMD_STOP        = 11,
+       FIO_NET_CMD_SEND_ETA    = 8,
+       FIO_NET_CMD_ETA         = 9,
+       FIO_NET_CMD_PROBE       = 10,
+       FIO_NET_CMD_START       = 11,
+       FIO_NET_CMD_STOP        = 12,
 
        FIO_NET_CMD_F_MORE      = 1UL << 0,
 
@@ -77,8 +78,8 @@ struct cmd_line_pdu {
 extern int fio_start_server(int);
 extern int fio_server_text_output(const char *, unsigned int len);
 extern int fio_server_log(const char *format, ...);
-extern int fio_net_send_cmd(int, uint16_t, const void *, off_t);
-extern int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t serial);
+extern int fio_net_send_cmd(int, uint16_t, const void *, off_t, uint64_t);
+extern int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag);
 extern void fio_server_set_arg(const char *);
 extern int fio_server_parse_string(const char *, char **, int *, int *, struct in_addr *);
 
@@ -86,7 +87,6 @@ struct thread_stat;
 struct group_run_stats;
 extern void fio_server_send_ts(struct thread_stat *, struct group_run_stats *);
 extern void fio_server_send_gs(struct group_run_stats *);
-extern void fio_server_send_status(void);
 extern void fio_server_idle_loop(void);
 
 extern int fio_clients_connect(void);
@@ -104,12 +104,14 @@ extern int exit_backend;
 extern int fio_net_port;
 
 static inline void fio_init_net_cmd(struct fio_net_cmd *cmd, uint16_t opcode,
-                                   const void *pdu, uint32_t pdu_len)
+                                   const void *pdu, uint32_t pdu_len,
+                                   uint64_t tag)
 {
        memset(cmd, 0, sizeof(*cmd));
 
        cmd->version    = __cpu_to_le16(FIO_SERVER_VER);
        cmd->opcode     = cpu_to_le16(opcode);
+       cmd->tag        = cpu_to_le64(tag);
 
        if (pdu) {
                cmd->pdu_len    = cpu_to_le32(pdu_len);
diff --git a/stat.h b/stat.h
index 68bc97022563ecefb92629d0d06f16478e0f967a..b564ebf2e30fb418bea8ad89eba5d71fec6fae65 100644 (file)
--- a/stat.h
+++ b/stat.h
@@ -190,7 +190,7 @@ struct jobs_eta {
 
 extern void show_thread_status(struct thread_stat *ts, struct group_run_stats *rs);
 extern void show_group_stats(struct group_run_stats *rs);
-extern int calc_thread_status(struct jobs_eta *je);
+extern int calc_thread_status(struct jobs_eta *je, int force);
 extern void display_thread_status(struct jobs_eta *je);
 
 #endif