server: make client connections fork off
authorJens Axboe <axboe@kernel.dk>
Mon, 19 Mar 2012 08:13:15 +0000 (09:13 +0100)
committerJens Axboe <axboe@kernel.dk>
Mon, 19 Mar 2012 08:13:15 +0000 (09:13 +0100)
Instead of handling connections inline from the server, fork
them off before running them. This makes the able to handle
the idle loop command send/reply directly, without being
invoked from the idle loop.

New connections are forked off automatically as well, leaving
the server free to accept new connections. Not sure we want that,
or at least it should be an option to return BUSY on attempted
new connections.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
backend.c
client.c
client.h
server.c
server.h

index 92c4648645a964ed4b3563b167c8e4036a934076..e65af5258af564e64594c65bcbe32f01bb6d9f35 100644 (file)
--- a/backend.c
+++ b/backend.c
@@ -969,6 +969,9 @@ static void *thread_main(void *data)
 
        dprint(FD_PROCESS, "jobs pid=%d started\n", (int) td->pid);
 
 
        dprint(FD_PROCESS, "jobs pid=%d started\n", (int) td->pid);
 
+       if (is_backend)
+               fio_server_send_start(td);
+
        INIT_FLIST_HEAD(&td->io_u_freelist);
        INIT_FLIST_HEAD(&td->io_u_busylist);
        INIT_FLIST_HEAD(&td->io_u_requeues);
        INIT_FLIST_HEAD(&td->io_u_freelist);
        INIT_FLIST_HEAD(&td->io_u_busylist);
        INIT_FLIST_HEAD(&td->io_u_requeues);
@@ -1555,21 +1558,13 @@ static void run_threads(void)
 
                reap_threads(&nr_running, &t_rate, &m_rate);
 
 
                reap_threads(&nr_running, &t_rate, &m_rate);
 
-               if (todo) {
-                       if (is_backend)
-                               fio_server_idle_loop();
-                       else
-                               usleep(100000);
-               }
+               if (todo)
+                       usleep(100000);
        }
 
        while (nr_running) {
                reap_threads(&nr_running, &t_rate, &m_rate);
        }
 
        while (nr_running) {
                reap_threads(&nr_running, &t_rate, &m_rate);
-
-               if (is_backend)
-                       fio_server_idle_loop();
-               else
-                       usleep(10000);
+               usleep(10000);
        }
 
        update_io_ticks();
        }
 
        update_io_ticks();
index 6230a66e032258f338c55fa8c05e72bf4d6ad833..b553b055a82275c718a393d2e7e8f9832cfad9b5 100644 (file)
--- a/client.c
+++ b/client.c
@@ -373,7 +373,7 @@ int fio_client_connect(struct fio_client *client)
 
 void fio_client_terminate(struct fio_client *client)
 {
 
 void fio_client_terminate(struct fio_client *client)
 {
-       fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_QUIT, 0, NULL);
+       fio_net_send_quit(client->fd);
 }
 
 void fio_clients_terminate(void)
 }
 
 void fio_clients_terminate(void)
@@ -1118,7 +1118,8 @@ int fio_handle_client(struct fio_client *client)
 
                convert_stop(cmd);
                client->state = Client_stopped;
 
                convert_stop(cmd);
                client->state = Client_stopped;
-               client->error = pdu->error;
+               client->error = le32_to_cpu(pdu->error);
+               client->signal = le32_to_cpu(pdu->signal);
                ops->stop(client, cmd);
                free(cmd);
                break;
                ops->stop(client, cmd);
                free(cmd);
                break;
index 30228161fc2f479e66340ff5f65e8674e87b4d30..fd60b467461ce0a84027af880234acfc2f2a750f 100644 (file)
--- a/client.h
+++ b/client.h
@@ -43,6 +43,7 @@ struct fio_client {
        int disk_stats_shown;
        unsigned int jobs;
        int error;
        int disk_stats_shown;
        unsigned int jobs;
        int error;
+       int signal;
        int ipv6;
        int sent_job;
        uint32_t type;
        int ipv6;
        int sent_job;
        uint32_t type;
index 899b230274b1133dd54c3aacdb0020acf70056ba..d5733c4e05901a4f720e07e217320486d24e4138 100644 (file)
--- a/server.c
+++ b/server.c
@@ -34,9 +34,22 @@ static char *fio_server_arg;
 static char *bind_sock;
 static struct sockaddr_in saddr_in;
 static struct sockaddr_in6 saddr_in6;
 static char *bind_sock;
 static struct sockaddr_in saddr_in;
 static struct sockaddr_in6 saddr_in6;
-static int first_cmd_check;
 static int use_ipv6;
 
 static int use_ipv6;
 
+struct fio_fork_item {
+       struct flist_head list;
+       int exitval;
+       int signal;
+       int exited;
+       pid_t pid;
+};
+
+/* Created on fork on new connection */
+static FLIST_HEAD(conn_list);
+
+/* Created on job fork from connection */
+static FLIST_HEAD(job_list);
+
 static const char *fio_server_ops[FIO_NET_CMD_NR] = {
        "",
        "QUIT",
 static const char *fio_server_ops[FIO_NET_CMD_NR] = {
        "",
        "QUIT",
@@ -375,26 +388,125 @@ int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag,
        return 0;
 }
 
        return 0;
 }
 
-static int fio_server_send_quit_cmd(void)
+int fio_net_send_quit(int sk)
 {
        dprint(FD_NET, "server: sending quit\n");
 {
        dprint(FD_NET, "server: sending quit\n");
+
        return fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_QUIT, 0, NULL);
 }
 
        return fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_QUIT, 0, NULL);
 }
 
