server: rewrite message handling
authorJens Axboe <axboe@fb.com>
Thu, 10 Dec 2015 22:32:15 +0000 (15:32 -0700)
committerJens Axboe <axboe@fb.com>
Thu, 10 Dec 2015 22:32:15 +0000 (15:32 -0700)
Currently we don't do any synchronization when we send out
network messages, this can cause a variety of issues. Instead
of adding serialization for transmits, let the messages queue
up in the network backend, and let the backend send them out.

Work in progress, not everything has been tested with this.

Signed-off-by: Jens Axboe <axboe@fb.com>
iolog.c
server.c
server.h

diff --git a/iolog.c b/iolog.c
index e674171e5195f71b74b6c03514a33d597ad4312e..650a21b0beb9143c46916a07675df1f4c6ded915 100644 (file)
--- a/iolog.c
+++ b/iolog.c
@@ -1164,6 +1164,8 @@ int iolog_flush(struct io_log *log, int wait)
 {
        struct iolog_flush_data *data;
 
+       io_u_quiesce(log->td);
+
        data = malloc(sizeof(*data));
        if (!data)
                return 1;
index 27ea282fa735dd702ddd9f40b8f5cac277bcc57d..0942e631730388bf0e225f4d20673bb3adc50df6 100644 (file)
--- a/server.c
+++ b/server.c
@@ -32,7 +32,28 @@ int fio_net_port = FIO_NET_PORT;
 
 int exit_backend = 0;
 
-static int server_fd = -1;
+enum {
+       SK_F_FREE       = 1,
+       SK_F_COPY       = 2,
+       SK_F_SIMPLE     = 4,
+};
+
+struct sk_entry {
+       struct flist_head list;
+       int opcode;
+       void *buf;
+       off_t size;
+       uint64_t *tagptr;
+       int flags;
+};
+
+struct sk_out {
+       int sk;
+       struct fio_mutex *lock;
+       struct flist_head list;
+       struct fio_mutex *wait;
+};
+
 static char *fio_server_arg;
 static char *bind_sock;
 static struct sockaddr_in saddr_in;
@@ -46,6 +67,8 @@ static unsigned int has_zlib = 0;
 static unsigned int use_zlib;
 static char me[128];
 
+static pthread_key_t sk_out_key;
+
 struct fio_fork_item {
        struct flist_head list;
        int exitval;
@@ -86,6 +109,16 @@ static const char *fio_server_ops[FIO_NET_CMD_NR] = {
        "SENDFILE",
 };
 
+static void sk_lock(struct sk_out *sk_out)
+{
+       fio_mutex_down(sk_out->lock);
+}
+
+static void sk_unlock(struct sk_out *sk_out)
+{
+       fio_mutex_up(sk_out->lock);
+}
+
 const char *fio_server_op(unsigned int op)
 {
        static char buf[32];
@@ -416,6 +449,32 @@ int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size,
        return ret;
 }
 
+static int fio_net_queue_cmd(uint16_t opcode, void *buf, off_t size,
+                            uint64_t *tagptr, int flags)
+{
+       struct sk_out *sk_out = pthread_getspecific(sk_out_key);
+       struct sk_entry *entry;
+
+       entry = smalloc(sizeof(*entry));
+       entry->opcode = opcode;
+       if (flags & SK_F_COPY) {
+               entry->buf = smalloc(size);
+               memcpy(entry->buf, buf, size);
+       } else
+               entry->buf = buf;
+       entry->size = size;
+       entry->tagptr = tagptr;
+       entry->flags = flags;
+
+       sk_lock(sk_out);
+       flist_add_tail(&entry->list, &sk_out->list);
+       sk_unlock(sk_out);
+
+       fio_mutex_up(sk_out->wait);
+
+       return 0;
+}
+
 static int fio_net_send_simple_stack_cmd(int sk, uint16_t opcode, uint64_t tag)
 {
        struct fio_net_cmd cmd;
@@ -452,6 +511,13 @@ int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag,
        return 0;
 }
 
+static int fio_net_queue_quit(void)
+{
+       dprint(FD_NET, "server: sending quit\n");
+
+       return fio_net_queue_cmd(FIO_NET_CMD_QUIT, NULL, 0, 0, SK_F_SIMPLE);
+}
+
 int fio_net_send_quit(int sk)
 {
        dprint(FD_NET, "server: sending quit\n");
@@ -459,8 +525,7 @@ int fio_net_send_quit(int sk)
        return fio_net_send_simple_cmd(sk, FIO_NET_CMD_QUIT, 0, NULL);
 }
 
-static int fio_net_send_ack(int sk, struct fio_net_cmd *cmd, int error,
-                           int signal)
+static int fio_net_send_ack(struct fio_net_cmd *cmd, int error, int signal)
 {
        struct cmd_end_pdu epdu;
        uint64_t tag = 0;
@@ -470,13 +535,13 @@ static int fio_net_send_ack(int sk, struct fio_net_cmd *cmd, int error,
 
        epdu.error = __cpu_to_le32(error);
        epdu.signal = __cpu_to_le32(signal);
-       return fio_net_send_cmd(sk, FIO_NET_CMD_STOP, &epdu, sizeof(epdu), &tag, NULL);
+       return fio_net_queue_cmd(FIO_NET_CMD_STOP, &epdu, sizeof(epdu), &tag, SK_F_COPY);
 }
 
-int fio_net_send_stop(int sk, int error, int signal)
+static int fio_net_queue_stop(int error, int signal)
 {
        dprint(FD_NET, "server: sending stop (%d, %d)\n", error, signal);
-       return fio_net_send_ack(sk, NULL, error, signal);
+       return fio_net_send_ack(NULL, error, signal);
 }
 
 static void fio_server_add_fork_item(pid_t pid, struct flist_head *list)
@@ -527,20 +592,23 @@ static void fio_server_check_fork_item(struct fio_fork_item *ffi)
        }
 }
 
-static void fio_server_fork_item_done(struct fio_fork_item *ffi)
+static void fio_server_fork_item_done(struct fio_fork_item *ffi, bool stop)
 {
        dprint(FD_NET, "pid %u exited, sig=%u, exitval=%d\n", (int) ffi->pid, ffi->signal, ffi->exitval);
 
        /*
         * Fold STOP and QUIT...
         */
-       fio_net_send_stop(server_fd, ffi->exitval, ffi->signal);
-       fio_net_send_quit(server_fd);
+       if (stop) {
+               fio_net_queue_stop(ffi->exitval, ffi->signal);
+               fio_net_queue_quit();
+       }
+
        flist_del(&ffi->list);
        free(ffi);
 }
 
-static void fio_server_check_fork_items(struct flist_head *list)
+static void fio_server_check_fork_items(struct flist_head *list, bool stop)
 {
        struct flist_head *entry, *tmp;
        struct fio_fork_item *ffi;
@@ -551,18 +619,18 @@ static void fio_server_check_fork_items(struct flist_head *list)
                fio_server_check_fork_item(ffi);
 
                if (ffi->exited)
-                       fio_server_fork_item_done(ffi);
+                       fio_server_fork_item_done(ffi, stop);
        }
 }
 
 static void fio_server_check_jobs(struct flist_head *job_list)
 {
-       fio_server_check_fork_items(job_list);
+       fio_server_check_fork_items(job_list, true);
 }
 
 static void fio_server_check_conns(struct flist_head *conn_list)
 {
-       fio_server_check_fork_items(conn_list);
+       fio_server_check_fork_items(conn_list, false);
 }
 
 static int handle_load_file_cmd(struct fio_net_cmd *cmd)
