From bead01d7dcd76467f0aba0d32b173442bbdaa020 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Thu, 17 Dec 2015 10:24:03 -0700 Subject: [PATCH] client/server: various bug fixes - Better handling of vectored commands - Improve sk_out mutex handling (don't alloc separately) - Add support for sync sending of network data - Prep for network xmit of logs Signed-off-by: Jens Axboe --- client.c | 19 +++++++++++++++++++ iolog.c | 8 +++++++- iolog.h | 1 + server.c | 52 ++++++++++++++++++++++++++++++++++------------------ 4 files changed, 61 insertions(+), 19 deletions(-) diff --git a/client.c b/client.c index f4b95d32..752b508c 100644 --- a/client.c +++ b/client.c @@ -32,6 +32,7 @@ static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd); static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd); static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd); static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd); +static void handle_iolog(struct fio_client *client, struct cmd_iolog_pdu *pdu); struct client_ops fio_client_ops = { .text = handle_text, @@ -42,6 +43,7 @@ struct client_ops fio_client_ops = { .start = handle_start, .eta = display_thread_status, .probe = handle_probe, + .iolog = handle_iolog, .eta_msec = FIO_CLIENT_DEF_ETA_MSEC, .client_type = FIO_CLIENT_TYPE_CLI, }; @@ -1224,6 +1226,23 @@ static void handle_eta(struct fio_client *client, struct fio_net_cmd *cmd) fio_client_dec_jobs_eta(eta, client->ops->eta); } +static void handle_iolog(struct fio_client *client, struct cmd_iolog_pdu *pdu) +{ + FILE *f; + + printf("got log compressed; %d\n", pdu->compressed); + + f = fopen((const char *) pdu->name, "w"); + if (!f) { + perror("fopen log"); + return; + } + + flush_samples(f, pdu->samples, + pdu->nr_samples * sizeof(struct io_sample)); + fclose(f); +} + static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd) { struct cmd_probe_reply_pdu *probe = (struct cmd_probe_reply_pdu *) cmd->payload; diff --git a/iolog.c b/iolog.c index d4a10176..2f84c828 100644 --- a/iolog.c +++ b/iolog.c @@ -634,7 +634,7 @@ void free_log(struct io_log *log) free(log); } -static void flush_samples(FILE *f, void *samples, uint64_t sample_size) +void flush_samples(FILE *f, void *samples, uint64_t sample_size) { struct io_sample *s; int log_offset; @@ -984,6 +984,12 @@ static int finish_log(struct thread_data *td, struct io_log *log, int trylock) } else fio_lock_file(log->filename); + /* + * We should do this for any networked client. Will enable when + * the kinks are ironed out. + * + * if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backed) + */ if (td->client_type == FIO_CLIENT_TYPE_GUI) fio_send_iolog(td, log, log->filename); else diff --git a/iolog.h b/iolog.h index b99329a4..0c9a983c 100644 --- a/iolog.h +++ b/iolog.h @@ -207,6 +207,7 @@ struct log_params { extern void finalize_logs(struct thread_data *td); extern void setup_log(struct io_log **, struct log_params *, const char *); extern void flush_log(struct io_log *, int); +extern void flush_samples(FILE *, void *, uint64_t); extern void free_log(struct io_log *); extern void fio_writeout_logs(struct thread_data *); extern int iolog_flush(struct io_log *, int); diff --git a/server.c b/server.c index f11e9727..9a381f24 100644 --- a/server.c +++ b/server.c @@ -37,6 +37,7 @@ enum { SK_F_COPY = 2, SK_F_SIMPLE = 4, SK_F_VEC = 8, + SK_F_INLINE = 16, }; struct sk_entry { @@ -54,9 +55,10 @@ struct sk_out { * protected by below ->lock */ int sk; /* socket fd to talk to client */ - struct fio_mutex *lock; /* protects ref and below list */ + struct fio_mutex lock; /* protects ref and below list */ struct flist_head list; /* list of pending transmit work */ - struct fio_mutex *wait; /* wake backend when items added to list */ + struct fio_mutex wait; /* wake backend when items added to list */ + struct fio_mutex xmit; /* held while sending data */ }; static char *fio_server_arg; @@ -116,12 +118,12 @@ static const char *fio_server_ops[FIO_NET_CMD_NR] = { static void sk_lock(struct sk_out *sk_out) { - fio_mutex_down(sk_out->lock); + fio_mutex_down(&sk_out->lock); } static void sk_unlock(struct sk_out *sk_out) { - fio_mutex_up(sk_out->lock); + fio_mutex_up(&sk_out->lock); } void sk_out_assign(struct sk_out *sk_out) @@ -137,8 +139,9 @@ void sk_out_assign(struct sk_out *sk_out) static void sk_out_free(struct sk_out *sk_out) { - fio_mutex_remove(sk_out->lock); - fio_mutex_remove(sk_out->wait); + __fio_mutex_remove(&sk_out->lock); + __fio_mutex_remove(&sk_out->wait); + __fio_mutex_remove(&sk_out->xmit); sfree(sk_out); } @@ -530,22 +533,28 @@ static struct sk_entry *fio_net_prep_cmd(uint16_t opcode, void *buf, off_t size, memcpy(entry->buf, buf, size); } else entry->buf = buf; + entry->size = size; entry->tagptr = tagptr; entry->flags = flags; - return entry; } +static int handle_sk_entry(struct sk_out *sk_out, struct sk_entry *entry); + static void fio_net_queue_entry(struct sk_entry *entry) { struct sk_out *sk_out = pthread_getspecific(sk_out_key); - sk_lock(sk_out); - flist_add_tail(&entry->list, &sk_out->list); - sk_unlock(sk_out); + if (entry->flags & SK_F_INLINE) + handle_sk_entry(sk_out, entry); + else { + sk_lock(sk_out); + flist_add_tail(&entry->list, &sk_out->list); + sk_unlock(sk_out); - fio_mutex_up(sk_out->wait); + fio_mutex_up(&sk_out->wait); + } } static int fio_net_queue_cmd(uint16_t opcode, void *buf, off_t size, @@ -1102,9 +1111,11 @@ static int handle_sk_entry(struct sk_out *sk_out, struct sk_entry *entry) { int ret; + fio_mutex_down(&sk_out->xmit); + if (entry->flags & SK_F_VEC) ret = send_vec_entry(sk_out, entry); - if (entry->flags & SK_F_SIMPLE) { + else if (entry->flags & SK_F_SIMPLE) { uint64_t tag = 0; if (entry->tagptr) @@ -1114,6 +1125,8 @@ static int handle_sk_entry(struct sk_out *sk_out, struct sk_entry *entry) } else ret = fio_net_send_cmd(sk_out->sk, entry->opcode, entry->buf, entry->size, entry->tagptr, NULL); + fio_mutex_up(&sk_out->xmit); + if (ret) log_err("fio: failed handling cmd %s\n", fio_server_op(entry->opcode)); @@ -1177,7 +1190,7 @@ static int handle_connection(struct sk_out *sk_out) break; } else if (!ret) { fio_server_check_jobs(&job_list); - fio_mutex_down_timeout(sk_out->wait, timeout); + fio_mutex_down_timeout(&sk_out->wait, timeout); continue; } @@ -1323,8 +1336,9 @@ static int accept_loop(int listen_sk) sk_out = smalloc(sizeof(*sk_out)); sk_out->sk = sk; INIT_FLIST_HEAD(&sk_out->list); - sk_out->lock = fio_mutex_init(FIO_MUTEX_UNLOCKED); - sk_out->wait = fio_mutex_init(FIO_MUTEX_LOCKED); + __fio_mutex_init(&sk_out->lock, FIO_MUTEX_UNLOCKED); + __fio_mutex_init(&sk_out->wait, FIO_MUTEX_LOCKED); + __fio_mutex_init(&sk_out->xmit, FIO_MUTEX_UNLOCKED); pid = fork(); if (pid) { @@ -1649,7 +1663,8 @@ static int fio_send_iolog_gz(struct sk_entry *first, struct io_log *log) this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out; entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, out_pdu, this_len, - NULL, SK_F_FREE | SK_F_VEC); + NULL, SK_F_VEC | SK_F_INLINE | SK_F_FREE); + out_pdu = NULL; flist_add_tail(&entry->list, &first->next); } while (stream.avail_in); @@ -1693,7 +1708,7 @@ int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name) /* * Assemble header entry first */ - first = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, &pdu, sizeof(pdu), NULL, SK_F_COPY | SK_F_VEC); + first = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, &pdu, sizeof(pdu), NULL, SK_F_VEC | SK_F_INLINE | SK_F_COPY); /* * Now append actual log entries. Compress if we can, otherwise just @@ -1706,10 +1721,11 @@ int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name) entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, log->log, log->nr_samples * log_entry_sz(log), - NULL, SK_F_FREE | SK_F_VEC); + NULL, SK_F_VEC | SK_F_INLINE); flist_add_tail(&entry->list, &first->next); } + fio_net_queue_entry(first); return ret; } -- 2.25.1