client/server: transparent handling of storing compressed logs
authorJens Axboe <axboe@fb.com>
Thu, 17 Dec 2015 21:54:15 +0000 (14:54 -0700)
committerJens Axboe <axboe@fb.com>
Thu, 17 Dec 2015 21:54:15 +0000 (14:54 -0700)
Signed-off-by: Jens Axboe <axboe@fb.com>
client.c
client.h
configure
gclient.c
iolog.c
iolog.h
server.c
server.h

index 752b508..27a764d 100644 (file)
--- a/client.c
+++ b/client.c
@@ -32,7 +32,6 @@ static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd);
 static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd);
 static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd);
 static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd);
-static void handle_iolog(struct fio_client *client, struct cmd_iolog_pdu *pdu);
 
 struct client_ops fio_client_ops = {
        .text           = handle_text,
@@ -43,7 +42,6 @@ struct client_ops fio_client_ops = {
        .start          = handle_start,
        .eta            = display_thread_status,
        .probe          = handle_probe,
-       .iolog          = handle_iolog,
        .eta_msec       = FIO_CLIENT_DEF_ETA_MSEC,
        .client_type    = FIO_CLIENT_TYPE_CLI,
 };
@@ -72,6 +70,8 @@ static int error_clients;
 #define FIO_CLIENT_HASH_MASK   (FIO_CLIENT_HASH_SZ - 1)
 static struct flist_head client_hash[FIO_CLIENT_HASH_SZ];
 
+static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *, bool *);
+
 static void fio_client_add_hash(struct fio_client *client)
 {
        int bucket = hash_long(client->fd, FIO_CLIENT_HASH_BITS);
@@ -1226,21 +1226,44 @@ static void handle_eta(struct fio_client *client, struct fio_net_cmd *cmd)
        fio_client_dec_jobs_eta(eta, client->ops->eta);
 }
 
-static void handle_iolog(struct fio_client *client, struct cmd_iolog_pdu *pdu)
+void fio_client_handle_iolog(struct fio_client *client, struct fio_net_cmd *cmd)
 {
-       FILE *f;
-
-       printf("got log compressed; %d\n", pdu->compressed);
+       struct cmd_iolog_pdu *pdu;
+       bool store_direct;
 
-       f = fopen((const char *) pdu->name, "w");
-       if (!f) {
-               perror("fopen log");
+       pdu = convert_iolog(cmd, &store_direct);
+       if (!pdu)
                return;
-       }
 
-       flush_samples(f, pdu->samples,
-                       pdu->nr_samples * sizeof(struct io_sample));
-       fclose(f);
+       if (store_direct) {
+               ssize_t ret;
+               size_t sz;
+               int fd;
+
+               fd = open((const char *) pdu->name,
+                               O_WRONLY | O_CREAT | O_TRUNC, 0644);
+               if (fd < 0) {
+                       perror("open log");
+                       return;
+               }
+               sz = cmd->pdu_len - sizeof(*pdu);
+               ret = write(fd, pdu->samples, sz);
+               if (ret != sz)
+                       log_err("fio: short write on compressed log\n");
+               close(fd);
+       } else {
+               FILE *f;
+
+               f = fopen((const char *) pdu->name, "w");
+               if (!f) {
+                       perror("fopen log");
+                       return;
+               }
+
+               flush_samples(f, pdu->samples,
+                               pdu->nr_samples * sizeof(struct io_sample));
+               fclose(f);
+       }
 }
 
 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd)
@@ -1383,27 +1406,36 @@ err:
  * This has been compressed on the server side, since it can be big.
  * Uncompress here.
  */