-static int handle_run_cmd(struct fio_net_cmd *cmd)
+int fio_net_send_stop(int sk, int error, int signal)
 {
        struct cmd_end_pdu epdu;
 {
        struct cmd_end_pdu epdu;
+
+       dprint(FD_NET, "server: sending stop (%d, %d)\n", error, signal);
+
+       epdu.error = __cpu_to_le32(error);
+       epdu.signal = __cpu_to_le32(signal);
+       return fio_net_send_cmd(server_fd, FIO_NET_CMD_STOP, &epdu, sizeof(epdu), 0);
+}
+
+static void fio_server_add_fork_item(pid_t pid, struct flist_head *list)
+{
+       struct fio_fork_item *ffi;
+
+       ffi = malloc(sizeof(*ffi));
+       ffi->exitval = 0;
+       ffi->signal = 0;
+       ffi->exited = 0;
+       ffi->pid = pid;
+       flist_add_tail(&ffi->list, list);
+}
+
+static void fio_server_add_conn_pid(pid_t pid)
+{
+       dprint(FD_NET, "server: forked off connection job (pid=%u)\n", pid);
+       fio_server_add_fork_item(pid, &conn_list);
+}
+
+static void fio_server_add_job_pid(pid_t pid)
+{
+       dprint(FD_NET, "server: forked off job job (pid=%u)\n", pid);
+       fio_server_add_fork_item(pid, &job_list);
+}
+
+static void fio_server_check_fork_item(struct fio_fork_item *ffi)
+{
+       int ret, status;
+
+       ret = waitpid(ffi->pid, &status, WNOHANG);
+       if (ret < 0) {
+               if (errno == ECHILD) {
+                       log_err("fio: connection pid %u disappeared\n", ffi->pid);
+                       ffi->exited = 1;
+               } else
+                       log_err("fio: waitpid: %s\n", strerror(errno));
+       } else if (ret == ffi->pid) {
+               if (WIFSIGNALED(status)) {
+                       ffi->signal = WTERMSIG(status);
+                       ffi->exited = 1;
+               }
+               if (WIFEXITED(status)) {
+                       if (WEXITSTATUS(status))
+                               ffi->exitval = WEXITSTATUS(status);
+                       ffi->exited = 1;
+               }
+       }
+}
+
+static void fio_server_fork_item_done(struct fio_fork_item *ffi)
+{
+       dprint(FD_NET, "pid %u exited, sig=%u, exitval=%d\n", ffi->pid, ffi->signal, ffi->exitval);
+
+       /*
+        * Fold STOP and QUIT...
+        */
+       fio_net_send_stop(server_fd, ffi->exitval, ffi->signal);
+       fio_net_send_quit(server_fd);
+       flist_del(&ffi->list);
+       free(ffi);
+}
+
+static void fio_server_check_fork_items(struct flist_head *list, int bla)
+{
+       struct flist_head *entry, *tmp;
+       struct fio_fork_item *ffi;
+
+       flist_for_each_safe(entry, tmp, list) {
+               ffi = flist_entry(entry, struct fio_fork_item, list);
+
+               fio_server_check_fork_item(ffi);
+
+               if (ffi->exited)
+                       fio_server_fork_item_done(ffi);
+       }
+}
+
+static void fio_server_check_jobs(void)
+{
+       fio_server_check_fork_items(&job_list, 0);
+}
+
+static void fio_server_check_conns(void)
+{
+       fio_server_check_fork_items(&conn_list, 1);
+}
+
+static int handle_run_cmd(struct fio_net_cmd *cmd)
+{
+       pid_t pid;
        int ret;
 
        int ret;
 
-       ret = fio_backend();
+       set_genesis_time();
 
 
-       epdu.error = ret;
-       fio_net_send_cmd(server_fd, FIO_NET_CMD_STOP, &epdu, sizeof(epdu), 0);
+       pid = fork();
+       if (pid) {
+               fio_server_add_job_pid(pid);
+               return 0;
+       }
 
 
-       fio_server_send_quit_cmd();
-       reset_fio_state();
-       first_cmd_check = 0;
-       return ret;
+       ret = fio_backend();
+       _exit(ret);
 }
 
 static int handle_job_cmd(struct fio_net_cmd *cmd)
 }
 
 static int handle_job_cmd(struct fio_net_cmd *cmd)
