From 1b42725f06f8906b9b99381da3490484f59df28a Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Wed, 14 Mar 2012 15:03:03 +0100 Subject: [PATCH] gfio: Add support for sending logs over the network This is for things like bw log, latency logs, etc. We add a new depedency, zlib, for this. The logs can be quite big and the network is not necessarily super fast, so we compress them using zlib. Signed-off-by: Jens Axboe --- Makefile | 2 +- client.c | 71 +++++++++++++++++++++++++++++++++++++++++++ client.h | 3 ++ gfio.c | 6 ++++ iolog.c | 4 +++ iolog.h | 10 +++--- server.c | 92 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-- server.h | 35 ++++++++++++++++----- 8 files changed, 206 insertions(+), 17 deletions(-) diff --git a/Makefile b/Makefile index 762bd7d4..4a9e7189 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ CPPFLAGS= -D_GNU_SOURCE -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64 \ $(DEBUGFLAGS) OPTFLAGS= -O3 -fno-omit-frame-pointer -g $(EXTFLAGS) CFLAGS = -std=gnu99 -Wwrite-strings -Wall $(OPTFLAGS) -LIBS = -lm $(EXTLIBS) +LIBS = -lm -lz $(EXTLIBS) PROGS = fio SCRIPTS = fio_generate_plots UNAME := $(shell uname) diff --git a/client.c b/client.c index 6371c2be..c49e9a19 100644 --- a/client.c +++ b/client.c @@ -14,6 +14,7 @@ #include #include #include +#include #include "fio.h" #include "client.h" @@ -950,6 +951,67 @@ static void convert_text(struct fio_net_cmd *cmd) pdu->log_usec = le64_to_cpu(pdu->log_usec); } +/* + * This has been compressed on the server side, since it can be big. + * Uncompress here. + */ +static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *cmd) +{ + struct cmd_iolog_pdu *pdu = (struct cmd_iolog_pdu *) cmd->payload; + struct cmd_iolog_pdu *ret; + uint32_t nr_samples; + unsigned long total; + z_stream stream; + void *p; + + stream.zalloc = Z_NULL; + stream.zfree = Z_NULL; + stream.opaque = Z_NULL; + stream.avail_in = 0; + stream.next_in = Z_NULL; + + if (inflateInit(&stream) != Z_OK) + return NULL; + + /* + * Everything beyond the first entry is compressed. + */ + nr_samples = le32_to_cpu(pdu->nr_samples); + + total = sizeof(*pdu) + nr_samples * sizeof(struct io_sample); + ret = malloc(total); + ret->nr_samples = nr_samples; + p = (void *) ret + sizeof(pdu->nr_samples); + + stream.avail_in = cmd->pdu_len - sizeof(pdu->nr_samples); + stream.next_in = (void *) pdu + sizeof(pdu->nr_samples); + while (stream.avail_in) { + unsigned int this_chunk = 65536; + unsigned int this_len; + int err; + + if (this_chunk > total) + this_chunk = total; + + stream.avail_out = this_chunk; + stream.next_out = p; + err = inflate(&stream, Z_NO_FLUSH); + if (err != Z_OK) { + log_err("fio: inflate error %d\n", err); + goto out; + } + + this_len = this_chunk - stream.avail_out; + p += this_len; + total -= this_len; + } + + ret->log_type = cpu_to_le32(ret->log_type); +out: + inflateEnd(&stream); + return ret; +} + int fio_handle_client(struct fio_client *client) { struct client_ops *ops = client->ops; @@ -1048,6 +1110,15 @@ int fio_handle_client(struct fio_client *client) ops->add_job(client, cmd); free(cmd); break; + case FIO_NET_CMD_IOLOG: + if (ops->iolog) { + struct cmd_iolog_pdu *pdu; + + pdu = convert_iolog(cmd); + ops->iolog(client, pdu); + } + free(cmd); + break; default: log_err("fio: unknown client op: %s\n", fio_server_op(cmd->opcode)); free(cmd); diff --git a/client.h b/client.h index 5b9c2770..30228161 100644 --- a/client.h +++ b/client.h @@ -59,10 +59,12 @@ struct fio_client { void *client_data; }; +struct cmd_iolog_pdu; typedef void (client_cmd_op)(struct fio_client *, struct fio_net_cmd *); typedef void (client_eta_op)(struct jobs_eta *je); typedef void (client_timed_out_op)(struct fio_client *); typedef void (client_jobs_eta_op)(struct fio_client *client, struct jobs_eta *je); +typedef void (client_iolog_op)(struct fio_client *client, struct cmd_iolog_pdu *); struct client_ops { client_cmd_op *text; @@ -78,6 +80,7 @@ struct client_ops { client_cmd_op *stop; client_cmd_op *start; client_cmd_op *job_start; + client_iolog_op *iolog; unsigned int eta_msec; int stay_connected; diff --git a/gfio.c b/gfio.c index 73aeb276..068f229e 100644 --- a/gfio.c +++ b/gfio.c @@ -1636,6 +1636,11 @@ static void gfio_client_job_start(struct fio_client *client, struct fio_net_cmd gdk_threads_leave(); } +static void gfio_client_iolog(struct fio_client *client, struct cmd_iolog_pdu *pdu) +{ + free(pdu); +} + struct client_ops gfio_client_ops = { .text = gfio_text_op, .disk_util = gfio_disk_util_op, @@ -1650,6 +1655,7 @@ struct client_ops gfio_client_ops = { .stop = gfio_client_stop, .start = gfio_client_start, .job_start = gfio_client_job_start, + .iolog = gfio_client_iolog, .eta_msec = FIO_CLIENT_DEF_ETA_MSEC, .stay_connected = 1, .client_type = FIO_CLIENT_TYPE_GUI, diff --git a/iolog.c b/iolog.c index 5d7c5ab7..88adbf7f 100644 --- a/iolog.c +++ b/iolog.c @@ -535,6 +535,10 @@ void finish_log_named(struct thread_data *td, struct io_log *log, snprintf(file_name, 200, "%s_%s.log", prefix, postfix); p = basename(file_name); + + if (td->client_type == FIO_CLIENT_TYPE_GUI) + fio_send_iolog(td, log, p); + __finish_log(log, p); } diff --git a/iolog.h b/iolog.h index d00d00c4..18d3c6c5 100644 --- a/iolog.h +++ b/iolog.h @@ -21,10 +21,10 @@ struct io_stat { * A single data sample */ struct io_sample { - unsigned long time; - unsigned long val; - enum fio_ddir ddir; - unsigned int bs; + uint64_t time; + uint64_t val; + uint32_t ddir; + uint32_t bs; }; enum { @@ -46,7 +46,7 @@ struct io_log { unsigned long max_samples; struct io_sample *log; - int log_type; + unsigned int log_type; /* * Windowed average, for logging single entries average over some diff --git a/server.c b/server.c index 89422e96..4e1f39be 100644 --- a/server.c +++ b/server.c @@ -16,6 +16,7 @@ #include #include #include +#include #include "fio.h" #include "server.h" @@ -248,15 +249,19 @@ struct fio_net_cmd *fio_net_recv_cmd(int sk) return cmdret; } -void fio_net_cmd_crc(struct fio_net_cmd *cmd) +void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, void *pdu) { uint32_t pdu_len; cmd->cmd_crc16 = __cpu_to_le16(fio_crc16(cmd, FIO_NET_CMD_CRC_SZ)); pdu_len = le32_to_cpu(cmd->pdu_len); - if (pdu_len) - cmd->pdu_crc16 = __cpu_to_le16(fio_crc16(cmd->payload, pdu_len)); + cmd->pdu_crc16 = __cpu_to_le16(fio_crc16(pdu, pdu_len)); +} + +void fio_net_cmd_crc(struct fio_net_cmd *cmd) +{ + fio_net_cmd_crc_pdu(cmd, cmd->payload); } int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size, @@ -846,6 +851,87 @@ void fio_server_send_du(void) } } +int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name) +{ + struct cmd_iolog_pdu *pdu; + struct fio_net_cmd cmd; + z_stream stream; + void *out_pdu; + size_t p_size; + int i; + + p_size = sizeof(*pdu) + log->nr_samples * sizeof(struct io_sample); + pdu = malloc(p_size); + + pdu->nr_samples = __cpu_to_le32(log->nr_samples); + pdu->log_type = cpu_to_le32(log->log_type); + strcpy((char *) pdu->name, name); + + for (i = 0; i < log->nr_samples; i++) { + struct io_sample *s = &pdu->samples[i]; + + s->time = cpu_to_le64(log->log[i].time); + s->val = cpu_to_le64(log->log[i].val); + s->ddir = cpu_to_le32(log->log[i].ddir); + s->bs = cpu_to_le32(log->log[i].bs); + } + + /* + * Dirty - since the log is potentially huge, compress it into + * FIO_SERVER_MAX_FRAGMENT_PDU chunks and let the receiving + * side defragment it. + */ + out_pdu = malloc(FIO_SERVER_MAX_FRAGMENT_PDU); + + stream.zalloc = Z_NULL; + stream.zfree = Z_NULL; + stream.opaque = Z_NULL; + + if (deflateInit(&stream, Z_DEFAULT_COMPRESSION) != Z_OK) { + free(out_pdu); + free(pdu); + return 1; + } + + /* + * Don't compress the nr samples entry, we want to know on the + * client side how much data to allocate before starting inflate. + */ + __fio_init_net_cmd(&cmd, FIO_NET_CMD_IOLOG, sizeof(pdu->nr_samples), 0); + cmd.flags = __cpu_to_le32(FIO_NET_CMD_F_MORE); + fio_net_cmd_crc_pdu(&cmd, pdu); + fio_send_data(server_fd, &cmd, sizeof(cmd)); + fio_send_data(server_fd, pdu, sizeof(pdu->nr_samples)); + + stream.next_in = (void *) pdu + sizeof(pdu->nr_samples); + stream.avail_in = p_size - sizeof(pdu->nr_samples); + + do { + unsigned int this_len; + + stream.avail_out = FIO_SERVER_MAX_FRAGMENT_PDU; + stream.next_out = out_pdu; + deflate(&stream, Z_FINISH); + + this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out; + + __fio_init_net_cmd(&cmd, FIO_NET_CMD_IOLOG, this_len, 0); + + if (stream.avail_in) + cmd.flags = __cpu_to_le32(FIO_NET_CMD_F_MORE); + + fio_net_cmd_crc_pdu(&cmd, out_pdu); + + fio_send_data(server_fd, &cmd, sizeof(cmd)); + fio_send_data(server_fd, out_pdu, this_len); + } while (stream.avail_in); + + free(pdu); + free(out_pdu); + deflateEnd(&stream); + return 0; +} + void fio_server_send_add_job(struct thread_options *o, const char *ioengine) { struct cmd_add_job_pdu pdu; diff --git a/server.h b/server.h index 2235f3a5..76d24583 100644 --- a/server.h +++ b/server.h @@ -38,7 +38,7 @@ struct fio_net_int_cmd { }; enum { - FIO_SERVER_VER = 11, + FIO_SERVER_VER = 12, FIO_SERVER_MAX_FRAGMENT_PDU = 1024, @@ -58,7 +58,8 @@ enum { FIO_NET_CMD_SERVER_START = 14, FIO_NET_CMD_ADD_JOB = 15, FIO_NET_CMD_RUN = 16, - FIO_NET_CMD_NR = 17, + FIO_NET_CMD_IOLOG = 17, + FIO_NET_CMD_NR = 18, FIO_NET_CMD_F_MORE = 1UL << 0, @@ -66,6 +67,8 @@ enum { FIO_NET_CMD_CRC_SZ = sizeof(struct fio_net_cmd) - 2 * sizeof(uint16_t), + FIO_NET_NAME_MAX = 256, + FIO_NET_CLIENT_TIMEOUT = 5000, }; @@ -127,6 +130,13 @@ struct cmd_text_pdu { uint8_t buf[0]; }; +struct cmd_iolog_pdu { + uint32_t nr_samples; + uint32_t log_type; + uint8_t name[FIO_NET_NAME_MAX]; + struct io_sample samples[0]; +}; + extern int fio_start_server(char *); extern int fio_server_text_output(int, const char *, size_t); extern int fio_net_send_cmd(int, uint16_t, const void *, off_t, uint64_t); @@ -147,28 +157,37 @@ extern void fio_server_idle_loop(void); extern int fio_recv_data(int sk, void *p, unsigned int len); extern int fio_send_data(int sk, const void *p, unsigned int len); extern void fio_net_cmd_crc(struct fio_net_cmd *); +extern void fio_net_cmd_crc_pdu(struct fio_net_cmd *, void *); extern struct fio_net_cmd *fio_net_recv_cmd(int sk); +extern int fio_send_iolog(struct thread_data *, struct io_log *, const char *); + struct thread_options; extern void fio_server_send_add_job(struct thread_options *, const char *); extern int exit_backend; extern int fio_net_port; -static inline void fio_init_net_cmd(struct fio_net_cmd *cmd, uint16_t opcode, - const void *pdu, uint32_t pdu_len, - uint64_t tag) +static inline void __fio_init_net_cmd(struct fio_net_cmd *cmd, uint16_t opcode, + uint32_t pdu_len, uint64_t tag) { memset(cmd, 0, sizeof(*cmd)); cmd->version = __cpu_to_le16(FIO_SERVER_VER); cmd->opcode = cpu_to_le16(opcode); cmd->tag = cpu_to_le64(tag); + cmd->pdu_len = cpu_to_le32(pdu_len); +} + + +static inline void fio_init_net_cmd(struct fio_net_cmd *cmd, uint16_t opcode, + const void *pdu, uint32_t pdu_len, + uint64_t tag) +{ + __fio_init_net_cmd(cmd, opcode, pdu_len, tag); - if (pdu) { - cmd->pdu_len = cpu_to_le32(pdu_len); + if (pdu) memcpy(&cmd->payload, pdu, pdu_len); - } } #endif -- 2.25.1