Fixup and improve per-thread data
authorJens Axboe <axboe@fb.com>
Fri, 11 Dec 2015 20:25:24 +0000 (13:25 -0700)
committerJens Axboe <axboe@fb.com>
Fri, 11 Dec 2015 20:25:24 +0000 (13:25 -0700)
- Get rid of backend_data, just use sk_out
- Add reference counting to sk_out and proper assign/drop helpers
- Ensure we pass it to other threads created
- Fix leaks of sk_out

This should get us closer to the new model actually working as
intended.

Signed-off-by: Jens Axboe <axboe@fb.com>
backend.c
fio.h
iolog.c
iolog.h
rate-submit.c
rate-submit.h
server.c
workqueue.c
workqueue.h

index c8554bc..c9875f4 100644 (file)
--- a/backend.c
+++ b/backend.c
@@ -1358,19 +1358,29 @@ static uint64_t do_dry_run(struct thread_data *td)
        return td->bytes_done[DDIR_WRITE] + td->bytes_done[DDIR_TRIM];
 }
 
+struct fork_data {
+       struct thread_data *td;
+       struct sk_out *sk_out;
+};
+
 /*
  * Entry point for the thread based jobs. The process based jobs end up
  * here as well, after a little setup.
  */
 static void *thread_main(void *data)
 {
+       struct fork_data *fd = data;
        unsigned long long elapsed_us[DDIR_RWDIR_CNT] = { 0, };
-       struct thread_data *td = data;
+       struct thread_data *td = fd->td;
        struct thread_options *o = &td->o;
+       struct sk_out *sk_out = fd->sk_out;
        pthread_condattr_t attr;
        int clear_state;
        int ret;
 
+       sk_out_assign(sk_out);
+       free(fd);
+
        if (!o->use_thread) {
                setsid();
                td->pid = getpid();
@@ -1550,12 +1560,12 @@ static void *thread_main(void *data)
                        goto err;
        }
 
-       if (iolog_compress_init(td))
+       if (iolog_compress_init(td, sk_out))
                goto err;
 
        fio_verify_init(td);
 
-       if (rate_submit_init(td))
+       if (rate_submit_init(td, sk_out))
                goto err;
 
        fio_gettime(&td->epoch, NULL);
@@ -1702,6 +1712,7 @@ err:
         */
        check_update_rusage(td);
 
+       sk_out_drop();
        return (void *) (uintptr_t) td->error;
 }
 
@@ -1710,9 +1721,9 @@ err:
  * We cannot pass the td data into a forked process, so attach the td and
  * pass it to the thread worker.
  */
-static int fork_main(int shmid, int offset)
+static int fork_main(struct sk_out *sk_out, int shmid, int offset)
 {
-       struct thread_data *td;
+       struct fork_data *fd;
        void *data, *ret;
 
 #if !defined(__hpux) && !defined(CONFIG_NO_SHM)
@@ -1730,8 +1741,10 @@ static int fork_main(int shmid, int offset)
        data = threads;
 #endif
 
-       td = data + offset * sizeof(struct thread_data);
-       ret = thread_main(td);
+       fd = calloc(1, sizeof(*fd));
+       fd->td = data + offset * sizeof(struct thread_data);
+       fd->sk_out = sk_out;
+       ret = thread_main(fd);
        shmdt(data);
        return (int) (uintptr_t) ret;
 }
@@ -1956,7 +1969,7 @@ mounted:
 /*
  * Main function for kicking off and reaping jobs, as needed.
  */
-static void run_threads(void)
+static void run_threads(struct sk_out *sk_out)
 {
        struct thread_data *td;
        unsigned int i, todo, nr_running, m_rate, t_rate, nr_started;
@@ -2090,14 +2103,20 @@ reap:
                        nr_started++;
 
                        if (td->o.use_thread) {
+                               struct fork_data *fd;
                                int ret;
 
+                               fd = calloc(1, sizeof(*fd));
+                               fd->td = td;
+                               fd->sk_out = sk_out;
+
                                dprint(FD_PROCESS, "will pthread_create\n");
                                ret = pthread_create(&td->thread, NULL,
-                                                       thread_main, td);
+                                                       thread_main, fd);
                                if (ret) {
                                        log_err("pthread_create: %s\n",
                                                        strerror(ret));
+                                       free(fd);
                                        nr_started--;
                                        break;
                                }
@@ -2110,7 +2129,7 @@ reap:
                                dprint(FD_PROCESS, "will fork\n");
                                pid = fork();
                                if (!pid) {
-                                       int ret = fork_main(shm_id, i);
+                                       int ret = fork_main(sk_out, shm_id, i);
 
                                        _exit(ret);
                                } else if (i == fio_debug_jobno)
@@ -2220,11 +2239,10 @@ static void free_disk_util(void)
 
 static void *helper_thread_main(void *data)
 {
-       struct backend_data *d = data;
+       struct sk_out *sk_out = data;
        int ret = 0;
 
-       if (d)
-               pthread_setspecific(d->key, d->ptr);
+       sk_out_assign(sk_out);
 
        fio_mutex_up(startup_mutex);
 
@@ -2256,10 +2274,11 @@ static void *helper_thread_main(void *data)
                        print_thread_status();
        }
 
+       sk_out_drop();
        return NULL;
 }
 
