- Get rid of backend_data, just use sk_out
- Add reference counting to sk_out and proper assign/drop helpers
- Ensure we pass it to other threads created
- Fix leaks of sk_out
This should get us closer to the new model actually working as
intended.
Signed-off-by: Jens Axboe <axboe@fb.com>
return td->bytes_done[DDIR_WRITE] + td->bytes_done[DDIR_TRIM];
}
return td->bytes_done[DDIR_WRITE] + td->bytes_done[DDIR_TRIM];
}
+struct fork_data {
+ struct thread_data *td;
+ struct sk_out *sk_out;
+};
+
/*
* Entry point for the thread based jobs. The process based jobs end up
* here as well, after a little setup.
*/
static void *thread_main(void *data)
{
/*
* Entry point for the thread based jobs. The process based jobs end up
* here as well, after a little setup.
*/
static void *thread_main(void *data)
{
+ struct fork_data *fd = data;
unsigned long long elapsed_us[DDIR_RWDIR_CNT] = { 0, };
unsigned long long elapsed_us[DDIR_RWDIR_CNT] = { 0, };
- struct thread_data *td = data;
+ struct thread_data *td = fd->td;
struct thread_options *o = &td->o;
struct thread_options *o = &td->o;
+ struct sk_out *sk_out = fd->sk_out;
pthread_condattr_t attr;
int clear_state;
int ret;
pthread_condattr_t attr;
int clear_state;
int ret;
+ sk_out_assign(sk_out);
+ free(fd);
+
if (!o->use_thread) {
setsid();
td->pid = getpid();
if (!o->use_thread) {
setsid();
td->pid = getpid();
- if (iolog_compress_init(td))
+ if (iolog_compress_init(td, sk_out))
goto err;
fio_verify_init(td);
goto err;
fio_verify_init(td);
- if (rate_submit_init(td))
+ if (rate_submit_init(td, sk_out))
goto err;
fio_gettime(&td->epoch, NULL);
goto err;
fio_gettime(&td->epoch, NULL);
*/
check_update_rusage(td);
*/
check_update_rusage(td);
return (void *) (uintptr_t) td->error;
}
return (void *) (uintptr_t) td->error;
}
* We cannot pass the td data into a forked process, so attach the td and
* pass it to the thread worker.
*/
* We cannot pass the td data into a forked process, so attach the td and
* pass it to the thread worker.
*/
-static int fork_main(int shmid, int offset)
+static int fork_main(struct sk_out *sk_out, int shmid, int offset)
- struct thread_data *td;
void *data, *ret;
#if !defined(__hpux) && !defined(CONFIG_NO_SHM)
void *data, *ret;
#if !defined(__hpux) && !defined(CONFIG_NO_SHM)
- td = data + offset * sizeof(struct thread_data);
- ret = thread_main(td);
+ fd = calloc(1, sizeof(*fd));
+ fd->td = data + offset * sizeof(struct thread_data);
+ fd->sk_out = sk_out;
+ ret = thread_main(fd);
shmdt(data);
return (int) (uintptr_t) ret;
}
shmdt(data);
return (int) (uintptr_t) ret;
}
/*
* Main function for kicking off and reaping jobs, as needed.
*/
/*
* Main function for kicking off and reaping jobs, as needed.
*/
-static void run_threads(void)
+static void run_threads(struct sk_out *sk_out)
{
struct thread_data *td;
unsigned int i, todo, nr_running, m_rate, t_rate, nr_started;
{
struct thread_data *td;
unsigned int i, todo, nr_running, m_rate, t_rate, nr_started;
nr_started++;
if (td->o.use_thread) {
nr_started++;
if (td->o.use_thread) {
+ fd = calloc(1, sizeof(*fd));
+ fd->td = td;
+ fd->sk_out = sk_out;
+
dprint(FD_PROCESS, "will pthread_create\n");
ret = pthread_create(&td->thread, NULL,
dprint(FD_PROCESS, "will pthread_create\n");
ret = pthread_create(&td->thread, NULL,
if (ret) {
log_err("pthread_create: %s\n",
strerror(ret));
if (ret) {
log_err("pthread_create: %s\n",
strerror(ret));
dprint(FD_PROCESS, "will fork\n");
pid = fork();
if (!pid) {
dprint(FD_PROCESS, "will fork\n");
pid = fork();
if (!pid) {
- int ret = fork_main(shm_id, i);
+ int ret = fork_main(sk_out, shm_id, i);
_exit(ret);
} else if (i == fio_debug_jobno)
_exit(ret);
} else if (i == fio_debug_jobno)
static void *helper_thread_main(void *data)
{
static void *helper_thread_main(void *data)
{
- struct backend_data *d = data;
+ struct sk_out *sk_out = data;
- if (d)
- pthread_setspecific(d->key, d->ptr);
fio_mutex_up(startup_mutex);
fio_mutex_up(startup_mutex);
-static int create_helper_thread(struct backend_data *data)
+static int create_helper_thread(struct sk_out *sk_out)
pthread_cond_init(&helper_cond, NULL);
pthread_mutex_init(&helper_lock, NULL);
pthread_cond_init(&helper_cond, NULL);
pthread_mutex_init(&helper_lock, NULL);
- ret = pthread_create(&helper_thread, NULL, helper_thread_main, data);
+ ret = pthread_create(&helper_thread, NULL, helper_thread_main, sk_out);
if (ret) {
log_err("Can't create helper thread: %s\n", strerror(ret));
return 1;
if (ret) {
log_err("Can't create helper thread: %s\n", strerror(ret));
return 1;
-int fio_backend(struct backend_data *data)
+int fio_backend(struct sk_out *sk_out)
{
struct thread_data *td;
int i;
{
struct thread_data *td;
int i;
set_genesis_time();
stat_init();
set_genesis_time();
stat_init();
- create_helper_thread(data);
+ create_helper_thread(sk_out);
cgroup_list = smalloc(sizeof(*cgroup_list));
INIT_FLIST_HEAD(cgroup_list);
cgroup_list = smalloc(sizeof(*cgroup_list));
INIT_FLIST_HEAD(cgroup_list);
wait_for_helper_thread_exit();
wait_for_helper_thread_exit();
* Per-thread/process specific data. Only used for the network client
* for now.
*/
* Per-thread/process specific data. Only used for the network client
* for now.
*/
-struct backend_data {
- pthread_key_t key;
- void *ptr;
-};
+struct sk_out;
+void sk_out_assign(struct sk_out *);
+void sk_out_drop(void);
/*
* This describes a single thread/process executing a fio job.
/*
* This describes a single thread/process executing a fio job.
extern int __must_check parse_options(int, char **);
extern int parse_jobs_ini(char *, int, int, int);
extern int parse_cmd_line(int, char **, int);
extern int __must_check parse_options(int, char **);
extern int parse_jobs_ini(char *, int, int, int);
extern int parse_cmd_line(int, char **, int);
-extern int fio_backend(struct backend_data *);
+extern int fio_backend(struct sk_out *);
extern void reset_fio_state(void);
extern void clear_io_state(struct thread_data *, int);
extern int fio_options_parse(struct thread_data *, char **, int, int);
extern void reset_fio_state(void);
extern void clear_io_state(struct thread_data *, int);
extern int fio_options_parse(struct thread_data *, char **, int, int);
static struct workqueue_ops log_compress_wq_ops = {
.fn = gz_work,
.init_worker_fn = gz_init_worker,
static struct workqueue_ops log_compress_wq_ops = {
.fn = gz_work,
.init_worker_fn = gz_init_worker,
-int iolog_compress_init(struct thread_data *td)
+int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
{
if (!(td->flags & TD_F_COMPRESS_LOG))
return 0;
{
if (!(td->flags & TD_F_COMPRESS_LOG))
return 0;
- workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1);
+ workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
-int iolog_compress_init(struct thread_data *td)
+int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
extern void queue_io_piece(struct thread_data *, struct io_piece *);
extern void prune_io_piece_log(struct thread_data *);
extern void write_iolog_close(struct thread_data *);
extern void queue_io_piece(struct thread_data *, struct io_piece *);
extern void prune_io_piece_log(struct thread_data *);
extern void write_iolog_close(struct thread_data *);
-extern int iolog_compress_init(struct thread_data *);
+extern int iolog_compress_init(struct thread_data *, struct sk_out *);
extern void iolog_compress_exit(struct thread_data *);
#ifdef CONFIG_ZLIB
extern void iolog_compress_exit(struct thread_data *);
#ifdef CONFIG_ZLIB
#include "fio.h"
#include "ioengine.h"
#include "lib/getrusage.h"
#include "fio.h"
#include "ioengine.h"
#include "lib/getrusage.h"
+#include "rate-submit.h"
static int io_workqueue_fn(struct submit_worker *sw,
struct workqueue_work *work)
static int io_workqueue_fn(struct submit_worker *sw,
struct workqueue_work *work)
.exit_worker_fn = io_workqueue_exit_worker_fn,
};
.exit_worker_fn = io_workqueue_exit_worker_fn,
};
-int rate_submit_init(struct thread_data *td)
+int rate_submit_init(struct thread_data *td, struct sk_out *sk_out)
{
if (td->o.io_submit_mode != IO_MODE_OFFLOAD)
return 0;
{
if (td->o.io_submit_mode != IO_MODE_OFFLOAD)
return 0;
- return workqueue_init(td, &td->io_wq, &rated_wq_ops, td->o.iodepth);
+ return workqueue_init(td, &td->io_wq, &rated_wq_ops, td->o.iodepth, sk_out);
}
void rate_submit_exit(struct thread_data *td)
}
void rate_submit_exit(struct thread_data *td)
#ifndef FIO_RATE_SUBMIT
#define FIO_RATE_SUBMIT
#ifndef FIO_RATE_SUBMIT
#define FIO_RATE_SUBMIT
-int rate_submit_init(struct thread_data *);
+int rate_submit_init(struct thread_data *, struct sk_out *);
void rate_submit_exit(struct thread_data *);
#endif
void rate_submit_exit(struct thread_data *);
#endif
int sk;
struct fio_mutex *lock;
struct flist_head list;
int sk;
struct fio_mutex *lock;
struct flist_head list;
fio_mutex_up(sk_out->lock);
}
fio_mutex_up(sk_out->lock);
}
+void sk_out_assign(struct sk_out *sk_out)
+{
+ if (!sk_out)
+ return;
+
+ sk_lock(sk_out);
+ sk_out->refs++;
+ sk_unlock(sk_out);
+ pthread_setspecific(sk_out_key, sk_out);
+}
+
+static void __sk_out_drop(struct sk_out *sk_out)
+{
+ fio_mutex_remove(sk_out->lock);
+ fio_mutex_remove(sk_out->wait);
+ sfree(sk_out);
+}
+
+void sk_out_drop(void)
+{
+ struct sk_out *sk_out = pthread_getspecific(sk_out_key);
+
+ if (sk_out) {
+ int refs;
+
+ sk_lock(sk_out);
+ refs = --sk_out->refs;
+ sk_unlock(sk_out);
+
+ if (!refs)
+ __sk_out_drop(sk_out);
+
+ pthread_setspecific(sk_out_key, NULL);
+ }
+}
+
const char *fio_server_op(unsigned int op)
{
static char buf[32];
const char *fio_server_op(unsigned int op)
{
static char buf[32];
static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list,
struct fio_net_cmd *cmd)
{
static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list,
struct fio_net_cmd *cmd)
{
- struct backend_data data;
- data.key = sk_out_key;
- data.ptr = sk_out;
- //pthread_setspecific(sk_out_key, sk_out);
- ret = fio_backend(&data);
+ ret = fio_backend(sk_out);
free_threads_shm();
_exit(ret);
}
free_threads_shm();
_exit(ret);
}
-static int accept_loop(int listen_sk)
+static int accept_loop(struct sk_out *sk_out, int listen_sk)
{
struct sockaddr_in addr;
struct sockaddr_in6 addr6;
socklen_t len = use_ipv6 ? sizeof(addr6) : sizeof(addr);
struct pollfd pfd;
int ret = 0, sk, exitval = 0;
{
struct sockaddr_in addr;
struct sockaddr_in6 addr6;
socklen_t len = use_ipv6 ? sizeof(addr6) : sizeof(addr);
struct pollfd pfd;
int ret = 0, sk, exitval = 0;
FLIST_HEAD(conn_list);
dprint(FD_NET, "server enter accept loop\n");
fio_set_fd_nonblocking(listen_sk, "server");
FLIST_HEAD(conn_list);
dprint(FD_NET, "server enter accept loop\n");
fio_set_fd_nonblocking(listen_sk, "server");
- sk_out = smalloc(sizeof(*sk_out));
- INIT_FLIST_HEAD(&sk_out->list);
- sk_out->lock = fio_mutex_init(FIO_MUTEX_UNLOCKED);
- sk_out->wait = fio_mutex_init(FIO_MUTEX_LOCKED);
-
- pthread_setspecific(sk_out_key, sk_out);
-
while (!exit_backend) {
const char *from;
char buf[64];
while (!exit_backend) {
const char *from;
char buf[64];
handle_connection(sk_out);
}
handle_connection(sk_out);
}
-#if 0
- fio_mutex_remove(sk_out->lock);
- fio_mutex_remove(sk_out->wait);
- sfree(sk_out);
- pthread_setspecific(sk_out_key, NULL);
-#endif
-
static int fio_server(void)
{
static int fio_server(void)
{
int sk, ret;
dprint(FD_NET, "starting server\n");
int sk, ret;
dprint(FD_NET, "starting server\n");
if (pthread_key_create(&sk_out_key, NULL))
log_err("fio: can't create sk_out backend key\n");
if (pthread_key_create(&sk_out_key, NULL))
log_err("fio: can't create sk_out backend key\n");
+ sk_out = smalloc(sizeof(*sk_out));
+ INIT_FLIST_HEAD(&sk_out->list);
+ sk_out->lock = fio_mutex_init(FIO_MUTEX_UNLOCKED);
+ sk_out->wait = fio_mutex_init(FIO_MUTEX_LOCKED);
+
+ sk_out_assign(sk_out);
+
+ ret = accept_loop(sk_out, sk);
if (bind_sock)
free(bind_sock);
if (bind_sock)
free(bind_sock);
unsigned int eflags = 0, ret = 0;
FLIST_HEAD(local_list);
unsigned int eflags = 0, ret = 0;
FLIST_HEAD(local_list);
+ sk_out_assign(sw->sk_out);
+
if (wq->ops.nice) {
if (nice(wq->ops.nice) < 0) {
log_err("workqueue: nice %s\n", strerror(errno));
if (wq->ops.nice) {
if (nice(wq->ops.nice) < 0) {
log_err("workqueue: nice %s\n", strerror(errno));
pthread_mutex_lock(&sw->lock);
sw->flags |= (SW_F_EXITED | eflags);
pthread_mutex_unlock(&sw->lock);
pthread_mutex_lock(&sw->lock);
sw->flags |= (SW_F_EXITED | eflags);
pthread_mutex_unlock(&sw->lock);
pthread_mutex_destroy(&wq->stat_lock);
}
pthread_mutex_destroy(&wq->stat_lock);
}
-static int start_worker(struct workqueue *wq, unsigned int index)
+static int start_worker(struct workqueue *wq, unsigned int index,
+ struct sk_out *sk_out)
{
struct submit_worker *sw = &wq->workers[index];
int ret;
{
struct submit_worker *sw = &wq->workers[index];
int ret;
pthread_mutex_init(&sw->lock, NULL);
sw->wq = wq;
sw->index = index;
pthread_mutex_init(&sw->lock, NULL);
sw->wq = wq;
sw->index = index;
if (wq->ops.alloc_worker_fn) {
ret = wq->ops.alloc_worker_fn(sw);
if (wq->ops.alloc_worker_fn) {
ret = wq->ops.alloc_worker_fn(sw);
}
int workqueue_init(struct thread_data *td, struct workqueue *wq,
}
int workqueue_init(struct thread_data *td, struct workqueue *wq,
- struct workqueue_ops *ops, unsigned max_pending)
+ struct workqueue_ops *ops, unsigned int max_workers,
+ struct sk_out *sk_out)
{
unsigned int running;
int i, error;
{
unsigned int running;
int i, error;
- wq->max_workers = max_pending;
+ wq->max_workers = max_workers;
wq->td = td;
wq->ops = *ops;
wq->work_seq = 0;
wq->td = td;
wq->ops = *ops;
wq->work_seq = 0;
wq->workers = calloc(wq->max_workers, sizeof(struct submit_worker));
for (i = 0; i < wq->max_workers; i++)
wq->workers = calloc(wq->max_workers, sizeof(struct submit_worker));
for (i = 0; i < wq->max_workers; i++)
- if (start_worker(wq, i))
+ if (start_worker(wq, i, sk_out))
break;
wq->max_workers = i;
break;
wq->max_workers = i;
uint64_t seq;
struct workqueue *wq;
void *private;
uint64_t seq;
struct workqueue *wq;
void *private;
};
typedef int (workqueue_work_fn)(struct submit_worker *, struct workqueue_work *);
};
typedef int (workqueue_work_fn)(struct submit_worker *, struct workqueue_work *);
volatile int wake_idle;
};
volatile int wake_idle;
};
-int workqueue_init(struct thread_data *td, struct workqueue *wq, struct workqueue_ops *ops, unsigned int max_workers);
+int workqueue_init(struct thread_data *td, struct workqueue *wq, struct workqueue_ops *ops, unsigned int max_workers, struct sk_out *sk_out);
void workqueue_exit(struct workqueue *wq);
void workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work);
void workqueue_exit(struct workqueue *wq);
void workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work);