"SERVER_START",
"ADD_JOB",
"CMD_RUN"
+ "CMD_IOLOG",
};
const char *fio_server_op(unsigned int op)
return buf;
}
-int fio_send_data(int sk, const void *p, unsigned int len)
+static ssize_t iov_total_len(const struct iovec *iov, int count)
{
- assert(len <= sizeof(struct fio_net_cmd) + FIO_SERVER_MAX_FRAGMENT_PDU);
+ ssize_t ret = 0;
- do {
- int ret = send(sk, p, len, 0);
+ while (count--) {
+ ret += iov->iov_len;
+ iov++;
+ }
+
+ return ret;
+}
+
+static int fio_sendv_data(int sk, struct iovec *iov, int count)
+{
+ ssize_t total_len = iov_total_len(iov, count);
+ ssize_t ret;
+ do {
+ ret = writev(sk, iov, count);
if (ret > 0) {
- len -= ret;
- if (!len)
+ total_len -= ret;
+ if (!total_len)
break;
- p += ret;
- continue;
+
+ while (ret) {
+ if (ret >= iov->iov_len) {
+ ret -= iov->iov_len;
+ iov++;
+ continue;
+ }
+ iov->iov_base += ret;
+ iov->iov_len -= ret;
+ ret = 0;
+ }
} else if (!ret)
break;
else if (errno == EAGAIN || errno == EINTR)
break;
} while (!exit_backend);
- if (!len)
+ if (!total_len)
return 0;
if (errno)
return 1;
}
+int fio_send_data(int sk, const void *p, unsigned int len)
+{
+ struct iovec iov = { .iov_base = (void *) p, .iov_len = len };
+
+ assert(len <= sizeof(struct fio_net_cmd) + FIO_SERVER_MAX_FRAGMENT_PDU);
+
+ return fio_sendv_data(sk, &iov, 1);
+}
+
int fio_recv_data(int sk, void *p, unsigned int len)
{
do {
return cmdret;
}
-void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, void *pdu)
+void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, const void *pdu)
{
uint32_t pdu_len;
strcpy(p.ts.verror, ts->verror);
strcpy(p.ts.description, ts->description);
- p.ts.error = cpu_to_le32(ts->error);
- p.ts.groupid = cpu_to_le32(ts->groupid);
- p.ts.pid = cpu_to_le32(ts->pid);
- p.ts.members = cpu_to_le32(ts->members);
+ p.ts.error = cpu_to_le32(ts->error);
+ p.ts.thread_number = cpu_to_le32(ts->thread_number);
+ p.ts.groupid = cpu_to_le32(ts->groupid);
+ p.ts.pid = cpu_to_le32(ts->pid);
+ p.ts.members = cpu_to_le32(ts->members);
for (i = 0; i < 2; i++) {
convert_io_stat(&p.ts.clat_stat[i], &ts->clat_stat[i]);
}
}
-int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
+/*
+ * Send a command with a separate PDU, not inlined in the command
+ */
+static int fio_send_cmd_ext_pdu(int sk, uint16_t opcode, const void *buf,
+ off_t size, uint64_t tag, uint32_t flags)
{
- struct cmd_iolog_pdu *pdu;
struct fio_net_cmd cmd;
+ struct iovec iov[2];
+
+ iov[0].iov_base = &cmd;
+ iov[0].iov_len = sizeof(cmd);
+ iov[1].iov_base = (void *) buf;
+ iov[1].iov_len = size;
+
+ __fio_init_net_cmd(&cmd, opcode, size, tag);
+ cmd.flags = __cpu_to_le32(flags);
+ fio_net_cmd_crc_pdu(&cmd, buf);
+
+ return fio_sendv_data(server_fd, iov, 2);
+}
+
+int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
+{
+ struct cmd_iolog_pdu pdu;
z_stream stream;
void *out_pdu;
- size_t p_size;
- int i;
-
- p_size = sizeof(*pdu) + log->nr_samples * sizeof(struct io_sample);
- pdu = malloc(p_size);
+ int i, ret = 0;
- pdu->nr_samples = __cpu_to_le32(log->nr_samples);
- pdu->log_type = cpu_to_le32(log->log_type);
- strcpy((char *) pdu->name, name);
+ pdu.thread_number = cpu_to_le32(td->thread_number);
+ pdu.nr_samples = __cpu_to_le32(log->nr_samples);
+ pdu.log_type = cpu_to_le32(log->log_type);
+ strcpy((char *) pdu.name, name);
for (i = 0; i < log->nr_samples; i++) {
- struct io_sample *s = &pdu->samples[i];
+ struct io_sample *s = &log->log[i];
- s->time = cpu_to_le64(log->log[i].time);
- s->val = cpu_to_le64(log->log[i].val);
- s->ddir = cpu_to_le32(log->log[i].ddir);
- s->bs = cpu_to_le32(log->log[i].bs);
+ s->time = cpu_to_le64(s->time);
+ s->val = cpu_to_le64(s->val);
+ s->ddir = cpu_to_le32(s->ddir);
+ s->bs = cpu_to_le32(s->bs);
}
/*
stream.opaque = Z_NULL;
if (deflateInit(&stream, Z_DEFAULT_COMPRESSION) != Z_OK) {
- free(out_pdu);
- free(pdu);
- return 1;
+ ret = 1;
+ goto err;
}
/*
- * Don't compress the nr samples entry, we want to know on the
- * client side how much data to allocate before starting inflate.
+ * Send header first, it's not compressed.
*/
- __fio_init_net_cmd(&cmd, FIO_NET_CMD_IOLOG, sizeof(pdu->nr_samples), 0);
- cmd.flags = __cpu_to_le32(FIO_NET_CMD_F_MORE);
- fio_net_cmd_crc_pdu(&cmd, pdu);
- fio_send_data(server_fd, &cmd, sizeof(cmd));
- fio_send_data(server_fd, pdu, sizeof(pdu->nr_samples));
+ ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, &pdu,
+ sizeof(pdu), 0, FIO_NET_CMD_F_MORE);
+ if (ret)
+ goto err_zlib;
- stream.next_in = (void *) pdu + sizeof(pdu->nr_samples);
- stream.avail_in = p_size - sizeof(pdu->nr_samples);
+ stream.next_in = (void *) log->log;
+ stream.avail_in = log->nr_samples * sizeof(struct io_sample);
do {
- unsigned int this_len;
+ unsigned int this_len, flags = 0;
+ int ret;
stream.avail_out = FIO_SERVER_MAX_FRAGMENT_PDU;
stream.next_out = out_pdu;
- deflate(&stream, Z_FINISH);
+ ret = deflate(&stream, Z_FINISH);
+ /* may be Z_OK, or Z_STREAM_END */
+ if (ret < 0)
+ goto err_zlib;
this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out;
- __fio_init_net_cmd(&cmd, FIO_NET_CMD_IOLOG, this_len, 0);
-
if (stream.avail_in)
- cmd.flags = __cpu_to_le32(FIO_NET_CMD_F_MORE);
-
- fio_net_cmd_crc_pdu(&cmd, out_pdu);
+ flags = FIO_NET_CMD_F_MORE;
- fio_send_data(server_fd, &cmd, sizeof(cmd));
- fio_send_data(server_fd, out_pdu, this_len);
+ ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG,
+ out_pdu, this_len, 0, flags);
+ if (ret)
+ goto err_zlib;
} while (stream.avail_in);
- free(pdu);
- free(out_pdu);
+err_zlib:
deflateEnd(&stream);
- return 0;
+err:
+ free(out_pdu);
+ return ret;
}
-void fio_server_send_add_job(struct thread_options *o, const char *ioengine)
+void fio_server_send_add_job(struct thread_data *td)
{
struct cmd_add_job_pdu pdu;
- convert_thread_options_to_net(&pdu.top, o);
+ memset(&pdu, 0, sizeof(pdu));
+ pdu.thread_number = cpu_to_le32(td->thread_number);
+ pdu.groupid = cpu_to_le32(td->groupid);
+ convert_thread_options_to_net(&pdu.top, &td->o);
fio_net_send_cmd(server_fd, FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), 0);
}