SK_F_FREE = 1,
SK_F_COPY = 2,
SK_F_SIMPLE = 4,
+ SK_F_VEC = 8,
};
struct sk_entry {
off_t size;
uint64_t *tagptr;
int flags;
+ struct flist_head next;
};
struct sk_out {
if (!total_len)
return 0;
- if (errno)
- return -errno;
-
return 1;
}
return ret;
}
-static int fio_net_queue_cmd(uint16_t opcode, void *buf, off_t size,
- uint64_t *tagptr, int flags)
+struct sk_entry *fio_net_prep_cmd(uint16_t opcode, void *buf, off_t size,
+ uint64_t *tagptr, int flags)
{
- struct sk_out *sk_out = pthread_getspecific(sk_out_key);
struct sk_entry *entry;
entry = smalloc(sizeof(*entry));
+ INIT_FLIST_HEAD(&entry->next);
entry->opcode = opcode;
if (flags & SK_F_COPY) {
entry->buf = smalloc(size);
entry->tagptr = tagptr;
entry->flags = flags;
+ return entry;
+}
+
+static void fio_net_queue_entry(struct sk_entry *entry)
+{
+ struct sk_out *sk_out = pthread_getspecific(sk_out_key);
+
sk_lock(sk_out);
flist_add_tail(&entry->list, &sk_out->list);
sk_unlock(sk_out);
fio_mutex_up(sk_out->wait);
+}
+static int fio_net_queue_cmd(uint16_t opcode, void *buf, off_t size,
+ uint64_t *tagptr, int flags)
+{
+ struct sk_entry *entry;
+
+ entry = fio_net_prep_cmd(opcode, buf, size, tagptr, flags);
+ fio_net_queue_entry(entry);
return 0;
}
return ret;
}
-static void handle_sk_entry(struct sk_out *sk_out, struct sk_entry *entry)
+/*
+ * Send a command with a separate PDU, not inlined in the command
+ */
+static int fio_send_cmd_ext_pdu(int sk, uint16_t opcode, const void *buf,
+ off_t size, uint64_t tag, uint32_t flags)
{
- if (entry->flags & SK_F_SIMPLE) {
- uint64_t tag = 0;
+ struct fio_net_cmd cmd;
+ struct iovec iov[2];
- if (entry->tagptr)
- tag = *entry->tagptr;
+ iov[0].iov_base = (void *) &cmd;
+ iov[0].iov_len = sizeof(cmd);
+ iov[1].iov_base = (void *) buf;
+ iov[1].iov_len = size;
- fio_net_send_simple_cmd(sk_out->sk, entry->opcode, tag, NULL);
- } else
- fio_net_send_cmd(sk_out->sk, entry->opcode, entry->buf, entry->size, entry->tagptr, NULL);
+ __fio_init_net_cmd(&cmd, opcode, size, tag);
+ cmd.flags = __cpu_to_le32(flags);
+ fio_net_cmd_crc_pdu(&cmd, buf);
+ return fio_sendv_data(sk, iov, 2);
+}
+
+static void finish_entry(struct sk_entry *entry)
+{
if (entry->flags & SK_F_FREE)
free(entry->buf);
else if (entry->flags & SK_F_COPY)
sfree(entry);
}
-static void handle_xmits(struct sk_out *sk_out)
+static void entry_set_flags_tag(struct sk_entry *entry, struct flist_head *list,
+ unsigned int *flags, uint64_t *tag)
+{
+ if (!flist_empty(list))
+ *flags = FIO_NET_CMD_F_MORE;
+ else
+ *flags = 0;
+
+ if (entry->tagptr)
+ *tag = *entry->tagptr;
+ else
+ *tag = 0;
+}
+
+static int send_vec_entry(struct sk_out *sk_out, struct sk_entry *first)
+{
+ unsigned int flags;
+ uint64_t tag;
+ int ret;
+
+ entry_set_flags_tag(first, &first->next, &flags, &tag);
+
+ ret = fio_send_cmd_ext_pdu(sk_out->sk, first->opcode, first->buf, first->size, tag, flags);
+
+ while (!flist_empty(&first->next)) {
+ struct sk_entry *next;
+
+ next = flist_first_entry(&first->next, struct sk_entry, list);
+ flist_del_init(&next->list);
+
+ entry_set_flags_tag(next, &first->next, &flags, &tag);
+
+ ret += fio_send_cmd_ext_pdu(sk_out->sk, next->opcode, next->buf, next->size, tag, flags);
+ finish_entry(next);
+ }
+
+ return ret;
+}
+
+static int handle_sk_entry(struct sk_out *sk_out, struct sk_entry *entry)
+{
+ int ret;
+
+ if (entry->flags & SK_F_VEC)
+ ret = send_vec_entry(sk_out, entry);
+ if (entry->flags & SK_F_SIMPLE) {
+ uint64_t tag = 0;
+
+ if (entry->tagptr)
+ tag = *entry->tagptr;
+
+ ret = fio_net_send_simple_cmd(sk_out->sk, entry->opcode, tag, NULL);
+ } else
+ ret = fio_net_send_cmd(sk_out->sk, entry->opcode, entry->buf, entry->size, entry->tagptr, NULL);
+
+ if (ret)
+ log_err("fio: failed handling cmd %s\n", fio_server_op(entry->opcode));
+
+ finish_entry(entry);
+ return ret;
+}
+
+static int handle_xmits(struct sk_out *sk_out)
{
struct sk_entry *entry;
FLIST_HEAD(list);
+ int ret = 0;
sk_lock(sk_out);
if (flist_empty(&sk_out->list)) {
sk_unlock(sk_out);
- return;
+ return 0;
}
flist_splice_init(&sk_out->list, &list);
while (!flist_empty(&list)) {
entry = flist_entry(list.next, struct sk_entry, list);
flist_del(&entry->list);
- handle_sk_entry(sk_out, entry);
+ ret += handle_sk_entry(sk_out, entry);
}
+
+ return ret;
}
static int handle_connection(struct sk_out *sk_out)
}
}
-/*
- * Send a command with a separate PDU, not inlined in the command
- */
-static int fio_send_cmd_ext_pdu(int sk, uint16_t opcode, const void *buf,
- off_t size, uint64_t tag, uint32_t flags)
-{
- struct fio_net_cmd cmd;
- struct iovec iov[2];
-
- iov[0].iov_base = (void *) &cmd;
- iov[0].iov_len = sizeof(cmd);
- iov[1].iov_base = (void *) buf;
- iov[1].iov_len = size;
-
- __fio_init_net_cmd(&cmd, opcode, size, tag);
- cmd.flags = __cpu_to_le32(flags);
- fio_net_cmd_crc_pdu(&cmd, buf);
-
- return fio_sendv_data(sk, iov, 2);
-}
-
-static int fio_send_iolog_gz(struct sk_out *sk_out, struct cmd_iolog_pdu *pdu,
- struct io_log *log)
+static int fio_send_iolog_gz(struct sk_entry *first, struct io_log *log)
{
int ret = 0;
#ifdef CONFIG_ZLIB
+ struct sk_entry *entry;
z_stream stream;
void *out_pdu;
stream.avail_in = log->nr_samples * log_entry_sz(log);
do {
- unsigned int this_len, flags = 0;
+ unsigned int this_len;
stream.avail_out = FIO_SERVER_MAX_FRAGMENT_PDU;
stream.next_out = out_pdu;
this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out;
- if (stream.avail_in)
- flags = FIO_NET_CMD_F_MORE;
-
- ret = fio_send_cmd_ext_pdu(sk_out->sk, FIO_NET_CMD_IOLOG,
- out_pdu, this_len, 0, flags);
- if (ret)
- goto err_zlib;
+ entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, out_pdu, this_len,
+ NULL, SK_F_FREE | SK_F_VEC);
+ flist_add_tail(&entry->list, &first->next);
} while (stream.avail_in);
err_zlib:
int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
{
- struct sk_out *sk_out = pthread_getspecific(sk_out_key);
struct cmd_iolog_pdu pdu;
+ struct sk_entry *first;
int i, ret = 0;
pdu.nr_samples = cpu_to_le64(log->nr_samples);
}
/*
- * Send header first, it's not compressed.
+ * Assemble header entry first
*/
- ret = fio_send_cmd_ext_pdu(sk_out->sk, FIO_NET_CMD_IOLOG, &pdu,
- sizeof(pdu), 0, FIO_NET_CMD_F_MORE);
- if (ret)
- return ret;
+ first = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, &pdu, sizeof(pdu), NULL, SK_F_COPY | SK_F_VEC);
/*
- * Now send actual log, compress if we can, otherwise just plain
+ * Now append actual log entries. Compress if we can, otherwise just
+ * plain text output.
*/
if (use_zlib)
- ret = fio_send_iolog_gz(sk_out, &pdu, log);
+ ret = fio_send_iolog_gz(first, log);
else {
- ret = fio_send_cmd_ext_pdu(sk_out->sk, FIO_NET_CMD_IOLOG,
- log->log, log->nr_samples * log_entry_sz(log),
- 0, 0);
+ struct sk_entry *entry;
+
+ entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, log->log,
+ log->nr_samples * log_entry_sz(log),
+ NULL, SK_F_FREE | SK_F_VEC);
+ flist_add_tail(&entry->list, &first->next);
}
return ret;