server: make client connections fork off
[fio.git] / server.c
index 4e1f39be52568ae3f55ff7f8d7913a125b5e50dd..d5733c4e05901a4f720e07e217320486d24e4138 100644 (file)
--- a/server.c
+++ b/server.c
@@ -34,9 +34,22 @@ static char *fio_server_arg;
 static char *bind_sock;
 static struct sockaddr_in saddr_in;
 static struct sockaddr_in6 saddr_in6;
-static int first_cmd_check;
 static int use_ipv6;
 
+struct fio_fork_item {
+       struct flist_head list;
+       int exitval;
+       int signal;
+       int exited;
+       pid_t pid;
+};
+
+/* Created on fork on new connection */
+static FLIST_HEAD(conn_list);
+
+/* Created on job fork from connection */
+static FLIST_HEAD(job_list);
+
 static const char *fio_server_ops[FIO_NET_CMD_NR] = {
        "",
        "QUIT",
@@ -55,6 +68,7 @@ static const char *fio_server_ops[FIO_NET_CMD_NR] = {
        "SERVER_START",
        "ADD_JOB",
        "CMD_RUN"
+       "CMD_IOLOG",
 };
 
 const char *fio_server_op(unsigned int op)
@@ -68,19 +82,40 @@ const char *fio_server_op(unsigned int op)
        return buf;
 }
 
-int fio_send_data(int sk, const void *p, unsigned int len)
+static ssize_t iov_total_len(const struct iovec *iov, int count)
 {
-       assert(len <= sizeof(struct fio_net_cmd) + FIO_SERVER_MAX_FRAGMENT_PDU);
+       ssize_t ret = 0;
 
-       do {
-               int ret = send(sk, p, len, 0);
+       while (count--) {
+               ret += iov->iov_len;
+               iov++;
+       }
 
+       return ret;
+}
+
+static int fio_sendv_data(int sk, struct iovec *iov, int count)
+{
+       ssize_t total_len = iov_total_len(iov, count);
+       ssize_t ret;
+
+       do {
+               ret = writev(sk, iov, count);
                if (ret > 0) {
-                       len -= ret;
-                       if (!len)
+                       total_len -= ret;
+                       if (!total_len)
                                break;
-                       p += ret;
-                       continue;
+
+                       while (ret) {
+                               if (ret >= iov->iov_len) {
+                                       ret -= iov->iov_len;
+                                       iov++;
+                                       continue;
+                               }
+                               iov->iov_base += ret;
+                               iov->iov_len -= ret;
+                               ret = 0;
+                       }
                } else if (!ret)
                        break;
                else if (errno == EAGAIN || errno == EINTR)
@@ -89,7 +124,7 @@ int fio_send_data(int sk, const void *p, unsigned int len)
                        break;
        } while (!exit_backend);
 
-       if (!len)
+       if (!total_len)
                return 0;
 
        if (errno)
@@ -98,6 +133,15 @@ int fio_send_data(int sk, const void *p, unsigned int len)
        return 1;
 }
 
+int fio_send_data(int sk, const void *p, unsigned int len)
+{
+       struct iovec iov = { .iov_base = (void *) p, .iov_len = len };
+
+       assert(len <= sizeof(struct fio_net_cmd) + FIO_SERVER_MAX_FRAGMENT_PDU);
+
+       return fio_sendv_data(sk, &iov, 1);
+}
+
 int fio_recv_data(int sk, void *p, unsigned int len)
 {
        do {
@@ -249,7 +293,7 @@ struct fio_net_cmd *fio_net_recv_cmd(int sk)
        return cmdret;
 }
 
-void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, void *pdu)
+void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, const void *pdu)
 {
        uint32_t pdu_len;
 
@@ -344,26 +388,125 @@ int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag,
        return 0;
 }
 
-static int fio_server_send_quit_cmd(void)
+int fio_net_send_quit(int sk)
 {
        dprint(FD_NET, "server: sending quit\n");
+
        return fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_QUIT, 0, NULL);
 }
 
