parse: ensure that an option has a category
[fio.git] / server.c
index 73e36a506641af0dcff0b4ae812c48a3e742513a..899b230274b1133dd54c3aacdb0020acf70056ba 100644 (file)
--- a/server.c
+++ b/server.c
@@ -55,6 +55,7 @@ static const char *fio_server_ops[FIO_NET_CMD_NR] = {
        "SERVER_START",
        "ADD_JOB",
        "CMD_RUN"
+       "CMD_IOLOG",
 };
 
 const char *fio_server_op(unsigned int op)
@@ -279,7 +280,7 @@ struct fio_net_cmd *fio_net_recv_cmd(int sk)
        return cmdret;
 }
 
-void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, void *pdu)
+void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, const void *pdu)
 {
        uint32_t pdu_len;
 
@@ -751,10 +752,11 @@ void fio_server_send_ts(struct thread_stat *ts, struct group_run_stats *rs)
        strcpy(p.ts.verror, ts->verror);
        strcpy(p.ts.description, ts->description);
 
-       p.ts.error      = cpu_to_le32(ts->error);
-       p.ts.groupid    = cpu_to_le32(ts->groupid);
-       p.ts.pid        = cpu_to_le32(ts->pid);
-       p.ts.members    = cpu_to_le32(ts->members);
+       p.ts.error              = cpu_to_le32(ts->error);
+       p.ts.thread_number      = cpu_to_le32(ts->thread_number);
+       p.ts.groupid            = cpu_to_le32(ts->groupid);
+       p.ts.pid                = cpu_to_le32(ts->pid);
+       p.ts.members            = cpu_to_le32(ts->members);
 
        for (i = 0; i < 2; i++) {
                convert_io_stat(&p.ts.clat_stat[i], &ts->clat_stat[i]);
@@ -881,14 +883,35 @@ void fio_server_send_du(void)
        }
 }
 
+/*
+ * Send a command with a separate PDU, not inlined in the command
+ */
+static int fio_send_cmd_ext_pdu(int sk, uint16_t opcode, const void *buf,
+                               off_t size, uint64_t tag, uint32_t flags)
+{
+       struct fio_net_cmd cmd;
+       struct iovec iov[2];
+
+       iov[0].iov_base = &cmd;
+       iov[0].iov_len = sizeof(cmd);
+       iov[1].iov_base = (void *) buf;
+       iov[1].iov_len = size;
+
+       __fio_init_net_cmd(&cmd, opcode, size, tag);
+       cmd.flags = __cpu_to_le32(flags);
+       fio_net_cmd_crc_pdu(&cmd, buf);
+
+       return fio_sendv_data(server_fd, iov, 2);
+}
+
 int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
 {
        struct cmd_iolog_pdu pdu;
-       struct fio_net_cmd cmd;
        z_stream stream;
        void *out_pdu;
-       int i;
+       int i, ret = 0;
 
+       pdu.thread_number = cpu_to_le32(td->thread_number);
        pdu.nr_samples = __cpu_to_le32(log->nr_samples);
        pdu.log_type = cpu_to_le32(log->log_type);
        strcpy((char *) pdu.name, name);
@@ -914,53 +937,58 @@ int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
        stream.opaque = Z_NULL;
 
        if (deflateInit(&stream, Z_DEFAULT_COMPRESSION) != Z_OK) {
-               free(out_pdu);
-               return 1;
+               ret = 1;
+               goto err;
        }
 
        /*
         * Send header first, it's not compressed.
         */
-       __fio_init_net_cmd(&cmd, FIO_NET_CMD_IOLOG, sizeof(pdu), 0);
-       cmd.flags = __cpu_to_le32(FIO_NET_CMD_F_MORE);
-       fio_net_cmd_crc_pdu(&cmd, &pdu);
-       fio_send_data(server_fd, &cmd, sizeof(cmd));
-       fio_send_data(server_fd, &pdu, sizeof(pdu));
+       ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, &pdu,
+                                       sizeof(pdu), 0, FIO_NET_CMD_F_MORE);
+       if (ret)
+               goto err_zlib;
 
        stream.next_in = (void *) log->log;
        stream.avail_in = log->nr_samples * sizeof(struct io_sample);
 
        do {
-               unsigned int this_len;
+               unsigned int this_len, flags = 0;
+               int ret;
 
                stream.avail_out = FIO_SERVER_MAX_FRAGMENT_PDU;
                stream.next_out = out_pdu;
-               assert(deflate(&stream, Z_FINISH) == Z_OK);
+               ret = deflate(&stream, Z_FINISH);
+               /* may be Z_OK, or Z_STREAM_END */
+               if (ret < 0)
+                       goto err_zlib;
 
                this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out;
 
-               __fio_init_net_cmd(&cmd, FIO_NET_CMD_IOLOG, this_len, 0);
-
                if (stream.avail_in)
-                       cmd.flags = __cpu_to_le32(FIO_NET_CMD_F_MORE);
-
-               fio_net_cmd_crc_pdu(&cmd, out_pdu);
+                       flags = FIO_NET_CMD_F_MORE;
 
-               fio_send_data(server_fd, &cmd, sizeof(cmd));
-               fio_send_data(server_fd, out_pdu, this_len);
+               ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG,
+                                          out_pdu, this_len, 0, flags);
+               if (ret)
+                       goto err_zlib;
        } while (stream.avail_in);
 
-       free(out_pdu);
+err_zlib:
        deflateEnd(&stream);
-       return 0;
+err:
+       free(out_pdu);
+       return ret;
 }
 
-void fio_server_send_add_job(struct thread_options *o, const char *ioengine)
+void fio_server_send_add_job(struct thread_data *td)
 {
        struct cmd_add_job_pdu pdu;
 
        memset(&pdu, 0, sizeof(pdu));
-       convert_thread_options_to_net(&pdu.top, o);
+       pdu.thread_number = cpu_to_le32(td->thread_number);
+       pdu.groupid = cpu_to_le32(td->groupid);
+       convert_thread_options_to_net(&pdu.top, &td->o);
 
        fio_net_send_cmd(server_fd, FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), 0);
 }