@@ -577,18 +645,20 @@ static int handle_load_file_cmd(struct fio_net_cmd *cmd)
        pdu->client_type = le16_to_cpu(pdu->client_type);
 
        if (parse_jobs_ini(file_name, 0, 0, pdu->client_type)) {
-               fio_net_send_quit(server_fd);
+               fio_net_queue_quit();
                return -1;
        }
 
        spdu.jobs = cpu_to_le32(thread_number);
        spdu.stat_outputs = cpu_to_le32(stat_number);
-       fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, NULL);
+       fio_net_queue_cmd(FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, SK_F_COPY);
        return 0;
 }
 
-static int handle_run_cmd(struct flist_head *job_list, struct fio_net_cmd *cmd)
+static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list,
+                         struct fio_net_cmd *cmd)
 {
+       struct backend_data data;
        pid_t pid;
        int ret;
 
@@ -601,7 +671,10 @@ static int handle_run_cmd(struct flist_head *job_list, struct fio_net_cmd *cmd)
                return 0;
        }
 
-       ret = fio_backend(NULL);
+       data.key = sk_out_key;
+       data.ptr = sk_out;
+       //pthread_setspecific(sk_out_key, sk_out);
+       ret = fio_backend(&data);
        free_threads_shm();
        _exit(ret);
 }
