client/server: track and handle command timeouts
authorJens Axboe <axboe@kernel.dk>
Tue, 11 Oct 2011 08:15:51 +0000 (10:15 +0200)
committerJens Axboe <axboe@kernel.dk>
Tue, 11 Oct 2011 08:15:51 +0000 (10:15 +0200)
Signed-off-by: Jens Axboe <axboe@kernel.dk>
client.c
server.c
server.h

index 358903bb1f29bff18df48b67c189e38c7d05149f..130aeafc1a493efc8d3c92f63049200b9724a177 100644 (file)
--- a/client.c
+++ b/client.c
@@ -45,6 +45,8 @@ struct fio_client {
        struct flist_head eta_list;
        struct client_eta *eta_in_flight;
 
+       struct flist_head cmd_list;
+
        uint16_t argc;
        char **argv;
 };
@@ -188,6 +190,7 @@ int fio_client_add(const char *hostname, void **cookie)
        INIT_FLIST_HEAD(&client->hash_list);
        INIT_FLIST_HEAD(&client->arg_list);
        INIT_FLIST_HEAD(&client->eta_list);
+       INIT_FLIST_HEAD(&client->cmd_list);
 
        if (fio_server_parse_string(hostname, &client->hostname,
                                        &client->is_sock, &client->port,
@@ -266,6 +269,8 @@ static int fio_client_connect(struct fio_client *client)
        else
                fd = fio_client_connect_ip(client);
 
+       dprint(FD_NET, "client: %s connected %d\n", client->hostname, fd);
+
        if (fd < 0)
                return 1;
 
@@ -285,7 +290,7 @@ void fio_clients_terminate(void)
        flist_for_each(entry, &client_list) {
                client = flist_entry(entry, struct fio_client, list);
 
-               fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_QUIT, 0);
+               fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_QUIT, 0, NULL);
        }
 }
 
@@ -314,8 +319,7 @@ static void probe_client(struct fio_client *client)
 {
        dprint(FD_NET, "client: send probe\n");
 
-       fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_PROBE, 0);
-       handle_client(client);
+       fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_PROBE, 0, &client->cmd_list);
 }
 
 static int send_client_cmd_line(struct fio_client *client)
@@ -631,6 +635,30 @@ static void dec_jobs_eta(struct client_eta *eta)
        }
 }
 
+static void remove_reply_cmd(struct fio_client *client, struct fio_net_cmd *cmd)
+{
+       struct fio_net_int_cmd *icmd = NULL;
+       struct flist_head *entry;
+
+       flist_for_each(entry, &client->cmd_list) {
+               icmd = flist_entry(entry, struct fio_net_int_cmd, list);
+
+               if (cmd->tag == (uint64_t) icmd)
+                       break;
+
+               icmd = NULL;
+       }
+
+       if (!icmd) {
+               log_err("fio: client: unable to find matching tag\n");
+               return;
+       }
+
+       flist_del(&icmd->list);
+       cmd->tag = icmd->saved_tag;
+       free(icmd);
+}
+
 static void handle_eta(struct fio_client *client, struct fio_net_cmd *cmd)
 {
        struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
@@ -679,8 +707,8 @@ static int handle_client(struct fio_client *client)
        if (!cmd)
                return 0;
 
-       dprint(FD_NET, "client: got cmd op %d from %s\n",
-                                       cmd->opcode, client->hostname);
+       dprint(FD_NET, "client: got cmd op %s from %s\n",
+                               fio_server_op(cmd->opcode), client->hostname);
 
        switch (cmd->opcode) {
        case FIO_NET_CMD_QUIT:
@@ -711,10 +739,12 @@ static int handle_client(struct fio_client *client)
                free(cmd);
                break;
        case FIO_NET_CMD_ETA:
+               remove_reply_cmd(client, cmd);
                handle_eta(client, cmd);
                free(cmd);
                break;
        case FIO_NET_CMD_PROBE:
+               remove_reply_cmd(client, cmd);
                handle_probe(client, cmd);
                free(cmd);
                break;
@@ -727,7 +757,7 @@ static int handle_client(struct fio_client *client)
                free(cmd);
                break;
        default:
-               log_err("fio: unknown client op: %d\n", cmd->opcode);
+               log_err("fio: unknown client op: %s\n", fio_server_op(cmd->opcode));
                free(cmd);
                break;
        }
@@ -760,7 +790,7 @@ static void request_client_etas(void)
                flist_add_tail(&client->eta_list, &eta_list);
                client->eta_in_flight = eta;
                fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_SEND_ETA,
-                                               (uint64_t) eta);
+                                       (uint64_t) eta, &client->cmd_list);
        }
 
        while (skipped--)
@@ -769,6 +799,55 @@ static void request_client_etas(void)
        dprint(FD_NET, "client: requested eta tag %p\n", eta);
 }
 
