From 2466096336bd0fbc1a94811aa338926af6baf42f Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Fri, 11 Dec 2015 13:25:24 -0700 Subject: [PATCH] Fixup and improve per-thread data - Get rid of backend_data, just use sk_out - Add reference counting to sk_out and proper assign/drop helpers - Ensure we pass it to other threads created - Fix leaks of sk_out This should get us closer to the new model actually working as intended. Signed-off-by: Jens Axboe --- backend.c | 55 +++++++++++++++++++++++++------------- fio.h | 9 +++---- iolog.c | 8 +++--- iolog.h | 2 +- rate-submit.c | 5 ++-- rate-submit.h | 2 +- server.c | 73 +++++++++++++++++++++++++++++++++++---------------- workqueue.c | 14 +++++++--- workqueue.h | 3 ++- 9 files changed, 113 insertions(+), 58 deletions(-) diff --git a/backend.c b/backend.c index c8554bc1..c9875f48 100644 --- a/backend.c +++ b/backend.c @@ -1358,19 +1358,29 @@ static uint64_t do_dry_run(struct thread_data *td) return td->bytes_done[DDIR_WRITE] + td->bytes_done[DDIR_TRIM]; } +struct fork_data { + struct thread_data *td; + struct sk_out *sk_out; +}; + /* * Entry point for the thread based jobs. The process based jobs end up * here as well, after a little setup. */ static void *thread_main(void *data) { + struct fork_data *fd = data; unsigned long long elapsed_us[DDIR_RWDIR_CNT] = { 0, }; - struct thread_data *td = data; + struct thread_data *td = fd->td; struct thread_options *o = &td->o; + struct sk_out *sk_out = fd->sk_out; pthread_condattr_t attr; int clear_state; int ret; + sk_out_assign(sk_out); + free(fd); + if (!o->use_thread) { setsid(); td->pid = getpid(); @@ -1550,12 +1560,12 @@ static void *thread_main(void *data) goto err; } - if (iolog_compress_init(td)) + if (iolog_compress_init(td, sk_out)) goto err; fio_verify_init(td); - if (rate_submit_init(td)) + if (rate_submit_init(td, sk_out)) goto err; fio_gettime(&td->epoch, NULL); @@ -1702,6 +1712,7 @@ err: */ check_update_rusage(td); + sk_out_drop(); return (void *) (uintptr_t) td->error; } @@ -1710,9 +1721,9 @@ err: * We cannot pass the td data into a forked process, so attach the td and * pass it to the thread worker. */ -static int fork_main(int shmid, int offset) +static int fork_main(struct sk_out *sk_out, int shmid, int offset) { - struct thread_data *td; + struct fork_data *fd; void *data, *ret; #if !defined(__hpux) && !defined(CONFIG_NO_SHM) @@ -1730,8 +1741,10 @@ static int fork_main(int shmid, int offset) data = threads; #endif - td = data + offset * sizeof(struct thread_data); - ret = thread_main(td); + fd = calloc(1, sizeof(*fd)); + fd->td = data + offset * sizeof(struct thread_data); + fd->sk_out = sk_out; + ret = thread_main(fd); shmdt(data); return (int) (uintptr_t) ret; } @@ -1956,7 +1969,7 @@ mounted: /* * Main function for kicking off and reaping jobs, as needed. */ -static void run_threads(void) +static void run_threads(struct sk_out *sk_out) { struct thread_data *td; unsigned int i, todo, nr_running, m_rate, t_rate, nr_started; @@ -2090,14 +2103,20 @@ reap: nr_started++; if (td->o.use_thread) { + struct fork_data *fd; int ret; + fd = calloc(1, sizeof(*fd)); + fd->td = td; + fd->sk_out = sk_out; + dprint(FD_PROCESS, "will pthread_create\n"); ret = pthread_create(&td->thread, NULL, - thread_main, td); + thread_main, fd); if (ret) { log_err("pthread_create: %s\n", strerror(ret)); + free(fd); nr_started--; break; } @@ -2110,7 +2129,7 @@ reap: dprint(FD_PROCESS, "will fork\n"); pid = fork(); if (!pid) { - int ret = fork_main(shm_id, i); + int ret = fork_main(sk_out, shm_id, i); _exit(ret); } else if (i == fio_debug_jobno) @@ -2220,11 +2239,10 @@ static void free_disk_util(void) static void *helper_thread_main(void *data) { - struct backend_data *d = data; + struct sk_out *sk_out = data; int ret = 0; - if (d) - pthread_setspecific(d->key, d->ptr); + sk_out_assign(sk_out); fio_mutex_up(startup_mutex); @@ -2256,10 +2274,11 @@ static void *helper_thread_main(void *data) print_thread_status(); } + sk_out_drop(); return NULL; } -static int create_helper_thread(struct backend_data *data) +static int create_helper_thread(struct sk_out *sk_out) { int ret; @@ -2268,7 +2287,7 @@ static int create_helper_thread(struct backend_data *data) pthread_cond_init(&helper_cond, NULL); pthread_mutex_init(&helper_lock, NULL); - ret = pthread_create(&helper_thread, NULL, helper_thread_main, data); + ret = pthread_create(&helper_thread, NULL, helper_thread_main, sk_out); if (ret) { log_err("Can't create helper thread: %s\n", strerror(ret)); return 1; @@ -2280,7 +2299,7 @@ static int create_helper_thread(struct backend_data *data) return 0; } -int fio_backend(struct backend_data *data) +int fio_backend(struct sk_out *sk_out) { struct thread_data *td; int i; @@ -2310,12 +2329,12 @@ int fio_backend(struct backend_data *data) set_genesis_time(); stat_init(); - create_helper_thread(data); + create_helper_thread(sk_out); cgroup_list = smalloc(sizeof(*cgroup_list)); INIT_FLIST_HEAD(cgroup_list); - run_threads(); + run_threads(sk_out); wait_for_helper_thread_exit(); diff --git a/fio.h b/fio.h index 63778b65..fb527dab 100644 --- a/fio.h +++ b/fio.h @@ -111,10 +111,9 @@ enum { * Per-thread/process specific data. Only used for the network client * for now. */ -struct backend_data { - pthread_key_t key; - void *ptr; -}; +struct sk_out; +void sk_out_assign(struct sk_out *); +void sk_out_drop(void); /* * This describes a single thread/process executing a fio job. @@ -477,7 +476,7 @@ extern int __must_check fio_init_options(void); extern int __must_check parse_options(int, char **); extern int parse_jobs_ini(char *, int, int, int); extern int parse_cmd_line(int, char **, int); -extern int fio_backend(struct backend_data *); +extern int fio_backend(struct sk_out *); extern void reset_fio_state(void); extern void clear_io_state(struct thread_data *, int); extern int fio_options_parse(struct thread_data *, char **, int, int); diff --git a/iolog.c b/iolog.c index 650a21b0..d4a10176 100644 --- a/iolog.c +++ b/iolog.c @@ -1133,15 +1133,15 @@ static int gz_init_worker(struct submit_worker *sw) static struct workqueue_ops log_compress_wq_ops = { .fn = gz_work, .init_worker_fn = gz_init_worker, - .nice = 1, + .nice = 1, }; -int iolog_compress_init(struct thread_data *td) +int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out) { if (!(td->flags & TD_F_COMPRESS_LOG)) return 0; - workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1); + workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out); return 0; } @@ -1207,7 +1207,7 @@ int iolog_flush(struct io_log *log, int wait) return 1; } -int iolog_compress_init(struct thread_data *td) +int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out) { return 0; } diff --git a/iolog.h b/iolog.h index 6f027ca8..b99329a4 100644 --- a/iolog.h +++ b/iolog.h @@ -184,7 +184,7 @@ extern void trim_io_piece(struct thread_data *, const struct io_u *); extern void queue_io_piece(struct thread_data *, struct io_piece *); extern void prune_io_piece_log(struct thread_data *); extern void write_iolog_close(struct thread_data *); -extern int iolog_compress_init(struct thread_data *); +extern int iolog_compress_init(struct thread_data *, struct sk_out *); extern void iolog_compress_exit(struct thread_data *); #ifdef CONFIG_ZLIB diff --git a/rate-submit.c b/rate-submit.c index 39b552d4..92cb6222 100644 --- a/rate-submit.c +++ b/rate-submit.c @@ -7,6 +7,7 @@ #include "fio.h" #include "ioengine.h" #include "lib/getrusage.h" +#include "rate-submit.h" static int io_workqueue_fn(struct submit_worker *sw, struct workqueue_work *work) @@ -235,12 +236,12 @@ static struct workqueue_ops rated_wq_ops = { .exit_worker_fn = io_workqueue_exit_worker_fn, }; -int rate_submit_init(struct thread_data *td) +int rate_submit_init(struct thread_data *td, struct sk_out *sk_out) { if (td->o.io_submit_mode != IO_MODE_OFFLOAD) return 0; - return workqueue_init(td, &td->io_wq, &rated_wq_ops, td->o.iodepth); + return workqueue_init(td, &td->io_wq, &rated_wq_ops, td->o.iodepth, sk_out); } void rate_submit_exit(struct thread_data *td) diff --git a/rate-submit.h b/rate-submit.h index b4ca1292..19fde3a9 100644 --- a/rate-submit.h +++ b/rate-submit.h @@ -1,7 +1,7 @@ #ifndef FIO_RATE_SUBMIT #define FIO_RATE_SUBMIT -int rate_submit_init(struct thread_data *); +int rate_submit_init(struct thread_data *, struct sk_out *); void rate_submit_exit(struct thread_data *); #endif diff --git a/server.c b/server.c index efdf51f3..b0ff1dcc 100644 --- a/server.c +++ b/server.c @@ -50,6 +50,8 @@ struct sk_entry { }; struct sk_out { + unsigned int refs; + int sk; struct fio_mutex *lock; struct flist_head list; @@ -121,6 +123,42 @@ static void sk_unlock(struct sk_out *sk_out) fio_mutex_up(sk_out->lock); } +void sk_out_assign(struct sk_out *sk_out) +{ + if (!sk_out) + return; + + sk_lock(sk_out); + sk_out->refs++; + sk_unlock(sk_out); + pthread_setspecific(sk_out_key, sk_out); +} + +static void __sk_out_drop(struct sk_out *sk_out) +{ + fio_mutex_remove(sk_out->lock); + fio_mutex_remove(sk_out->wait); + sfree(sk_out); +} + +void sk_out_drop(void) +{ + struct sk_out *sk_out = pthread_getspecific(sk_out_key); + + if (sk_out) { + int refs; + + sk_lock(sk_out); + refs = --sk_out->refs; + sk_unlock(sk_out); + + if (!refs) + __sk_out_drop(sk_out); + + pthread_setspecific(sk_out_key, NULL); + } +} + const char *fio_server_op(unsigned int op) { static char buf[32]; @@ -672,7 +710,6 @@ static int handle_load_file_cmd(struct fio_net_cmd *cmd) static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list, struct fio_net_cmd *cmd) { - struct backend_data data; pid_t pid; int ret; @@ -685,10 +722,7 @@ static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list, return 0; } - data.key = sk_out_key; - data.ptr = sk_out; - //pthread_setspecific(sk_out_key, sk_out); - ret = fio_backend(&data); + ret = fio_backend(sk_out); free_threads_shm(); _exit(ret); } @@ -1189,27 +1223,19 @@ int get_my_addr_str(int sk) return 0; } -static int accept_loop(int listen_sk) +static int accept_loop(struct sk_out *sk_out, int listen_sk) { struct sockaddr_in addr; struct sockaddr_in6 addr6; socklen_t len = use_ipv6 ? sizeof(addr6) : sizeof(addr); struct pollfd pfd; int ret = 0, sk, exitval = 0; - struct sk_out *sk_out; FLIST_HEAD(conn_list); dprint(FD_NET, "server enter accept loop\n"); fio_set_fd_nonblocking(listen_sk, "server"); - sk_out = smalloc(sizeof(*sk_out)); - INIT_FLIST_HEAD(&sk_out->list); - sk_out->lock = fio_mutex_init(FIO_MUTEX_UNLOCKED); - sk_out->wait = fio_mutex_init(FIO_MUTEX_LOCKED); - - pthread_setspecific(sk_out_key, sk_out); - while (!exit_backend) { const char *from; char buf[64]; @@ -1275,13 +1301,6 @@ static int accept_loop(int listen_sk) handle_connection(sk_out); } -#if 0 - fio_mutex_remove(sk_out->lock); - fio_mutex_remove(sk_out->wait); - sfree(sk_out); - pthread_setspecific(sk_out_key, NULL); -#endif - return exitval; } @@ -2020,6 +2039,7 @@ static void set_sig_handlers(void) static int fio_server(void) { + struct sk_out *sk_out; int sk, ret; dprint(FD_NET, "starting server\n"); @@ -2036,7 +2056,14 @@ static int fio_server(void) if (pthread_key_create(&sk_out_key, NULL)) log_err("fio: can't create sk_out backend key\n"); - ret = accept_loop(sk); + sk_out = smalloc(sizeof(*sk_out)); + INIT_FLIST_HEAD(&sk_out->list); + sk_out->lock = fio_mutex_init(FIO_MUTEX_UNLOCKED); + sk_out->wait = fio_mutex_init(FIO_MUTEX_LOCKED); + + sk_out_assign(sk_out); + + ret = accept_loop(sk_out, sk); close(sk); @@ -2047,6 +2074,8 @@ static int fio_server(void) if (bind_sock) free(bind_sock); + sk_out_drop(); + return ret; } diff --git a/workqueue.c b/workqueue.c index 5fd95b90..6e67f3e7 100644 --- a/workqueue.c +++ b/workqueue.c @@ -133,6 +133,8 @@ static void *worker_thread(void *data) unsigned int eflags = 0, ret = 0; FLIST_HEAD(local_list); + sk_out_assign(sw->sk_out); + if (wq->ops.nice) { if (nice(wq->ops.nice) < 0) { log_err("workqueue: nice %s\n", strerror(errno)); @@ -206,6 +208,7 @@ done: pthread_mutex_lock(&sw->lock); sw->flags |= (SW_F_EXITED | eflags); pthread_mutex_unlock(&sw->lock); + sk_out_drop(); return NULL; } @@ -267,7 +270,8 @@ void workqueue_exit(struct workqueue *wq) pthread_mutex_destroy(&wq->stat_lock); } -static int start_worker(struct workqueue *wq, unsigned int index) +static int start_worker(struct workqueue *wq, unsigned int index, + struct sk_out *sk_out) { struct submit_worker *sw = &wq->workers[index]; int ret; @@ -277,6 +281,7 @@ static int start_worker(struct workqueue *wq, unsigned int index) pthread_mutex_init(&sw->lock, NULL); sw->wq = wq; sw->index = index; + sw->sk_out = sk_out; if (wq->ops.alloc_worker_fn) { ret = wq->ops.alloc_worker_fn(sw); @@ -297,12 +302,13 @@ static int start_worker(struct workqueue *wq, unsigned int index) } int workqueue_init(struct thread_data *td, struct workqueue *wq, - struct workqueue_ops *ops, unsigned max_pending) + struct workqueue_ops *ops, unsigned int max_workers, + struct sk_out *sk_out) { unsigned int running; int i, error; - wq->max_workers = max_pending; + wq->max_workers = max_workers; wq->td = td; wq->ops = *ops; wq->work_seq = 0; @@ -314,7 +320,7 @@ int workqueue_init(struct thread_data *td, struct workqueue *wq, wq->workers = calloc(wq->max_workers, sizeof(struct submit_worker)); for (i = 0; i < wq->max_workers; i++) - if (start_worker(wq, i)) + if (start_worker(wq, i, sk_out)) break; wq->max_workers = i; diff --git a/workqueue.h b/workqueue.h index 46a3979f..1961b2ae 100644 --- a/workqueue.h +++ b/workqueue.h @@ -17,6 +17,7 @@ struct submit_worker { uint64_t seq; struct workqueue *wq; void *private; + struct sk_out *sk_out; }; typedef int (workqueue_work_fn)(struct submit_worker *, struct workqueue_work *); @@ -60,7 +61,7 @@ struct workqueue { volatile int wake_idle; }; -int workqueue_init(struct thread_data *td, struct workqueue *wq, struct workqueue_ops *ops, unsigned int max_workers); +int workqueue_init(struct thread_data *td, struct workqueue *wq, struct workqueue_ops *ops, unsigned int max_workers, struct sk_out *sk_out); void workqueue_exit(struct workqueue *wq); void workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work); -- 2.25.1