workqueue: make workqueue_exit() safe for multiple exit calls
[fio.git] / rate-submit.c
1 /*
2  * Rated submission helpers
3  *
4  * Copyright (C) 2015 Jens Axboe <axboe@kernel.dk>
5  *
6  */
7 #include "fio.h"
8 #include "ioengine.h"
9 #include "lib/getrusage.h"
10
11 static int io_workqueue_fn(struct submit_worker *sw,
12                            struct workqueue_work *work)
13 {
14         struct io_u *io_u = container_of(work, struct io_u, work);
15         const enum fio_ddir ddir = io_u->ddir;
16         struct thread_data *td = sw->private;
17         int ret;
18
19         dprint(FD_RATE, "io_u %p queued by %u\n", io_u, gettid());
20
21         io_u_set(io_u, IO_U_F_NO_FILE_PUT);
22
23         td->cur_depth++;
24
25         do {
26                 ret = td_io_queue(td, io_u);
27                 if (ret != FIO_Q_BUSY)
28                         break;
29                 ret = io_u_queued_complete(td, 1);
30                 if (ret > 0)
31                         td->cur_depth -= ret;
32                 io_u_clear(io_u, IO_U_F_FLIGHT);
33         } while (1);
34
35         dprint(FD_RATE, "io_u %p ret %d by %u\n", io_u, ret, gettid());
36
37         io_queue_event(td, io_u, &ret, ddir, NULL, 0, NULL);
38
39         if (ret == FIO_Q_COMPLETED)
40                 td->cur_depth--;
41         else if (ret == FIO_Q_QUEUED) {
42                 unsigned int min_evts;
43
44                 if (td->o.iodepth == 1)
45                         min_evts = 1;
46                 else
47                         min_evts = 0;
48
49                 ret = io_u_queued_complete(td, min_evts);
50                 if (ret > 0)
51                         td->cur_depth -= ret;
52         } else if (ret == FIO_Q_BUSY) {
53                 ret = io_u_queued_complete(td, td->cur_depth);
54                 if (ret > 0)
55                         td->cur_depth -= ret;
56         }
57
58         return 0;
59 }
60
61 static bool io_workqueue_pre_sleep_flush_fn(struct submit_worker *sw)
62 {
63         struct thread_data *td = sw->private;
64
65         if (td->io_u_queued || td->cur_depth || td->io_u_in_flight)
66                 return true;
67
68         return false;
69 }
70
71 static void io_workqueue_pre_sleep_fn(struct submit_worker *sw)
72 {
73         struct thread_data *td = sw->private;
74         int ret;
75
76         ret = io_u_quiesce(td);
77         if (ret > 0)
78                 td->cur_depth -= ret;
79 }
80
81 static int io_workqueue_alloc_fn(struct submit_worker *sw)
82 {
83         struct thread_data *td;
84
85         td = calloc(1, sizeof(*td));
86         sw->private = td;
87         return 0;
88 }
89
90 static void io_workqueue_free_fn(struct submit_worker *sw)
91 {
92         free(sw->private);
93         sw->private = NULL;
94 }
95
96 static int io_workqueue_init_worker_fn(struct submit_worker *sw)
97 {
98         struct thread_data *parent = sw->wq->td;
99         struct thread_data *td = sw->private;
100         int fio_unused ret;
101
102         memcpy(&td->o, &parent->o, sizeof(td->o));
103         memcpy(&td->ts, &parent->ts, sizeof(td->ts));
104         td->o.uid = td->o.gid = -1U;
105         dup_files(td, parent);
106         td->eo = parent->eo;
107         fio_options_mem_dupe(td);
108
109         if (ioengine_load(td))
110                 goto err;
111
112         if (td->o.odirect)
113                 td->io_ops->flags |= FIO_RAWIO;
114
115         td->pid = gettid();
116
117         INIT_FLIST_HEAD(&td->io_log_list);
118         INIT_FLIST_HEAD(&td->io_hist_list);
119         INIT_FLIST_HEAD(&td->verify_list);
120         INIT_FLIST_HEAD(&td->trim_list);
121         INIT_FLIST_HEAD(&td->next_rand_list);
122         td->io_hist_tree = RB_ROOT;
123
124         td->o.iodepth = 1;
125         if (td_io_init(td))
126                 goto err_io_init;
127
128         fio_gettime(&td->epoch, NULL);
129         fio_getrusage(&td->ru_start);
130         clear_io_state(td, 1);
131
132         td_set_runstate(td, TD_RUNNING);
133         td->flags |= TD_F_CHILD;
134         td->parent = parent;
135         return 0;
136
137 err_io_init:
138         close_ioengine(td);
139 err:
140         return 1;
141
142 }
143
144 static void io_workqueue_exit_worker_fn(struct submit_worker *sw,
145                                         unsigned int *sum_cnt)
146 {
147         struct thread_data *td = sw->private;
148
149         (*sum_cnt)++;
150         sum_thread_stats(&sw->wq->td->ts, &td->ts, *sum_cnt == 1);
151
152         fio_options_free(td);
153         close_and_free_files(td);
154         if (td->io_ops)
155                 close_ioengine(td);
156         td_set_runstate(td, TD_EXITED);
157 }
158
159 #ifdef CONFIG_SFAA
160 static void sum_val(uint64_t *dst, uint64_t *src)
161 {
162         if (*src) {
163                 __sync_fetch_and_add(dst, *src);
164                 *src = 0;
165         }
166 }
167 #else
168 static void sum_val(uint64_t *dst, uint64_t *src)
169 {
170         if (*src) {
171                 *dst += *src;
172                 *src = 0;
173         }
174 }
175 #endif
176
177 static void pthread_double_unlock(pthread_mutex_t *lock1,
178                                   pthread_mutex_t *lock2)
179 {
180 #ifndef CONFIG_SFAA
181         pthread_mutex_unlock(lock1);
182         pthread_mutex_unlock(lock2);
183 #endif
184 }
185
186 static void pthread_double_lock(pthread_mutex_t *lock1, pthread_mutex_t *lock2)
187 {
188 #ifndef CONFIG_SFAA
189         if (lock1 < lock2) {
190                 pthread_mutex_lock(lock1);
191                 pthread_mutex_lock(lock2);
192         } else {
193                 pthread_mutex_lock(lock2);
194                 pthread_mutex_lock(lock1);
195         }
196 #endif
197 }
198
199 static void sum_ddir(struct thread_data *dst, struct thread_data *src,
200                      enum fio_ddir ddir)
201 {
202         pthread_double_lock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
203
204         sum_val(&dst->io_bytes[ddir], &src->io_bytes[ddir]);
205         sum_val(&dst->io_blocks[ddir], &src->io_blocks[ddir]);
206         sum_val(&dst->this_io_blocks[ddir], &src->this_io_blocks[ddir]);
207         sum_val(&dst->this_io_bytes[ddir], &src->this_io_bytes[ddir]);
208         sum_val(&dst->bytes_done[ddir], &src->bytes_done[ddir]);
209
210         pthread_double_unlock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
211 }
212
213 static void io_workqueue_update_acct_fn(struct submit_worker *sw)
214 {
215         struct thread_data *src = sw->private;
216         struct thread_data *dst = sw->wq->td;
217
218         if (td_read(src))
219                 sum_ddir(dst, src, DDIR_READ);
220         if (td_write(src))
221                 sum_ddir(dst, src, DDIR_WRITE);
222         if (td_trim(src))
223                 sum_ddir(dst, src, DDIR_TRIM);
224
225 }
226
227 static struct workqueue_ops rated_wq_ops = {
228         .fn                     = io_workqueue_fn,
229         .pre_sleep_flush_fn     = io_workqueue_pre_sleep_flush_fn,
230         .pre_sleep_fn           = io_workqueue_pre_sleep_fn,
231         .update_acct_fn         = io_workqueue_update_acct_fn,
232         .alloc_worker_fn        = io_workqueue_alloc_fn,
233         .free_worker_fn         = io_workqueue_free_fn,
234         .init_worker_fn         = io_workqueue_init_worker_fn,
235         .exit_worker_fn         = io_workqueue_exit_worker_fn,
236 };
237
238 int rate_submit_init(struct thread_data *td)
239 {
240         if (td->o.io_submit_mode != IO_MODE_OFFLOAD)
241                 return 0;
242
243         return workqueue_init(td, &td->io_wq, &rated_wq_ops, td->o.iodepth);
244 }
245
246 void rate_submit_exit(struct thread_data *td)
247 {
248         if (td->o.io_submit_mode != IO_MODE_OFFLOAD)
249                 return;
250
251         workqueue_exit(&td->io_wq);
252 }