+static void handle_eta(struct fio_client *client, struct fio_net_cmd *cmd)
+{
+ struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
+ struct client_eta *eta = (struct client_eta *) (uintptr_t) cmd->tag;
+
+ dprint(FD_NET, "client: got eta tag %p, %d\n", eta, eta->pending);
+
+ assert(client->eta_in_flight == eta);
+
+ client->eta_in_flight = NULL;
+ flist_del_init(&client->eta_list);
+
+ if (client->ops->jobs_eta)
+ client->ops->jobs_eta(client, je);
+
+ fio_client_sum_jobs_eta(&eta->eta, je);
+ fio_client_dec_jobs_eta(eta, client->ops->eta);
+}
+
+static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd)
+{
+ struct cmd_probe_reply_pdu *probe = (struct cmd_probe_reply_pdu *) cmd->payload;
+ const char *os, *arch;
+ char bit[16];
+
+ os = fio_get_os_string(probe->os);
+ if (!os)
+ os = "unknown";
+
+ arch = fio_get_arch_string(probe->arch);
+ if (!arch)
+ os = "unknown";
+
+ sprintf(bit, "%d-bit", probe->bpp * 8);
+ probe->flags = le64_to_cpu(probe->flags);
+
+ log_info("hostname=%s, be=%u, %s, os=%s, arch=%s, fio=%s, flags=%lx\n",
+ probe->hostname, probe->bigendian, bit, os, arch,
+ probe->fio_version, (unsigned long) probe->flags);
+
+ if (!client->name)
+ client->name = strdup((char *) probe->hostname);
+}
+
+static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd)
+{
+ struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload;
+
+ client->state = Client_started;
+ client->jobs = le32_to_cpu(pdu->jobs);
+ client->nr_stat = le32_to_cpu(pdu->stat_outputs);
+
+ if (sum_stat_clients > 1)
+ do_output_all_clients = 1;
+
+ sum_stat_clients += client->nr_stat;
+}
+
+static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd)
+{
+ if (client->error)
+ log_info("client <%s>: exited with error %d\n", client->hostname, client->error);
+}
+
+static void convert_stop(struct fio_net_cmd *cmd)
+{
+ struct cmd_end_pdu *pdu = (struct cmd_end_pdu *) cmd->payload;
+
+ pdu->error = le32_to_cpu(pdu->error);
+}
+
+static void convert_text(struct fio_net_cmd *cmd)
+{
+ struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmd->payload;
+
+ pdu->level = le32_to_cpu(pdu->level);
+ pdu->buf_len = le32_to_cpu(pdu->buf_len);
+ pdu->log_sec = le64_to_cpu(pdu->log_sec);
+ pdu->log_usec = le64_to_cpu(pdu->log_usec);
+}
+
+static struct cmd_iolog_pdu *convert_iolog_gz(struct fio_net_cmd *cmd,
+ struct cmd_iolog_pdu *pdu)
+{
+#ifdef CONFIG_ZLIB
+ struct cmd_iolog_pdu *ret;
+ z_stream stream;
+ uint32_t nr_samples;
+ size_t total;
+ void *p;
+
+ stream.zalloc = Z_NULL;
+ stream.zfree = Z_NULL;
+ stream.opaque = Z_NULL;
+ stream.avail_in = 0;
+ stream.next_in = Z_NULL;
+
+ if (inflateInit(&stream) != Z_OK)
+ return NULL;
+
+ /*
+ * Get header first, it's not compressed
+ */
+ nr_samples = le32_to_cpu(pdu->nr_samples);
+
+ total = nr_samples * sizeof(struct io_sample);
+ ret = malloc(total + sizeof(*pdu));
+ ret->nr_samples = nr_samples;
+
+ memcpy(ret, pdu, sizeof(*pdu));
+
+ p = (void *) ret + sizeof(*pdu);
+
+ stream.avail_in = cmd->pdu_len - sizeof(*pdu);
+ stream.next_in = (void *) pdu + sizeof(*pdu);
+ while (stream.avail_in) {
+ unsigned int this_chunk = 65536;
+ unsigned int this_len;
+ int err;
+
+ if (this_chunk > total)
+ this_chunk = total;
+
+ stream.avail_out = this_chunk;
+ stream.next_out = p;
+ err = inflate(&stream, Z_NO_FLUSH);
+ /* may be Z_OK, or Z_STREAM_END */
+ if (err < 0) {
+ log_err("fio: inflate error %d\n", err);
+ free(ret);
+ ret = NULL;
+ goto err;
+ }
+
+ this_len = this_chunk - stream.avail_out;
+ p += this_len;
+ total -= this_len;
+ }
+
+err:
+ inflateEnd(&stream);
+ return ret;
+#else
+ return NULL;
+#endif
+}
+
+/*
+ * This has been compressed on the server side, since it can be big.
+ * Uncompress here.
+ */
+static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *cmd)
+{
+ struct cmd_iolog_pdu *pdu = (struct cmd_iolog_pdu *) cmd->payload;
+ struct cmd_iolog_pdu *ret;
+ int i;
+
+ /*
+ * Convert if compressed and we support it. If it's not
+ * compressed, we need not do anything.
+ */
+ if (le32_to_cpu(pdu->compressed)) {
+#ifndef CONFIG_ZLIB
+ log_err("fio: server sent compressed data by mistake\n");
+ return NULL;
+#endif
+ ret = convert_iolog_gz(cmd, pdu);
+ if (!ret) {
+ log_err("fio: failed decompressing log\n");
+ return NULL;
+ }
+ } else
+ ret = pdu;
+
+ ret->thread_number = le32_to_cpu(ret->thread_number);
+ ret->nr_samples = le32_to_cpu(ret->nr_samples);
+ ret->log_type = le32_to_cpu(ret->log_type);
+ ret->compressed = le32_to_cpu(ret->compressed);
+
+ for (i = 0; i < ret->nr_samples; i++) {
+ struct io_sample *s = &ret->samples[i];
+
+ s->time = le64_to_cpu(s->time);
+ s->val = le64_to_cpu(s->val);
+ s->ddir = le32_to_cpu(s->ddir);
+ s->bs = le32_to_cpu(s->bs);
+ }
+
+ return ret;
+}
+
+int fio_handle_client(struct fio_client *client)
+{
+ struct client_ops *ops = client->ops;
+ struct fio_net_cmd *cmd;
+
+ dprint(FD_NET, "client: handle %s\n", client->hostname);
+
+ cmd = fio_net_recv_cmd(client->fd);
+ if (!cmd)
+ return 0;
+
+ dprint(FD_NET, "client: got cmd op %s from %s (pdu=%u)\n",
+ fio_server_op(cmd->opcode), client->hostname, cmd->pdu_len);
+
+ switch (cmd->opcode) {
+ case FIO_NET_CMD_QUIT:
+ if (ops->quit)
+ ops->quit(client, cmd);
+ remove_client(client);
+ free(cmd);
+ break;
+ case FIO_NET_CMD_TEXT:
+ convert_text(cmd);
+ ops->text(client, cmd);
+ free(cmd);
+ break;
+ case FIO_NET_CMD_DU: {
+ struct cmd_du_pdu *du = (struct cmd_du_pdu *) cmd->payload;
+
+ convert_dus(&du->dus);
+ convert_agg(&du->agg);
+
+ ops->disk_util(client, cmd);
+ free(cmd);
+ break;
+ }
+ case FIO_NET_CMD_TS: {
+ struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload;
+
+ convert_ts(&p->ts, &p->ts);
+ convert_gs(&p->rs, &p->rs);
+
+ ops->thread_status(client, cmd);
+ free(cmd);
+ break;
+ }
+ case FIO_NET_CMD_GS: {
+ struct group_run_stats *gs = (struct group_run_stats *) cmd->payload;
+
+ convert_gs(gs, gs);
+
+ ops->group_stats(client, cmd);
+ free(cmd);
+ break;
+ }
+ case FIO_NET_CMD_ETA: {
+ struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
+
+ remove_reply_cmd(client, cmd);
+ convert_jobs_eta(je);
+ handle_eta(client, cmd);
+ free(cmd);
+ break;
+ }
+ case FIO_NET_CMD_PROBE:
+ remove_reply_cmd(client, cmd);
+ ops->probe(client, cmd);
+ free(cmd);
+ break;
+ case FIO_NET_CMD_SERVER_START:
+ client->state = Client_running;
+ if (ops->job_start)
+ ops->job_start(client, cmd);
+ free(cmd);
+ break;
+ case FIO_NET_CMD_START: {
+ struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload;
+
+ pdu->jobs = le32_to_cpu(pdu->jobs);
+ ops->start(client, cmd);
+ free(cmd);
+ break;
+ }
+ case FIO_NET_CMD_STOP: {
+ struct cmd_end_pdu *pdu = (struct cmd_end_pdu *) cmd->payload;
+
+ convert_stop(cmd);
+ client->state = Client_stopped;
+ client->error = le32_to_cpu(pdu->error);
+ client->signal = le32_to_cpu(pdu->signal);
+ ops->stop(client, cmd);
+ free(cmd);
+ break;
+ }
+ case FIO_NET_CMD_ADD_JOB: {
+ struct cmd_add_job_pdu *pdu = (struct cmd_add_job_pdu *) cmd->payload;
+
+ client->thread_number = le32_to_cpu(pdu->thread_number);
+ client->groupid = le32_to_cpu(pdu->groupid);
+
+ if (ops->add_job)
+ ops->add_job(client, cmd);
+ free(cmd);
+ break;
+ }
+ case FIO_NET_CMD_IOLOG:
+ if (ops->iolog) {
+ struct cmd_iolog_pdu *pdu;
+
+ pdu = convert_iolog(cmd);
+ ops->iolog(client, pdu);
+ }
+ free(cmd);
+ break;
+ case FIO_NET_CMD_UPDATE_JOB:
+ ops->update_job(client, cmd);
+ remove_reply_cmd(client, cmd);
+ free(cmd);
+ break;
+ default:
+ log_err("fio: unknown client op: %s\n", fio_server_op(cmd->opcode));
+ free(cmd);
+ break;
+ }
+
+ return 1;
+}
+
+static void request_client_etas(struct client_ops *ops)