From af9c9fb34e420fc4d9cf317aa0f3cf6795a5a07f Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Sun, 9 Oct 2011 21:54:10 +0200 Subject: [PATCH] client/server: request ETA instead of having the server send it automatically Also changes the 'serial' of the command to a tag, that's passed back and forth for commands that need to use it. Signed-off-by: Jens Axboe --- client.c | 76 +++++++++++++++++++++++++++++++--------- eta.c | 18 +++++----- fio.c | 4 +-- server.c | 103 +++++++++++++++++++++++++++++-------------------------- server.h | 22 ++++++------ stat.h | 2 +- 6 files changed, 138 insertions(+), 87 deletions(-) diff --git a/client.c b/client.c index dbc85226..76935ca5 100644 --- a/client.c +++ b/client.c @@ -32,15 +32,16 @@ struct fio_client { char *name; int state; + int skip_newline; int is_sock; + int waiting_eta; uint16_t argc; char **argv; }; -static struct jobs_eta client_etas; -static int received_etas; +static struct timeval eta_tv; enum { Client_created = 0, @@ -50,6 +51,11 @@ enum { Client_exited = 4, }; +struct client_eta { + struct jobs_eta eta; + unsigned int pending; +}; + static FLIST_HEAD(client_list); #define FIO_CLIENT_HASH_BITS 7 @@ -104,6 +110,8 @@ static void remove_client(struct fio_client *client) fio_client_remove_hash(client); + /* FIXME: check ->waiting_eta and handle it */ + free(client->hostname); if (client->argv) free(client->argv); @@ -310,7 +318,7 @@ static int send_client_cmd_line(struct fio_client *client) } clp->lines = cpu_to_le16(client->argc); - ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOBLINE, pdu, mem); + ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOBLINE, pdu, mem, 0); free(pdu); return ret; } @@ -393,7 +401,7 @@ static int fio_client_send_ini(struct fio_client *client, const char *filename) return 1; } - ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, buf, sb.st_size); + ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, buf, sb.st_size, 0); free(buf); close(fd); return ret; @@ -549,9 +557,8 @@ static void convert_jobs_eta(struct jobs_eta *je) je->eta_sec = le64_to_cpu(je->eta_sec); } -static void sum_jobs_eta(struct jobs_eta *je) +static void sum_jobs_eta(struct jobs_eta *dst, struct jobs_eta *je) { - struct jobs_eta *dst = &client_etas; int i; dst->nr_running += je->nr_running; @@ -577,19 +584,17 @@ static void sum_jobs_eta(struct jobs_eta *je) static void handle_eta(struct fio_net_cmd *cmd) { struct jobs_eta *je = (struct jobs_eta *) cmd->payload; + struct client_eta *eta = (struct client_eta *) cmd->tag; + + dprint(FD_NET, "client: got eta tag %p, %d\n", eta, eta->pending); convert_jobs_eta(je); + sum_jobs_eta(&eta->eta, je); - if (nr_clients > 1) { - sum_jobs_eta(je); - received_etas++; - if (received_etas == nr_clients) { - received_etas = 0; - display_thread_status(&client_etas); - memset(&client_etas, 0, sizeof(client_etas)); - } - } else - display_thread_status(je); + if (!--eta->pending) { + display_thread_status(&eta->eta); + free(eta); + } } static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd) @@ -679,6 +684,35 @@ static int handle_client(struct fio_client *client) return 1; } +static void request_client_etas(void) +{ + struct fio_client *client; + struct flist_head *entry; + struct client_eta *eta; + + dprint(FD_NET, "client: request eta (%d)\n", nr_clients); + + /* + * We need to do something more clever about checking status + * of command being send, client haven't sent previous ETA + * already, etc. + */ + + eta = malloc(sizeof(*eta)); + memset(&eta->eta, 0, sizeof(eta->eta)); + eta->pending = nr_clients; + + flist_for_each(entry, &client_list) { + client = flist_entry(entry, struct fio_client, list); + + client->waiting_eta = 1; + fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_SEND_ETA, + (uint64_t) eta); + } + + dprint(FD_NET, "client: requested eta tag %p\n", eta); +} + int fio_handle_clients(void) { struct fio_client *client; @@ -686,6 +720,8 @@ int fio_handle_clients(void) struct pollfd *pfds; int i, ret = 0; + gettimeofday(&eta_tv, NULL); + pfds = malloc(nr_clients * sizeof(struct pollfd)); while (!exit_backend && nr_clients) { @@ -701,6 +737,14 @@ int fio_handle_clients(void) assert(i == nr_clients); do { + struct timeval tv; + + gettimeofday(&tv, NULL); + if (mtime_since(&eta_tv, &tv) >= 900) { + request_client_etas(); + memcpy(&eta_tv, &tv, sizeof(tv)); + } + ret = poll(pfds, nr_clients, 100); if (ret < 0) { if (errno == EINTR) diff --git a/eta.c b/eta.c index 5b912eb7..b7f1fd62 100644 --- a/eta.c +++ b/eta.c @@ -230,7 +230,7 @@ static void calc_iops(unsigned long mtime, unsigned long long *io_iops, * Print status of the jobs we know about. This includes rate estimates, * ETA, thread state, etc. */ -int calc_thread_status(struct jobs_eta *je) +int calc_thread_status(struct jobs_eta *je, int force) { struct thread_data *td; int i; @@ -245,11 +245,13 @@ int calc_thread_status(struct jobs_eta *je) static struct timeval rate_prev_time, disp_prev_time; int i2p = 0; - if (temp_stall_ts || terse_output || eta_print == FIO_ETA_NEVER) - return 0; + if (!force) { + if (temp_stall_ts || terse_output || eta_print == FIO_ETA_NEVER) + return 0; - if (!isatty(STDOUT_FILENO) && (eta_print != FIO_ETA_ALWAYS)) - return 0; + if (!isatty(STDOUT_FILENO) && (eta_print != FIO_ETA_ALWAYS)) + return 0; + } if (!rate_io_bytes[0] && !rate_io_bytes[1]) fill_start_time(&rate_prev_time); @@ -332,7 +334,7 @@ int calc_thread_status(struct jobs_eta *je) /* * Allow a little slack, the target is to print it every 1000 msecs */ - if (disp_time < 900) + if (!force && disp_time < 900) return 0; calc_rate(disp_time, io_bytes, disp_io_bytes, je->rate); @@ -340,7 +342,7 @@ int calc_thread_status(struct jobs_eta *je) memcpy(&disp_prev_time, &now, sizeof(now)); - if (!je->nr_running && !je->nr_pending) + if (!force && !je->nr_running && !je->nr_pending) return 0; je->nr_threads = thread_number; @@ -421,7 +423,7 @@ void print_thread_status(void) memset(je, 0, sizeof(*je) + thread_number * sizeof(char)); - if (calc_thread_status(je)) + if (calc_thread_status(je, 0)) display_thread_status(je); free(je); diff --git a/fio.c b/fio.c index 212b72e8..ece897e5 100644 --- a/fio.c +++ b/fio.c @@ -183,9 +183,7 @@ static void *disk_thread_main(void *data) break; update_io_ticks(); - if (is_backend) - fio_server_send_status(); - else + if (!is_backend) print_thread_status(); } diff --git a/server.c b/server.c index a54db7bf..5afd8d45 100644 --- a/server.c +++ b/server.c @@ -98,7 +98,7 @@ static int verify_convert_cmd(struct fio_net_cmd *cmd) cmd->version = le16_to_cpu(cmd->version); cmd->opcode = le16_to_cpu(cmd->opcode); cmd->flags = le32_to_cpu(cmd->flags); - cmd->serial = le64_to_cpu(cmd->serial); + cmd->tag = le64_to_cpu(cmd->tag); cmd->pdu_len = le32_to_cpu(cmd->pdu_len); switch (cmd->version) { @@ -205,7 +205,8 @@ void fio_net_cmd_crc(struct fio_net_cmd *cmd) cmd->pdu_crc16 = __cpu_to_le16(crc16(cmd->payload, pdu_len)); } -int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size) +int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size, + uint64_t tag) { struct fio_net_cmd *cmd; size_t this_len; @@ -218,7 +219,7 @@ int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size) cmd = malloc(sizeof(*cmd) + this_len); - fio_init_net_cmd(cmd, opcode, buf, this_len); + fio_init_net_cmd(cmd, opcode, buf, this_len, tag); if (this_len < size) cmd->flags = __cpu_to_le32(FIO_NET_CMD_F_MORE); @@ -234,11 +235,11 @@ int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size) return ret; } -int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t serial) +int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag) { struct fio_net_cmd cmd; - fio_init_net_cmd(&cmd, opcode, NULL, 0); + fio_init_net_cmd(&cmd, opcode, NULL, 0, tag); fio_net_cmd_crc(&cmd); return fio_send_data(sk, &cmd, sizeof(cmd)); @@ -324,7 +325,48 @@ static int handle_probe_cmd(struct fio_net_cmd *cmd) probe.os = FIO_OS; probe.arch = FIO_ARCH; - return fio_net_send_cmd(server_fd, FIO_NET_CMD_PROBE, &probe, sizeof(probe)); + return fio_net_send_cmd(server_fd, FIO_NET_CMD_PROBE, &probe, sizeof(probe), 0); +} + +static int handle_send_eta_cmd(struct fio_net_cmd *cmd) +{ + struct jobs_eta *je; + size_t size; + void *buf; + int i; + + size = sizeof(*je) + thread_number * sizeof(char); + buf = malloc(size); + memset(buf, 0, size); + je = buf; + + if (!calc_thread_status(je, 1)) { + free(je); + return 0; + } + + dprint(FD_NET, "server sending status\n"); + + je->nr_running = cpu_to_le32(je->nr_running); + je->nr_ramp = cpu_to_le32(je->nr_ramp); + je->nr_pending = cpu_to_le32(je->nr_pending); + je->files_open = cpu_to_le32(je->files_open); + je->m_rate = cpu_to_le32(je->m_rate); + je->t_rate = cpu_to_le32(je->t_rate); + je->m_iops = cpu_to_le32(je->m_iops); + je->t_iops = cpu_to_le32(je->t_iops); + + for (i = 0; i < 2; i++) { + je->rate[i] = cpu_to_le32(je->rate[i]); + je->iops[i] = cpu_to_le32(je->iops[i]); + } + + je->elapsed_sec = cpu_to_le32(je->nr_running); + je->eta_sec = cpu_to_le64(je->eta_sec); + + fio_net_send_cmd(server_fd, FIO_NET_CMD_ETA, buf, size, cmd->tag); + free(je); + return 0; } static int handle_command(struct fio_net_cmd *cmd) @@ -349,6 +391,9 @@ static int handle_command(struct fio_net_cmd *cmd) case FIO_NET_CMD_PROBE: ret = handle_probe_cmd(cmd); break; + case FIO_NET_CMD_SEND_ETA: + ret = handle_send_eta_cmd(cmd); + break; default: log_err("fio: unknown opcode: %d\n", cmd->opcode); ret = 1; @@ -477,7 +522,7 @@ out: int fio_server_text_output(const char *buf, unsigned int len) { if (server_fd != -1) - return fio_net_send_cmd(server_fd, FIO_NET_CMD_TEXT, buf, len); + return fio_net_send_cmd(server_fd, FIO_NET_CMD_TEXT, buf, len, 0); return fwrite(buf, len, 1, f_err); } @@ -590,7 +635,7 @@ void fio_server_send_ts(struct thread_stat *ts, struct group_run_stats *rs) convert_gs(&p.rs, rs); - fio_net_send_cmd(server_fd, FIO_NET_CMD_TS, &p, sizeof(p)); + fio_net_send_cmd(server_fd, FIO_NET_CMD_TS, &p, sizeof(p), 0); } void fio_server_send_gs(struct group_run_stats *rs) @@ -600,47 +645,7 @@ void fio_server_send_gs(struct group_run_stats *rs) dprint(FD_NET, "server sending group run stats\n"); convert_gs(&gs, rs); - fio_net_send_cmd(server_fd, FIO_NET_CMD_GS, &gs, sizeof(gs)); -} - -void fio_server_send_status(void) -{ - struct jobs_eta *je; - size_t size; - void *buf; - int i; - - size = sizeof(*je) + thread_number * sizeof(char); - buf = malloc(size); - memset(buf, 0, size); - je = buf; - - if (!calc_thread_status(je)) { - free(je); - return; - } - - dprint(FD_NET, "server sending status\n"); - - je->nr_running = cpu_to_le32(je->nr_running); - je->nr_ramp = cpu_to_le32(je->nr_ramp); - je->nr_pending = cpu_to_le32(je->nr_pending); - je->files_open = cpu_to_le32(je->files_open); - je->m_rate = cpu_to_le32(je->m_rate); - je->t_rate = cpu_to_le32(je->t_rate); - je->m_iops = cpu_to_le32(je->m_iops); - je->t_iops = cpu_to_le32(je->t_iops); - - for (i = 0; i < 2; i++) { - je->rate[i] = cpu_to_le32(je->rate[i]); - je->iops[i] = cpu_to_le32(je->iops[i]); - } - - je->elapsed_sec = cpu_to_le32(je->nr_running); - je->eta_sec = cpu_to_le64(je->eta_sec); - - fio_net_send_cmd(server_fd, FIO_NET_CMD_ETA, buf, size); - free(je); + fio_net_send_cmd(server_fd, FIO_NET_CMD_GS, &gs, sizeof(gs), 0); } int fio_server_log(const char *format, ...) diff --git a/server.h b/server.h index d68cbf52..9f7eeaec 100644 --- a/server.h +++ b/server.h @@ -14,7 +14,7 @@ struct fio_net_cmd { uint16_t version; /* protocol version */ uint16_t opcode; /* command opcode */ uint32_t flags; /* modifier flags */ - uint64_t serial; /* serial number */ + uint64_t tag; /* passed back on reply */ uint32_t pdu_len; /* length of post-cmd layload */ /* * These must be immediately before the payload, anything before @@ -26,7 +26,7 @@ struct fio_net_cmd { }; enum { - FIO_SERVER_VER = 4, + FIO_SERVER_VER = 5, FIO_SERVER_MAX_PDU = 1024, @@ -37,10 +37,11 @@ enum { FIO_NET_CMD_TEXT = 5, FIO_NET_CMD_TS = 6, FIO_NET_CMD_GS = 7, - FIO_NET_CMD_ETA = 8, - FIO_NET_CMD_PROBE = 9, - FIO_NET_CMD_START = 10, - FIO_NET_CMD_STOP = 11, + FIO_NET_CMD_SEND_ETA = 8, + FIO_NET_CMD_ETA = 9, + FIO_NET_CMD_PROBE = 10, + FIO_NET_CMD_START = 11, + FIO_NET_CMD_STOP = 12, FIO_NET_CMD_F_MORE = 1UL << 0, @@ -77,8 +78,8 @@ struct cmd_line_pdu { extern int fio_start_server(int); extern int fio_server_text_output(const char *, unsigned int len); extern int fio_server_log(const char *format, ...); -extern int fio_net_send_cmd(int, uint16_t, const void *, off_t); -extern int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t serial); +extern int fio_net_send_cmd(int, uint16_t, const void *, off_t, uint64_t); +extern int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag); extern void fio_server_set_arg(const char *); extern int fio_server_parse_string(const char *, char **, int *, int *, struct in_addr *); @@ -86,7 +87,6 @@ struct thread_stat; struct group_run_stats; extern void fio_server_send_ts(struct thread_stat *, struct group_run_stats *); extern void fio_server_send_gs(struct group_run_stats *); -extern void fio_server_send_status(void); extern void fio_server_idle_loop(void); extern int fio_clients_connect(void); @@ -104,12 +104,14 @@ extern int exit_backend; extern int fio_net_port; static inline void fio_init_net_cmd(struct fio_net_cmd *cmd, uint16_t opcode, - const void *pdu, uint32_t pdu_len) + const void *pdu, uint32_t pdu_len, + uint64_t tag) { memset(cmd, 0, sizeof(*cmd)); cmd->version = __cpu_to_le16(FIO_SERVER_VER); cmd->opcode = cpu_to_le16(opcode); + cmd->tag = cpu_to_le64(tag); if (pdu) { cmd->pdu_len = cpu_to_le32(pdu_len); diff --git a/stat.h b/stat.h index 68bc9702..b564ebf2 100644 --- a/stat.h +++ b/stat.h @@ -190,7 +190,7 @@ struct jobs_eta { extern void show_thread_status(struct thread_stat *ts, struct group_run_stats *rs); extern void show_group_stats(struct group_run_stats *rs); -extern int calc_thread_status(struct jobs_eta *je); +extern int calc_thread_status(struct jobs_eta *je, int force); extern void display_thread_status(struct jobs_eta *je); #endif -- 2.25.1