-static int handle_run_cmd(struct fio_net_cmd *cmd)
+int fio_net_send_stop(int sk, int error, int signal)
 {
        struct cmd_end_pdu epdu;
+
+       dprint(FD_NET, "server: sending stop (%d, %d)\n", error, signal);
+
+       epdu.error = __cpu_to_le32(error);
+       epdu.signal = __cpu_to_le32(signal);
+       return fio_net_send_cmd(server_fd, FIO_NET_CMD_STOP, &epdu, sizeof(epdu), 0);
+}
+
+static void fio_server_add_fork_item(pid_t pid, struct flist_head *list)
+{
+       struct fio_fork_item *ffi;
+
+       ffi = malloc(sizeof(*ffi));
+       ffi->exitval = 0;
+       ffi->signal = 0;
+       ffi->exited = 0;
+       ffi->pid = pid;
+       flist_add_tail(&ffi->list, list);
+}
+
+static void fio_server_add_conn_pid(pid_t pid)
+{
+       dprint(FD_NET, "server: forked off connection job (pid=%u)\n", pid);
+       fio_server_add_fork_item(pid, &conn_list);
+}
+
+static void fio_server_add_job_pid(pid_t pid)
+{
+       dprint(FD_NET, "server: forked off job job (pid=%u)\n", pid);
+       fio_server_add_fork_item(pid, &job_list);
+}
+
+static void fio_server_check_fork_item(struct fio_fork_item *ffi)
+{
+       int ret, status;
+
+       ret = waitpid(ffi->pid, &status, WNOHANG);
+       if (ret < 0) {
+               if (errno == ECHILD) {
+                       log_err("fio: connection pid %u disappeared\n", ffi->pid);
+                       ffi->exited = 1;
+               } else
+                       log_err("fio: waitpid: %s\n", strerror(errno));
+       } else if (ret == ffi->pid) {
+               if (WIFSIGNALED(status)) {
+                       ffi->signal = WTERMSIG(status);
+                       ffi->exited = 1;
+               }
+               if (WIFEXITED(status)) {
+                       if (WEXITSTATUS(status))
+                               ffi->exitval = WEXITSTATUS(status);
+                       ffi->exited = 1;
+               }
+       }
+}
+
+static void fio_server_fork_item_done(struct fio_fork_item *ffi)
+{
+       dprint(FD_NET, "pid %u exited, sig=%u, exitval=%d\n", ffi->pid, ffi->signal, ffi->exitval);
+
+       /*
+        * Fold STOP and QUIT...
+        */
+       fio_net_send_stop(server_fd, ffi->exitval, ffi->signal);
+       fio_net_send_quit(server_fd);
+       flist_del(&ffi->list);
+       free(ffi);
+}
+
+static void fio_server_check_fork_items(struct flist_head *list, int bla)
+{
+       struct flist_head *entry, *tmp;
+       struct fio_fork_item *ffi;
+
+       flist_for_each_safe(entry, tmp, list) {
+               ffi = flist_entry(entry, struct fio_fork_item, list);
+
+               fio_server_check_fork_item(ffi);
+
+               if (ffi->exited)
+                       fio_server_fork_item_done(ffi);
+       }
+}
+
+static void fio_server_check_jobs(void)
+{
+       fio_server_check_fork_items(&job_list, 0);
+}
+
+static void fio_server_check_conns(void)
+{
+       fio_server_check_fork_items(&conn_list, 1);
+}
+
+static int handle_run_cmd(struct fio_net_cmd *cmd)
+{
+       pid_t pid;
        int ret;
 
-       ret = fio_backend();
+       set_genesis_time();
 
-       epdu.error = ret;
-       fio_net_send_cmd(server_fd, FIO_NET_CMD_STOP, &epdu, sizeof(epdu), 0);
+       pid = fork();
+       if (pid) {
+               fio_server_add_job_pid(pid);
+               return 0;
+       }
 
-       fio_server_send_quit_cmd();
-       reset_fio_state();
-       first_cmd_check = 0;
-       return ret;
+       ret = fio_backend();
+       _exit(ret);
 }
 
 static int handle_job_cmd(struct fio_net_cmd *cmd)
