client/server: various bug fixes
authorJens Axboe <axboe@fb.com>
Thu, 17 Dec 2015 17:24:03 +0000 (10:24 -0700)
committerJens Axboe <axboe@fb.com>
Thu, 17 Dec 2015 17:24:03 +0000 (10:24 -0700)
- Better handling of vectored commands
- Improve sk_out mutex handling (don't alloc separately)
- Add support for sync sending of network data
- Prep for network xmit of logs

Signed-off-by: Jens Axboe <axboe@fb.com>
client.c
iolog.c
iolog.h
server.c

index f4b95d325d9f83839ce951da4967390d0f0932b8..752b508c73802013f215b0c1fff5627021b6c288 100644 (file)
--- a/client.c
+++ b/client.c
@@ -32,6 +32,7 @@ static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd);
 static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd);
 static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd);
 static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd);
 static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd);
 static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd);
 static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd);
+static void handle_iolog(struct fio_client *client, struct cmd_iolog_pdu *pdu);
 
 struct client_ops fio_client_ops = {
        .text           = handle_text,
 
 struct client_ops fio_client_ops = {
        .text           = handle_text,
@@ -42,6 +43,7 @@ struct client_ops fio_client_ops = {
        .start          = handle_start,
        .eta            = display_thread_status,
        .probe          = handle_probe,
        .start          = handle_start,
        .eta            = display_thread_status,
        .probe          = handle_probe,
+       .iolog          = handle_iolog,
        .eta_msec       = FIO_CLIENT_DEF_ETA_MSEC,
        .client_type    = FIO_CLIENT_TYPE_CLI,
 };
        .eta_msec       = FIO_CLIENT_DEF_ETA_MSEC,
        .client_type    = FIO_CLIENT_TYPE_CLI,
 };
@@ -1224,6 +1226,23 @@ static void handle_eta(struct fio_client *client, struct fio_net_cmd *cmd)
        fio_client_dec_jobs_eta(eta, client->ops->eta);
 }
 
        fio_client_dec_jobs_eta(eta, client->ops->eta);
 }
 
+static void handle_iolog(struct fio_client *client, struct cmd_iolog_pdu *pdu)
+{
+       FILE *f;
+
+       printf("got log compressed; %d\n", pdu->compressed);
+
+       f = fopen((const char *) pdu->name, "w");
+       if (!f) {
+               perror("fopen log");
+               return;
+       }
+
+       flush_samples(f, pdu->samples,
+                       pdu->nr_samples * sizeof(struct io_sample));
+       fclose(f);
+}
+
 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd)
 {
        struct cmd_probe_reply_pdu *probe = (struct cmd_probe_reply_pdu *) cmd->payload;
 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd)
 {
        struct cmd_probe_reply_pdu *probe = (struct cmd_probe_reply_pdu *) cmd->payload;
diff --git a/iolog.c b/iolog.c
index d4a101766076f7a83b2da1ea0ac33a455bbd5ea2..2f84c8280240eab3e3ca3d5bdb70f87777017216 100644 (file)
--- a/iolog.c
+++ b/iolog.c
@@ -634,7 +634,7 @@ void free_log(struct io_log *log)
        free(log);
 }
 
        free(log);
 }
 
-static void flush_samples(FILE *f, void *samples, uint64_t sample_size)
+void flush_samples(FILE *f, void *samples, uint64_t sample_size)
 {
        struct io_sample *s;
        int log_offset;
 {
        struct io_sample *s;
        int log_offset;
@@ -984,6 +984,12 @@ static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
        } else
                fio_lock_file(log->filename);
 
        } else
                fio_lock_file(log->filename);
 
+       /*
+        * We should do this for any networked client. Will enable when
+        * the kinks are ironed out.
+        *
+        * if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backed)
+        */
        if (td->client_type == FIO_CLIENT_TYPE_GUI)
                fio_send_iolog(td, log, log->filename);
        else
        if (td->client_type == FIO_CLIENT_TYPE_GUI)
                fio_send_iolog(td, log, log->filename);
        else
