X-Git-Url: https://git.kernel.dk/?p=fio.git;a=blobdiff_plain;f=server.c;h=6e96b77a46432cc3371d9cf1fb4f28e8555a39c9;hp=8e5ca50f45faf5e7480c85d1e84f45f7b014afb9;hb=3e260a46ea9a8de224c3d0a29a608da3440f284a;hpb=6a5c4d92ce70a05d2fee981b6f133373c0ef62f8 diff --git a/server.c b/server.c index 8e5ca50f..6e96b77a 100644 --- a/server.c +++ b/server.c @@ -11,12 +11,15 @@ #include #include #include +#include #include #include #include #include #include +#ifdef CONFIG_ZLIB #include +#endif #include "fio.h" #include "server.h" @@ -33,6 +36,12 @@ static char *bind_sock; static struct sockaddr_in saddr_in; static struct sockaddr_in6 saddr_in6; static int use_ipv6; +#ifdef CONFIG_ZLIB +static unsigned int has_zlib = 1; +#else +static unsigned int has_zlib = 0; +#endif +static unsigned int use_zlib; struct fio_fork_item { struct flist_head list; @@ -42,12 +51,6 @@ struct fio_fork_item { pid_t pid; }; -/* Created on fork on new connection */ -static FLIST_HEAD(conn_list); - -/* Created on job fork from connection */ -static FLIST_HEAD(job_list); - static const char *fio_server_ops[FIO_NET_CMD_NR] = { "", "QUIT", @@ -293,8 +296,9 @@ struct fio_net_cmd *fio_net_recv_cmd(int sk) static void add_reply(uint64_t tag, struct flist_head *list) { - struct fio_net_cmd_reply *reply = (struct fio_net_cmd_reply *) tag; + struct fio_net_cmd_reply *reply; + reply = (struct fio_net_cmd_reply *) (uintptr_t) tag; flist_add_tail(&reply->list, list); } @@ -313,8 +317,9 @@ static uint64_t alloc_reply(uint64_t tag, uint16_t opcode) static void free_reply(uint64_t tag) { - struct fio_net_cmd_reply *reply = (struct fio_net_cmd_reply *) tag; + struct fio_net_cmd_reply *reply; + reply = (struct fio_net_cmd_reply *) (uintptr_t) tag; free(reply); } @@ -460,16 +465,16 @@ static void fio_server_add_fork_item(pid_t pid, struct flist_head *list) flist_add_tail(&ffi->list, list); } -static void fio_server_add_conn_pid(pid_t pid) +static void fio_server_add_conn_pid(struct flist_head *conn_list, pid_t pid) { - dprint(FD_NET, "server: forked off connection job (pid=%u)\n", pid); - fio_server_add_fork_item(pid, &conn_list); + dprint(FD_NET, "server: forked off connection job (pid=%u)\n", (int) pid); + fio_server_add_fork_item(pid, conn_list); } -static void fio_server_add_job_pid(pid_t pid) +static void fio_server_add_job_pid(struct flist_head *job_list, pid_t pid) { - dprint(FD_NET, "server: forked off job job (pid=%u)\n", pid); - fio_server_add_fork_item(pid, &job_list); + dprint(FD_NET, "server: forked off job job (pid=%u)\n", (int) pid); + fio_server_add_fork_item(pid, job_list); } static void fio_server_check_fork_item(struct fio_fork_item *ffi) @@ -479,7 +484,7 @@ static void fio_server_check_fork_item(struct fio_fork_item *ffi) ret = waitpid(ffi->pid, &status, WNOHANG); if (ret < 0) { if (errno == ECHILD) { - log_err("fio: connection pid %u disappeared\n", ffi->pid); + log_err("fio: connection pid %u disappeared\n", (int) ffi->pid); ffi->exited = 1; } else log_err("fio: waitpid: %s\n", strerror(errno)); @@ -498,7 +503,7 @@ static void fio_server_check_fork_item(struct fio_fork_item *ffi) static void fio_server_fork_item_done(struct fio_fork_item *ffi) { - dprint(FD_NET, "pid %u exited, sig=%u, exitval=%d\n", ffi->pid, ffi->signal, ffi->exitval); + dprint(FD_NET, "pid %u exited, sig=%u, exitval=%d\n", (int) ffi->pid, ffi->signal, ffi->exitval); /* * Fold STOP and QUIT... @@ -524,17 +529,17 @@ static void fio_server_check_fork_items(struct flist_head *list) } } -static void fio_server_check_jobs(void) +static void fio_server_check_jobs(struct flist_head *job_list) { - fio_server_check_fork_items(&job_list); + fio_server_check_fork_items(job_list); } -static void fio_server_check_conns(void) +static void fio_server_check_conns(struct flist_head *conn_list) { - fio_server_check_fork_items(&conn_list); + fio_server_check_fork_items(conn_list); } -static int handle_run_cmd(struct fio_net_cmd *cmd) +static int handle_run_cmd(struct flist_head *job_list, struct fio_net_cmd *cmd) { pid_t pid; int ret; @@ -543,7 +548,7 @@ static int handle_run_cmd(struct fio_net_cmd *cmd) pid = fork(); if (pid) { - fio_server_add_job_pid(pid); + fio_server_add_job_pid(job_list, pid); return 0; } @@ -614,7 +619,8 @@ static int handle_jobline_cmd(struct fio_net_cmd *cmd) static int handle_probe_cmd(struct fio_net_cmd *cmd) { - struct cmd_probe_pdu probe; + struct cmd_client_probe_pdu *pdu = (struct cmd_client_probe_pdu *) cmd->payload; + struct cmd_probe_reply_pdu probe; uint64_t tag = cmd->tag; dprint(FD_NET, "server: sending probe reply\n"); @@ -630,7 +636,17 @@ static int handle_probe_cmd(struct fio_net_cmd *cmd) probe.arch = FIO_ARCH; probe.bpp = sizeof(void *); probe.cpus = __cpu_to_le32(cpus_online()); - probe.flags = 0; + + /* + * If the client supports compression and we do too, then enable it + */ + if (has_zlib && le64_to_cpu(pdu->flags) & FIO_PROBE_FLAG_ZLIB) { + probe.flags = __cpu_to_le64(FIO_PROBE_FLAG_ZLIB); + use_zlib = 1; + } else { + probe.flags = 0; + use_zlib = 0; + } return fio_net_send_cmd(server_fd, FIO_NET_CMD_PROBE, &probe, sizeof(probe), &tag, NULL); } @@ -659,6 +675,7 @@ static int handle_send_eta_cmd(struct fio_net_cmd *cmd) je->nr_running = cpu_to_le32(je->nr_running); je->nr_ramp = cpu_to_le32(je->nr_ramp); je->nr_pending = cpu_to_le32(je->nr_pending); + je->nr_setting_up = cpu_to_le32(je->nr_setting_up); je->files_open = cpu_to_le32(je->files_open); for (i = 0; i < DDIR_RWDIR_CNT; i++) { @@ -666,12 +683,15 @@ static int handle_send_eta_cmd(struct fio_net_cmd *cmd) je->t_rate[i] = cpu_to_le32(je->t_rate[i]); je->m_iops[i] = cpu_to_le32(je->m_iops[i]); je->t_iops[i] = cpu_to_le32(je->t_iops[i]); + je->rate[i] = cpu_to_le32(je->rate[i]); + je->iops[i] = cpu_to_le32(je->iops[i]); } je->elapsed_sec = cpu_to_le64(je->elapsed_sec); je->eta_sec = cpu_to_le64(je->eta_sec); je->nr_threads = cpu_to_le32(je->nr_threads); je->is_pow2 = cpu_to_le32(je->is_pow2); + je->unit_base = cpu_to_le32(je->unit_base); fio_net_send_cmd(server_fd, FIO_NET_CMD_ETA, je, size, &tag, NULL); free(je); @@ -708,12 +728,13 @@ static int handle_update_job_cmd(struct fio_net_cmd *cmd) return 0; } -static int handle_command(struct fio_net_cmd *cmd) +static int handle_command(struct flist_head *job_list, struct fio_net_cmd *cmd) { int ret; - dprint(FD_NET, "server: got op [%s], pdu=%u, tag=%lx\n", - fio_server_op(cmd->opcode), cmd->pdu_len, cmd->tag); + dprint(FD_NET, "server: got op [%s], pdu=%u, tag=%llx\n", + fio_server_op(cmd->opcode), cmd->pdu_len, + (unsigned long long) cmd->tag); switch (cmd->opcode) { case FIO_NET_CMD_QUIT: @@ -735,7 +756,7 @@ static int handle_command(struct fio_net_cmd *cmd) ret = handle_send_eta_cmd(cmd); break; case FIO_NET_CMD_RUN: - ret = handle_run_cmd(cmd); + ret = handle_run_cmd(job_list, cmd); break; case FIO_NET_CMD_UPDATE_JOB: ret = handle_update_job_cmd(cmd); @@ -751,10 +772,10 @@ static int handle_command(struct fio_net_cmd *cmd) static int handle_connection(int sk) { struct fio_net_cmd *cmd = NULL; + FLIST_HEAD(job_list); int ret = 0; reset_fio_state(); - INIT_FLIST_HEAD(&job_list); server_fd = sk; /* read forever */ @@ -778,7 +799,7 @@ static int handle_connection(int sk) log_err("fio: poll: %s\n", strerror(errno)); break; } else if (!ret) { - fio_server_check_jobs(); + fio_server_check_jobs(&job_list); continue; } @@ -790,7 +811,7 @@ static int handle_connection(int sk) } } while (!exit_backend); - fio_server_check_jobs(); + fio_server_check_jobs(&job_list); if (ret < 0) break; @@ -801,7 +822,7 @@ static int handle_connection(int sk) break; } - ret = handle_command(cmd); + ret = handle_command(&job_list, cmd); if (ret) break; @@ -822,6 +843,7 @@ static int accept_loop(int listen_sk) socklen_t len = sizeof(addr); struct pollfd pfd; int ret = 0, sk, flags, exitval = 0; + FLIST_HEAD(conn_list); dprint(FD_NET, "server enter accept loop\n"); @@ -847,7 +869,7 @@ static int accept_loop(int listen_sk) log_err("fio: poll: %s\n", strerror(errno)); break; } else if (!ret) { - fio_server_check_conns(); + fio_server_check_conns(&conn_list); continue; } @@ -855,7 +877,7 @@ static int accept_loop(int listen_sk) break; } while (!exit_backend); - fio_server_check_conns(); + fio_server_check_conns(&conn_list); if (exit_backend || ret < 0) break; @@ -871,7 +893,7 @@ static int accept_loop(int listen_sk) pid = fork(); if (pid) { close(sk); - fio_server_add_conn_pid(pid); + fio_server_add_conn_pid(&conn_list, pid); continue; } @@ -935,6 +957,7 @@ static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src) } dst->kb_base = cpu_to_le32(src->kb_base); + dst->unit_base = cpu_to_le32(src->unit_base); dst->groupid = cpu_to_le32(src->groupid); dst->unified_rw_rep = cpu_to_le32(src->unified_rw_rep); } @@ -1017,6 +1040,12 @@ void fio_server_send_ts(struct thread_stat *ts, struct group_run_stats *rs) p.ts.total_err_count = cpu_to_le64(ts->total_err_count); p.ts.first_error = cpu_to_le32(ts->first_error); p.ts.kb_base = cpu_to_le32(ts->kb_base); + p.ts.unit_base = cpu_to_le32(ts->unit_base); + + p.ts.latency_depth = cpu_to_le32(ts->latency_depth); + p.ts.latency_target = cpu_to_le64(ts->latency_target); + p.ts.latency_window = cpu_to_le64(ts->latency_window); + p.ts.latency_percentile.u.i = __cpu_to_le64(fio_double_to_uint64(ts->latency_percentile.u.f)); convert_gs(&p.rs, rs); @@ -1109,26 +1138,12 @@ static int fio_send_cmd_ext_pdu(int sk, uint16_t opcode, const void *buf, return fio_sendv_data(sk, iov, 2); } -int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name) +static int fio_send_iolog_gz(struct cmd_iolog_pdu *pdu, struct io_log *log) { - struct cmd_iolog_pdu pdu; + int ret = 0; +#ifdef CONFIG_ZLIB z_stream stream; void *out_pdu; - int i, ret = 0; - - pdu.thread_number = cpu_to_le32(td->thread_number); - pdu.nr_samples = __cpu_to_le32(log->nr_samples); - pdu.log_type = cpu_to_le32(log->log_type); - strcpy((char *) pdu.name, name); - - for (i = 0; i < log->nr_samples; i++) { - struct io_sample *s = &log->log[i]; - - s->time = cpu_to_le64(s->time); - s->val = cpu_to_le64(s->val); - s->ddir = cpu_to_le32(s->ddir); - s->bs = cpu_to_le32(s->bs); - } /* * Dirty - since the log is potentially huge, compress it into @@ -1146,14 +1161,6 @@ int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name) goto err; } - /* - * Send header first, it's not compressed. - */ - ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, &pdu, - sizeof(pdu), 0, FIO_NET_CMD_F_MORE); - if (ret) - goto err_zlib; - stream.next_in = (void *) log->log; stream.avail_in = log->nr_samples * sizeof(struct io_sample); @@ -1183,9 +1190,48 @@ err_zlib: deflateEnd(&stream); err: free(out_pdu); +#endif return ret; } +int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name) +{ + struct cmd_iolog_pdu pdu; + int i, ret = 0; + + pdu.thread_number = cpu_to_le32(td->thread_number); + pdu.nr_samples = __cpu_to_le32(log->nr_samples); + pdu.log_type = cpu_to_le32(log->log_type); + pdu.compressed = cpu_to_le32(use_zlib); + strcpy((char *) pdu.name, name); + + for (i = 0; i < log->nr_samples; i++) { + struct io_sample *s = &log->log[i]; + + s->time = cpu_to_le64(s->time); + s->val = cpu_to_le64(s->val); + s->ddir = cpu_to_le32(s->ddir); + s->bs = cpu_to_le32(s->bs); + } + + /* + * Send header first, it's not compressed. + */ + ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, &pdu, + sizeof(pdu), 0, FIO_NET_CMD_F_MORE); + if (ret) + return ret; + + /* + * Now send actual log, compress if we can, otherwise just plain + */ + if (use_zlib) + return fio_send_iolog_gz(&pdu, log); + + return fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, log->log, + log->nr_samples * sizeof(struct io_sample), 0, 0); +} + void fio_server_send_add_job(struct thread_data *td) { struct cmd_add_job_pdu pdu; @@ -1421,7 +1467,7 @@ int fio_server_parse_string(const char *str, char **ptr, int *is_sock, host++; lport = atoi(host); if (!lport || lport > 65535) { - log_err("fio: bad server port %u\n", port); + log_err("fio: bad server port %u\n", lport); return 1; } /* no hostname given, we are done */ @@ -1439,7 +1485,7 @@ int fio_server_parse_string(const char *str, char **ptr, int *is_sock, portp++; lport = atoi(portp); if (!lport || lport > 65535) { - log_err("fio: bad server port %u\n", port); + log_err("fio: bad server port %u\n", lport); return 1; } }