tp: move pthread_cond_signal() outside of lock
[fio.git] / lib / tp.c
1 /*
2  * Basic workqueue like code, that sets up a thread and allows async
3  * processing of some sort. Could be extended to allow for multiple
4  * worker threads. But right now fio associates one of this per IO
5  * thread, so should be enough to have just a single thread doing the
6  * work.
7  */
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <stdarg.h>
11 #include <unistd.h>
12 #include <errno.h>
13 #include <pthread.h>
14
15 #include "../smalloc.h"
16 #include "../log.h"
17 #include "tp.h"
18
19 static void tp_flush_work(struct flist_head *list)
20 {
21         struct tp_work *work;
22
23         while (!flist_empty(list)) {
24                 work = flist_entry(list->next, struct tp_work, list);
25                 flist_del(&work->list);
26                 work->fn(work);
27         }
28 }
29
30 static void *tp_thread(void *data)
31 {
32         struct tp_data *tdat = data;
33         struct flist_head work_list;
34
35         INIT_FLIST_HEAD(&work_list);
36
37         while (1) {
38                 pthread_mutex_lock(&tdat->lock);
39
40                 if (!tdat->thread_exit && flist_empty(&tdat->work))
41                         pthread_cond_wait(&tdat->cv, &tdat->lock);
42
43                 if (!flist_empty(&tdat->work))
44                         flist_splice_tail_init(&tdat->work, &work_list);
45
46                 pthread_mutex_unlock(&tdat->lock);
47
48                 if (flist_empty(&work_list)) {
49                         if (tdat->thread_exit)
50                                 break;
51                         continue;
52                 }
53
54                 tp_flush_work(&work_list);
55         }
56
57         return NULL;
58 }
59
60 void tp_queue_work(struct tp_data *tdat, struct tp_work *work)
61 {
62         work->done = 0;
63
64         pthread_mutex_lock(&tdat->lock);
65         flist_add_tail(&work->list, &tdat->work);
66         pthread_mutex_unlock(&tdat->lock);
67
68         pthread_cond_signal(&tdat->cv);
69 }
70
71 void tp_init(struct tp_data **tdatp)
72 {
73         struct tp_data *tdat;
74         int ret;
75
76         if (*tdatp)
77                 return;
78
79         *tdatp = tdat = smalloc(sizeof(*tdat));
80         pthread_mutex_init(&tdat->lock, NULL);
81         INIT_FLIST_HEAD(&tdat->work);
82         pthread_cond_init(&tdat->cv, NULL);
83         pthread_cond_init(&tdat->sleep_cv, NULL);
84
85         ret = pthread_create(&tdat->thread, NULL, tp_thread, tdat);
86         if (ret)
87                 log_err("fio: failed to create tp thread\n");
88 }
89
90 void tp_exit(struct tp_data **tdatp)
91 {
92         struct tp_data *tdat = *tdatp;
93         void *ret;
94
95         if (!tdat)
96                 return;
97
98         pthread_mutex_lock(&tdat->lock);
99         tdat->thread_exit = 1;
100         pthread_mutex_unlock(&tdat->lock);
101
102         pthread_cond_signal(&tdat->cv);
103
104         pthread_join(tdat->thread, &ret);
105
106         sfree(tdat);
107         *tdatp = NULL;
108 }