@@ -616,13 +689,14 @@ static int handle_job_cmd(struct fio_net_cmd *cmd)
        pdu->client_type = le32_to_cpu(pdu->client_type);
 
        if (parse_jobs_ini(buf, 1, 0, pdu->client_type)) {
-               fio_net_send_quit(server_fd);
+               fio_net_queue_quit();
                return -1;
        }
 
        spdu.jobs = cpu_to_le32(thread_number);
        spdu.stat_outputs = cpu_to_le32(stat_number);
-       fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, NULL);
+
+       fio_net_queue_cmd(FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, SK_F_COPY);
        return 0;
 }
 
@@ -653,7 +727,7 @@ static int handle_jobline_cmd(struct fio_net_cmd *cmd)
        }
 
        if (parse_cmd_line(clp->lines, argv, clp->client_type)) {
-               fio_net_send_quit(server_fd);
+               fio_net_queue_quit();
                free(argv);
                return -1;
        }
@@ -662,7 +736,8 @@ static int handle_jobline_cmd(struct fio_net_cmd *cmd)
 
        spdu.jobs = cpu_to_le32(thread_number);
        spdu.stat_outputs = cpu_to_le32(stat_number);
-       fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, NULL);
+
+       fio_net_queue_cmd(FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, SK_F_COPY);
        return 0;
 }
 
@@ -699,7 +774,7 @@ static int handle_probe_cmd(struct fio_net_cmd *cmd)
                use_zlib = 0;
        }
 
-       return fio_net_send_cmd(server_fd, FIO_NET_CMD_PROBE, &probe, sizeof(probe), &tag, NULL);
+       return fio_net_queue_cmd(FIO_NET_CMD_PROBE, &probe, sizeof(probe), &tag, SK_F_COPY);
 }
 
 static int handle_send_eta_cmd(struct fio_net_cmd *cmd)
@@ -742,18 +817,17 @@ static int handle_send_eta_cmd(struct fio_net_cmd *cmd)
                je->unit_base           = cpu_to_le32(je->unit_base);
        }
 
-       fio_net_send_cmd(server_fd, FIO_NET_CMD_ETA, je, size, &tag, NULL);
-       free(je);
+       fio_net_queue_cmd(FIO_NET_CMD_ETA, je, size, &tag, SK_F_FREE);
        return 0;
 }
 
-static int send_update_job_reply(int fd, uint64_t __tag, int error)
+static int send_update_job_reply(uint64_t __tag, int error)
 {
        uint64_t tag = __tag;
        uint32_t pdu_error;
 
        pdu_error = __cpu_to_le32(error);
-       return fio_net_send_cmd(fd, FIO_NET_CMD_UPDATE_JOB, &pdu_error, sizeof(pdu_error), &tag, NULL);
+       return fio_net_queue_cmd(FIO_NET_CMD_UPDATE_JOB, &pdu_error, sizeof(pdu_error), &tag, SK_F_COPY);
 }
 
 static int handle_update_job_cmd(struct fio_net_cmd *cmd)
@@ -767,13 +841,13 @@ static int handle_update_job_cmd(struct fio_net_cmd *cmd)
        dprint(FD_NET, "server: updating options for job %u\n", tnumber);
 
        if (!tnumber || tnumber > thread_number) {
-               send_update_job_reply(server_fd, cmd->tag, ENODEV);
+               send_update_job_reply(cmd->tag, ENODEV);
                return 0;
        }
 
        td = &threads[tnumber - 1];
        convert_thread_options_to_cpu(&td->o, &pdu->top);
-       send_update_job_reply(server_fd, cmd->tag, 0);
+       send_update_job_reply(cmd->tag, 0);
        return 0;
 }
 
@@ -792,17 +866,16 @@ static int handle_trigger_cmd(struct fio_net_cmd *cmd)
                struct all_io_list state;
 
                state.threads = cpu_to_le64((uint64_t) 0);