-static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *cmd)
+static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *cmd,
+                                          bool *store_direct)
 {
        struct cmd_iolog_pdu *pdu = (struct cmd_iolog_pdu *) cmd->payload;
        struct cmd_iolog_pdu *ret;
        uint64_t i;
+       int compressed;
        void *samples;
 
+       *store_direct = false;
+
        /*
         * Convert if compressed and we support it. If it's not
         * compressed, we need not do anything.
         */
-       if (le32_to_cpu(pdu->compressed)) {
+       compressed = le32_to_cpu(pdu->compressed);
+       if (compressed == XMIT_COMPRESSED) {
 #ifndef CONFIG_ZLIB
                log_err("fio: server sent compressed data by mistake\n");
                return NULL;
 #endif
                ret = convert_iolog_gz(cmd, pdu);
+               printf("compressed iolog, %p\n", ret);
                if (!ret) {
                        log_err("fio: failed decompressing log\n");
                        return NULL;
                }
+       } else if (compressed == STORE_COMPRESSED) {
+               *store_direct = true;
+               ret = pdu;
        } else
                ret = pdu;
 
@@ -1413,6 +1445,9 @@ static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *cmd)
        ret->compressed         = le32_to_cpu(ret->compressed);
        ret->log_offset         = le32_to_cpu(ret->log_offset);
 
+       if (*store_direct)
+               return ret;
+
        samples = &ret->samples[0];
        for (i = 0; i < ret->nr_samples; i++) {
                struct io_sample *s;
@@ -1569,12 +1604,7 @@ int fio_handle_client(struct fio_client *client)
                break;
                }
        case FIO_NET_CMD_IOLOG:
-               if (ops->iolog) {
-                       struct cmd_iolog_pdu *pdu;
-
-                       pdu = convert_iolog(cmd);
-                       ops->iolog(client, pdu);
-               }
+               fio_client_handle_iolog(client, cmd);
                break;
        case FIO_NET_CMD_UPDATE_JOB:
                ops->update_job(client, cmd);
index 035e606..7fe09d1 100644 (file)
--- a/client.h
+++ b/client.h
@@ -76,12 +76,10 @@ struct fio_client {
        unsigned int nr_files;
 };
 
-struct cmd_iolog_pdu;
 typedef void (client_cmd_op)(struct fio_client *, struct fio_net_cmd *);
 typedef void (client_eta_op)(struct jobs_eta *je);
 typedef void (client_timed_out_op)(struct fio_client *);
 typedef void (client_jobs_eta_op)(struct fio_client *client, struct jobs_eta *je);
-typedef void (client_iolog_op)(struct fio_client *client, struct cmd_iolog_pdu *);
 
 struct client_ops {
        client_cmd_op           *text;
@@ -98,7 +96,6 @@ struct client_ops {
        client_cmd_op           *stop;
        client_cmd_op           *start;
        client_cmd_op           *job_start;
-       client_iolog_op         *iolog;
        client_timed_out_op     *removed;
 
        unsigned int eta_msec;
index af26165..770678a 100755 (executable)
--- a/configure
+++ b/configure
@@ -460,7 +460,7 @@ echo "Wordsize                      $wordsize"
 # zlib probe
 zlib="no"
 cat > $TMPC <<EOF
-#include <zlib.h>
+#include <zlib2.h>
 int main(void)
 {
   z_stream stream;
index 949ad42..9c32474 100644 (file)
--- a/gclient.c
+++ b/gclient.c
@@ -693,11 +693,6 @@ static void gfio_client_job_start(struct fio_client *client, struct fio_net_cmd
        gdk_threads_leave();
 }
 
-static void gfio_client_iolog(struct fio_client *client, struct cmd_iolog_pdu *pdu)
-{
-       printf("got iolog: name=%s, type=%u, entries=%lu\n", pdu->name, pdu->log_type, (unsigned long) pdu->nr_samples);
-}
-
 static void gfio_add_total_depths_tree(GtkListStore *model,
                                       struct thread_stat *ts, unsigned int len)
 {
@@ -1393,7 +1388,6 @@ struct client_ops gfio_client_ops = {
        .stop                   = gfio_client_stop,
        .start                  = gfio_client_start,
        .job_start              = gfio_client_job_start,
-       .iolog                  = gfio_client_iolog,
        .removed                = gfio_client_removed,
        .eta_msec               = FIO_CLIENT_DEF_ETA_MSEC,
        .stay_connected         = 1,
diff --git a/iolog.c b/iolog.c
index 2f84c82..feda9ed 100644 (file)
--- a/iolog.c
+++ b/iolog.c
@@ -682,13 +682,6 @@ struct iolog_flush_data {
        uint64_t nr_samples;
 };
 
-struct iolog_compress {
-       struct flist_head list;
-       void *buf;
-       size_t len;
-       unsigned int seq;
-};
-
 #define GZ_CHUNK       131072
 
 static struct iolog_compress *get_new_chunk(unsigned int seq)
@@ -984,13 +977,7 @@ static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
        } else
                fio_lock_file(log->filename);
 
-       /*
-        * We should do this for any networked client. Will enable when
-        * the kinks are ironed out.
-        *
-        * if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backed)
-        */
-       if (td->client_type == FIO_CLIENT_TYPE_GUI)
+       if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
                fio_send_iolog(td, log, log->filename);
        else
                flush_log(log, !td->o.per_job_logs);
@@ -1000,6 +987,26 @@ static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
        return 0;
 }
 
+size_t log_chunk_sizes(struct io_log *log)
+{
+       struct flist_head *entry;
+       size_t ret;
+
+       if (flist_empty(&log->chunk_list))
+               return 0;
+
+       ret = 0;
+       pthread_mutex_lock(&log->chunk_lock);
+       flist_for_each(entry, &log->chunk_list) {
+               struct iolog_compress *c;
+
+               c = flist_entry(entry, struct iolog_compress, list);
+               ret += c->len;
+       }
+       pthread_mutex_unlock(&log->chunk_lock);
+       return ret;
+}
+
 #ifdef CONFIG_ZLIB
 
 static void drop_data_unlock(struct iolog_flush_data *data)
diff --git a/iolog.h b/iolog.h
index 0c9a983..297daf5 100644 (file)
--- a/iolog.h
+++ b/iolog.h
@@ -186,6 +186,7 @@ extern void prune_io_piece_log(struct thread_data *);
 extern void write_iolog_close(struct thread_data *);
 extern int iolog_compress_init(struct thread_data *, struct sk_out *);
 extern void iolog_compress_exit(struct thread_data *);
+extern size_t log_chunk_sizes(struct io_log *);
 
 #ifdef CONFIG_ZLIB
 extern int iolog_file_inflate(const char *);
@@ -218,4 +219,11 @@ static inline void init_ipo(struct io_piece *ipo)
        INIT_FLIST_HEAD(&ipo->trim_list);
 }
 
+struct iolog_compress {
+       struct flist_head list;
+       void *buf;
+       size_t len;
+       unsigned int seq;
+};
+
 #endif
index 9a381f2..ae8643b 100644 (file)
--- a/server.c
+++ b/server.c
@@ -520,8 +520,9 @@ int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size,
        return ret;
 }
 
-static struct sk_entry *fio_net_prep_cmd(uint16_t opcode, void *buf, off_t size,
-                                        uint64_t *tagptr, int flags)
+static struct sk_entry *fio_net_prep_cmd(uint16_t opcode, void *buf,
+                                        size_t size, uint64_t *tagptr,
+                                        int flags)
 {
        struct sk_entry *entry;
 
@@ -1676,20 +1677,52 @@ err:
        return ret;
 }
 
+static int fio_send_gz_chunks(struct sk_entry *first, struct io_log *log)
+{
+       struct sk_entry *entry;
+       struct flist_head *node;
+
+       pthread_mutex_lock(&log->chunk_lock);
+       flist_for_each(node, &log->chunk_list) {
+               struct iolog_compress *c;
+
+               c = flist_entry(node, struct iolog_compress, list);
+               entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, c->buf, c->len,
+                                               NULL, SK_F_VEC | SK_F_INLINE);
+               flist_add_tail(&entry->list, &first->next);
+       }
+       pthread_mutex_unlock(&log->chunk_lock);
+
+       return 0;
+}
+
 int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
 {
        struct cmd_iolog_pdu pdu;
        struct sk_entry *first;
        int i, ret = 0;
 
+       if (!flist_empty(&log->chunk_list))
+               printf("log has chunks\n");
+
        pdu.nr_samples = cpu_to_le64(log->nr_samples);
        pdu.thread_number = cpu_to_le32(td->thread_number);
        pdu.log_type = cpu_to_le32(log->log_type);
-       pdu.compressed = cpu_to_le32(use_zlib);
+
+       if (!flist_empty(&log->chunk_list))
+               pdu.compressed = __cpu_to_le32(STORE_COMPRESSED);
+       else if (use_zlib)
+               pdu.compressed = __cpu_to_le32(XMIT_COMPRESSED);
+       else
+               pdu.compressed = 0;
 
        strncpy((char *) pdu.name, name, FIO_NET_NAME_MAX);
        pdu.name[FIO_NET_NAME_MAX - 1] = '\0';
 
+       /*
+        * We can't do this for a pre-compressed log, but for that case,
+        * log->nr_samples is zero anyway.
+        */
        for (i = 0; i < log->nr_samples; i++) {
                struct io_sample *s = get_sample(log, i);
 
@@ -1714,14 +1747,16 @@ int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
         * Now append actual log entries. Compress if we can, otherwise just
         * plain text output.
         */
-       if (use_zlib)
+       if (!flist_empty(&log->chunk_list))
+               ret = fio_send_gz_chunks(first, log);
+       else if (use_zlib)
                ret = fio_send_iolog_gz(first, log);
        else {
                struct sk_entry *entry;
+               size_t size = log->nr_samples * log_entry_sz(log);
 
-               entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, log->log,
-                                       log->nr_samples * log_entry_sz(log),
-                                       NULL, SK_F_VEC | SK_F_INLINE);
+               entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, log->log, size,
+                                               NULL, SK_F_VEC | SK_F_INLINE);
                flist_add_tail(&entry->list, &first->next);
        }
 
index dc4a419..5a59d07 100644 (file)
--- a/server.h
+++ b/server.h
@@ -172,6 +172,11 @@ struct cmd_text_pdu {
        uint8_t buf[0];
 };
 
+enum {
+       XMIT_COMPRESSED         = 1U,
+       STORE_COMPRESSED        = 2U,
+};
+
 struct cmd_iolog_pdu {
        uint64_t nr_samples;
        uint32_t thread_number;