@@ -376,7 +519,7 @@ static int handle_job_cmd(struct fio_net_cmd *cmd)
        pdu->client_type = le32_to_cpu(pdu->client_type);
 
        if (parse_jobs_ini(buf, 1, 0, pdu->client_type)) {
-               fio_server_send_quit_cmd();
+               fio_net_send_quit(server_fd);
                return -1;
        }
 
@@ -412,7 +555,7 @@ static int handle_jobline_cmd(struct fio_net_cmd *cmd)
        }
 
        if (parse_cmd_line(clp->lines, argv, clp->client_type)) {
-               fio_server_send_quit_cmd();
+               fio_net_send_quit(server_fd);
                free(argv);
                return -1;
        }
@@ -527,11 +670,15 @@ static int handle_command(struct fio_net_cmd *cmd)
        return ret;
 }
 
-static int handle_connection(int sk, int block)
+static int handle_connection(int sk)
 {
        struct fio_net_cmd *cmd = NULL;
        int ret = 0;
 
+       reset_fio_state();
+       INIT_FLIST_HEAD(&job_list);
+       server_fd = sk;
+
        /* read forever */
        while (!exit_backend) {
                struct pollfd pfd = {
@@ -548,8 +695,7 @@ static int handle_connection(int sk, int block)
                                log_err("fio: poll: %s\n", strerror(errno));
                                break;
                        } else if (!ret) {
-                               if (!block)
-                                       return 0;
+                               fio_server_check_jobs();
                                continue;
                        }
 
@@ -561,6 +707,8 @@ static int handle_connection(int sk, int block)
                        }
                } while (!exit_backend);
 
+               fio_server_check_jobs();
+
                if (ret < 0)
                        break;
 
@@ -581,17 +729,8 @@ static int handle_connection(int sk, int block)
        if (cmd)
                free(cmd);
 
-       return ret;
-}
-
-void fio_server_idle_loop(void)
-{
-       if (!first_cmd_check) {
-               fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_SERVER_START, 0, NULL);
-               first_cmd_check = 1;
-       }
-       if (server_fd != -1)
-               handle_connection(server_fd, 0);
+       close(sk);
+       _exit(ret);
 }
 
 static int accept_loop(int listen_sk)
@@ -599,52 +738,59 @@ static int accept_loop(int listen_sk)
        struct sockaddr_in addr;
        fio_socklen_t len = sizeof(addr);
        struct pollfd pfd;
-       int ret, sk, flags, exitval = 0;
+       int ret = 0, sk, flags, exitval = 0;
 
        dprint(FD_NET, "server enter accept loop\n");
 
        flags = fcntl(listen_sk, F_GETFL);
        flags |= O_NONBLOCK;
        fcntl(listen_sk, F_SETFL, flags);
-again:
-       pfd.fd = listen_sk;
-       pfd.events = POLLIN;
-       do {
-               ret = poll(&pfd, 1, 100);
-               if (ret < 0) {
-                       if (errno == EINTR)
-                               break;
-                       log_err("fio: poll: %s\n", strerror(errno));
-                       goto out;
-               } else if (!ret)
-                       continue;
 
-               if (pfd.revents & POLLIN)
-                       break;
-       } while (!exit_backend);
+       while (!exit_backend) {
+               pid_t pid;
 
-       if (exit_backend)
-               goto out;
+               pfd.fd = listen_sk;
+               pfd.events = POLLIN;
+               do {
+                       ret = poll(&pfd, 1, 100);
+                       if (ret < 0) {
+                               if (errno == EINTR)
+                                       break;
+                               log_err("fio: poll: %s\n", strerror(errno));
+                               break;
+                       } else if (!ret) {
+                               fio_server_check_conns();
+                               continue;
+                       }
 
-       sk = accept(listen_sk, (struct sockaddr *) &addr, &len);
-       if (sk < 0) {
-               log_err("fio: accept: %s\n", strerror(errno));
-               return -1;
-       }
+                       if (pfd.revents & POLLIN)
+                               break;
+               } while (!exit_backend);
 
-       dprint(FD_NET, "server: connect from %s\n", inet_ntoa(addr.sin_addr));
+               fio_server_check_conns();
 
-       server_fd = sk;
+               if (exit_backend || ret < 0)
+                       break;
 
-       exitval = handle_connection(sk, 1);
+               sk = accept(listen_sk, (struct sockaddr *) &addr, &len);
+               if (sk < 0) {
+                       log_err("fio: accept: %s\n", strerror(errno));
+                       return -1;
+               }
 
-       server_fd = -1;
-       close(sk);
+               dprint(FD_NET, "server: connect from %s\n", inet_ntoa(addr.sin_addr));
 
-       if (!exit_backend)
-               goto again;
+               pid = fork();
+               if (pid) {
+                       close(sk);
+                       fio_server_add_conn_pid(pid);
+                       continue;
+               }
+
+               /* exits */
+               handle_connection(sk);
+       }
 
