+ fio_net_queue_cmd(FIO_NET_CMD_DU, &pdu, sizeof(pdu), NULL, SK_F_COPY);
+ }
+}
+
+#ifdef CONFIG_ZLIB
+static int __fio_append_iolog_gz(struct sk_entry *first, struct io_log *log,
+ struct io_logs *cur_log, z_stream *stream)
+{
+ unsigned int this_len;
+ void *out_pdu;
+ int ret;
+
+ stream->next_in = (void *) cur_log->log;
+ stream->avail_in = cur_log->nr_samples * log_entry_sz(log);
+
+ do {
+ struct sk_entry *entry;
+
+ /*
+ * Dirty - since the log is potentially huge, compress it into
+ * FIO_SERVER_MAX_FRAGMENT_PDU chunks and let the receiving
+ * side defragment it.
+ */
+ out_pdu = malloc(FIO_SERVER_MAX_FRAGMENT_PDU);
+
+ stream->avail_out = FIO_SERVER_MAX_FRAGMENT_PDU;
+ stream->next_out = out_pdu;
+ ret = deflate(stream, Z_BLOCK);
+ /* may be Z_OK, or Z_STREAM_END */
+ if (ret < 0) {
+ free(out_pdu);
+ return 1;
+ }
+
+ this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream->avail_out;
+
+ entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, out_pdu, this_len,
+ NULL, SK_F_VEC | SK_F_INLINE | SK_F_FREE);
+ flist_add_tail(&entry->list, &first->next);
+ } while (stream->avail_in);
+
+ return 0;
+}
+
+static int fio_append_iolog_gz(struct sk_entry *first, struct io_log *log)
+{
+ int ret = 0;
+ z_stream stream;
+
+ memset(&stream, 0, sizeof(stream));
+ stream.zalloc = Z_NULL;
+ stream.zfree = Z_NULL;
+ stream.opaque = Z_NULL;
+
+ if (deflateInit(&stream, Z_DEFAULT_COMPRESSION) != Z_OK)
+ return 1;
+
+ while (!flist_empty(&log->io_logs)) {
+ struct io_logs *cur_log;
+
+ cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
+ flist_del_init(&cur_log->list);
+
+ ret = __fio_append_iolog_gz(first, log, cur_log, &stream);
+ if (ret)
+ break;
+ }
+
+ ret = deflate(&stream, Z_FINISH);
+
+ while (ret != Z_STREAM_END) {
+ struct sk_entry *entry;
+ unsigned int this_len;
+ void *out_pdu;
+
+ out_pdu = malloc(FIO_SERVER_MAX_FRAGMENT_PDU);
+ stream.avail_out = FIO_SERVER_MAX_FRAGMENT_PDU;
+ stream.next_out = out_pdu;
+
+ ret = deflate(&stream, Z_FINISH);
+ /* may be Z_OK, or Z_STREAM_END */
+ if (ret < 0) {
+ free(out_pdu);
+ break;
+ }
+
+ this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out;
+
+ entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, out_pdu, this_len,
+ NULL, SK_F_VEC | SK_F_INLINE | SK_F_FREE);
+ flist_add_tail(&entry->list, &first->next);
+ } while (ret != Z_STREAM_END);
+
+ ret = deflateEnd(&stream);
+ if (ret == Z_OK)
+ return 0;
+
+ return 1;
+}
+#else
+static int fio_append_iolog_gz(struct sk_entry *first, struct io_log *log)
+{
+ return 1;
+}
+#endif
+
+static int fio_append_gz_chunks(struct sk_entry *first, struct io_log *log)
+{
+ struct sk_entry *entry;
+ struct flist_head *node;
+
+ pthread_mutex_lock(&log->chunk_lock);
+ flist_for_each(node, &log->chunk_list) {
+ struct iolog_compress *c;
+
+ c = flist_entry(node, struct iolog_compress, list);
+ entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, c->buf, c->len,
+ NULL, SK_F_VEC | SK_F_INLINE);
+ flist_add_tail(&entry->list, &first->next);
+ }
+ pthread_mutex_unlock(&log->chunk_lock);
+
+ return 0;
+}
+
+static int fio_append_text_log(struct sk_entry *first, struct io_log *log)
+{
+ struct sk_entry *entry;
+
+ while (!flist_empty(&log->io_logs)) {
+ struct io_logs *cur_log;
+ size_t size;
+
+ cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
+ flist_del_init(&cur_log->list);
+
+ size = cur_log->nr_samples * log_entry_sz(log);
+
+ entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, cur_log->log, size,
+ NULL, SK_F_VEC | SK_F_INLINE);
+ flist_add_tail(&entry->list, &first->next);
+ }
+
+ return 0;
+}
+
+int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
+{
+ struct cmd_iolog_pdu pdu;
+ struct sk_entry *first;
+ struct flist_head *entry;
+ int ret = 0;
+
+ pdu.nr_samples = cpu_to_le64(iolog_nr_samples(log));
+ pdu.thread_number = cpu_to_le32(td->thread_number);
+ pdu.log_type = cpu_to_le32(log->log_type);
+
+ if (!flist_empty(&log->chunk_list))
+ pdu.compressed = __cpu_to_le32(STORE_COMPRESSED);
+ else if (use_zlib)
+ pdu.compressed = __cpu_to_le32(XMIT_COMPRESSED);
+ else
+ pdu.compressed = 0;
+
+ strncpy((char *) pdu.name, name, FIO_NET_NAME_MAX);
+ pdu.name[FIO_NET_NAME_MAX - 1] = '\0';
+
+ /*
+ * We can't do this for a pre-compressed log, but for that case,
+ * log->nr_samples is zero anyway.
+ */
+ flist_for_each(entry, &log->io_logs) {
+ struct io_logs *cur_log;
+ int i;
+
+ cur_log = flist_entry(entry, struct io_logs, list);
+
+ for (i = 0; i < cur_log->nr_samples; i++) {
+ struct io_sample *s = get_sample(log, cur_log, i);
+
+ s->time = cpu_to_le64(s->time);
+ s->val = cpu_to_le64(s->val);
+ s->__ddir = cpu_to_le32(s->__ddir);
+ s->bs = cpu_to_le32(s->bs);
+
+ if (log->log_offset) {
+ struct io_sample_offset *so = (void *) s;
+
+ so->offset = cpu_to_le64(so->offset);
+ }
+ }