-static int create_helper_thread(struct backend_data *data)
+static int create_helper_thread(struct sk_out *sk_out)
 {
        int ret;
 
@@ -2268,7 +2287,7 @@ static int create_helper_thread(struct backend_data *data)
        pthread_cond_init(&helper_cond, NULL);
        pthread_mutex_init(&helper_lock, NULL);
 
-       ret = pthread_create(&helper_thread, NULL, helper_thread_main, data);
+       ret = pthread_create(&helper_thread, NULL, helper_thread_main, sk_out);
        if (ret) {
                log_err("Can't create helper thread: %s\n", strerror(ret));
                return 1;
@@ -2280,7 +2299,7 @@ static int create_helper_thread(struct backend_data *data)
        return 0;
 }
 
-int fio_backend(struct backend_data *data)
+int fio_backend(struct sk_out *sk_out)
 {
        struct thread_data *td;
        int i;
@@ -2310,12 +2329,12 @@ int fio_backend(struct backend_data *data)
 
        set_genesis_time();
        stat_init();
-       create_helper_thread(data);
+       create_helper_thread(sk_out);
 
        cgroup_list = smalloc(sizeof(*cgroup_list));
        INIT_FLIST_HEAD(cgroup_list);
 
-       run_threads();
+       run_threads(sk_out);
 
        wait_for_helper_thread_exit();
 
diff --git a/fio.h b/fio.h
index 63778b6..fb527da 100644 (file)
--- a/fio.h
+++ b/fio.h
@@ -111,10 +111,9 @@ enum {
  * Per-thread/process specific data. Only used for the network client
  * for now.
  */
-struct backend_data {
-       pthread_key_t key;
-       void *ptr;
-};
+struct sk_out;
+void sk_out_assign(struct sk_out *);
+void sk_out_drop(void);
 
 /*
  * This describes a single thread/process executing a fio job.
@@ -477,7 +476,7 @@ extern int __must_check fio_init_options(void);
 extern int __must_check parse_options(int, char **);
 extern int parse_jobs_ini(char *, int, int, int);
 extern int parse_cmd_line(int, char **, int);
-extern int fio_backend(struct backend_data *);
+extern int fio_backend(struct sk_out *);
 extern void reset_fio_state(void);
 extern void clear_io_state(struct thread_data *, int);
 extern int fio_options_parse(struct thread_data *, char **, int, int);
diff --git a/iolog.c b/iolog.c
index 650a21b..d4a1017 100644 (file)
--- a/iolog.c
+++ b/iolog.c
@@ -1133,15 +1133,15 @@ static int gz_init_worker(struct submit_worker *sw)
 static struct workqueue_ops log_compress_wq_ops = {
        .fn             = gz_work,
        .init_worker_fn = gz_init_worker,
-       .nice   = 1,
+       .nice           = 1,
 };
 
-int iolog_compress_init(struct thread_data *td)
+int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
 {
        if (!(td->flags & TD_F_COMPRESS_LOG))
                return 0;
 
-       workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1);
+       workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
        return 0;
 }
 
@@ -1207,7 +1207,7 @@ int iolog_flush(struct io_log *log, int wait)
        return 1;
 }
 
-int iolog_compress_init(struct thread_data *td)
+int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
 {
        return 0;
 }
diff --git a/iolog.h b/iolog.h
index 6f027ca..b99329a 100644 (file)
--- a/iolog.h
+++ b/iolog.h
@@ -184,7 +184,7 @@ extern void trim_io_piece(struct thread_data *, const struct io_u *);
 extern void queue_io_piece(struct thread_data *, struct io_piece *);
 extern void prune_io_piece_log(struct thread_data *);
 extern void write_iolog_close(struct thread_data *);
-extern int iolog_compress_init(struct thread_data *);
+extern int iolog_compress_init(struct thread_data *, struct sk_out *);
 extern void iolog_compress_exit(struct thread_data *);
 
 #ifdef CONFIG_ZLIB
index 39b552d..92cb622 100644 (file)
@@ -7,6 +7,7 @@
 #include "fio.h"
 #include "ioengine.h"
 #include "lib/getrusage.h"
+#include "rate-submit.h"
 
 static int io_workqueue_fn(struct submit_worker *sw,
                           struct workqueue_work *work)
@@ -235,12 +236,12 @@ static struct workqueue_ops rated_wq_ops = {
        .exit_worker_fn         = io_workqueue_exit_worker_fn,
 };
 
-int rate_submit_init(struct thread_data *td)
+int rate_submit_init(struct thread_data *td, struct sk_out *sk_out)
 {
        if (td->o.io_submit_mode != IO_MODE_OFFLOAD)
                return 0;
 
-       return workqueue_init(td, &td->io_wq, &rated_wq_ops, td->o.iodepth);
+       return workqueue_init(td, &td->io_wq, &rated_wq_ops, td->o.iodepth, sk_out);
 }
 
 void rate_submit_exit(struct thread_data *td)
index b4ca129..19fde3a 100644 (file)
@@ -1,7 +1,7 @@
 #ifndef FIO_RATE_SUBMIT
 #define FIO_RATE_SUBMIT
 
-int rate_submit_init(struct thread_data *);
+int rate_submit_init(struct thread_data *, struct sk_out *);
 void rate_submit_exit(struct thread_data *);
 
 #endif
index efdf51f..b0ff1dc 100644 (file)
--- a/server.c
+++ b/server.c
@@ -50,6 +50,8 @@ struct sk_entry {
 };
 
 struct sk_out {
+       unsigned int refs;
+
        int sk;
        struct fio_mutex *lock;
        struct flist_head list;
@@ -121,6 +123,42 @@ static void sk_unlock(struct sk_out *sk_out)
        fio_mutex_up(sk_out->lock);
 }
 
+void sk_out_assign(struct sk_out *sk_out)
+{
+       if (!sk_out)
+               return;
+
+       sk_lock(sk_out);
+       sk_out->refs++;
+       sk_unlock(sk_out);
+       pthread_setspecific(sk_out_key, sk_out);
+}
+
+static void __sk_out_drop(struct sk_out *sk_out)
+{
+       fio_mutex_remove(sk_out->lock);
+       fio_mutex_remove(sk_out->wait);
+       sfree(sk_out);
+}
+
+void sk_out_drop(void)
+{
+       struct sk_out *sk_out = pthread_getspecific(sk_out_key);
+
+       if (sk_out) {
+               int refs;
+
+               sk_lock(sk_out);
+               refs = --sk_out->refs;
+               sk_unlock(sk_out);
+
+               if (!refs)
+                       __sk_out_drop(sk_out);
+
+               pthread_setspecific(sk_out_key, NULL);
+       }
+}
+
 const char *fio_server_op(unsigned int op)
 {
        static char buf[32];
@@ -672,7 +710,6 @@ static int handle_load_file_cmd(struct fio_net_cmd *cmd)
 static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list,
                          struct fio_net_cmd *cmd)
 {
-       struct backend_data data;
        pid_t pid;
        int ret;
 
@@ -685,10 +722,7 @@ static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list,
                return 0;
        }
 
-       data.key = sk_out_key;
-       data.ptr = sk_out;
-       //pthread_setspecific(sk_out_key, sk_out);
-       ret = fio_backend(&data);
+       ret = fio_backend(sk_out);
        free_threads_shm();
        _exit(ret);
 }
@@ -1189,27 +1223,19 @@ int get_my_addr_str(int sk)
        return 0;
 }
 
-static int accept_loop(int listen_sk)
+static int accept_loop(struct sk_out *sk_out, int listen_sk)
 {
        struct sockaddr_in addr;
        struct sockaddr_in6 addr6;
        socklen_t len = use_ipv6 ? sizeof(addr6) : sizeof(addr);
        struct pollfd pfd;
        int ret = 0, sk, exitval = 0;
-       struct sk_out *sk_out;
        FLIST_HEAD(conn_list);
 
        dprint(FD_NET, "server enter accept loop\n");
 
        fio_set_fd_nonblocking(listen_sk, "server");
 
-       sk_out = smalloc(sizeof(*sk_out));
-       INIT_FLIST_HEAD(&sk_out->list);
-       sk_out->lock = fio_mutex_init(FIO_MUTEX_UNLOCKED);
-       sk_out->wait = fio_mutex_init(FIO_MUTEX_LOCKED);
-
-       pthread_setspecific(sk_out_key, sk_out);
-
        while (!exit_backend) {
                const char *from;
                char buf[64];
@@ -1275,13 +1301,6 @@ static int accept_loop(int listen_sk)
                handle_connection(sk_out);
        }
 
-#if 0
-       fio_mutex_remove(sk_out->lock);
-       fio_mutex_remove(sk_out->wait);
-       sfree(sk_out);
-       pthread_setspecific(sk_out_key, NULL);
-#endif
-
        return exitval;
 }
 
@@ -2020,6 +2039,7 @@ static void set_sig_handlers(void)
 
 static int fio_server(void)
 {
+       struct sk_out *sk_out;
        int sk, ret;
 
        dprint(FD_NET, "starting server\n");
@@ -2036,7 +2056,14 @@ static int fio_server(void)
        if (pthread_key_create(&sk_out_key, NULL))
                log_err("fio: can't create sk_out backend key\n");
 
-       ret = accept_loop(sk);
+       sk_out = smalloc(sizeof(*sk_out));
+       INIT_FLIST_HEAD(&sk_out->list);
+       sk_out->lock = fio_mutex_init(FIO_MUTEX_UNLOCKED);
+       sk_out->wait = fio_mutex_init(FIO_MUTEX_LOCKED);
+
+       sk_out_assign(sk_out);
+
+       ret = accept_loop(sk_out, sk);
 
        close(sk);
 
@@ -2047,6 +2074,8 @@ static int fio_server(void)
        if (bind_sock)
                free(bind_sock);
 
+       sk_out_drop();
+
        return ret;
 }
 
index 5fd95b9..6e67f3e 100644 (file)
@@ -133,6 +133,8 @@ static void *worker_thread(void *data)
        unsigned int eflags = 0, ret = 0;
        FLIST_HEAD(local_list);
 
+       sk_out_assign(sw->sk_out);
+
        if (wq->ops.nice) {
                if (nice(wq->ops.nice) < 0) {
                        log_err("workqueue: nice %s\n", strerror(errno));
@@ -206,6 +208,7 @@ done:
        pthread_mutex_lock(&sw->lock);
        sw->flags |= (SW_F_EXITED | eflags);
        pthread_mutex_unlock(&sw->lock);
+       sk_out_drop();
        return NULL;
 }
 
@@ -267,7 +270,8 @@ void workqueue_exit(struct workqueue *wq)
        pthread_mutex_destroy(&wq->stat_lock);
 }
 
-static int start_worker(struct workqueue *wq, unsigned int index)
+static int start_worker(struct workqueue *wq, unsigned int index,
+                       struct sk_out *sk_out)
 {
        struct submit_worker *sw = &wq->workers[index];
        int ret;
@@ -277,6 +281,7 @@ static int start_worker(struct workqueue *wq, unsigned int index)
        pthread_mutex_init(&sw->lock, NULL);
        sw->wq = wq;
        sw->index = index;
+       sw->sk_out = sk_out;
 
        if (wq->ops.alloc_worker_fn) {
                ret = wq->ops.alloc_worker_fn(sw);
@@ -297,12 +302,13 @@ static int start_worker(struct workqueue *wq, unsigned int index)
 }
 
 int workqueue_init(struct thread_data *td, struct workqueue *wq,
-                  struct workqueue_ops *ops, unsigned max_pending)
+                  struct workqueue_ops *ops, unsigned int max_workers,
+                  struct sk_out *sk_out)
 {
        unsigned int running;
        int i, error;
 
-       wq->max_workers = max_pending;
+       wq->max_workers = max_workers;
        wq->td = td;
        wq->ops = *ops;
        wq->work_seq = 0;
@@ -314,7 +320,7 @@ int workqueue_init(struct thread_data *td, struct workqueue *wq,
        wq->workers = calloc(wq->max_workers, sizeof(struct submit_worker));
 
        for (i = 0; i < wq->max_workers; i++)
-               if (start_worker(wq, i))
+               if (start_worker(wq, i, sk_out))
                        break;
 
        wq->max_workers = i;
index 46a3979..1961b2a 100644 (file)
@@ -17,6 +17,7 @@ struct submit_worker {
        uint64_t seq;
        struct workqueue *wq;
        void *private;
+       struct sk_out *sk_out;
 };
 
 typedef int (workqueue_work_fn)(struct submit_worker *, struct workqueue_work *);
@@ -60,7 +61,7 @@ struct workqueue {
        volatile int wake_idle;
 };
 
-int workqueue_init(struct thread_data *td, struct workqueue *wq, struct workqueue_ops *ops, unsigned int max_workers);
+int workqueue_init(struct thread_data *td, struct workqueue *wq, struct workqueue_ops *ops, unsigned int max_workers, struct sk_out *sk_out);
 void workqueue_exit(struct workqueue *wq);
 
 void workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work);