-               fio_net_send_cmd(server_fd, FIO_NET_CMD_VTRIGGER, &state, sizeof(state), NULL, NULL);
-       } else {
-               fio_net_send_cmd(server_fd, FIO_NET_CMD_VTRIGGER, rep, sz, NULL, NULL);
-               free(rep);
-       }
+               fio_net_queue_cmd(FIO_NET_CMD_VTRIGGER, &state, sizeof(state), NULL, SK_F_COPY);
+       } else
+               fio_net_queue_cmd(FIO_NET_CMD_VTRIGGER, rep, sz, NULL, SK_F_FREE);
 
        exec_trigger(buf);
        return 0;
 }
 
-static int handle_command(struct flist_head *job_list, struct fio_net_cmd *cmd)
+static int handle_command(struct sk_out *sk_out, struct flist_head *job_list,
+                         struct fio_net_cmd *cmd)
 {
        int ret;
 
@@ -813,7 +886,8 @@ static int handle_command(struct flist_head *job_list, struct fio_net_cmd *cmd)
        switch (cmd->opcode) {
        case FIO_NET_CMD_QUIT:
                fio_terminate_threads(TERMINATE_ALL);
-               return -1;
+               ret = 0;
+               break;
        case FIO_NET_CMD_EXIT:
                exit_backend = 1;
                return -1;
@@ -833,7 +907,7 @@ static int handle_command(struct flist_head *job_list, struct fio_net_cmd *cmd)
                ret = handle_send_eta_cmd(cmd);
                break;
        case FIO_NET_CMD_RUN:
-               ret = handle_run_cmd(job_list, cmd);
+               ret = handle_run_cmd(sk_out, job_list, cmd);
                break;
        case FIO_NET_CMD_UPDATE_JOB:
                ret = handle_update_job_cmd(cmd);
@@ -875,19 +949,59 @@ static int handle_command(struct flist_head *job_list, struct fio_net_cmd *cmd)
        return ret;
 }
 
-static int handle_connection(int sk)
+static void handle_sk_entry(struct sk_out *sk_out, struct sk_entry *entry)
+{
+       if (entry->flags & SK_F_SIMPLE) {
+               uint64_t tag = 0;
+
+               if (entry->tagptr)
+                       tag = *entry->tagptr;
+
+               fio_net_send_simple_cmd(sk_out->sk, entry->opcode, tag, NULL);
+       } else
+               fio_net_send_cmd(sk_out->sk, entry->opcode, entry->buf, entry->size, entry->tagptr, NULL);
+
+       if (entry->flags & SK_F_FREE)
+               free(entry->buf);
+       else if (entry->flags & SK_F_COPY)
+               sfree(entry->buf);
+
+       sfree(entry);
+}
+
+static void handle_xmits(struct sk_out *sk_out)
+{
+       struct sk_entry *entry;
+       FLIST_HEAD(list);
+
+       sk_lock(sk_out);
+       if (flist_empty(&sk_out->list)) {
+               sk_unlock(sk_out);
+               return;
+       }
+
+       flist_splice_init(&sk_out->list, &list);
+       sk_unlock(sk_out);
+
+       while (!flist_empty(&list)) {
+               entry = flist_entry(list.next, struct sk_entry, list);
+               flist_del(&entry->list);
+               handle_sk_entry(sk_out, entry);
+       }
+}
+
+static int handle_connection(struct sk_out *sk_out)
 {
        struct fio_net_cmd *cmd = NULL;
        FLIST_HEAD(job_list);
        int ret = 0;
 
        reset_fio_state();
-       server_fd = sk;
 
        /* read forever */
        while (!exit_backend) {
                struct pollfd pfd = {
-                       .fd     = sk,
+                       .fd     = sk_out->sk,
                        .events = POLLIN,
                };
 
@@ -898,7 +1012,9 @@ static int handle_connection(int sk)
                        if (!flist_empty(&job_list))
                                timeout = 100;
 
-                       ret = poll(&pfd, 1, timeout);
+                       handle_xmits(sk_out);
+
+                       ret = poll(&pfd, 1, 0);
                        if (ret < 0) {
                                if (errno == EINTR)
                                        break;
@@ -906,6 +1022,7 @@ static int handle_connection(int sk)
                                break;
                        } else if (!ret) {
                                fio_server_check_jobs(&job_list);
+                               fio_mutex_down_timeout(sk_out->wait, timeout);
                                continue;
                        }
 
@@ -922,13 +1039,13 @@ static int handle_connection(int sk)
                if (ret < 0)
                        break;
 
-               cmd = fio_net_recv_cmd(sk);
+               cmd = fio_net_recv_cmd(sk_out->sk);
                if (!cmd) {
                        ret = -1;
                        break;
                }
 
-               ret = handle_command(&job_list, cmd);
+               ret = handle_command(sk_out, &job_list, cmd);
                if (ret)
                        break;
 
@@ -939,39 +1056,45 @@ static int handle_connection(int sk)
        if (cmd)
                free(cmd);
 
-       close(sk);
+       handle_xmits(sk_out);
+
+       close(sk_out->sk);
        _exit(ret);
 }
 
 /* get the address on this host bound by the input socket, 
  * whether it is ipv6 or ipv4 */
 
-int get_my_addr_str( int sk )
+int get_my_addr_str(int sk)
 {
-       int ret; 
-       struct sockaddr * sockaddr_p;
-       struct sockaddr_in myaddr4 = {0};
-       struct sockaddr_in6 myaddr6 = {0};
-       char * net_addr;
-       socklen_t len = use_ipv6 ? sizeof(myaddr6) : sizeof(myaddr4);
+       struct sockaddr_in6 myaddr6 = { 0, };
+       struct sockaddr_in myaddr4 = { 0, };
+       struct sockaddr *sockaddr_p;
+       char *net_addr;
+       socklen_t len;
+       int ret;
 
-       if (use_ipv6)
+       if (use_ipv6) {
+               len = sizeof(myaddr6);
                sockaddr_p = (struct sockaddr * )&myaddr6;
-       else
+               net_addr = (char * )&myaddr6.sin6_addr;
+       } else {
+               len = sizeof(myaddr4);
                sockaddr_p = (struct sockaddr * )&myaddr4;
+               net_addr = (char * )&myaddr4.sin_addr;
+       }
+
        ret = getsockname(sk, sockaddr_p, &len);
        if (ret) {
                log_err("fio: getsockaddr: %s\n", strerror(errno));
                return -1;
        }
-       if (use_ipv6)
-               net_addr = (char * )&myaddr6.sin6_addr;
-       else
-               net_addr = (char * )&myaddr4.sin_addr;
-       if (NULL == inet_ntop(use_ipv6?AF_INET6:AF_INET, net_addr, client_sockaddr_str, INET6_ADDRSTRLEN-1)) {
+
+       if (!inet_ntop(use_ipv6?AF_INET6:AF_INET, net_addr, client_sockaddr_str, INET6_ADDRSTRLEN - 1)) {
                log_err("inet_ntop: failed to convert addr to string\n");
                return -1;
        }
+
        dprint(FD_NET, "fio server bound to addr %s\n", client_sockaddr_str);
        return 0;
 }
@@ -983,12 +1106,20 @@ static int accept_loop(int listen_sk)
        socklen_t len = use_ipv6 ? sizeof(addr6) : sizeof(addr);
        struct pollfd pfd;
        int ret = 0, sk, exitval = 0;
+       struct sk_out *sk_out;
        FLIST_HEAD(conn_list);
 
        dprint(FD_NET, "server enter accept loop\n");
 
        fio_set_fd_nonblocking(listen_sk, "server");
 
+       sk_out = smalloc(sizeof(*sk_out));
+       INIT_FLIST_HEAD(&sk_out->list);
+       sk_out->lock = fio_mutex_init(FIO_MUTEX_UNLOCKED);
+       sk_out->wait = fio_mutex_init(FIO_MUTEX_LOCKED);
+
+       pthread_setspecific(sk_out_key, sk_out);
+
        while (!exit_backend) {
                const char *from;
                char buf[64];
@@ -1039,28 +1170,39 @@ static int accept_loop(int listen_sk)
 
                dprint(FD_NET, "server: connect from %s\n", from);
 
+               sk_out->sk = sk;
+
                pid = fork();
                if (pid) {
                        close(sk);
                        fio_server_add_conn_pid(&conn_list, pid);
+                       pthread_setspecific(sk_out_key, sk_out);
                        continue;
                }
 
                /* exits */
                get_my_addr_str(sk); /* if error, it's already logged, non-fatal */
-               handle_connection(sk);
+               handle_connection(sk_out);
        }
 
+#if 0
+       fio_mutex_remove(sk_out->lock);
+       fio_mutex_remove(sk_out->wait);
+       sfree(sk_out);
+       pthread_setspecific(sk_out_key, NULL);
+#endif
+
        return exitval;
 }
 
 int fio_server_text_output(int level, const char *buf, size_t len)
 {
+       struct sk_out *sk_out = pthread_getspecific(sk_out_key);
        struct cmd_text_pdu *pdu;
        unsigned int tlen;
        struct timeval tv;
 
-       if (server_fd == -1)
+       if (!sk_out || sk_out->sk == -1)
                return -1;
 
        tlen = sizeof(*pdu) + len;
@@ -1075,7 +1217,7 @@ int fio_server_text_output(int level, const char *buf, size_t len)
 
        memcpy(pdu->buf, buf, len);
 
-       fio_net_send_cmd(server_fd, FIO_NET_CMD_TEXT, pdu, tlen, NULL, NULL);
+       fio_net_queue_cmd(FIO_NET_CMD_TEXT, pdu, tlen, NULL, SK_F_COPY);
        free(pdu);
        return len;
 }
@@ -1205,7 +1347,7 @@ void fio_server_send_ts(struct thread_stat *ts, struct group_run_stats *rs)
 
        convert_gs(&p.rs, rs);
 
-       fio_net_send_cmd(server_fd, FIO_NET_CMD_TS, &p, sizeof(p), NULL, NULL);
+       fio_net_queue_cmd(FIO_NET_CMD_TS, &p, sizeof(p), NULL, SK_F_COPY);
 }
 
 void fio_server_send_gs(struct group_run_stats *rs)
@@ -1215,7 +1357,7 @@ void fio_server_send_gs(struct group_run_stats *rs)
        dprint(FD_NET, "server sending group run stats\n");
 
        convert_gs(&gs, rs);
-       fio_net_send_cmd(server_fd, FIO_NET_CMD_GS, &gs, sizeof(gs), NULL, NULL);
+       fio_net_queue_cmd(FIO_NET_CMD_GS, &gs, sizeof(gs), NULL, SK_F_COPY);
 }
 
 static void convert_agg(struct disk_util_agg *dst, struct disk_util_agg *src)
