+ stream.avail_out = FIO_SERVER_MAX_FRAGMENT_PDU;
+ stream.next_out = out_pdu;
+ ret = deflate(&stream, Z_FINISH);
+ /* may be Z_OK, or Z_STREAM_END */
+ if (ret < 0)
+ goto err_zlib;
+
+ this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out;
+
+ if (stream.avail_in)
+ flags = FIO_NET_CMD_F_MORE;
+
+ ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG,
+ out_pdu, this_len, 0, flags);
+ if (ret)
+ goto err_zlib;
+ } while (stream.avail_in);
+
+err_zlib:
+ deflateEnd(&stream);
+err:
+ free(out_pdu);
+#endif
+ return ret;
+}
+
+int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
+{
+ struct cmd_iolog_pdu pdu;
+ int i, ret = 0;
+
+ pdu.nr_samples = cpu_to_le64(log->nr_samples);
+ pdu.thread_number = cpu_to_le32(td->thread_number);
+ pdu.log_type = cpu_to_le32(log->log_type);
+ pdu.compressed = cpu_to_le32(use_zlib);
+
+ strncpy((char *) pdu.name, name, FIO_NET_NAME_MAX);
+ pdu.name[FIO_NET_NAME_MAX - 1] = '\0';
+
+ for (i = 0; i < log->nr_samples; i++) {
+ struct io_sample *s = get_sample(log, i);
+
+ s->time = cpu_to_le64(s->time);
+ s->val = cpu_to_le64(s->val);
+ s->__ddir = cpu_to_le32(s->__ddir);
+ s->bs = cpu_to_le32(s->bs);
+
+ if (log->log_offset) {
+ struct io_sample_offset *so = (void *) s;
+
+ so->offset = cpu_to_le64(so->offset);
+ }
+ }
+
+ /*
+ * Send header first, it's not compressed.
+ */
+ ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, &pdu,
+ sizeof(pdu), 0, FIO_NET_CMD_F_MORE);
+ if (ret)
+ return ret;
+
+ /*
+ * Now send actual log, compress if we can, otherwise just plain
+ */
+ if (use_zlib)
+ return fio_send_iolog_gz(&pdu, log);
+
+ return fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, log->log,
+ log->nr_samples * log_entry_sz(log), 0, 0);
+}
+
+void fio_server_send_add_job(struct thread_data *td)
+{
+ struct cmd_add_job_pdu pdu;
+
+ memset(&pdu, 0, sizeof(pdu));
+ pdu.thread_number = cpu_to_le32(td->thread_number);
+ pdu.groupid = cpu_to_le32(td->groupid);
+ convert_thread_options_to_net(&pdu.top, &td->o);
+
+ fio_net_send_cmd(server_fd, FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), NULL, NULL);
+}
+
+void fio_server_send_start(struct thread_data *td)
+{
+ assert(server_fd != -1);
+
+ fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_SERVER_START, 0, NULL);
+}
+
+int fio_server_get_verify_state(const char *name, int threadnumber,
+ void **datap, int *version)
+{
+ struct thread_io_list *s;
+ struct cmd_sendfile out;
+ struct cmd_reply *rep;
+ uint64_t tag;
+ void *data;
+
+ dprint(FD_NET, "server: request verify state\n");
+
+ rep = smalloc(sizeof(*rep));
+ if (!rep) {
+ log_err("fio: smalloc pool too small\n");
+ return 1;
+ }
+
+ __fio_mutex_init(&rep->lock, FIO_MUTEX_LOCKED);
+ rep->data = NULL;
+ rep->error = 0;
+
+ verify_state_gen_name((char *) out.path, sizeof(out.path), name, me,
+ threadnumber);
+ tag = (uint64_t) (uintptr_t) rep;
+ fio_net_send_cmd(server_fd, FIO_NET_CMD_SENDFILE, &out, sizeof(out),
+ &tag, NULL);
+
+ /*
+ * Wait for the backend to receive the reply
+ */
+ if (fio_mutex_down_timeout(&rep->lock, 10)) {
+ log_err("fio: timed out waiting for reply\n");
+ goto fail;
+ }
+
+ if (rep->error) {
+ log_err("fio: failure on receiving state file: %s\n", strerror(rep->error));
+fail:
+ *datap = NULL;
+ sfree(rep);
+ fio_net_send_quit(server_fd);
+ return 1;
+ }
+
+ /*
+ * The format is verify_state_hdr, then thread_io_list. Verify
+ * the header, and the thread_io_list checksum
+ */
+ s = rep->data + sizeof(struct verify_state_hdr);
+ if (verify_state_hdr(rep->data, s, version))
+ goto fail;
+
+ /*
+ * Don't need the header from now, copy just the thread_io_list
+ */
+ rep->size -= sizeof(struct verify_state_hdr);
+ data = malloc(rep->size);
+ memcpy(data, s, rep->size);
+ *datap = data;
+
+ sfree(rep->data);
+ __fio_mutex_remove(&rep->lock);
+ sfree(rep);
+ return 0;