diff --git a/iolog.h b/iolog.h
index b99329a47d5d6be07f8a2f2e29b75d83cfe0ae88..0c9a983cdd2481275ea159bcb4c09a1354706226 100644 (file)
--- a/iolog.h
+++ b/iolog.h
@@ -207,6 +207,7 @@ struct log_params {
 extern void finalize_logs(struct thread_data *td);
 extern void setup_log(struct io_log **, struct log_params *, const char *);
 extern void flush_log(struct io_log *, int);
 extern void finalize_logs(struct thread_data *td);
 extern void setup_log(struct io_log **, struct log_params *, const char *);
 extern void flush_log(struct io_log *, int);
+extern void flush_samples(FILE *, void *, uint64_t);
 extern void free_log(struct io_log *);
 extern void fio_writeout_logs(struct thread_data *);
 extern int iolog_flush(struct io_log *, int);
 extern void free_log(struct io_log *);
 extern void fio_writeout_logs(struct thread_data *);
 extern int iolog_flush(struct io_log *, int);
index f11e97278b20f52614269087e37393b2e94b3bf1..9a381f24d1ee84bdcbc5f01742d682afbd719231 100644 (file)
--- a/server.c
+++ b/server.c
@@ -37,6 +37,7 @@ enum {
        SK_F_COPY       = 2,
        SK_F_SIMPLE     = 4,
        SK_F_VEC        = 8,
        SK_F_COPY       = 2,
        SK_F_SIMPLE     = 4,
        SK_F_VEC        = 8,
+       SK_F_INLINE     = 16,
 };
 
 struct sk_entry {
 };
 
 struct sk_entry {
@@ -54,9 +55,10 @@ struct sk_out {
                                 * protected by below ->lock */
 
        int sk;                 /* socket fd to talk to client */
                                 * protected by below ->lock */
 
        int sk;                 /* socket fd to talk to client */
-       struct fio_mutex *lock; /* protects ref and below list */
+       struct fio_mutex lock;  /* protects ref and below list */
        struct flist_head list; /* list of pending transmit work */
        struct flist_head list; /* list of pending transmit work */
-       struct fio_mutex *wait; /* wake backend when items added to list */
+       struct fio_mutex wait;  /* wake backend when items added to list */
+       struct fio_mutex xmit;  /* held while sending data */
 };
 
 static char *fio_server_arg;
 };
 
 static char *fio_server_arg;
@@ -116,12 +118,12 @@ static const char *fio_server_ops[FIO_NET_CMD_NR] = {
 
 static void sk_lock(struct sk_out *sk_out)
 {
 
 static void sk_lock(struct sk_out *sk_out)
 {
-       fio_mutex_down(sk_out->lock);
+       fio_mutex_down(&sk_out->lock);
 }
 
 static void sk_unlock(struct sk_out *sk_out)
 {
 }
 
 static void sk_unlock(struct sk_out *sk_out)
 {
-       fio_mutex_up(sk_out->lock);
+       fio_mutex_up(&sk_out->lock);
 }
 
 void sk_out_assign(struct sk_out *sk_out)
 }
 
 void sk_out_assign(struct sk_out *sk_out)
@@ -137,8 +139,9 @@ void sk_out_assign(struct sk_out *sk_out)
 
 static void sk_out_free(struct sk_out *sk_out)
 {
 
 static void sk_out_free(struct sk_out *sk_out)
 {
-       fio_mutex_remove(sk_out->lock);
-       fio_mutex_remove(sk_out->wait);
+       __fio_mutex_remove(&sk_out->lock);
+       __fio_mutex_remove(&sk_out->wait);
+       __fio_mutex_remove(&sk_out->xmit);
        sfree(sk_out);
 }
 
        sfree(sk_out);
 }
 
@@ -530,22 +533,28 @@ static struct sk_entry *fio_net_prep_cmd(uint16_t opcode, void *buf, off_t size,
                memcpy(entry->buf, buf, size);
        } else
                entry->buf = buf;
                memcpy(entry->buf, buf, size);
        } else
                entry->buf = buf;
+
        entry->size = size;
        entry->tagptr = tagptr;
        entry->flags = flags;
        entry->size = size;
        entry->tagptr = tagptr;
        entry->flags = flags;
-
        return entry;
 }
 
        return entry;
 }
 
