54761b0902b612662b71521ad902b3be61a0ddb4
[fio.git] / workqueue.c
1 /*
2  * Rated submission helpers
3  *
4  * Copyright (C) 2015 Jens Axboe <axboe@kernel.dk>
5  *
6  */
7 #include <unistd.h>
8
9 #include "fio.h"
10 #include "flist.h"
11 #include "workqueue.h"
12 #include "lib/getrusage.h"
13
14 struct submit_worker {
15         pthread_t thread;
16         pthread_mutex_t lock;
17         pthread_cond_t cond;
18         struct flist_head work_list;
19         unsigned int flags;
20         unsigned int index;
21         uint64_t seq;
22         struct workqueue *wq;
23         struct thread_data td;
24 };
25
26 enum {
27         SW_F_IDLE       = 1 << 0,
28         SW_F_RUNNING    = 1 << 1,
29         SW_F_EXIT       = 1 << 2,
30         SW_F_EXITED     = 1 << 3,
31         SW_F_ACCOUNTED  = 1 << 4,
32         SW_F_ERROR      = 1 << 5,
33 };
34
35 static struct submit_worker *__get_submit_worker(struct workqueue *wq,
36                                                  unsigned int start,
37                                                  unsigned int end,
38                                                  struct submit_worker **best)
39 {
40         struct submit_worker *sw = NULL;
41
42         while (start <= end) {
43                 sw = &wq->workers[start];
44                 if (sw->flags & SW_F_IDLE)
45                         return sw;
46                 if (!(*best) || sw->seq < (*best)->seq)
47                         *best = sw;
48                 start++;
49         }
50
51         return NULL;
52 }
53
54 static struct submit_worker *get_submit_worker(struct workqueue *wq)
55 {
56         unsigned int next = wq->next_free_worker;
57         struct submit_worker *sw, *best = NULL;
58
59         assert(next < wq->max_workers);
60
61         sw = __get_submit_worker(wq, next, wq->max_workers - 1, &best);
62         if (!sw && next)
63                 sw = __get_submit_worker(wq, 0, next - 1, &best);
64
65         /*
66          * No truly idle found, use best match
67          */
68         if (!sw)
69                 sw = best;
70
71         if (sw->index == wq->next_free_worker) {
72                 if (sw->index + 1 < wq->max_workers)
73                         wq->next_free_worker = sw->index + 1;
74                 else
75                         wq->next_free_worker = 0;
76         }
77
78         return sw;
79 }
80
81 static bool all_sw_idle(struct workqueue *wq)
82 {
83         int i;
84
85         for (i = 0; i < wq->max_workers; i++) {
86                 struct submit_worker *sw = &wq->workers[i];
87
88                 if (!(sw->flags & SW_F_IDLE))
89                         return false;
90         }
91
92         return true;
93 }
94
95 /*
96  * Must be serialized wrt workqueue_enqueue() by caller
97  */
98 void workqueue_flush(struct workqueue *wq)
99 {
100         wq->wake_idle = 1;
101
102         while (!all_sw_idle(wq)) {
103                 pthread_mutex_lock(&wq->flush_lock);
104                 pthread_cond_wait(&wq->flush_cond, &wq->flush_lock);
105                 pthread_mutex_unlock(&wq->flush_lock);
106         }
107
108         wq->wake_idle = 0;
109 }
110
111 /*
112  * Must be serialized by caller. Returns true for queued, false for busy.
113  */
114 bool workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work)
115 {
116         struct submit_worker *sw;
117
118         sw = get_submit_worker(wq);
119         if (sw) {
120                 pthread_mutex_lock(&sw->lock);
121                 flist_add_tail(&work->list, &sw->work_list);
122                 sw->seq = ++wq->work_seq;
123                 sw->flags &= ~SW_F_IDLE;
124                 pthread_mutex_unlock(&sw->lock);
125
126                 pthread_cond_signal(&sw->cond);
127                 return true;
128         }
129
130         return false;
131 }
132
133 static void handle_list(struct submit_worker *sw, struct flist_head *list)
134 {
135         struct workqueue *wq = sw->wq;
136         struct workqueue_work *work;
137
138         while (!flist_empty(list)) {
139                 work = flist_first_entry(list, struct workqueue_work, list);
140                 flist_del_init(&work->list);
141                 wq->ops.fn(&sw->td, work);
142         }
143 }
144
145 static int init_submit_worker(struct submit_worker *sw)
146 {
147         struct thread_data *parent = sw->wq->td;
148         struct thread_data *td = &sw->td;
149         int fio_unused ret;
150
151         memcpy(&td->o, &parent->o, sizeof(td->o));
152         memcpy(&td->ts, &parent->ts, sizeof(td->ts));
153         td->o.uid = td->o.gid = -1U;
154         dup_files(td, parent);
155         td->eo = parent->eo;
156         fio_options_mem_dupe(td);
157
158         if (ioengine_load(td))
159                 goto err;
160
161         if (td->o.odirect)
162                 td->io_ops->flags |= FIO_RAWIO;
163
164         td->pid = gettid();
165
166         INIT_FLIST_HEAD(&td->io_log_list);
167         INIT_FLIST_HEAD(&td->io_hist_list);
168         INIT_FLIST_HEAD(&td->verify_list);
169         INIT_FLIST_HEAD(&td->trim_list);
170         INIT_FLIST_HEAD(&td->next_rand_list);
171         td->io_hist_tree = RB_ROOT;
172
173         td->o.iodepth = 1;
174         if (td_io_init(td))
175                 goto err_io_init;
176
177         fio_gettime(&td->epoch, NULL);
178         fio_getrusage(&td->ru_start);
179         clear_io_state(td, 1);
180
181         td_set_runstate(td, TD_RUNNING);
182         td->flags |= TD_F_CHILD;
183         td->parent = parent;
184         return 0;
185
186 err_io_init:
187         close_ioengine(td);
188 err:
189         return 1;
190 }
191
192 #ifdef CONFIG_SFAA
193 static void sum_val(uint64_t *dst, uint64_t *src)
194 {
195         if (*src) {
196                 __sync_fetch_and_add(dst, *src);
197                 *src = 0;
198         }
199 }
200 #else
201 static void sum_val(uint64_t *dst, uint64_t *src)
202 {
203         if (*src) {
204                 *dst += *src;
205                 *src = 0;
206         }
207 }
208 #endif
209
210 static void pthread_double_unlock(pthread_mutex_t *lock1,
211                                   pthread_mutex_t *lock2)
212 {
213 #ifndef CONFIG_SFAA
214         pthread_mutex_unlock(lock1);
215         pthread_mutex_unlock(lock2);
216 #endif
217 }
218
219 static void pthread_double_lock(pthread_mutex_t *lock1, pthread_mutex_t *lock2)
220 {
221 #ifndef CONFIG_SFAA
222         if (lock1 < lock2) {
223                 pthread_mutex_lock(lock1);
224                 pthread_mutex_lock(lock2);
225         } else {
226                 pthread_mutex_lock(lock2);
227                 pthread_mutex_lock(lock1);
228         }
229 #endif
230 }
231
232 static void sum_ddir(struct thread_data *dst, struct thread_data *src,
233                      enum fio_ddir ddir)
234 {
235         pthread_double_lock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
236
237         sum_val(&dst->io_bytes[ddir], &src->io_bytes[ddir]);
238         sum_val(&dst->io_blocks[ddir], &src->io_blocks[ddir]);
239         sum_val(&dst->this_io_blocks[ddir], &src->this_io_blocks[ddir]);
240         sum_val(&dst->this_io_bytes[ddir], &src->this_io_bytes[ddir]);
241         sum_val(&dst->bytes_done[ddir], &src->bytes_done[ddir]);
242
243         pthread_double_unlock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
244 }
245
246 static void update_accounting(struct submit_worker *sw)
247 {
248         struct thread_data *src = &sw->td;
249         struct thread_data *dst = sw->wq->td;
250
251         if (td_read(src))
252                 sum_ddir(dst, src, DDIR_READ);
253         if (td_write(src))
254                 sum_ddir(dst, src, DDIR_WRITE);
255         if (td_trim(src))
256                 sum_ddir(dst, src, DDIR_TRIM);
257 }
258
259 static void *worker_thread(void *data)
260 {
261         struct submit_worker *sw = data;
262         struct workqueue *wq = sw->wq;
263         unsigned int eflags = 0, ret;
264         FLIST_HEAD(local_list);
265
266         ret = init_submit_worker(sw);
267         pthread_mutex_lock(&sw->lock);
268         sw->flags |= SW_F_RUNNING;
269         if (ret)
270                 sw->flags |= SW_F_ERROR;
271         pthread_mutex_unlock(&sw->lock);
272
273         pthread_mutex_lock(&wq->flush_lock);
274         pthread_cond_signal(&wq->flush_cond);
275         pthread_mutex_unlock(&wq->flush_lock);
276
277         if (sw->flags & SW_F_ERROR)
278                 goto done;
279
280         while (1) {
281                 pthread_mutex_lock(&sw->lock);
282
283                 if (flist_empty(&sw->work_list)) {
284                         if (sw->flags & SW_F_EXIT) {
285                                 pthread_mutex_unlock(&sw->lock);
286                                 break;
287                         }
288
289                         if (workqueue_pre_sleep_check(wq)) {
290                                 pthread_mutex_unlock(&sw->lock);
291                                 workqueue_pre_sleep(wq);
292                                 pthread_mutex_lock(&sw->lock);
293                         }
294
295                         /*
296                          * We dropped and reaquired the lock, check
297                          * state again.
298                          */
299                         if (!flist_empty(&sw->work_list))
300                                 goto handle_work;
301
302                         if (sw->flags & SW_F_EXIT) {
303                                 pthread_mutex_unlock(&sw->lock);
304                                 break;
305                         } else if (!(sw->flags & SW_F_IDLE)) {
306                                 sw->flags |= SW_F_IDLE;
307                                 wq->next_free_worker = sw->index;
308                                 if (wq->wake_idle)
309                                         pthread_cond_signal(&wq->flush_cond);
310                         }
311                         update_accounting(sw);
312                         pthread_cond_wait(&sw->cond, &sw->lock);
313                 } else {
314 handle_work:
315                         flist_splice_init(&sw->work_list, &local_list);
316                 }
317                 pthread_mutex_unlock(&sw->lock);
318                 handle_list(sw, &local_list);
319         }
320
321         update_accounting(sw);
322
323 done:
324         pthread_mutex_lock(&sw->lock);
325         sw->flags |= (SW_F_EXITED | eflags);
326         pthread_mutex_unlock(&sw->lock);
327         return NULL;
328 }
329
330 static void free_worker(struct submit_worker *sw)
331 {
332         struct thread_data *td = &sw->td;
333
334         fio_options_free(td);
335         close_and_free_files(td);
336         if (td->io_ops)
337                 close_ioengine(td);
338         td_set_runstate(td, TD_EXITED);
339
340         pthread_cond_destroy(&sw->cond);
341         pthread_mutex_destroy(&sw->lock);
342 }
343
344 static void shutdown_worker(struct submit_worker *sw, unsigned int *sum_cnt)
345 {
346         struct thread_data *parent = sw->wq->td;
347
348         pthread_join(sw->thread, NULL);
349         (*sum_cnt)++;
350         sum_thread_stats(&parent->ts, &sw->td.ts, *sum_cnt == 1);
351         free_worker(sw);
352 }
353
354 void workqueue_exit(struct workqueue *wq)
355 {
356         unsigned int shutdown, sum_cnt = 0;
357         struct submit_worker *sw;
358         int i;
359
360         for (i = 0; i < wq->max_workers; i++) {
361                 sw = &wq->workers[i];
362
363                 pthread_mutex_lock(&sw->lock);
364                 sw->flags |= SW_F_EXIT;
365                 pthread_cond_signal(&sw->cond);
366                 pthread_mutex_unlock(&sw->lock);
367         }
368
369         do {
370                 shutdown = 0;
371                 for (i = 0; i < wq->max_workers; i++) {
372                         sw = &wq->workers[i];
373                         if (sw->flags & SW_F_ACCOUNTED)
374                                 continue;
375                         pthread_mutex_lock(&sw->lock);
376                         sw->flags |= SW_F_ACCOUNTED;
377                         pthread_mutex_unlock(&sw->lock);
378                         shutdown_worker(sw, &sum_cnt);
379                         shutdown++;
380                 }
381         } while (shutdown && shutdown != wq->max_workers);
382
383         free(wq->workers);
384         pthread_mutex_destroy(&wq->flush_lock);
385         pthread_cond_destroy(&wq->flush_cond);
386         pthread_mutex_destroy(&wq->stat_lock);
387 }
388
389 static int start_worker(struct workqueue *wq, unsigned int index)
390 {
391         struct submit_worker *sw = &wq->workers[index];
392         int ret;
393
394         INIT_FLIST_HEAD(&sw->work_list);
395         pthread_cond_init(&sw->cond, NULL);
396         pthread_mutex_init(&sw->lock, NULL);
397         sw->wq = wq;
398         sw->index = index;
399
400         ret = pthread_create(&sw->thread, NULL, worker_thread, sw);
401         if (!ret) {
402                 pthread_mutex_lock(&sw->lock);
403                 sw->flags = SW_F_IDLE;
404                 pthread_mutex_unlock(&sw->lock);
405                 return 0;
406         }
407
408         free_worker(sw);
409         return 1;
410 }
411
412 int workqueue_init(struct thread_data *td, struct workqueue *wq,
413                    struct workqueue_ops *ops, unsigned max_pending)
414 {
415         unsigned int running;
416         int i, error;
417
418         wq->max_workers = max_pending;
419         wq->td = td;
420         wq->ops = *ops;
421         wq->work_seq = 0;
422         wq->next_free_worker = 0;
423         pthread_cond_init(&wq->flush_cond, NULL);
424         pthread_mutex_init(&wq->flush_lock, NULL);
425         pthread_mutex_init(&wq->stat_lock, NULL);
426
427         wq->workers = calloc(wq->max_workers, sizeof(struct submit_worker));
428
429         for (i = 0; i < wq->max_workers; i++)
430                 if (start_worker(wq, i))
431                         break;
432
433         wq->max_workers = i;
434         if (!wq->max_workers)
435                 goto err;
436
437         /*
438          * Wait for them all to be started and initialized
439          */
440         error = 0;
441         do {
442                 struct submit_worker *sw;
443
444                 running = 0;
445                 pthread_mutex_lock(&wq->flush_lock);
446                 for (i = 0; i < wq->max_workers; i++) {
447                         sw = &wq->workers[i];
448                         pthread_mutex_lock(&sw->lock);
449                         if (sw->flags & SW_F_RUNNING)
450                                 running++;
451                         if (sw->flags & SW_F_ERROR)
452                                 error++;
453                         pthread_mutex_unlock(&sw->lock);
454                 }
455
456                 if (error || running == wq->max_workers) {
457                         pthread_mutex_unlock(&wq->flush_lock);
458                         break;
459                 }
460
461                 pthread_cond_wait(&wq->flush_cond, &wq->flush_lock);
462                 pthread_mutex_unlock(&wq->flush_lock);
463         } while (1);
464
465         if (!error)
466                 return 0;
467
468 err:
469         log_err("Can't create rate workqueue\n");
470         td_verror(td, ESRCH, "workqueue_init");
471         workqueue_exit(wq);
472         return 1;
473 }