+static int client_check_cmd_timeout(struct fio_client *client,
+                                   struct timeval *now)
+{
+       struct fio_net_int_cmd *cmd;
+       struct flist_head *entry, *tmp;
+       int ret = 0;
+
+       flist_for_each_safe(entry, tmp, &client->cmd_list) {
+               cmd = flist_entry(entry, struct fio_net_int_cmd, list);
+
+               if (mtime_since(&cmd->tv, now) < FIO_NET_CLIENT_TIMEOUT)
+                       continue;
+
+               log_err("fio: client %s, timeout on cmd %s\n", client->hostname,
+                                               fio_server_op(cmd->cmd.opcode));
+               flist_del(&cmd->list);
+               free(cmd);
+               ret = 1;
+       }
+
+       return flist_empty(&client->cmd_list) && ret;
+}
+
+static int fio_client_timed_out(void)
+{
+       struct fio_client *client;
+       struct flist_head *entry, *tmp;
+       struct timeval tv;
+       int ret = 0;
+
+       gettimeofday(&tv, NULL);
+
+       flist_for_each_safe(entry, tmp, &client_list) {
+               client = flist_entry(entry, struct fio_client, list);
+
+               if (flist_empty(&client->cmd_list))
+                       continue;
+
+               if (!client_check_cmd_timeout(client, &tv))
+                       continue;
+
+               log_err("fio: client %s timed out\n", client->hostname);
+               remove_client(client);
+               ret = 1;
+       }
+
+       return ret;
+}
+
 int fio_handle_clients(void)
 {
        struct fio_client *client;
@@ -799,6 +878,9 @@ int fio_handle_clients(void)
                        if (mtime_since(&eta_tv, &tv) >= 900) {
                                request_client_etas();
                                memcpy(&eta_tv, &tv, sizeof(tv));
+
+                               if (fio_client_timed_out())
+                                       break;
                        }
 
                        ret = poll(pfds, nr_clients, 100);
index 5506ca9fedbbb860f7df89b02147d507a0d394ec..339bb66a6f3b980e6a031434e23a5344f544b9c5 100644 (file)
--- a/server.c
+++ b/server.c
@@ -33,6 +33,33 @@ static char *fio_server_arg;
 static char *bind_sock;
 static struct sockaddr_in saddr_in;
 
+static const char *fio_server_ops[FIO_NET_CMD_NR] = {
+       "",
+       "QUIT",
+       "EXIT",
+       "JOB",
+       "JOBLINE",
+       "TEXT",
+       "TS",
+       "GS",
+       "SEND_ETA",
+       "ETA",
+       "PROBE",
+       "START",
+       "STOP"
+};
+
+const char *fio_server_op(unsigned int op)
+{
+       static char buf[32];
+
+       if (op < FIO_NET_CMD_NR)
+               return fio_server_ops[op];
+
+       sprintf(buf, "UNKNOWN/%d", op);
+       return buf;
+}
+
 int fio_send_data(int sk, const void *p, unsigned int len)
 {
        assert(len <= sizeof(struct fio_net_cmd) + FIO_SERVER_MAX_PDU);
@@ -243,7 +270,7 @@ int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size,
        return ret;
 }
 
-int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag)
+static int fio_net_send_simple_stack_cmd(int sk, uint16_t opcode, uint64_t tag)
 {
        struct fio_net_cmd cmd;
 
@@ -253,10 +280,42 @@ int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag)
        return fio_send_data(sk, &cmd, sizeof(cmd));
 }
 
+/*
+ * If 'list' is non-NULL, then allocate and store the sent command for
+ * later verification.
+ */
+int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag,
+                           struct flist_head *list)
+{
+       struct fio_net_int_cmd *cmd;
+       int ret;
+
+       if (!list)
+               return fio_net_send_simple_stack_cmd(sk, opcode, tag);
+
+       cmd = malloc(sizeof(*cmd));
+
+       fio_init_net_cmd(&cmd->cmd, opcode, NULL, 0, (uint64_t) cmd);
+       fio_net_cmd_crc(&cmd->cmd);
+
+       INIT_FLIST_HEAD(&cmd->list);
+       gettimeofday(&cmd->tv, NULL);
+       cmd->saved_tag = tag;
+
+       ret = fio_send_data(sk, &cmd->cmd, sizeof(cmd->cmd));
+       if (ret) {
+               free(cmd);
+               return ret;
+       }
+
+       flist_add_tail(&cmd->list, list);
+       return 0;
+}
+
 static int fio_server_send_quit_cmd(void)
 {
        dprint(FD_NET, "server: sending quit\n");
-       return fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_QUIT, 0);
+       return fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_QUIT, 0, NULL);
 }
 
 static int handle_job_cmd(struct fio_net_cmd *cmd)
