X-Git-Url: https://git.kernel.dk/?p=fio.git;a=blobdiff_plain;f=workqueue.c;h=841dbb9ffffa6de6252990592e7d750e876442b5;hp=06fc26310dd57a829c4b05ce5bc00cd17856ab3f;hb=290c64f219cd22cf67a94b6c5ba74a300ddc2ed3;hpb=51575029ff4027f42ef6be374fd50b2cda5880b4 diff --git a/workqueue.c b/workqueue.c index 06fc2631..841dbb9f 100644 --- a/workqueue.c +++ b/workqueue.c @@ -9,14 +9,15 @@ #include "fio.h" #include "flist.h" #include "workqueue.h" +#include "smalloc.h" +#include "pshared.h" enum { SW_F_IDLE = 1 << 0, SW_F_RUNNING = 1 << 1, SW_F_EXIT = 1 << 2, - SW_F_EXITED = 1 << 3, - SW_F_ACCOUNTED = 1 << 4, - SW_F_ERROR = 1 << 5, + SW_F_ACCOUNTED = 1 << 3, + SW_F_ERROR = 1 << 4, }; static struct submit_worker *__get_submit_worker(struct workqueue *wq, @@ -109,9 +110,9 @@ void workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work) flist_add_tail(&work->list, &sw->work_list); sw->seq = ++wq->work_seq; sw->flags &= ~SW_F_IDLE; - pthread_mutex_unlock(&sw->lock); pthread_cond_signal(&sw->cond); + pthread_mutex_unlock(&sw->lock); } static void handle_list(struct submit_worker *sw, struct flist_head *list) @@ -130,9 +131,11 @@ static void *worker_thread(void *data) { struct submit_worker *sw = data; struct workqueue *wq = sw->wq; - unsigned int eflags = 0, ret = 0; + unsigned int ret = 0; FLIST_HEAD(local_list); + sk_out_assign(sw->sk_out); + if (wq->ops.nice) { if (nice(wq->ops.nice) < 0) { log_err("workqueue: nice %s\n", strerror(errno)); @@ -203,9 +206,7 @@ handle_work: wq->ops.update_acct_fn(sw); done: - pthread_mutex_lock(&sw->lock); - sw->flags |= (SW_F_EXITED | eflags); - pthread_mutex_unlock(&sw->lock); + sk_out_drop(); return NULL; } @@ -234,6 +235,9 @@ void workqueue_exit(struct workqueue *wq) struct submit_worker *sw; int i; + if (!wq->workers) + return; + for (i = 0; i < wq->max_workers; i++) { sw = &wq->workers[i]; @@ -257,22 +261,28 @@ void workqueue_exit(struct workqueue *wq) } } while (shutdown && shutdown != wq->max_workers); - free(wq->workers); + sfree(wq->workers); + wq->workers = NULL; pthread_mutex_destroy(&wq->flush_lock); pthread_cond_destroy(&wq->flush_cond); pthread_mutex_destroy(&wq->stat_lock); } -static int start_worker(struct workqueue *wq, unsigned int index) +static int start_worker(struct workqueue *wq, unsigned int index, + struct sk_out *sk_out) { struct submit_worker *sw = &wq->workers[index]; int ret; INIT_FLIST_HEAD(&sw->work_list); - pthread_cond_init(&sw->cond, NULL); - pthread_mutex_init(&sw->lock, NULL); + + ret = mutex_cond_init_pshared(&sw->lock, &sw->cond); + if (ret) + return ret; + sw->wq = wq; sw->index = index; + sw->sk_out = sk_out; if (wq->ops.alloc_worker_fn) { ret = wq->ops.alloc_worker_fn(sw); @@ -293,24 +303,32 @@ static int start_worker(struct workqueue *wq, unsigned int index) } int workqueue_init(struct thread_data *td, struct workqueue *wq, - struct workqueue_ops *ops, unsigned max_pending) + struct workqueue_ops *ops, unsigned int max_workers, + struct sk_out *sk_out) { unsigned int running; int i, error; + int ret; - wq->max_workers = max_pending; + wq->max_workers = max_workers; wq->td = td; wq->ops = *ops; wq->work_seq = 0; wq->next_free_worker = 0; - pthread_cond_init(&wq->flush_cond, NULL); - pthread_mutex_init(&wq->flush_lock, NULL); - pthread_mutex_init(&wq->stat_lock, NULL); - wq->workers = calloc(wq->max_workers, sizeof(struct submit_worker)); + ret = mutex_cond_init_pshared(&wq->flush_lock, &wq->flush_cond); + if (ret) + goto err; + ret = mutex_init_pshared(&wq->stat_lock); + if (ret) + goto err; + + wq->workers = smalloc(wq->max_workers * sizeof(struct submit_worker)); + if (!wq->workers) + goto err; for (i = 0; i < wq->max_workers; i++) - if (start_worker(wq, i)) + if (start_worker(wq, i, sk_out)) break; wq->max_workers = i;