-out:
        return exitval;
 }
 
@@ -721,10 +867,11 @@ void fio_server_send_ts(struct thread_stat *ts, struct group_run_stats *rs)
        strcpy(p.ts.verror, ts->verror);
        strcpy(p.ts.description, ts->description);
 
-       p.ts.error      = cpu_to_le32(ts->error);
-       p.ts.groupid    = cpu_to_le32(ts->groupid);
-       p.ts.pid        = cpu_to_le32(ts->pid);
-       p.ts.members    = cpu_to_le32(ts->members);
+       p.ts.error              = cpu_to_le32(ts->error);
+       p.ts.thread_number      = cpu_to_le32(ts->thread_number);
+       p.ts.groupid            = cpu_to_le32(ts->groupid);
+       p.ts.pid                = cpu_to_le32(ts->pid);
+       p.ts.members            = cpu_to_le32(ts->members);
 
        for (i = 0; i < 2; i++) {
                convert_io_stat(&p.ts.clat_stat[i], &ts->clat_stat[i]);
@@ -851,29 +998,46 @@ void fio_server_send_du(void)
        }
 }
 
-int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
+/*
+ * Send a command with a separate PDU, not inlined in the command
+ */
+static int fio_send_cmd_ext_pdu(int sk, uint16_t opcode, const void *buf,
+                               off_t size, uint64_t tag, uint32_t flags)
 {
-       struct cmd_iolog_pdu *pdu;
        struct fio_net_cmd cmd;
+       struct iovec iov[2];
+
+       iov[0].iov_base = &cmd;
+       iov[0].iov_len = sizeof(cmd);
+       iov[1].iov_base = (void *) buf;
+       iov[1].iov_len = size;
+
+       __fio_init_net_cmd(&cmd, opcode, size, tag);
+       cmd.flags = __cpu_to_le32(flags);
+       fio_net_cmd_crc_pdu(&cmd, buf);
+
+       return fio_sendv_data(server_fd, iov, 2);
+}
+
+int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
+{
+       struct cmd_iolog_pdu pdu;
        z_stream stream;
        void *out_pdu;
-       size_t p_size;
-       int i;
-
-       p_size = sizeof(*pdu) + log->nr_samples * sizeof(struct io_sample);
-       pdu = malloc(p_size);
+       int i, ret = 0;
 
-       pdu->nr_samples = __cpu_to_le32(log->nr_samples);
-       pdu->log_type = cpu_to_le32(log->log_type);
-       strcpy((char *) pdu->name, name);
+       pdu.thread_number = cpu_to_le32(td->thread_number);
+       pdu.nr_samples = __cpu_to_le32(log->nr_samples);
+       pdu.log_type = cpu_to_le32(log->log_type);
+       strcpy((char *) pdu.name, name);
 
        for (i = 0; i < log->nr_samples; i++) {
-               struct io_sample *s = &pdu->samples[i];
+               struct io_sample *s = &log->log[i];
 
-               s->time = cpu_to_le64(log->log[i].time);
-               s->val  = cpu_to_le64(log->log[i].val);
-               s->ddir = cpu_to_le32(log->log[i].ddir);
-               s->bs   = cpu_to_le32(log->log[i].bs);
+               s->time = cpu_to_le64(s->time);
+               s->val  = cpu_to_le64(s->val);
+               s->ddir = cpu_to_le32(s->ddir);
+               s->bs   = cpu_to_le32(s->bs);
        }
 
        /*
@@ -888,59 +1052,69 @@ int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
        stream.opaque = Z_NULL;
 
        if (deflateInit(&stream, Z_DEFAULT_COMPRESSION) != Z_OK) {
-               free(out_pdu);
-               free(pdu);
-               return 1;
+               ret = 1;
+               goto err;
        }
 
        /*
-        * Don't compress the nr samples entry, we want to know on the
-        * client side how much data to allocate before starting inflate.
+        * Send header first, it's not compressed.
         */
-       __fio_init_net_cmd(&cmd, FIO_NET_CMD_IOLOG, sizeof(pdu->nr_samples), 0);
-       cmd.flags = __cpu_to_le32(FIO_NET_CMD_F_MORE);
-       fio_net_cmd_crc_pdu(&cmd, pdu);
-       fio_send_data(server_fd, &cmd, sizeof(cmd));
-       fio_send_data(server_fd, pdu, sizeof(pdu->nr_samples));
+       ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, &pdu,
+                                       sizeof(pdu), 0, FIO_NET_CMD_F_MORE);
+       if (ret)
+               goto err_zlib;
 
