client: properly assign client eta in flight
[fio.git] / client.c
index dbc85226d90bbe38a5d21443e66c5db2b5ef2387..097fcc57adfc763afdeb361d16d38dc77d9ad011 100644 (file)
--- a/client.c
+++ b/client.c
 #include "flist.h"
 #include "hash.h"
 
+struct client_eta {
+       struct jobs_eta eta;
+       unsigned int pending;
+};
+
 struct fio_client {
        struct flist_head list;
        struct flist_head hash_list;
@@ -32,15 +37,18 @@ struct fio_client {
        char *name;
 
        int state;
+
        int skip_newline;
        int is_sock;
 
+       struct flist_head eta_list;
+       struct client_eta *eta_in_flight;
+
        uint16_t argc;
        char **argv;
 };
 
-static struct jobs_eta client_etas;
-static int received_etas;
+static struct timeval eta_tv;
 
 enum {
        Client_created          = 0,
@@ -51,6 +59,7 @@ enum {
 };
 
 static FLIST_HEAD(client_list);
+static FLIST_HEAD(eta_list);
 
 #define FIO_CLIENT_HASH_BITS   7
 #define FIO_CLIENT_HASH_SZ     (1 << FIO_CLIENT_HASH_BITS)
@@ -58,6 +67,7 @@ static FLIST_HEAD(client_list);
 static struct flist_head client_hash[FIO_CLIENT_HASH_SZ];
 
 static int handle_client(struct fio_client *client);
+static void dec_jobs_eta(struct client_eta *eta);
 
 static void fio_client_add_hash(struct fio_client *client)
 {
@@ -104,6 +114,11 @@ static void remove_client(struct fio_client *client)
 
        fio_client_remove_hash(client);
 
+       if (!flist_empty(&client->eta_list)) {
+               flist_del_init(&client->eta_list);
+               dec_jobs_eta(client->eta_in_flight);
+       }
+
        free(client->hostname);
        if (client->argv)
                free(client->argv);
@@ -144,6 +159,7 @@ int fio_client_add(const char *hostname, void **cookie)
 
        INIT_FLIST_HEAD(&client->list);
        INIT_FLIST_HEAD(&client->hash_list);
+       INIT_FLIST_HEAD(&client->eta_list);
 
        if (fio_server_parse_string(hostname, &client->hostname,
                                        &client->is_sock, &client->port,
@@ -279,17 +295,22 @@ static int send_client_cmd_line(struct fio_client *client)
        struct cmd_single_line_pdu *cslp;
        struct cmd_line_pdu *clp;
        unsigned long offset;
+       unsigned int *lens;
        void *pdu;
        size_t mem;
        int i, ret;
 
        dprint(FD_NET, "client: send cmdline %d\n", client->argc);
 
+       lens = malloc(client->argc * sizeof(unsigned int));
+
        /*
         * Find out how much mem we need
         */
-       for (i = 0, mem = 0; i < client->argc; i++)
-               mem += strlen(client->argv[i]) + 1;
+       for (i = 0, mem = 0; i < client->argc; i++) {
+               lens[i] = strlen(client->argv[i]) + 1;
+               mem += lens[i];
+       }
 
        /*
         * We need one cmd_line_pdu, and argc number of cmd_single_line_pdu
@@ -301,7 +322,7 @@ static int send_client_cmd_line(struct fio_client *client)
        offset = sizeof(*clp);
 
        for (i = 0; i < client->argc; i++) {
-               uint16_t arg_len = strlen(client->argv[i]) + 1;
+               uint16_t arg_len = lens[i];
 
                cslp = pdu + offset;
                strcpy((char *) cslp->text, client->argv[i]);
@@ -309,8 +330,9 @@ static int send_client_cmd_line(struct fio_client *client)
                offset += sizeof(*cslp) + arg_len;
        }
 
+       free(lens);
        clp->lines = cpu_to_le16(client->argc);
-       ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOBLINE, pdu, mem);
+       ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOBLINE, pdu, mem, 0);
        free(pdu);
        return ret;
 }
@@ -393,7 +415,7 @@ static int fio_client_send_ini(struct fio_client *client, const char *filename)
                return 1;
        }
 
-       ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, buf, sb.st_size);
+       ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, buf, sb.st_size, 0);
        free(buf);
        close(fd);
        return ret;
@@ -549,9 +571,8 @@ static void convert_jobs_eta(struct jobs_eta *je)
        je->eta_sec             = le64_to_cpu(je->eta_sec);
 }
 
-static void sum_jobs_eta(struct jobs_eta *je)
+static void sum_jobs_eta(struct jobs_eta *dst, struct jobs_eta *je)
 {
-       struct jobs_eta *dst = &client_etas;
        int i;
 
        dst->nr_running         += je->nr_running;
@@ -574,22 +595,29 @@ static void sum_jobs_eta(struct jobs_eta *je)
                dst->eta_sec = je->eta_sec;
 }
 
-static void handle_eta(struct fio_net_cmd *cmd)
+static void dec_jobs_eta(struct client_eta *eta)
+{
+       if (!--eta->pending) {
+               display_thread_status(&eta->eta);
+               free(eta);
+       }
+}
+
+static void handle_eta(struct fio_client *client, struct fio_net_cmd *cmd)
 {
        struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
+       struct client_eta *eta = (struct client_eta *) cmd->tag;
 
-       convert_jobs_eta(je);
+       dprint(FD_NET, "client: got eta tag %p, %d\n", eta, eta->pending);
 
-       if (nr_clients > 1) {
-               sum_jobs_eta(je);
-               received_etas++;
-               if (received_etas == nr_clients) {
-                       received_etas = 0;
-                       display_thread_status(&client_etas);
-                       memset(&client_etas, 0, sizeof(client_etas));
-               }
-       } else
-               display_thread_status(je);
+       assert(client->eta_in_flight == eta);
+
+       client->eta_in_flight = NULL;
+       flist_del_init(&client->eta_list);
+
+       convert_jobs_eta(je);
+       sum_jobs_eta(&eta->eta, je);
+       dec_jobs_eta(eta);
 }
 
 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd)
@@ -655,7 +683,7 @@ static int handle_client(struct fio_client *client)
                free(cmd);
                break;
        case FIO_NET_CMD_ETA:
-               handle_eta(cmd);
+               handle_eta(client, cmd);
                free(cmd);
                break;
        case FIO_NET_CMD_PROBE:
@@ -679,6 +707,40 @@ static int handle_client(struct fio_client *client)
        return 1;
 }
 
+static void request_client_etas(void)
+{
+       struct fio_client *client;
+       struct flist_head *entry;
+       struct client_eta *eta;
+       int skipped = 0;
+
+       dprint(FD_NET, "client: request eta (%d)\n", nr_clients);
+
+       eta = malloc(sizeof(*eta));
+       memset(&eta->eta, 0, sizeof(eta->eta));
+       eta->pending = nr_clients;
+
+       flist_for_each(entry, &client_list) {
+               client = flist_entry(entry, struct fio_client, list);
+
+               if (!flist_empty(&client->eta_list)) {
+                       skipped++;
+                       continue;
+               }
+
+               assert(!client->eta_in_flight);
+               flist_add_tail(&client->eta_list, &eta_list);
+               client->eta_in_flight = eta;
+               fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_SEND_ETA,
+                                               (uint64_t) eta);
+       }
+
+       while (skipped--)
+               dec_jobs_eta(eta);
+
+       dprint(FD_NET, "client: requested eta tag %p\n", eta);
+}
+
 int fio_handle_clients(void)
 {
        struct fio_client *client;
@@ -686,6 +748,8 @@ int fio_handle_clients(void)
        struct pollfd *pfds;
        int i, ret = 0;
 
+       gettimeofday(&eta_tv, NULL);
+
        pfds = malloc(nr_clients * sizeof(struct pollfd));
 
        while (!exit_backend && nr_clients) {
@@ -701,6 +765,14 @@ int fio_handle_clients(void)
                assert(i == nr_clients);
 
                do {
+                       struct timeval tv;
+
+                       gettimeofday(&tv, NULL);
+                       if (mtime_since(&eta_tv, &tv) >= 900) {
+                               request_client_etas();
+                               memcpy(&eta_tv, &tv, sizeof(tv));
+                       }
+
                        ret = poll(pfds, nr_clients, 100);
                        if (ret < 0) {
                                if (errno == EINTR)