static char *bind_sock;
static struct sockaddr_in saddr_in;
static struct sockaddr_in6 saddr_in6;
-static int first_cmd_check;
static int use_ipv6;
+struct fio_fork_item {
+ struct flist_head list;
+ int exitval;
+ int signal;
+ int exited;
+ pid_t pid;
+};
+
+/* Created on fork on new connection */
+static FLIST_HEAD(conn_list);
+
+/* Created on job fork from connection */
+static FLIST_HEAD(job_list);
+
static const char *fio_server_ops[FIO_NET_CMD_NR] = {
"",
"QUIT",
"SERVER_START",
"ADD_JOB",
"CMD_RUN"
+ "CMD_IOLOG",
};
const char *fio_server_op(unsigned int op)
return buf;
}
-int fio_send_data(int sk, const void *p, unsigned int len)
+static ssize_t iov_total_len(const struct iovec *iov, int count)
{
- assert(len <= sizeof(struct fio_net_cmd) + FIO_SERVER_MAX_FRAGMENT_PDU);
+ ssize_t ret = 0;
- do {
- int ret = send(sk, p, len, 0);
+ while (count--) {
+ ret += iov->iov_len;
+ iov++;
+ }
+ return ret;
+}
+
+static int fio_sendv_data(int sk, struct iovec *iov, int count)
+{
+ ssize_t total_len = iov_total_len(iov, count);
+ ssize_t ret;
+
+ do {
+ ret = writev(sk, iov, count);
if (ret > 0) {
- len -= ret;
- if (!len)
+ total_len -= ret;
+ if (!total_len)
break;
- p += ret;
- continue;
+
+ while (ret) {
+ if (ret >= iov->iov_len) {
+ ret -= iov->iov_len;
+ iov++;
+ continue;
+ }
+ iov->iov_base += ret;
+ iov->iov_len -= ret;
+ ret = 0;
+ }
} else if (!ret)
break;
else if (errno == EAGAIN || errno == EINTR)
break;
} while (!exit_backend);
- if (!len)
+ if (!total_len)
return 0;
if (errno)
return 1;
}
+int fio_send_data(int sk, const void *p, unsigned int len)
+{
+ struct iovec iov = { .iov_base = (void *) p, .iov_len = len };
+
+ assert(len <= sizeof(struct fio_net_cmd) + FIO_SERVER_MAX_FRAGMENT_PDU);
+
+ return fio_sendv_data(sk, &iov, 1);
+}
+
int fio_recv_data(int sk, void *p, unsigned int len)
{
do {
return cmdret;
}
-void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, void *pdu)
+void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, const void *pdu)
{
uint32_t pdu_len;
return 0;
}
-static int fio_server_send_quit_cmd(void)
+int fio_net_send_quit(int sk)
{
dprint(FD_NET, "server: sending quit\n");
- return fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_QUIT, 0, NULL);
+
+ return fio_net_send_simple_cmd(sk, FIO_NET_CMD_QUIT, 0, NULL);
}
-static int handle_run_cmd(struct fio_net_cmd *cmd)
+int fio_net_send_stop(int sk, int error, int signal)
{
struct cmd_end_pdu epdu;
+
+ dprint(FD_NET, "server: sending stop (%d, %d)\n", error, signal);
+
+ epdu.error = __cpu_to_le32(error);
+ epdu.signal = __cpu_to_le32(signal);
+ return fio_net_send_cmd(sk, FIO_NET_CMD_STOP, &epdu, sizeof(epdu), 0);
+}
+
+static void fio_server_add_fork_item(pid_t pid, struct flist_head *list)
+{
+ struct fio_fork_item *ffi;
+
+ ffi = malloc(sizeof(*ffi));
+ ffi->exitval = 0;
+ ffi->signal = 0;
+ ffi->exited = 0;
+ ffi->pid = pid;
+ flist_add_tail(&ffi->list, list);
+}
+
+static void fio_server_add_conn_pid(pid_t pid)
+{
+ dprint(FD_NET, "server: forked off connection job (pid=%u)\n", pid);
+ fio_server_add_fork_item(pid, &conn_list);
+}
+
+static void fio_server_add_job_pid(pid_t pid)
+{
+ dprint(FD_NET, "server: forked off job job (pid=%u)\n", pid);
+ fio_server_add_fork_item(pid, &job_list);
+}
+
+static void fio_server_check_fork_item(struct fio_fork_item *ffi)
+{
+ int ret, status;
+
+ ret = waitpid(ffi->pid, &status, WNOHANG);
+ if (ret < 0) {
+ if (errno == ECHILD) {
+ log_err("fio: connection pid %u disappeared\n", ffi->pid);
+ ffi->exited = 1;
+ } else
+ log_err("fio: waitpid: %s\n", strerror(errno));
+ } else if (ret == ffi->pid) {
+ if (WIFSIGNALED(status)) {
+ ffi->signal = WTERMSIG(status);
+ ffi->exited = 1;
+ }
+ if (WIFEXITED(status)) {
+ if (WEXITSTATUS(status))
+ ffi->exitval = WEXITSTATUS(status);
+ ffi->exited = 1;
+ }
+ }
+}
+
+static void fio_server_fork_item_done(struct fio_fork_item *ffi)
+{
+ dprint(FD_NET, "pid %u exited, sig=%u, exitval=%d\n", ffi->pid, ffi->signal, ffi->exitval);
+
+ /*
+ * Fold STOP and QUIT...
+ */
+ fio_net_send_stop(server_fd, ffi->exitval, ffi->signal);
+ fio_net_send_quit(server_fd);
+ flist_del(&ffi->list);
+ free(ffi);
+}
+
+static void fio_server_check_fork_items(struct flist_head *list, int bla)
+{
+ struct flist_head *entry, *tmp;
+ struct fio_fork_item *ffi;
+
+ flist_for_each_safe(entry, tmp, list) {
+ ffi = flist_entry(entry, struct fio_fork_item, list);
+
+ fio_server_check_fork_item(ffi);
+
+ if (ffi->exited)
+ fio_server_fork_item_done(ffi);
+ }
+}
+
+static void fio_server_check_jobs(void)
+{
+ fio_server_check_fork_items(&job_list, 0);
+}
+
+static void fio_server_check_conns(void)
+{
+ fio_server_check_fork_items(&conn_list, 1);
+}
+
+static int handle_run_cmd(struct fio_net_cmd *cmd)
+{
+ pid_t pid;
int ret;
- ret = fio_backend();
+ set_genesis_time();
- epdu.error = ret;
- fio_net_send_cmd(server_fd, FIO_NET_CMD_STOP, &epdu, sizeof(epdu), 0);
+ pid = fork();
+ if (pid) {
+ fio_server_add_job_pid(pid);
+ return 0;
+ }
- fio_server_send_quit_cmd();
- reset_fio_state();
- first_cmd_check = 0;
- return ret;
+ ret = fio_backend();
+ _exit(ret);
}
static int handle_job_cmd(struct fio_net_cmd *cmd)
pdu->client_type = le32_to_cpu(pdu->client_type);
if (parse_jobs_ini(buf, 1, 0, pdu->client_type)) {
- fio_server_send_quit_cmd();
+ fio_net_send_quit(server_fd);
return -1;
}
}
if (parse_cmd_line(clp->lines, argv, clp->client_type)) {
- fio_server_send_quit_cmd();
+ fio_net_send_quit(server_fd);
free(argv);
return -1;
}
return ret;
}
-static int handle_connection(int sk, int block)
+static int handle_connection(int sk)
{
struct fio_net_cmd *cmd = NULL;
int ret = 0;
+ reset_fio_state();
+ INIT_FLIST_HEAD(&job_list);
+ server_fd = sk;
+
/* read forever */
while (!exit_backend) {
struct pollfd pfd = {
log_err("fio: poll: %s\n", strerror(errno));
break;
} else if (!ret) {
- if (!block)
- return 0;
+ fio_server_check_jobs();
continue;
}
}
} while (!exit_backend);
+ fio_server_check_jobs();
+
if (ret < 0)
break;
if (cmd)
free(cmd);
- return ret;
-}
-
-void fio_server_idle_loop(void)
-{
- if (!first_cmd_check) {
- fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_SERVER_START, 0, NULL);
- first_cmd_check = 1;
- }
- if (server_fd != -1)
- handle_connection(server_fd, 0);
+ close(sk);
+ _exit(ret);
}
static int accept_loop(int listen_sk)
struct sockaddr_in addr;
fio_socklen_t len = sizeof(addr);
struct pollfd pfd;
- int ret, sk, flags, exitval = 0;
+ int ret = 0, sk, flags, exitval = 0;
dprint(FD_NET, "server enter accept loop\n");
flags = fcntl(listen_sk, F_GETFL);
flags |= O_NONBLOCK;
fcntl(listen_sk, F_SETFL, flags);
-again:
- pfd.fd = listen_sk;
- pfd.events = POLLIN;
- do {
- ret = poll(&pfd, 1, 100);
- if (ret < 0) {
- if (errno == EINTR)
- break;
- log_err("fio: poll: %s\n", strerror(errno));
- goto out;
- } else if (!ret)
- continue;
- if (pfd.revents & POLLIN)
- break;
- } while (!exit_backend);
+ while (!exit_backend) {
+ pid_t pid;
- if (exit_backend)
- goto out;
+ pfd.fd = listen_sk;
+ pfd.events = POLLIN;
+ do {
+ ret = poll(&pfd, 1, 100);
+ if (ret < 0) {
+ if (errno == EINTR)
+ break;
+ log_err("fio: poll: %s\n", strerror(errno));
+ break;
+ } else if (!ret) {
+ fio_server_check_conns();
+ continue;
+ }
- sk = accept(listen_sk, (struct sockaddr *) &addr, &len);
- if (sk < 0) {
- log_err("fio: accept: %s\n", strerror(errno));
- return -1;
- }
+ if (pfd.revents & POLLIN)
+ break;
+ } while (!exit_backend);
- dprint(FD_NET, "server: connect from %s\n", inet_ntoa(addr.sin_addr));
+ fio_server_check_conns();
- server_fd = sk;
+ if (exit_backend || ret < 0)
+ break;
- exitval = handle_connection(sk, 1);
+ sk = accept(listen_sk, (struct sockaddr *) &addr, &len);
+ if (sk < 0) {
+ log_err("fio: accept: %s\n", strerror(errno));
+ return -1;
+ }
- server_fd = -1;
- close(sk);
+ dprint(FD_NET, "server: connect from %s\n", inet_ntoa(addr.sin_addr));
- if (!exit_backend)
- goto again;
+ pid = fork();
+ if (pid) {
+ close(sk);
+ fio_server_add_conn_pid(pid);
+ continue;
+ }
+
+ /* exits */
+ handle_connection(sk);
+ }
-out:
return exitval;
}
strcpy(p.ts.verror, ts->verror);
strcpy(p.ts.description, ts->description);
- p.ts.error = cpu_to_le32(ts->error);
- p.ts.groupid = cpu_to_le32(ts->groupid);
- p.ts.pid = cpu_to_le32(ts->pid);
- p.ts.members = cpu_to_le32(ts->members);
+ p.ts.error = cpu_to_le32(ts->error);
+ p.ts.thread_number = cpu_to_le32(ts->thread_number);
+ p.ts.groupid = cpu_to_le32(ts->groupid);
+ p.ts.pid = cpu_to_le32(ts->pid);
+ p.ts.members = cpu_to_le32(ts->members);
for (i = 0; i < 2; i++) {
convert_io_stat(&p.ts.clat_stat[i], &ts->clat_stat[i]);
}
}
-int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
+/*
+ * Send a command with a separate PDU, not inlined in the command
+ */
+static int fio_send_cmd_ext_pdu(int sk, uint16_t opcode, const void *buf,
+ off_t size, uint64_t tag, uint32_t flags)
{
- struct cmd_iolog_pdu *pdu;
struct fio_net_cmd cmd;
+ struct iovec iov[2];
+
+ iov[0].iov_base = &cmd;
+ iov[0].iov_len = sizeof(cmd);
+ iov[1].iov_base = (void *) buf;
+ iov[1].iov_len = size;
+
+ __fio_init_net_cmd(&cmd, opcode, size, tag);
+ cmd.flags = __cpu_to_le32(flags);
+ fio_net_cmd_crc_pdu(&cmd, buf);
+
+ return fio_sendv_data(sk, iov, 2);
+}
+
+int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
+{
+ struct cmd_iolog_pdu pdu;
z_stream stream;
void *out_pdu;
- size_t p_size;
- int i;
-
- p_size = sizeof(*pdu) + log->nr_samples * sizeof(struct io_sample);
- pdu = malloc(p_size);
+ int i, ret = 0;
- pdu->nr_samples = __cpu_to_le32(log->nr_samples);
- pdu->log_type = cpu_to_le32(log->log_type);
- strcpy((char *) pdu->name, name);
+ pdu.thread_number = cpu_to_le32(td->thread_number);
+ pdu.nr_samples = __cpu_to_le32(log->nr_samples);
+ pdu.log_type = cpu_to_le32(log->log_type);
+ strcpy((char *) pdu.name, name);
for (i = 0; i < log->nr_samples; i++) {
- struct io_sample *s = &pdu->samples[i];
+ struct io_sample *s = &log->log[i];
- s->time = cpu_to_le64(log->log[i].time);
- s->val = cpu_to_le64(log->log[i].val);
- s->ddir = cpu_to_le32(log->log[i].ddir);
- s->bs = cpu_to_le32(log->log[i].bs);
+ s->time = cpu_to_le64(s->time);
+ s->val = cpu_to_le64(s->val);
+ s->ddir = cpu_to_le32(s->ddir);
+ s->bs = cpu_to_le32(s->bs);
}
/*
stream.opaque = Z_NULL;
if (deflateInit(&stream, Z_DEFAULT_COMPRESSION) != Z_OK) {
- free(out_pdu);
- free(pdu);
- return 1;
+ ret = 1;
+ goto err;
}
/*
- * Don't compress the nr samples entry, we want to know on the
- * client side how much data to allocate before starting inflate.
+ * Send header first, it's not compressed.
*/
- __fio_init_net_cmd(&cmd, FIO_NET_CMD_IOLOG, sizeof(pdu->nr_samples), 0);
- cmd.flags = __cpu_to_le32(FIO_NET_CMD_F_MORE);
- fio_net_cmd_crc_pdu(&cmd, pdu);
- fio_send_data(server_fd, &cmd, sizeof(cmd));
- fio_send_data(server_fd, pdu, sizeof(pdu->nr_samples));
+ ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, &pdu,
+ sizeof(pdu), 0, FIO_NET_CMD_F_MORE);
+ if (ret)
+ goto err_zlib;
- stream.next_in = (void *) pdu + sizeof(pdu->nr_samples);
- stream.avail_in = p_size - sizeof(pdu->nr_samples);
+ stream.next_in = (void *) log->log;
+ stream.avail_in = log->nr_samples * sizeof(struct io_sample);
do {
- unsigned int this_len;
+ unsigned int this_len, flags = 0;
+ int ret;
stream.avail_out = FIO_SERVER_MAX_FRAGMENT_PDU;
stream.next_out = out_pdu;
- deflate(&stream, Z_FINISH);
+ ret = deflate(&stream, Z_FINISH);
+ /* may be Z_OK, or Z_STREAM_END */
+ if (ret < 0)
+ goto err_zlib;
this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out;
- __fio_init_net_cmd(&cmd, FIO_NET_CMD_IOLOG, this_len, 0);
-
if (stream.avail_in)
- cmd.flags = __cpu_to_le32(FIO_NET_CMD_F_MORE);
-
- fio_net_cmd_crc_pdu(&cmd, out_pdu);
+ flags = FIO_NET_CMD_F_MORE;
- fio_send_data(server_fd, &cmd, sizeof(cmd));
- fio_send_data(server_fd, out_pdu, this_len);
+ ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG,
+ out_pdu, this_len, 0, flags);
+ if (ret)
+ goto err_zlib;
} while (stream.avail_in);
- free(pdu);
- free(out_pdu);
+err_zlib:
deflateEnd(&stream);
- return 0;
+err:
+ free(out_pdu);
+ return ret;
}
-void fio_server_send_add_job(struct thread_options *o, const char *ioengine)
+void fio_server_send_add_job(struct thread_data *td)
{
struct cmd_add_job_pdu pdu;
- convert_thread_options_to_net(&pdu.top, o);
+ memset(&pdu, 0, sizeof(pdu));
+ pdu.thread_number = cpu_to_le32(td->thread_number);
+ pdu.groupid = cpu_to_le32(td->groupid);
+ convert_thread_options_to_net(&pdu.top, &td->o);
fio_net_send_cmd(server_fd, FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), 0);
}
+void fio_server_send_start(struct thread_data *td)
+{
+ assert(server_fd != -1);
+
+ fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_SERVER_START, 0, NULL);
+}
+
static int fio_init_server_ip(void)
{
struct sockaddr *addr;