-       stream.next_in = (void *) pdu + sizeof(pdu->nr_samples);
-       stream.avail_in = p_size - sizeof(pdu->nr_samples);
+       stream.next_in = (void *) log->log;
+       stream.avail_in = log->nr_samples * sizeof(struct io_sample);
 
        do {
-               unsigned int this_len;
+               unsigned int this_len, flags = 0;
+               int ret;
 
                stream.avail_out = FIO_SERVER_MAX_FRAGMENT_PDU;
                stream.next_out = out_pdu;
-               deflate(&stream, Z_FINISH);
+               ret = deflate(&stream, Z_FINISH);
+               /* may be Z_OK, or Z_STREAM_END */
+               if (ret < 0)
+                       goto err_zlib;
 
                this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out;
 
-               __fio_init_net_cmd(&cmd, FIO_NET_CMD_IOLOG, this_len, 0);
-
                if (stream.avail_in)
-                       cmd.flags = __cpu_to_le32(FIO_NET_CMD_F_MORE);
-
-               fio_net_cmd_crc_pdu(&cmd, out_pdu);
+                       flags = FIO_NET_CMD_F_MORE;
 
-               fio_send_data(server_fd, &cmd, sizeof(cmd));
-               fio_send_data(server_fd, out_pdu, this_len);
+               ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG,
+                                          out_pdu, this_len, 0, flags);
+               if (ret)
+                       goto err_zlib;
        } while (stream.avail_in);
 
-       free(pdu);
-       free(out_pdu);
+err_zlib:
        deflateEnd(&stream);
-       return 0;
+err:
+       free(out_pdu);
+       return ret;
 }
 
-void fio_server_send_add_job(struct thread_options *o, const char *ioengine)
+void fio_server_send_add_job(struct thread_data *td)
 {
        struct cmd_add_job_pdu pdu;
 
-       convert_thread_options_to_net(&pdu.top, o);
+       memset(&pdu, 0, sizeof(pdu));
+       pdu.thread_number = cpu_to_le32(td->thread_number);
+       pdu.groupid = cpu_to_le32(td->groupid);
+       convert_thread_options_to_net(&pdu.top, &td->o);
 
        fio_net_send_cmd(server_fd, FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), 0);
 }
 
+void fio_server_send_start(struct thread_data *td)
+{
+       assert(server_fd != -1);
+
+       fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_SERVER_START, 0, NULL);
+}
+
 static int fio_init_server_ip(void)
 {
        struct sockaddr *addr;