@@ -407,7 +519,7 @@ static int handle_job_cmd(struct fio_net_cmd *cmd)
        pdu->client_type = le32_to_cpu(pdu->client_type);
 
        if (parse_jobs_ini(buf, 1, 0, pdu->client_type)) {
        pdu->client_type = le32_to_cpu(pdu->client_type);
 
        if (parse_jobs_ini(buf, 1, 0, pdu->client_type)) {
-               fio_server_send_quit_cmd();
+               fio_net_send_quit(server_fd);
                return -1;
        }
 
                return -1;
        }
 
@@ -443,7 +555,7 @@ static int handle_jobline_cmd(struct fio_net_cmd *cmd)
        }
 
        if (parse_cmd_line(clp->lines, argv, clp->client_type)) {
        }
 
        if (parse_cmd_line(clp->lines, argv, clp->client_type)) {
-               fio_server_send_quit_cmd();
+               fio_net_send_quit(server_fd);
                free(argv);
                return -1;
        }
                free(argv);
                return -1;
        }
@@ -558,11 +670,15 @@ static int handle_command(struct fio_net_cmd *cmd)
        return ret;
 }
 
        return ret;
 }
 
-static int handle_connection(int sk, int block)
+static int handle_connection(int sk)
 {
        struct fio_net_cmd *cmd = NULL;
        int ret = 0;
 
 {
        struct fio_net_cmd *cmd = NULL;
        int ret = 0;
 
+       reset_fio_state();
+       INIT_FLIST_HEAD(&job_list);
+       server_fd = sk;
+
        /* read forever */
        while (!exit_backend) {
                struct pollfd pfd = {
        /* read forever */
        while (!exit_backend) {
                struct pollfd pfd = {
@@ -579,8 +695,7 @@ static int handle_connection(int sk, int block)
                                log_err("fio: poll: %s\n", strerror(errno));
                                break;
                        } else if (!ret) {
                                log_err("fio: poll: %s\n", strerror(errno));
                                break;
                        } else if (!ret) {
-                               if (!block)
-                                       return 0;
+                               fio_server_check_jobs();
                                continue;
                        }
 
                                continue;
                        }
 
@@ -592,6 +707,8 @@ static int handle_connection(int sk, int block)
                        }
                } while (!exit_backend);
 
                        }
                } while (!exit_backend);
 
+               fio_server_check_jobs();
+
                if (ret < 0)
                        break;
 
                if (ret < 0)
                        break;
 
@@ -612,17 +729,8 @@ static int handle_connection(int sk, int block)
        if (cmd)
                free(cmd);
 
        if (cmd)
                free(cmd);
 
-       return ret;
-}
-
-void fio_server_idle_loop(void)
-{
-       if (!first_cmd_check) {
-               fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_SERVER_START, 0, NULL);
-               first_cmd_check = 1;
-       }
-       if (server_fd != -1)
-               handle_connection(server_fd, 0);
+       close(sk);
+       _exit(ret);
 }
 
 static int accept_loop(int listen_sk)
 }
 
 static int accept_loop(int listen_sk)
@@ -630,52 +738,59 @@ static int accept_loop(int listen_sk)
        struct sockaddr_in addr;
        fio_socklen_t len = sizeof(addr);
        struct pollfd pfd;
        struct sockaddr_in addr;
        fio_socklen_t len = sizeof(addr);
        struct pollfd pfd;
-       int ret, sk, flags, exitval = 0;
+       int ret = 0, sk, flags, exitval = 0;
 
        dprint(FD_NET, "server enter accept loop\n");
 
        flags = fcntl(listen_sk, F_GETFL);
        flags |= O_NONBLOCK;
        fcntl(listen_sk, F_SETFL, flags);
 
        dprint(FD_NET, "server enter accept loop\n");
 
        flags = fcntl(listen_sk, F_GETFL);
        flags |= O_NONBLOCK;
        fcntl(listen_sk, F_SETFL, flags);
