From: Jens Axboe Date: Thu, 10 Dec 2015 23:07:32 +0000 (-0700) Subject: server: make the io log transmit use the new infrastructure X-Git-Tag: fio-2.3~33^2~10 X-Git-Url: https://git.kernel.dk/?a=commitdiff_plain;h=02594e378911ba260a7efddd695d3a281bdb392a;p=fio.git server: make the io log transmit use the new infrastructure Add support for vectored commands, and pass them on the backend pipe as well. Signed-off-by: Jens Axboe --- diff --git a/server.c b/server.c index 0942e631..d7dca0e4 100644 --- a/server.c +++ b/server.c @@ -36,6 +36,7 @@ enum { SK_F_FREE = 1, SK_F_COPY = 2, SK_F_SIMPLE = 4, + SK_F_VEC = 8, }; struct sk_entry { @@ -45,6 +46,7 @@ struct sk_entry { off_t size; uint64_t *tagptr; int flags; + struct flist_head next; }; struct sk_out { @@ -449,13 +451,13 @@ int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size, return ret; } -static int fio_net_queue_cmd(uint16_t opcode, void *buf, off_t size, - uint64_t *tagptr, int flags) +struct sk_entry *fio_net_prep_cmd(uint16_t opcode, void *buf, off_t size, + uint64_t *tagptr, int flags) { - struct sk_out *sk_out = pthread_getspecific(sk_out_key); struct sk_entry *entry; entry = smalloc(sizeof(*entry)); + INIT_FLIST_HEAD(&entry->next); entry->opcode = opcode; if (flags & SK_F_COPY) { entry->buf = smalloc(size); @@ -466,12 +468,27 @@ static int fio_net_queue_cmd(uint16_t opcode, void *buf, off_t size, entry->tagptr = tagptr; entry->flags = flags; + return entry; +} + +static void fio_net_queue_entry(struct sk_entry *entry) +{ + struct sk_out *sk_out = pthread_getspecific(sk_out_key); + sk_lock(sk_out); flist_add_tail(&entry->list, &sk_out->list); sk_unlock(sk_out); fio_mutex_up(sk_out->wait); +} + +static int fio_net_queue_cmd(uint16_t opcode, void *buf, off_t size, + uint64_t *tagptr, int flags) +{ + struct sk_entry *entry; + entry = fio_net_prep_cmd(opcode, buf, size, tagptr, flags); + fio_net_queue_entry(entry); return 0; } @@ -949,8 +966,76 @@ static int handle_command(struct sk_out *sk_out, struct flist_head *job_list, return ret; } +/* + * Send a command with a separate PDU, not inlined in the command + */ +static int fio_send_cmd_ext_pdu(int sk, uint16_t opcode, const void *buf, + off_t size, uint64_t tag, uint32_t flags) +{ + struct fio_net_cmd cmd; + struct iovec iov[2]; + + iov[0].iov_base = (void *) &cmd; + iov[0].iov_len = sizeof(cmd); + iov[1].iov_base = (void *) buf; + iov[1].iov_len = size; + + __fio_init_net_cmd(&cmd, opcode, size, tag); + cmd.flags = __cpu_to_le32(flags); + fio_net_cmd_crc_pdu(&cmd, buf); + + return fio_sendv_data(sk, iov, 2); +} + +static void finish_entry(struct sk_entry *entry) +{ + if (entry->flags & SK_F_FREE) + free(entry->buf); + else if (entry->flags & SK_F_COPY) + sfree(entry->buf); + + sfree(entry); +} + +static void send_vec_entry(struct sk_out *sk_out, struct sk_entry *first) +{ + uint64_t tag; + int flags; + + if (!flist_empty(&first->next)) + flags = FIO_NET_CMD_F_MORE; + else + flags = 0; + + if (first->tagptr) + tag = *first->tagptr; + else + tag = 0; + + fio_send_cmd_ext_pdu(sk_out->sk, first->opcode, first->buf, first->size, tag, flags); + + while (!flist_empty(&first->next)) { + struct sk_entry *next; + + next = flist_first_entry(&first->next, struct sk_entry, list); + flist_del_init(&next->list); + if (flist_empty(&first->next)) + flags = 0; + + if (next->tagptr) + tag = *next->tagptr; + else + tag = 0; + + fio_send_cmd_ext_pdu(sk_out->sk, next->opcode, next->buf, next->size, tag, flags); + finish_entry(next); + } +} + static void handle_sk_entry(struct sk_out *sk_out, struct sk_entry *entry) { + if (entry->flags & SK_F_VEC) + send_vec_entry(sk_out, entry); if (entry->flags & SK_F_SIMPLE) { uint64_t tag = 0; @@ -961,12 +1046,7 @@ static void handle_sk_entry(struct sk_out *sk_out, struct sk_entry *entry) } else fio_net_send_cmd(sk_out->sk, entry->opcode, entry->buf, entry->size, entry->tagptr, NULL); - if (entry->flags & SK_F_FREE) - free(entry->buf); - else if (entry->flags & SK_F_COPY) - sfree(entry->buf); - - sfree(entry); + finish_entry(entry); } static void handle_xmits(struct sk_out *sk_out) @@ -1416,32 +1496,11 @@ void fio_server_send_du(void) } } -/* - * Send a command with a separate PDU, not inlined in the command - */ -static int fio_send_cmd_ext_pdu(int sk, uint16_t opcode, const void *buf, - off_t size, uint64_t tag, uint32_t flags) -{ - struct fio_net_cmd cmd; - struct iovec iov[2]; - - iov[0].iov_base = (void *) &cmd; - iov[0].iov_len = sizeof(cmd); - iov[1].iov_base = (void *) buf; - iov[1].iov_len = size; - - __fio_init_net_cmd(&cmd, opcode, size, tag); - cmd.flags = __cpu_to_le32(flags); - fio_net_cmd_crc_pdu(&cmd, buf); - - return fio_sendv_data(sk, iov, 2); -} - -static int fio_send_iolog_gz(struct sk_out *sk_out, struct cmd_iolog_pdu *pdu, - struct io_log *log) +static int fio_send_iolog_gz(struct sk_entry *first, struct io_log *log) { int ret = 0; #ifdef CONFIG_ZLIB + struct sk_entry *entry; z_stream stream; void *out_pdu; @@ -1465,7 +1524,7 @@ static int fio_send_iolog_gz(struct sk_out *sk_out, struct cmd_iolog_pdu *pdu, stream.avail_in = log->nr_samples * log_entry_sz(log); do { - unsigned int this_len, flags = 0; + unsigned int this_len; stream.avail_out = FIO_SERVER_MAX_FRAGMENT_PDU; stream.next_out = out_pdu; @@ -1476,13 +1535,9 @@ static int fio_send_iolog_gz(struct sk_out *sk_out, struct cmd_iolog_pdu *pdu, this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out; - if (stream.avail_in) - flags = FIO_NET_CMD_F_MORE; - - ret = fio_send_cmd_ext_pdu(sk_out->sk, FIO_NET_CMD_IOLOG, - out_pdu, this_len, 0, flags); - if (ret) - goto err_zlib; + entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, out_pdu, this_len, + NULL, SK_F_FREE | SK_F_VEC); + flist_add_tail(&entry->list, &first->next); } while (stream.avail_in); err_zlib: @@ -1495,8 +1550,8 @@ err: int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name) { - struct sk_out *sk_out = pthread_getspecific(sk_out_key); struct cmd_iolog_pdu pdu; + struct sk_entry *first; int i, ret = 0; pdu.nr_samples = cpu_to_le64(log->nr_samples); @@ -1523,22 +1578,23 @@ int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name) } /* - * Send header first, it's not compressed. + * Assemble header entry first */ - ret = fio_send_cmd_ext_pdu(sk_out->sk, FIO_NET_CMD_IOLOG, &pdu, - sizeof(pdu), 0, FIO_NET_CMD_F_MORE); - if (ret) - return ret; + first = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, &pdu, sizeof(pdu), NULL, SK_F_COPY | SK_F_VEC); /* - * Now send actual log, compress if we can, otherwise just plain + * Now append actual log entries. Compress if we can, otherwise just + * plain text output. */ if (use_zlib) - ret = fio_send_iolog_gz(sk_out, &pdu, log); + ret = fio_send_iolog_gz(first, log); else { - ret = fio_send_cmd_ext_pdu(sk_out->sk, FIO_NET_CMD_IOLOG, - log->log, log->nr_samples * log_entry_sz(log), - 0, 0); + struct sk_entry *entry; + + entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, log->log, + log->nr_samples * log_entry_sz(log), + NULL, SK_F_FREE | SK_F_VEC); + flist_add_tail(&entry->list, &first->next); } return ret;