@@ -1270,7 +1412,7 @@ void fio_server_send_du(void)
                convert_dus(&pdu.dus, &du->dus);
                convert_agg(&pdu.agg, &du->agg);
 
-               fio_net_send_cmd(server_fd, FIO_NET_CMD_DU, &pdu, sizeof(pdu), NULL, NULL);
+               fio_net_queue_cmd(FIO_NET_CMD_DU, &pdu, sizeof(pdu), NULL, SK_F_COPY);
        }
 }
 
@@ -1295,7 +1437,8 @@ static int fio_send_cmd_ext_pdu(int sk, uint16_t opcode, const void *buf,
        return fio_sendv_data(sk, iov, 2);
 }
 
-static int fio_send_iolog_gz(struct cmd_iolog_pdu *pdu, struct io_log *log)
+static int fio_send_iolog_gz(struct sk_out *sk_out, struct cmd_iolog_pdu *pdu,
+                            struct io_log *log)
 {
        int ret = 0;
 #ifdef CONFIG_ZLIB
@@ -1336,7 +1479,7 @@ static int fio_send_iolog_gz(struct cmd_iolog_pdu *pdu, struct io_log *log)
                if (stream.avail_in)
                        flags = FIO_NET_CMD_F_MORE;
 
-               ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG,
+               ret = fio_send_cmd_ext_pdu(sk_out->sk, FIO_NET_CMD_IOLOG,
                                           out_pdu, this_len, 0, flags);
                if (ret)
                        goto err_zlib;
@@ -1352,6 +1495,7 @@ err:
 
 int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
 {
+       struct sk_out *sk_out = pthread_getspecific(sk_out_key);
        struct cmd_iolog_pdu pdu;
        int i, ret = 0;
 
@@ -1381,7 +1525,7 @@ int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
        /*
         * Send header first, it's not compressed.
         */
-       ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, &pdu,
+       ret = fio_send_cmd_ext_pdu(sk_out->sk, FIO_NET_CMD_IOLOG, &pdu,
                                        sizeof(pdu), 0, FIO_NET_CMD_F_MORE);
        if (ret)
                return ret;
@@ -1390,10 +1534,14 @@ int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
         * Now send actual log, compress if we can, otherwise just plain
         */
        if (use_zlib)
-               return fio_send_iolog_gz(&pdu, log);
+               ret = fio_send_iolog_gz(sk_out, &pdu, log);
+       else {
+               ret = fio_send_cmd_ext_pdu(sk_out->sk, FIO_NET_CMD_IOLOG,
+                               log->log, log->nr_samples * log_entry_sz(log),
+                               0, 0);
+       }
 
-       return fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, log->log,
-                       log->nr_samples * log_entry_sz(log), 0, 0);
+       return ret;
 }
 
 void fio_server_send_add_job(struct thread_data *td)