-again:
-       pfd.fd = listen_sk;
-       pfd.events = POLLIN;
-       do {
-               ret = poll(&pfd, 1, 100);
-               if (ret < 0) {
-                       if (errno == EINTR)
-                               break;
-                       log_err("fio: poll: %s\n", strerror(errno));
-                       goto out;
-               } else if (!ret)
-                       continue;
 
 
-               if (pfd.revents & POLLIN)
-                       break;
-       } while (!exit_backend);
+       while (!exit_backend) {
+               pid_t pid;
 
 
-       if (exit_backend)
-               goto out;
+               pfd.fd = listen_sk;
+               pfd.events = POLLIN;
+               do {
+                       ret = poll(&pfd, 1, 100);
+                       if (ret < 0) {
+                               if (errno == EINTR)
+                                       break;
+                               log_err("fio: poll: %s\n", strerror(errno));
+                               break;
+                       } else if (!ret) {
+                               fio_server_check_conns();
+                               continue;
+                       }
 
 
-       sk = accept(listen_sk, (struct sockaddr *) &addr, &len);
-       if (sk < 0) {
-               log_err("fio: accept: %s\n", strerror(errno));
-               return -1;
-       }
+                       if (pfd.revents & POLLIN)
+                               break;
+               } while (!exit_backend);
 
 
-       dprint(FD_NET, "server: connect from %s\n", inet_ntoa(addr.sin_addr));
+               fio_server_check_conns();
 
 
-       server_fd = sk;
+               if (exit_backend || ret < 0)
+                       break;
 
 
-       exitval = handle_connection(sk, 1);
+               sk = accept(listen_sk, (struct sockaddr *) &addr, &len);
+               if (sk < 0) {
+                       log_err("fio: accept: %s\n", strerror(errno));
+                       return -1;
+               }
 
 
-       server_fd = -1;
-       close(sk);
+               dprint(FD_NET, "server: connect from %s\n", inet_ntoa(addr.sin_addr));
 
 
-       if (!exit_backend)
-               goto again;
+               pid = fork();
+               if (pid) {
+                       close(sk);
+                       fio_server_add_conn_pid(pid);
+                       continue;
+               }
+
+               /* exits */
+               handle_connection(sk);
+       }
 
 
-out:
        return exitval;
 }
 
        return exitval;
 }
 
@@ -993,6 +1108,13 @@ void fio_server_send_add_job(struct thread_data *td)
        fio_net_send_cmd(server_fd, FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), 0);
 }
 
        fio_net_send_cmd(server_fd, FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), 0);
 }
 
+void fio_server_send_start(struct thread_data *td)
+{
+       assert(server_fd != -1);
+
+       fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_SERVER_START, 0, NULL);
+}
+
 static int fio_init_server_ip(void)
 {
        struct sockaddr *addr;
 static int fio_init_server_ip(void)
 {
        struct sockaddr *addr;
index 3c66ecbee403c254fc1a8993372ca270f1c58181..422bfbf2b571617aeda1af3da8ce2ba86be0eeba 100644 (file)
--- a/server.h
+++ b/server.h
@@ -38,7 +38,7 @@ struct fio_net_int_cmd {
 };
 
 enum {
 };
 
 enum {
-       FIO_SERVER_VER                  = 13,
+       FIO_SERVER_VER                  = 14,
 
        FIO_SERVER_MAX_FRAGMENT_PDU     = 1024,
 
 
        FIO_SERVER_MAX_FRAGMENT_PDU     = 1024,
 
@@ -116,6 +116,7 @@ struct cmd_start_pdu {
 
 struct cmd_end_pdu {
        uint32_t error;
 
 struct cmd_end_pdu {
        uint32_t error;
+       uint32_t signal;
 };
 
 struct cmd_add_job_pdu {
 };
 
 struct cmd_add_job_pdu {
@@ -155,7 +156,6 @@ struct group_run_stats;
 extern void fio_server_send_ts(struct thread_stat *, struct group_run_stats *);
 extern void fio_server_send_gs(struct group_run_stats *);
 extern void fio_server_send_du(void);
 extern void fio_server_send_ts(struct thread_stat *, struct group_run_stats *);
 extern void fio_server_send_gs(struct group_run_stats *);
 extern void fio_server_send_du(void);
-extern void fio_server_idle_loop(void);
 
 extern int fio_recv_data(int sk, void *p, unsigned int len);
 extern int fio_send_data(int sk, const void *p, unsigned int len);
 
 extern int fio_recv_data(int sk, void *p, unsigned int len);
 extern int fio_send_data(int sk, const void *p, unsigned int len);
@@ -165,6 +165,9 @@ extern struct fio_net_cmd *fio_net_recv_cmd(int sk);
 
 extern int fio_send_iolog(struct thread_data *, struct io_log *, const char *);
 extern void fio_server_send_add_job(struct thread_data *);
 
 extern int fio_send_iolog(struct thread_data *, struct io_log *, const char *);
 extern void fio_server_send_add_job(struct thread_data *);
+extern void fio_server_send_start(struct thread_data *);
+extern int fio_net_send_stop(int sk, int error, int signal);
+extern int fio_net_send_quit(int sk);
 
 extern int exit_backend;
 extern int fio_net_port;
 
 extern int exit_backend;
 extern int fio_net_port;