11791209b54da02502d26c06ed0d60be25d304ee
[fio.git] / workqueue.c
1 /*
2  * Rated submission helpers
3  *
4  * Copyright (C) 2015 Jens Axboe <axboe@kernel.dk>
5  *
6  */
7 #include <unistd.h>
8
9 #include "fio.h"
10 #include "flist.h"
11 #include "workqueue.h"
12 #include "lib/getrusage.h"
13
14 struct submit_worker {
15         pthread_t thread;
16         pthread_mutex_t lock;
17         pthread_cond_t cond;
18         struct flist_head work_list;
19         unsigned int flags;
20         unsigned int index;
21         uint64_t seq;
22         struct workqueue *wq;
23         struct thread_data td;
24 };
25
26 enum {
27         SW_F_IDLE       = 1 << 0,
28         SW_F_RUNNING    = 1 << 1,
29         SW_F_EXIT       = 1 << 2,
30         SW_F_EXITED     = 1 << 3,
31         SW_F_ACCOUNTED  = 1 << 4,
32         SW_F_ERROR      = 1 << 5,
33 };
34
35 static struct submit_worker *__get_submit_worker(struct workqueue *wq,
36                                                  unsigned int start,
37                                                  unsigned int end,
38                                                  struct submit_worker **best)
39 {
40         struct submit_worker *sw = NULL;
41
42         while (start <= end) {
43                 sw = &wq->workers[start];
44                 if (sw->flags & SW_F_IDLE)
45                         return sw;
46                 if (!(*best) || sw->seq < (*best)->seq)
47                         *best = sw;
48                 start++;
49         }
50
51         return NULL;
52 }
53
54 static struct submit_worker *get_submit_worker(struct workqueue *wq)
55 {
56         unsigned int next = wq->next_free_worker;
57         struct submit_worker *sw, *best = NULL;
58
59         assert(next < wq->max_workers);
60
61         sw = __get_submit_worker(wq, next, wq->max_workers - 1, &best);
62         if (!sw && next)
63                 sw = __get_submit_worker(wq, 0, next - 1, &best);
64
65         /*
66          * No truly idle found, use best match
67          */
68         if (!sw)
69                 sw = best;
70
71         if (sw->index == wq->next_free_worker) {
72                 if (sw->index + 1 < wq->max_workers)
73                         wq->next_free_worker = sw->index + 1;
74                 else
75                         wq->next_free_worker = 0;
76         }
77
78         return sw;
79 }
80
81 static bool all_sw_idle(struct workqueue *wq)
82 {
83         int i;
84
85         for (i = 0; i < wq->max_workers; i++) {
86                 struct submit_worker *sw = &wq->workers[i];
87
88                 if (!(sw->flags & SW_F_IDLE))
89                         return false;
90         }
91
92         return true;
93 }
94
95 /*
96  * Must be serialized wrt workqueue_enqueue() by caller
97  */
98 void workqueue_flush(struct workqueue *wq)
99 {
100         wq->wake_idle = 1;
101
102         while (!all_sw_idle(wq)) {
103                 pthread_mutex_lock(&wq->flush_lock);
104                 pthread_cond_wait(&wq->flush_cond, &wq->flush_lock);
105                 pthread_mutex_unlock(&wq->flush_lock);
106         }
107
108         wq->wake_idle = 0;
109 }
110
111 /*
112  * Must be serialized by caller. Returns true for queued, false for busy.
113  */
114 bool workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work)
115 {
116         struct submit_worker *sw;
117
118         sw = get_submit_worker(wq);
119         if (sw) {
120                 pthread_mutex_lock(&sw->lock);
121                 flist_add_tail(&work->list, &sw->work_list);
122                 sw->seq = ++wq->work_seq;
123                 sw->flags &= ~SW_F_IDLE;
124                 pthread_mutex_unlock(&sw->lock);
125
126                 pthread_cond_signal(&sw->cond);
127                 return true;
128         }
129
130         return false;
131 }
132
133 static void handle_list(struct submit_worker *sw, struct flist_head *list)
134 {
135         struct workqueue *wq = sw->wq;
136         struct workqueue_work *work;
137
138         while (!flist_empty(list)) {
139                 work = flist_first_entry(list, struct workqueue_work, list);
140                 flist_del_init(&work->list);
141                 wq->fn(&sw->td, work);
142         }
143 }
144
145 static int init_submit_worker(struct submit_worker *sw)
146 {
147         struct thread_data *parent = sw->wq->td;
148         struct thread_data *td = &sw->td;
149         int fio_unused ret;
150
151         memcpy(&td->o, &parent->o, sizeof(td->o));
152         memcpy(&td->ts, &parent->ts, sizeof(td->ts));
153         td->o.uid = td->o.gid = -1U;
154         dup_files(td, parent);
155         td->eo = parent->eo;
156         fio_options_mem_dupe(td);
157
158         if (ioengine_load(td))
159                 goto err;
160
161         if (td->o.odirect)
162                 td->io_ops->flags |= FIO_RAWIO;
163
164         td->pid = gettid();
165
166         INIT_FLIST_HEAD(&td->io_log_list);
167         INIT_FLIST_HEAD(&td->io_hist_list);
168         INIT_FLIST_HEAD(&td->verify_list);
169         INIT_FLIST_HEAD(&td->trim_list);
170         INIT_FLIST_HEAD(&td->next_rand_list);
171         td->io_hist_tree = RB_ROOT;
172
173         td->o.iodepth = 1;
174         if (td_io_init(td))
175                 goto err_io_init;
176
177         fio_gettime(&td->epoch, NULL);
178         fio_getrusage(&td->ru_start);
179         clear_io_state(td, 1);
180
181         td_set_runstate(td, TD_RUNNING);
182         td->flags |= TD_F_CHILD;
183         td->parent = parent;
184         return 0;
185
186 err_io_init:
187         close_ioengine(td);
188 err:
189         return 1;
190 }
191
192 #ifdef CONFIG_SFAA
193 static void sum_val(uint64_t *dst, uint64_t *src)
194 {
195         if (*src) {
196                 __sync_fetch_and_add(dst, *src);
197                 *src = 0;
198         }
199 }
200 #else
201 static void sum_val(uint64_t *dst, uint64_t *src)
202 {
203         if (*src) {
204                 *dst += *src;
205                 *src = 0;
206         }
207 }
208 #endif
209
210 static void pthread_double_unlock(pthread_mutex_t *lock1,
211                                   pthread_mutex_t *lock2)
212 {
213 #ifndef CONFIG_SFAA
214         pthread_mutex_unlock(lock1);
215         pthread_mutex_unlock(lock2);
216 #endif
217 }
218
219 static void pthread_double_lock(pthread_mutex_t *lock1, pthread_mutex_t *lock2)
220 {
221 #ifndef CONFIG_SFAA
222         if (lock1 < lock2) {
223                 pthread_mutex_lock(lock1);
224                 pthread_mutex_lock(lock2);
225         } else {
226                 pthread_mutex_lock(lock2);
227                 pthread_mutex_lock(lock1);
228         }
229 #endif
230 }
231
232 static void sum_ddir(struct thread_data *dst, struct thread_data *src,
233                      enum fio_ddir ddir)
234 {
235         pthread_double_lock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
236
237         sum_val(&dst->io_bytes[ddir], &src->io_bytes[ddir]);
238         sum_val(&dst->io_blocks[ddir], &src->io_blocks[ddir]);
239         sum_val(&dst->this_io_blocks[ddir], &src->this_io_blocks[ddir]);
240         sum_val(&dst->this_io_bytes[ddir], &src->this_io_bytes[ddir]);
241         sum_val(&dst->bytes_done[ddir], &src->bytes_done[ddir]);
242
243         pthread_double_unlock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
244 }
245
246 static void update_accounting(struct submit_worker *sw)
247 {
248         struct thread_data *src = &sw->td;
249         struct thread_data *dst = sw->wq->td;
250
251         if (td_read(src))
252                 sum_ddir(dst, src, DDIR_READ);
253         if (td_write(src))
254                 sum_ddir(dst, src, DDIR_WRITE);
255         if (td_trim(src))
256                 sum_ddir(dst, src, DDIR_TRIM);
257 }
258
259 static void *worker_thread(void *data)
260 {
261         struct submit_worker *sw = data;
262         struct workqueue *wq = sw->wq;
263         struct thread_data *td = &sw->td;
264         unsigned int eflags = 0, ret;
265         FLIST_HEAD(local_list);
266
267         ret = init_submit_worker(sw);
268         pthread_mutex_lock(&sw->lock);
269         sw->flags |= SW_F_RUNNING;
270         if (ret)
271                 sw->flags |= SW_F_ERROR;
272         pthread_mutex_unlock(&sw->lock);
273
274         pthread_mutex_lock(&wq->flush_lock);
275         pthread_cond_signal(&wq->flush_cond);
276         pthread_mutex_unlock(&wq->flush_lock);
277
278         if (sw->flags & SW_F_ERROR)
279                 goto done;
280
281         while (1) {
282                 pthread_mutex_lock(&sw->lock);
283
284                 if (flist_empty(&sw->work_list)) {
285                         if (sw->flags & SW_F_EXIT) {
286                                 pthread_mutex_unlock(&sw->lock);
287                                 break;
288                         }
289
290                         if (td->io_u_queued || td->cur_depth ||
291                             td->io_u_in_flight) {
292                                 int ret;
293
294                                 pthread_mutex_unlock(&sw->lock);
295                                 ret = io_u_quiesce(td);
296                                 if (ret > 0)
297                                         td->cur_depth -= ret;
298                                 pthread_mutex_lock(&sw->lock);
299                         }
300
301                         /*
302                          * We dropped and reaquired the lock, check
303                          * state again.
304                          */
305                         if (!flist_empty(&sw->work_list))
306                                 goto handle_work;
307
308                         if (sw->flags & SW_F_EXIT) {
309                                 pthread_mutex_unlock(&sw->lock);
310                                 break;
311                         } else if (!(sw->flags & SW_F_IDLE)) {
312                                 sw->flags |= SW_F_IDLE;
313                                 wq->next_free_worker = sw->index;
314                                 if (wq->wake_idle)
315                                         pthread_cond_signal(&wq->flush_cond);
316                         }
317                         update_accounting(sw);
318                         pthread_cond_wait(&sw->cond, &sw->lock);
319                 } else {
320 handle_work:
321                         flist_splice_init(&sw->work_list, &local_list);
322                 }
323                 pthread_mutex_unlock(&sw->lock);
324                 handle_list(sw, &local_list);
325         }
326
327         update_accounting(sw);
328
329 done:
330         pthread_mutex_lock(&sw->lock);
331         sw->flags |= (SW_F_EXITED | eflags);
332         pthread_mutex_unlock(&sw->lock);
333         return NULL;
334 }
335
336 static void free_worker(struct submit_worker *sw)
337 {
338         struct thread_data *td = &sw->td;
339
340         fio_options_free(td);
341         close_and_free_files(td);
342         if (td->io_ops)
343                 close_ioengine(td);
344         td_set_runstate(td, TD_EXITED);
345
346         pthread_cond_destroy(&sw->cond);
347         pthread_mutex_destroy(&sw->lock);
348 }
349
350 static void shutdown_worker(struct submit_worker *sw, unsigned int *sum_cnt)
351 {
352         struct thread_data *parent = sw->wq->td;
353
354         pthread_join(sw->thread, NULL);
355         (*sum_cnt)++;
356         sum_thread_stats(&parent->ts, &sw->td.ts, *sum_cnt == 1);
357         free_worker(sw);
358 }
359
360 void workqueue_exit(struct workqueue *wq)
361 {
362         unsigned int shutdown, sum_cnt = 0;
363         struct submit_worker *sw;
364         int i;
365
366         for (i = 0; i < wq->max_workers; i++) {
367                 sw = &wq->workers[i];
368
369                 pthread_mutex_lock(&sw->lock);
370                 sw->flags |= SW_F_EXIT;
371                 pthread_cond_signal(&sw->cond);
372                 pthread_mutex_unlock(&sw->lock);
373         }
374
375         do {
376                 shutdown = 0;
377                 for (i = 0; i < wq->max_workers; i++) {
378                         sw = &wq->workers[i];
379                         if (sw->flags & SW_F_ACCOUNTED)
380                                 continue;
381                         pthread_mutex_lock(&sw->lock);
382                         sw->flags |= SW_F_ACCOUNTED;
383                         pthread_mutex_unlock(&sw->lock);
384                         shutdown_worker(sw, &sum_cnt);
385                         shutdown++;
386                 }
387         } while (shutdown && shutdown != wq->max_workers);
388
389         free(wq->workers);
390         pthread_mutex_destroy(&wq->flush_lock);
391         pthread_cond_destroy(&wq->flush_cond);
392         pthread_mutex_destroy(&wq->stat_lock);
393 }
394
395 static int start_worker(struct workqueue *wq, unsigned int index)
396 {
397         struct submit_worker *sw = &wq->workers[index];
398         int ret;
399
400         INIT_FLIST_HEAD(&sw->work_list);
401         pthread_cond_init(&sw->cond, NULL);
402         pthread_mutex_init(&sw->lock, NULL);
403         sw->wq = wq;
404         sw->index = index;
405
406         ret = pthread_create(&sw->thread, NULL, worker_thread, sw);
407         if (!ret) {
408                 pthread_mutex_lock(&sw->lock);
409                 sw->flags = SW_F_IDLE;
410                 pthread_mutex_unlock(&sw->lock);
411                 return 0;
412         }
413
414         free_worker(sw);
415         return 1;
416 }
417
418 int workqueue_init(struct thread_data *td, struct workqueue *wq,
419                    workqueue_fn *fn, unsigned max_pending)
420 {
421         unsigned int running;
422         int i, error;
423
424         wq->max_workers = max_pending;
425         wq->td = td;
426         wq->fn = fn;
427         wq->work_seq = 0;
428         wq->next_free_worker = 0;
429         pthread_cond_init(&wq->flush_cond, NULL);
430         pthread_mutex_init(&wq->flush_lock, NULL);
431         pthread_mutex_init(&wq->stat_lock, NULL);
432
433         wq->workers = calloc(wq->max_workers, sizeof(struct submit_worker));
434
435         for (i = 0; i < wq->max_workers; i++)
436                 if (start_worker(wq, i))
437                         break;
438
439         wq->max_workers = i;
440         if (!wq->max_workers)
441                 goto err;
442
443         /*
444          * Wait for them all to be started and initialized
445          */
446         error = 0;
447         do {
448                 struct submit_worker *sw;
449
450                 running = 0;
451                 pthread_mutex_lock(&wq->flush_lock);
452                 for (i = 0; i < wq->max_workers; i++) {
453                         sw = &wq->workers[i];
454                         pthread_mutex_lock(&sw->lock);
455                         if (sw->flags & SW_F_RUNNING)
456                                 running++;
457                         if (sw->flags & SW_F_ERROR)
458                                 error++;
459                         pthread_mutex_unlock(&sw->lock);
460                 }
461
462                 if (error || running == wq->max_workers) {
463                         pthread_mutex_unlock(&wq->flush_lock);
464                         break;
465                 }
466
467                 pthread_cond_wait(&wq->flush_cond, &wq->flush_lock);
468                 pthread_mutex_unlock(&wq->flush_lock);
469         } while (1);
470
471         if (!error)
472                 return 0;
473
474 err:
475         log_err("Can't create rate workqueue\n");
476         td_verror(td, ESRCH, "workqueue_init");
477         workqueue_exit(wq);
478         return 1;
479 }