+ p = flist_entry(entry, struct print_option, list);
+ memset(&pdu, 0, sizeof(pdu));
+
+ if (gid == -1U) {
+ pdu.global = __cpu_to_le16(1);
+ pdu.groupid = 0;
+ } else {
+ pdu.global = 0;
+ pdu.groupid = cpu_to_le32(gid);
+ }
+ len = strlen(p->name);
+ if (len >= sizeof(pdu.name)) {
+ len = sizeof(pdu.name) - 1;
+ pdu.truncated = __cpu_to_le16(1);
+ }
+ memcpy(pdu.name, p->name, len);
+ if (p->value) {
+ len = strlen(p->value);
+ if (len >= sizeof(pdu.value)) {
+ len = sizeof(pdu.value) - 1;
+ pdu.truncated = __cpu_to_le16(1);
+ }
+ memcpy(pdu.value, p->value, len);
+ }
+ fio_net_queue_cmd(FIO_NET_CMD_JOB_OPT, &pdu, sizeof(pdu), NULL, SK_F_COPY);
+ }
+}
+
+static void convert_agg(struct disk_util_agg *dst, struct disk_util_agg *src)
+{
+ int i;
+
+ for (i = 0; i < 2; i++) {
+ dst->ios[i] = cpu_to_le64(src->ios[i]);
+ dst->merges[i] = cpu_to_le64(src->merges[i]);
+ dst->sectors[i] = cpu_to_le64(src->sectors[i]);
+ dst->ticks[i] = cpu_to_le64(src->ticks[i]);
+ }
+
+ dst->io_ticks = cpu_to_le64(src->io_ticks);
+ dst->time_in_queue = cpu_to_le64(src->time_in_queue);
+ dst->slavecount = cpu_to_le32(src->slavecount);
+ dst->max_util.u.i = cpu_to_le64(fio_double_to_uint64(src->max_util.u.f));
+}
+
+static void convert_dus(struct disk_util_stat *dst, struct disk_util_stat *src)
+{
+ int i;
+
+ dst->name[FIO_DU_NAME_SZ - 1] = '\0';
+ strncpy((char *) dst->name, (char *) src->name, FIO_DU_NAME_SZ - 1);
+
+ for (i = 0; i < 2; i++) {
+ dst->s.ios[i] = cpu_to_le64(src->s.ios[i]);
+ dst->s.merges[i] = cpu_to_le64(src->s.merges[i]);
+ dst->s.sectors[i] = cpu_to_le64(src->s.sectors[i]);
+ dst->s.ticks[i] = cpu_to_le64(src->s.ticks[i]);
+ }
+
+ dst->s.io_ticks = cpu_to_le64(src->s.io_ticks);
+ dst->s.time_in_queue = cpu_to_le64(src->s.time_in_queue);
+ dst->s.msec = cpu_to_le64(src->s.msec);
+}
+
+void fio_server_send_du(void)
+{
+ struct disk_util *du;
+ struct flist_head *entry;
+ struct cmd_du_pdu pdu;
+
+ dprint(FD_NET, "server: sending disk_util %d\n", !flist_empty(&disk_list));
+
+ memset(&pdu, 0, sizeof(pdu));
+
+ flist_for_each(entry, &disk_list) {
+ du = flist_entry(entry, struct disk_util, list);
+
+ convert_dus(&pdu.dus, &du->dus);
+ convert_agg(&pdu.agg, &du->agg);
+
+ fio_net_queue_cmd(FIO_NET_CMD_DU, &pdu, sizeof(pdu), NULL, SK_F_COPY);
+ }
+}
+
+#ifdef CONFIG_ZLIB
+
+static inline void __fio_net_prep_tail(z_stream *stream, void *out_pdu,
+ struct sk_entry **last_entry,
+ struct sk_entry *first)
+{
+ unsigned int this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream->avail_out;
+
+ *last_entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, out_pdu, this_len,
+ NULL, SK_F_VEC | SK_F_INLINE | SK_F_FREE);
+ if (*last_entry)
+ flist_add_tail(&(*last_entry)->list, &first->next);
+}
+
+/*
+ * Deflates the next input given, creating as many new packets in the
+ * linked list as necessary.
+ */
+static int __deflate_pdu_buffer(void *next_in, unsigned int next_sz, void **out_pdu,
+ struct sk_entry **last_entry, z_stream *stream,
+ struct sk_entry *first)
+{
+ int ret;
+
+ stream->next_in = next_in;
+ stream->avail_in = next_sz;
+ do {
+ if (!stream->avail_out) {
+ __fio_net_prep_tail(stream, *out_pdu, last_entry, first);
+ if (*last_entry == NULL)
+ return 1;
+
+ *out_pdu = malloc(FIO_SERVER_MAX_FRAGMENT_PDU);
+
+ stream->avail_out = FIO_SERVER_MAX_FRAGMENT_PDU;
+ stream->next_out = *out_pdu;
+ }
+
+ ret = deflate(stream, Z_BLOCK);
+
+ if (ret < 0) {
+ free(*out_pdu);
+ return 1;
+ }
+ } while (stream->avail_in);
+
+ return 0;
+}
+
+static int __fio_append_iolog_gz_hist(struct sk_entry *first, struct io_log *log,
+ struct io_logs *cur_log, z_stream *stream)
+{
+ struct sk_entry *entry;
+ void *out_pdu;
+ int ret, i, j;
+ int sample_sz = log_entry_sz(log);
+
+ out_pdu = malloc(FIO_SERVER_MAX_FRAGMENT_PDU);
+ stream->avail_out = FIO_SERVER_MAX_FRAGMENT_PDU;
+ stream->next_out = out_pdu;
+
+ for (i = 0; i < cur_log->nr_samples; i++) {
+ struct io_sample *s;
+ struct io_u_plat_entry *cur_plat_entry, *prev_plat_entry;
+ uint64_t *cur_plat, *prev_plat;
+
+ s = get_sample(log, cur_log, i);
+ ret = __deflate_pdu_buffer(s, sample_sz, &out_pdu, &entry, stream, first);
+ if (ret)
+ return ret;
+
+ /* Do the subtraction on server side so that client doesn't have to
+ * reconstruct our linked list from packets.
+ */
+ cur_plat_entry = s->data.plat_entry;
+ prev_plat_entry = flist_first_entry(&cur_plat_entry->list, struct io_u_plat_entry, list);
+ cur_plat = cur_plat_entry->io_u_plat;
+ prev_plat = prev_plat_entry->io_u_plat;
+
+ for (j = 0; j < FIO_IO_U_PLAT_NR; j++) {
+ cur_plat[j] -= prev_plat[j];
+ }
+
+ flist_del(&prev_plat_entry->list);
+ free(prev_plat_entry);
+
+ ret = __deflate_pdu_buffer(cur_plat_entry, sizeof(*cur_plat_entry),
+ &out_pdu, &entry, stream, first);
+
+ if (ret)
+ return ret;
+ }
+
+ __fio_net_prep_tail(stream, out_pdu, &entry, first);
+ return entry == NULL;
+}
+
+static int __fio_append_iolog_gz(struct sk_entry *first, struct io_log *log,
+ struct io_logs *cur_log, z_stream *stream)
+{
+ unsigned int this_len;
+ void *out_pdu;
+ int ret;
+
+ if (log->log_type == IO_LOG_TYPE_HIST)
+ return __fio_append_iolog_gz_hist(first, log, cur_log, stream);
+
+ stream->next_in = (void *) cur_log->log;
+ stream->avail_in = cur_log->nr_samples * log_entry_sz(log);
+
+ do {
+ struct sk_entry *entry;
+
+ /*
+ * Dirty - since the log is potentially huge, compress it into
+ * FIO_SERVER_MAX_FRAGMENT_PDU chunks and let the receiving
+ * side defragment it.
+ */
+ out_pdu = malloc(FIO_SERVER_MAX_FRAGMENT_PDU);
+
+ stream->avail_out = FIO_SERVER_MAX_FRAGMENT_PDU;
+ stream->next_out = out_pdu;
+ ret = deflate(stream, Z_BLOCK);
+ /* may be Z_OK, or Z_STREAM_END */
+ if (ret < 0) {
+ free(out_pdu);
+ return 1;
+ }
+
+ this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream->avail_out;
+
+ entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, out_pdu, this_len,
+ NULL, SK_F_VEC | SK_F_INLINE | SK_F_FREE);
+ if (!entry) {
+ free(out_pdu);
+ return 1;
+ }
+ flist_add_tail(&entry->list, &first->next);
+ } while (stream->avail_in);
+
+ return 0;
+}
+
+static int fio_append_iolog_gz(struct sk_entry *first, struct io_log *log)
+{
+ z_stream stream = {
+ .zalloc = Z_NULL,
+ .zfree = Z_NULL,
+ .opaque = Z_NULL,
+ };
+ int ret = 0;
+
+ if (deflateInit(&stream, Z_DEFAULT_COMPRESSION) != Z_OK)
+ return 1;
+
+ while (!flist_empty(&log->io_logs)) {
+ struct io_logs *cur_log;
+
+ cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
+ flist_del_init(&cur_log->list);
+
+ ret = __fio_append_iolog_gz(first, log, cur_log, &stream);
+ if (ret)
+ break;
+ }
+
+ ret = deflate(&stream, Z_FINISH);
+
+ while (ret != Z_STREAM_END) {
+ struct sk_entry *entry;
+ unsigned int this_len;
+ void *out_pdu;
+
+ out_pdu = malloc(FIO_SERVER_MAX_FRAGMENT_PDU);
+ stream.avail_out = FIO_SERVER_MAX_FRAGMENT_PDU;
+ stream.next_out = out_pdu;
+
+ ret = deflate(&stream, Z_FINISH);
+ /* may be Z_OK, or Z_STREAM_END */
+ if (ret < 0) {
+ free(out_pdu);
+ break;
+ }
+
+ this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out;
+
+ entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, out_pdu, this_len,
+ NULL, SK_F_VEC | SK_F_INLINE | SK_F_FREE);
+ if (!entry) {
+ free(out_pdu);
+ break;
+ }
+ flist_add_tail(&entry->list, &first->next);
+ } while (ret != Z_STREAM_END);
+
+ ret = deflateEnd(&stream);
+ if (ret == Z_OK)
+ return 0;
+
+ return 1;
+}
+#else
+static int fio_append_iolog_gz(struct sk_entry *first, struct io_log *log)
+{
+ return 1;
+}
+#endif
+
+static int fio_append_gz_chunks(struct sk_entry *first, struct io_log *log)
+{
+ struct sk_entry *entry;
+ struct flist_head *node;
+ int ret = 0;
+
+ pthread_mutex_lock(&log->chunk_lock);
+ flist_for_each(node, &log->chunk_list) {
+ struct iolog_compress *c;
+
+ c = flist_entry(node, struct iolog_compress, list);
+ entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, c->buf, c->len,
+ NULL, SK_F_VEC | SK_F_INLINE);
+ if (!entry) {
+ ret = 1;
+ break;
+ }
+ flist_add_tail(&entry->list, &first->next);
+ }
+ pthread_mutex_unlock(&log->chunk_lock);
+ return ret;
+}
+
+static int fio_append_text_log(struct sk_entry *first, struct io_log *log)
+{
+ struct sk_entry *entry;
+ int ret = 0;
+
+ while (!flist_empty(&log->io_logs)) {
+ struct io_logs *cur_log;
+ size_t size;
+
+ cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
+ flist_del_init(&cur_log->list);
+
+ size = cur_log->nr_samples * log_entry_sz(log);
+
+ entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, cur_log->log, size,
+ NULL, SK_F_VEC | SK_F_INLINE);
+ if (!entry) {
+ ret = 1;
+ break;
+ }
+ flist_add_tail(&entry->list, &first->next);
+ }
+
+ return ret;
+}
+
+int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
+{
+ struct cmd_iolog_pdu pdu = {
+ .nr_samples = cpu_to_le64(iolog_nr_samples(log)),
+ .thread_number = cpu_to_le32(td->thread_number),
+ .log_type = cpu_to_le32(log->log_type),
+ .log_hist_coarseness = cpu_to_le32(log->hist_coarseness),
+ };
+ struct sk_entry *first;
+ struct flist_head *entry;
+ int ret = 0;
+
+ if (!flist_empty(&log->chunk_list))
+ pdu.compressed = __cpu_to_le32(STORE_COMPRESSED);
+ else if (use_zlib)
+ pdu.compressed = __cpu_to_le32(XMIT_COMPRESSED);
+ else
+ pdu.compressed = 0;
+
+ strncpy((char *) pdu.name, name, FIO_NET_NAME_MAX);
+ pdu.name[FIO_NET_NAME_MAX - 1] = '\0';
+
+ /*
+ * We can't do this for a pre-compressed log, but for that case,
+ * log->nr_samples is zero anyway.
+ */
+ flist_for_each(entry, &log->io_logs) {
+ struct io_logs *cur_log;
+ int i;
+
+ cur_log = flist_entry(entry, struct io_logs, list);
+
+ for (i = 0; i < cur_log->nr_samples; i++) {
+ struct io_sample *s = get_sample(log, cur_log, i);
+
+ s->time = cpu_to_le64(s->time);
+ s->data.val = cpu_to_le64(s->data.val);
+ s->__ddir = cpu_to_le32(s->__ddir);
+ s->bs = cpu_to_le64(s->bs);
+
+ if (log->log_offset) {
+ struct io_sample_offset *so = (void *) s;
+
+ so->offset = cpu_to_le64(so->offset);
+ }
+ }
+ }
+
+ /*
+ * Assemble header entry first
+ */
+ first = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, &pdu, sizeof(pdu), NULL, SK_F_VEC | SK_F_INLINE | SK_F_COPY);
+ if (!first)
+ return 1;
+
+ /*
+ * Now append actual log entries. If log compression was enabled on
+ * the job, just send out the compressed chunks directly. If we
+ * have a plain log, compress if we can, then send. Otherwise, send
+ * the plain text output.
+ */
+ if (!flist_empty(&log->chunk_list))
+ ret = fio_append_gz_chunks(first, log);
+ else if (use_zlib)
+ ret = fio_append_iolog_gz(first, log);
+ else
+ ret = fio_append_text_log(first, log);
+
+ fio_net_queue_entry(first);
+ return ret;
+}
+
+void fio_server_send_add_job(struct thread_data *td)
+{
+ struct cmd_add_job_pdu pdu = {
+ .thread_number = cpu_to_le32(td->thread_number),
+ .groupid = cpu_to_le32(td->groupid),
+ };
+
+ convert_thread_options_to_net(&pdu.top, &td->o);
+
+ fio_net_queue_cmd(FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), NULL,
+ SK_F_COPY);
+}
+
+void fio_server_send_start(struct thread_data *td)
+{
+ struct sk_out *sk_out = pthread_getspecific(sk_out_key);
+
+ assert(sk_out->sk != -1);
+
+ fio_net_queue_cmd(FIO_NET_CMD_SERVER_START, NULL, 0, NULL, SK_F_SIMPLE);
+}
+
+int fio_server_get_verify_state(const char *name, int threadnumber,
+ void **datap)
+{
+ struct thread_io_list *s;
+ struct cmd_sendfile out;
+ struct cmd_reply *rep;
+ uint64_t tag;
+ void *data;
+ int ret;
+
+ dprint(FD_NET, "server: request verify state\n");
+
+ rep = smalloc(sizeof(*rep));
+ if (!rep)
+ return ENOMEM;
+
+ __fio_sem_init(&rep->lock, FIO_SEM_LOCKED);
+ rep->data = NULL;
+ rep->error = 0;
+
+ verify_state_gen_name((char *) out.path, sizeof(out.path), name, me,
+ threadnumber);
+ tag = (uint64_t) (uintptr_t) rep;
+ fio_net_queue_cmd(FIO_NET_CMD_SENDFILE, &out, sizeof(out), &tag,
+ SK_F_COPY);
+
+ /*
+ * Wait for the backend to receive the reply
+ */
+ if (fio_sem_down_timeout(&rep->lock, 10000)) {
+ log_err("fio: timed out waiting for reply\n");
+ ret = ETIMEDOUT;
+ goto fail;
+ }
+
+ if (rep->error) {
+ log_err("fio: failure on receiving state file %s: %s\n",
+ out.path, strerror(rep->error));
+ ret = rep->error;
+fail:
+ *datap = NULL;
+ sfree(rep);
+ fio_net_queue_quit();
+ return ret;
+ }
+
+ /*
+ * The format is verify_state_hdr, then thread_io_list. Verify
+ * the header, and the thread_io_list checksum
+ */
+ s = rep->data + sizeof(struct verify_state_hdr);
+ if (verify_state_hdr(rep->data, s)) {
+ ret = EILSEQ;
+ goto fail;
+ }
+
+ /*
+ * Don't need the header from now, copy just the thread_io_list
+ */
+ ret = 0;
+ rep->size -= sizeof(struct verify_state_hdr);
+ data = malloc(rep->size);
+ memcpy(data, s, rep->size);
+ *datap = data;
+
+ sfree(rep->data);
+ __fio_sem_remove(&rep->lock);
+ sfree(rep);
+ return ret;