@@ -269,7 +328,7 @@ static int handle_job_cmd(struct fio_net_cmd *cmd)
                return -1;
        }
 
-       fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_START, 0);
+       fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_START, 0, NULL);
 
        ret = exec_run();
        fio_server_send_quit_cmd();
@@ -309,7 +368,7 @@ static int handle_jobline_cmd(struct fio_net_cmd *cmd)
 
        free(argv);
 
-       fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_START, 0);
+       fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_START, 0, NULL);
 
        ret = exec_run();
        fio_server_send_quit_cmd();
@@ -321,6 +380,8 @@ static int handle_probe_cmd(struct fio_net_cmd *cmd)
 {
        struct cmd_probe_pdu probe;
 
+       dprint(FD_NET, "server: sending probe reply\n");
+
        memset(&probe, 0, sizeof(probe));
        gethostname((char *) probe.hostname, sizeof(probe.hostname));
 #ifdef FIO_BIG_ENDIAN
@@ -333,7 +394,7 @@ static int handle_probe_cmd(struct fio_net_cmd *cmd)
        probe.os        = FIO_OS;
        probe.arch      = FIO_ARCH;
 
-       return fio_net_send_cmd(server_fd, FIO_NET_CMD_PROBE, &probe, sizeof(probe), 0);
+       return fio_net_send_cmd(server_fd, FIO_NET_CMD_PROBE, &probe, sizeof(probe), cmd->tag);
 }
 
 static int handle_send_eta_cmd(struct fio_net_cmd *cmd)
@@ -379,7 +440,8 @@ static int handle_command(struct fio_net_cmd *cmd)
 {
        int ret;
 
-       dprint(FD_NET, "server: got opcode %d, pdu=%u\n", cmd->opcode, cmd->pdu_len);
+       dprint(FD_NET, "server: got op [%s], pdu=%u, tag=%lx\n",
+                       fio_server_op(cmd->opcode), cmd->pdu_len, cmd->tag);
 
        switch (cmd->opcode) {
        case FIO_NET_CMD_QUIT:
@@ -401,7 +463,7 @@ static int handle_command(struct fio_net_cmd *cmd)
                ret = handle_send_eta_cmd(cmd);
                break;
        default:
-               log_err("fio: unknown opcode: %d\n", cmd->opcode);
+               log_err("fio: unknown opcode: %s\n",fio_server_op(cmd->opcode));
                ret = 1;
        }
 
@@ -758,7 +820,7 @@ static int fio_init_server_connection(void)
 
        log_info("fio: server listening on %s\n", bind_str);
 
-       if (listen(sk, 1) < 0) {
+       if (listen(sk, 0) < 0) {
                log_err("fio: listen: %s\n", strerror(errno));
                return -1;
        }
index ea888af46c4bcf7e19947088cd2ee02db7cf5d7a..cd07a85d627881fd7c379439f98103e20e676e30 100644 (file)
--- a/server.h
+++ b/server.h
@@ -3,6 +3,7 @@
 
 #include <inttypes.h>
 #include <string.h>
+#include <sys/time.h>
 
 #include "stat.h"
 #include "os/os.h"
@@ -25,6 +26,13 @@ struct fio_net_cmd {
        uint8_t payload[0];     /* payload */
 };
 
+struct fio_net_int_cmd {
+       struct fio_net_cmd cmd;
+       struct flist_head list;
+       struct timeval tv;
+       uint64_t saved_tag;
+};
+
 enum {
        FIO_SERVER_VER          = 5,
 
@@ -42,12 +50,15 @@ enum {
        FIO_NET_CMD_PROBE       = 10,
        FIO_NET_CMD_START       = 11,
        FIO_NET_CMD_STOP        = 12,
+       FIO_NET_CMD_NR          = 13,
 
        FIO_NET_CMD_F_MORE      = 1UL << 0,
 
        /* crc does not include the crc fields */
        FIO_NET_CMD_CRC_SZ      = sizeof(struct fio_net_cmd) -
                                        2 * sizeof(uint16_t),
+
+       FIO_NET_CLIENT_TIMEOUT  = 5000,
 };
 
 struct cmd_ts_pdu {
@@ -79,9 +90,10 @@ extern int fio_start_server(char *);
 extern int fio_server_text_output(const char *, size_t);
 extern int fio_server_log(const char *format, ...);
 extern int fio_net_send_cmd(int, uint16_t, const void *, off_t, uint64_t);
-extern int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag);
+extern int fio_net_send_simple_cmd(int, uint16_t, uint64_t, struct flist_head *);
 extern void fio_server_set_arg(const char *);
 extern int fio_server_parse_string(const char *, char **, int *, int *, struct in_addr *);
+extern const char *fio_server_op(unsigned int);
 
 struct thread_stat;
 struct group_run_stats;