+static int handle_sk_entry(struct sk_out *sk_out, struct sk_entry *entry);
+
 static void fio_net_queue_entry(struct sk_entry *entry)
 {
        struct sk_out *sk_out = pthread_getspecific(sk_out_key);
 
 static void fio_net_queue_entry(struct sk_entry *entry)
 {
        struct sk_out *sk_out = pthread_getspecific(sk_out_key);
 
-       sk_lock(sk_out);
-       flist_add_tail(&entry->list, &sk_out->list);
-       sk_unlock(sk_out);
+       if (entry->flags & SK_F_INLINE)
+               handle_sk_entry(sk_out, entry);
+       else {
+               sk_lock(sk_out);
+               flist_add_tail(&entry->list, &sk_out->list);
+               sk_unlock(sk_out);
 
 
-       fio_mutex_up(sk_out->wait);
+               fio_mutex_up(&sk_out->wait);
+       }
 }
 
 static int fio_net_queue_cmd(uint16_t opcode, void *buf, off_t size,
 }
 
 static int fio_net_queue_cmd(uint16_t opcode, void *buf, off_t size,
@@ -1102,9 +1111,11 @@ static int handle_sk_entry(struct sk_out *sk_out, struct sk_entry *entry)
 {
        int ret;
 
 {
        int ret;
 
+       fio_mutex_down(&sk_out->xmit);
+
        if (entry->flags & SK_F_VEC)
                ret = send_vec_entry(sk_out, entry);
        if (entry->flags & SK_F_VEC)
                ret = send_vec_entry(sk_out, entry);
-       if (entry->flags & SK_F_SIMPLE) {
+       else if (entry->flags & SK_F_SIMPLE) {
                uint64_t tag = 0;
 
                if (entry->tagptr)
                uint64_t tag = 0;
 
                if (entry->tagptr)
@@ -1114,6 +1125,8 @@ static int handle_sk_entry(struct sk_out *sk_out, struct sk_entry *entry)
        } else
                ret = fio_net_send_cmd(sk_out->sk, entry->opcode, entry->buf, entry->size, entry->tagptr, NULL);
 
        } else
                ret = fio_net_send_cmd(sk_out->sk, entry->opcode, entry->buf, entry->size, entry->tagptr, NULL);
 
+       fio_mutex_up(&sk_out->xmit);
+
        if (ret)
                log_err("fio: failed handling cmd %s\n", fio_server_op(entry->opcode));
 
        if (ret)
                log_err("fio: failed handling cmd %s\n", fio_server_op(entry->opcode));
 
@@ -1177,7 +1190,7 @@ static int handle_connection(struct sk_out *sk_out)
                                break;
                        } else if (!ret) {
                                fio_server_check_jobs(&job_list);
                                break;
                        } else if (!ret) {
                                fio_server_check_jobs(&job_list);
-                               fio_mutex_down_timeout(sk_out->wait, timeout);
+                               fio_mutex_down_timeout(&sk_out->wait, timeout);
                                continue;
                        }
 
                                continue;
                        }
 
@@ -1323,8 +1336,9 @@ static int accept_loop(int listen_sk)
                sk_out = smalloc(sizeof(*sk_out));
                sk_out->sk = sk;
                INIT_FLIST_HEAD(&sk_out->list);
                sk_out = smalloc(sizeof(*sk_out));
                sk_out->sk = sk;
                INIT_FLIST_HEAD(&sk_out->list);
-               sk_out->lock = fio_mutex_init(FIO_MUTEX_UNLOCKED);
-               sk_out->wait = fio_mutex_init(FIO_MUTEX_LOCKED);
+               __fio_mutex_init(&sk_out->lock, FIO_MUTEX_UNLOCKED);
+               __fio_mutex_init(&sk_out->wait, FIO_MUTEX_LOCKED);
+               __fio_mutex_init(&sk_out->xmit, FIO_MUTEX_UNLOCKED);
 
                pid = fork();
                if (pid) {
 
                pid = fork();
                if (pid) {
@@ -1649,7 +1663,8 @@ static int fio_send_iolog_gz(struct sk_entry *first, struct io_log *log)
                this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out;
 
                entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, out_pdu, this_len,
                this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out;
 
                entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, out_pdu, this_len,
-                                               NULL, SK_F_FREE | SK_F_VEC);
+                                               NULL, SK_F_VEC | SK_F_INLINE | SK_F_FREE);
+               out_pdu = NULL;
                flist_add_tail(&entry->list, &first->next);
        } while (stream.avail_in);
 
                flist_add_tail(&entry->list, &first->next);
        } while (stream.avail_in);
 
@@ -1693,7 +1708,7 @@ int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
        /*
         * Assemble header entry first
         */
        /*
         * Assemble header entry first
         */
-       first = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, &pdu, sizeof(pdu), NULL, SK_F_COPY | SK_F_VEC);
+       first = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, &pdu, sizeof(pdu), NULL, SK_F_VEC | SK_F_INLINE | SK_F_COPY);
 
        /*
         * Now append actual log entries. Compress if we can, otherwise just
 
        /*
         * Now append actual log entries. Compress if we can, otherwise just
@@ -1706,10 +1721,11 @@ int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
 
                entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, log->log,
                                        log->nr_samples * log_entry_sz(log),
 
                entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, log->log,
                                        log->nr_samples * log_entry_sz(log),
-                                       NULL, SK_F_FREE | SK_F_VEC);
+                                       NULL, SK_F_VEC | SK_F_INLINE);
                flist_add_tail(&entry->list, &first->next);
        }
 
                flist_add_tail(&entry->list, &first->next);
        }
 
+       fio_net_queue_entry(first);
        return ret;
 }
 
        return ret;
 }