@@ -1405,14 +1553,16 @@ void fio_server_send_add_job(struct thread_data *td)
        pdu.groupid = cpu_to_le32(td->groupid);
        convert_thread_options_to_net(&pdu.top, &td->o);
 
-       fio_net_send_cmd(server_fd, FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), NULL, NULL);
+       fio_net_queue_cmd(FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), NULL, SK_F_COPY);
 }
 
 void fio_server_send_start(struct thread_data *td)
 {
-       assert(server_fd != -1);
+       struct sk_out *sk_out = pthread_getspecific(sk_out_key);
+
+       assert(sk_out->sk != -1);
 
-       fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_SERVER_START, 0, NULL);
+       fio_net_queue_cmd(FIO_NET_CMD_SERVER_START, NULL, 0, 0, SK_F_SIMPLE);
 }
 
 int fio_server_get_verify_state(const char *name, int threadnumber,
@@ -1439,8 +1589,7 @@ int fio_server_get_verify_state(const char *name, int threadnumber,
        verify_state_gen_name((char *) out.path, sizeof(out.path), name, me,
                                threadnumber);
        tag = (uint64_t) (uintptr_t) rep;
-       fio_net_send_cmd(server_fd, FIO_NET_CMD_SENDFILE, &out, sizeof(out),
-                               &tag, NULL);
+       fio_net_queue_cmd(FIO_NET_CMD_SENDFILE, &out, sizeof(out), &tag, SK_F_COPY);
 
        /*
         * Wait for the backend to receive the reply
@@ -1455,7 +1604,7 @@ int fio_server_get_verify_state(const char *name, int threadnumber,
 fail:
                *datap = NULL;
                sfree(rep);
-               fio_net_send_quit(server_fd);
+               fio_net_queue_quit();
                return 1;
        }
 
@@ -1818,6 +1967,9 @@ static int fio_server(void)
 
        set_sig_handlers();
 
+       if (pthread_key_create(&sk_out_key, NULL))
+               log_err("fio: can't create sk_out backend key\n");
+
        ret = accept_loop(sk);
 
        close(sk);
@@ -1834,8 +1986,12 @@ static int fio_server(void)
 
 void fio_server_got_signal(int signal)
 {
+       struct sk_out *sk_out = pthread_getspecific(sk_out_key);
+
+       assert(sk_out);
+
        if (signal == SIGPIPE)
-               server_fd = -1;
+               sk_out->sk = -1;
        else {
                log_info("\nfio: terminating on signal %d\n", signal);
                exit_backend = 1;
index 6370c5042eb6bd7a8c1fc8b3c5b596bb1f7e9cc3..cf3855b7d35335a87345a49c37fe191d371bf995 100644 (file)
--- a/server.h
+++ b/server.h
@@ -208,7 +208,6 @@ extern struct fio_net_cmd *fio_net_recv_cmd(int sk);
 extern int fio_send_iolog(struct thread_data *, struct io_log *, const char *);
 extern void fio_server_send_add_job(struct thread_data *);
 extern void fio_server_send_start(struct thread_data *);
-extern int fio_net_send_stop(int sk, int error, int signal);
 extern int fio_net_send_quit(int sk);
 
 extern int exit_backend;