workqueue: add a workqueue_work type
[fio.git] / workqueue.c
CommitLineData
a9da8ab2
JA
1/*
2 * Rated submission helpers
3 *
4 * Copyright (C) 2015 Jens Axboe <axboe@kernel.dk>
5 *
6 */
7#include <unistd.h>
8
9#include "fio.h"
a9da8ab2
JA
10#include "flist.h"
11#include "workqueue.h"
12#include "lib/getrusage.h"
13
14struct submit_worker {
15 pthread_t thread;
16 pthread_mutex_t lock;
17 pthread_cond_t cond;
18 struct flist_head work_list;
19 unsigned int flags;
20 unsigned int index;
21 uint64_t seq;
22 struct workqueue *wq;
23 struct thread_data td;
24};
25
26enum {
27 SW_F_IDLE = 1 << 0,
28 SW_F_RUNNING = 1 << 1,
29 SW_F_EXIT = 1 << 2,
30 SW_F_EXITED = 1 << 3,
31 SW_F_ACCOUNTED = 1 << 4,
32 SW_F_ERROR = 1 << 5,
33};
34
35static struct submit_worker *__get_submit_worker(struct workqueue *wq,
36 unsigned int start,
37 unsigned int end,
38 struct submit_worker **best)
39{
40 struct submit_worker *sw = NULL;
41
42 while (start <= end) {
43 sw = &wq->workers[start];
44 if (sw->flags & SW_F_IDLE)
45 return sw;
46 if (!(*best) || sw->seq < (*best)->seq)
47 *best = sw;
48 start++;
49 }
50
51 return NULL;
52}
53
54static struct submit_worker *get_submit_worker(struct workqueue *wq)
55{
56 unsigned int next = wq->next_free_worker;
57 struct submit_worker *sw, *best = NULL;
58
59 assert(next < wq->max_workers);
60
61 sw = __get_submit_worker(wq, next, wq->max_workers - 1, &best);
62 if (!sw && next)
63 sw = __get_submit_worker(wq, 0, next - 1, &best);
64
65 /*
66 * No truly idle found, use best match
67 */
68 if (!sw)
69 sw = best;
70
71 if (sw->index == wq->next_free_worker) {
72 if (sw->index + 1 < wq->max_workers)
73 wq->next_free_worker = sw->index + 1;
74 else
75 wq->next_free_worker = 0;
76 }
77
78 return sw;
79}
80
1391052a 81static bool all_sw_idle(struct workqueue *wq)
a9da8ab2
JA
82{
83 int i;
84
85 for (i = 0; i < wq->max_workers; i++) {
86 struct submit_worker *sw = &wq->workers[i];
87
88 if (!(sw->flags & SW_F_IDLE))
1391052a 89 return false;
a9da8ab2
JA
90 }
91
1391052a 92 return true;
a9da8ab2
JA
93}
94
95/*
96 * Must be serialized wrt workqueue_enqueue() by caller
97 */
98void workqueue_flush(struct workqueue *wq)
99{
100 wq->wake_idle = 1;
101
102 while (!all_sw_idle(wq)) {
103 pthread_mutex_lock(&wq->flush_lock);
104 pthread_cond_wait(&wq->flush_cond, &wq->flush_lock);
105 pthread_mutex_unlock(&wq->flush_lock);
106 }
107
108 wq->wake_idle = 0;
109}
110
111/*
b07f6ad1 112 * Must be serialized by caller. Returns true for queued, false for busy.
a9da8ab2 113 */
88271841 114bool workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work)
a9da8ab2
JA
115{
116 struct submit_worker *sw;
117
118 sw = get_submit_worker(wq);
119 if (sw) {
a9da8ab2 120 pthread_mutex_lock(&sw->lock);
88271841 121 flist_add_tail(&work->list, &sw->work_list);
a9da8ab2
JA
122 sw->seq = ++wq->work_seq;
123 sw->flags &= ~SW_F_IDLE;
124 pthread_mutex_unlock(&sw->lock);
125
126 pthread_cond_signal(&sw->cond);
b07f6ad1 127 return true;
a9da8ab2
JA
128 }
129
b07f6ad1 130 return false;
a9da8ab2
JA
131}
132
133static void handle_list(struct submit_worker *sw, struct flist_head *list)
134{
135 struct workqueue *wq = sw->wq;
88271841 136 struct workqueue_work *work;
a9da8ab2
JA
137
138 while (!flist_empty(list)) {
88271841
JA
139 work = flist_first_entry(list, struct workqueue_work, list);
140 flist_del_init(&work->list);
141 wq->fn(&sw->td, work);
a9da8ab2
JA
142 }
143}
144
145static int init_submit_worker(struct submit_worker *sw)
146{
147 struct thread_data *parent = sw->wq->td;
148 struct thread_data *td = &sw->td;
149 int fio_unused ret;
150
151 memcpy(&td->o, &parent->o, sizeof(td->o));
152 memcpy(&td->ts, &parent->ts, sizeof(td->ts));
153 td->o.uid = td->o.gid = -1U;
154 dup_files(td, parent);
736a50cd 155 td->eo = parent->eo;
a9da8ab2
JA
156 fio_options_mem_dupe(td);
157
158 if (ioengine_load(td))
159 goto err;
160
161 if (td->o.odirect)
162 td->io_ops->flags |= FIO_RAWIO;
163
164 td->pid = gettid();
165
166 INIT_FLIST_HEAD(&td->io_log_list);
167 INIT_FLIST_HEAD(&td->io_hist_list);
168 INIT_FLIST_HEAD(&td->verify_list);
169 INIT_FLIST_HEAD(&td->trim_list);
170 INIT_FLIST_HEAD(&td->next_rand_list);
171 td->io_hist_tree = RB_ROOT;
172
173 td->o.iodepth = 1;
174 if (td_io_init(td))
175 goto err_io_init;
176
177 fio_gettime(&td->epoch, NULL);
178 fio_getrusage(&td->ru_start);
ac28d905 179 clear_io_state(td, 1);
a9da8ab2
JA
180
181 td_set_runstate(td, TD_RUNNING);
182 td->flags |= TD_F_CHILD;
183 td->parent = parent;
184 return 0;
185
186err_io_init:
187 close_ioengine(td);
188err:
189 return 1;
190}
191
f5cd2907 192#ifdef CONFIG_SFAA
a9da8ab2
JA
193static void sum_val(uint64_t *dst, uint64_t *src)
194{
195 if (*src) {
196 __sync_fetch_and_add(dst, *src);
197 *src = 0;
198 }
199}
2a274336
JA
200#else
201static void sum_val(uint64_t *dst, uint64_t *src)
202{
203 if (*src) {
204 *dst += *src;
205 *src = 0;
206 }
207}
208#endif
a9da8ab2 209
a6a3469e
JA
210static void pthread_double_unlock(pthread_mutex_t *lock1,
211 pthread_mutex_t *lock2)
a9da8ab2 212{
f5cd2907 213#ifndef CONFIG_SFAA
a6a3469e
JA
214 pthread_mutex_unlock(lock1);
215 pthread_mutex_unlock(lock2);
2a274336 216#endif
a6a3469e
JA
217}
218
219static void pthread_double_lock(pthread_mutex_t *lock1, pthread_mutex_t *lock2)
220{
221#ifndef CONFIG_SFAA
222 if (lock1 < lock2) {
223 pthread_mutex_lock(lock1);
224 pthread_mutex_lock(lock2);
225 } else {
226 pthread_mutex_lock(lock2);
227 pthread_mutex_lock(lock1);
228 }
229#endif
230}
231
232static void sum_ddir(struct thread_data *dst, struct thread_data *src,
233 enum fio_ddir ddir)
234{
235 pthread_double_lock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
2a274336 236
a9da8ab2
JA
237 sum_val(&dst->io_bytes[ddir], &src->io_bytes[ddir]);
238 sum_val(&dst->io_blocks[ddir], &src->io_blocks[ddir]);
239 sum_val(&dst->this_io_blocks[ddir], &src->this_io_blocks[ddir]);
240 sum_val(&dst->this_io_bytes[ddir], &src->this_io_bytes[ddir]);
241 sum_val(&dst->bytes_done[ddir], &src->bytes_done[ddir]);
2a274336 242
a6a3469e 243 pthread_double_unlock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
a9da8ab2
JA
244}
245
246static void update_accounting(struct submit_worker *sw)
247{
248 struct thread_data *src = &sw->td;
249 struct thread_data *dst = sw->wq->td;
250
251 if (td_read(src))
252 sum_ddir(dst, src, DDIR_READ);
253 if (td_write(src))
254 sum_ddir(dst, src, DDIR_WRITE);
255 if (td_trim(src))
256 sum_ddir(dst, src, DDIR_TRIM);
257}
258
259static void *worker_thread(void *data)
260{
261 struct submit_worker *sw = data;
262 struct workqueue *wq = sw->wq;
263 struct thread_data *td = &sw->td;
264 unsigned int eflags = 0, ret;
265 FLIST_HEAD(local_list);
266
267 ret = init_submit_worker(sw);
268 pthread_mutex_lock(&sw->lock);
269 sw->flags |= SW_F_RUNNING;
270 if (ret)
271 sw->flags |= SW_F_ERROR;
272 pthread_mutex_unlock(&sw->lock);
273
274 pthread_mutex_lock(&wq->flush_lock);
275 pthread_cond_signal(&wq->flush_cond);
276 pthread_mutex_unlock(&wq->flush_lock);
277
278 if (sw->flags & SW_F_ERROR)
279 goto done;
280
281 while (1) {
282 pthread_mutex_lock(&sw->lock);
283
284 if (flist_empty(&sw->work_list)) {
285 if (sw->flags & SW_F_EXIT) {
286 pthread_mutex_unlock(&sw->lock);
287 break;
288 }
289
290 if (td->io_u_queued || td->cur_depth ||
291 td->io_u_in_flight) {
40649b00
JA
292 int ret;
293
a9da8ab2 294 pthread_mutex_unlock(&sw->lock);
40649b00
JA
295 ret = io_u_quiesce(td);
296 if (ret > 0)
297 td->cur_depth -= ret;
a9da8ab2
JA
298 pthread_mutex_lock(&sw->lock);
299 }
300
301 /*
302 * We dropped and reaquired the lock, check
303 * state again.
304 */
305 if (!flist_empty(&sw->work_list))
306 goto handle_work;
307
308 if (sw->flags & SW_F_EXIT) {
309 pthread_mutex_unlock(&sw->lock);
310 break;
311 } else if (!(sw->flags & SW_F_IDLE)) {
312 sw->flags |= SW_F_IDLE;
313 wq->next_free_worker = sw->index;
314 if (wq->wake_idle)
315 pthread_cond_signal(&wq->flush_cond);
316 }
317 update_accounting(sw);
318 pthread_cond_wait(&sw->cond, &sw->lock);
319 } else {
320handle_work:
321 flist_splice_init(&sw->work_list, &local_list);
322 }
323 pthread_mutex_unlock(&sw->lock);
324 handle_list(sw, &local_list);
325 }
326
327 update_accounting(sw);
328
329done:
330 pthread_mutex_lock(&sw->lock);
331 sw->flags |= (SW_F_EXITED | eflags);
332 pthread_mutex_unlock(&sw->lock);
333 return NULL;
334}
335
336static void free_worker(struct submit_worker *sw)
337{
338 struct thread_data *td = &sw->td;
339
340 fio_options_free(td);
341 close_and_free_files(td);
342 if (td->io_ops)
343 close_ioengine(td);
344 td_set_runstate(td, TD_EXITED);
345
346 pthread_cond_destroy(&sw->cond);
347 pthread_mutex_destroy(&sw->lock);
348}
349
350static void shutdown_worker(struct submit_worker *sw, unsigned int *sum_cnt)
351{
352 struct thread_data *parent = sw->wq->td;
353
354 pthread_join(sw->thread, NULL);
355 (*sum_cnt)++;
fd595830 356 sum_thread_stats(&parent->ts, &sw->td.ts, *sum_cnt == 1);
a9da8ab2
JA
357 free_worker(sw);
358}
359
360void workqueue_exit(struct workqueue *wq)
361{
362 unsigned int shutdown, sum_cnt = 0;
363 struct submit_worker *sw;
364 int i;
365
366 for (i = 0; i < wq->max_workers; i++) {
367 sw = &wq->workers[i];
368
369 pthread_mutex_lock(&sw->lock);
370 sw->flags |= SW_F_EXIT;
371 pthread_cond_signal(&sw->cond);
372 pthread_mutex_unlock(&sw->lock);
373 }
374
375 do {
376 shutdown = 0;
377 for (i = 0; i < wq->max_workers; i++) {
378 sw = &wq->workers[i];
379 if (sw->flags & SW_F_ACCOUNTED)
380 continue;
b7d0bbf1 381 pthread_mutex_lock(&sw->lock);
a9da8ab2 382 sw->flags |= SW_F_ACCOUNTED;
b7d0bbf1 383 pthread_mutex_unlock(&sw->lock);
a9da8ab2
JA
384 shutdown_worker(sw, &sum_cnt);
385 shutdown++;
386 }
387 } while (shutdown && shutdown != wq->max_workers);
388
389 free(wq->workers);
390 pthread_mutex_destroy(&wq->flush_lock);
391 pthread_cond_destroy(&wq->flush_cond);
2a274336 392 pthread_mutex_destroy(&wq->stat_lock);
a9da8ab2
JA
393}
394
395static int start_worker(struct workqueue *wq, unsigned int index)
396{
397 struct submit_worker *sw = &wq->workers[index];
398 int ret;
399
400 INIT_FLIST_HEAD(&sw->work_list);
401 pthread_cond_init(&sw->cond, NULL);
402 pthread_mutex_init(&sw->lock, NULL);
403 sw->wq = wq;
404 sw->index = index;
405
406 ret = pthread_create(&sw->thread, NULL, worker_thread, sw);
407 if (!ret) {
408 pthread_mutex_lock(&sw->lock);
409 sw->flags = SW_F_IDLE;
410 pthread_mutex_unlock(&sw->lock);
411 return 0;
412 }
413
414 free_worker(sw);
415 return 1;
416}
417
418int workqueue_init(struct thread_data *td, struct workqueue *wq,
419 workqueue_fn *fn, unsigned max_pending)
420{
421 unsigned int running;
422 int i, error;
423
424 wq->max_workers = max_pending;
425 wq->td = td;
426 wq->fn = fn;
427 wq->work_seq = 0;
428 wq->next_free_worker = 0;
429 pthread_cond_init(&wq->flush_cond, NULL);
430 pthread_mutex_init(&wq->flush_lock, NULL);
2a274336 431 pthread_mutex_init(&wq->stat_lock, NULL);
a9da8ab2
JA
432
433 wq->workers = calloc(wq->max_workers, sizeof(struct submit_worker));
434
435 for (i = 0; i < wq->max_workers; i++)
436 if (start_worker(wq, i))
437 break;
438
439 wq->max_workers = i;
bddc8d16
JA
440 if (!wq->max_workers)
441 goto err;
a9da8ab2
JA
442
443 /*
444 * Wait for them all to be started and initialized
445 */
446 error = 0;
447 do {
448 struct submit_worker *sw;
449
450 running = 0;
451 pthread_mutex_lock(&wq->flush_lock);
452 for (i = 0; i < wq->max_workers; i++) {
453 sw = &wq->workers[i];
454 pthread_mutex_lock(&sw->lock);
455 if (sw->flags & SW_F_RUNNING)
456 running++;
457 if (sw->flags & SW_F_ERROR)
458 error++;
459 pthread_mutex_unlock(&sw->lock);
460 }
461
462 if (error || running == wq->max_workers) {
463 pthread_mutex_unlock(&wq->flush_lock);
464 break;
465 }
466
467 pthread_cond_wait(&wq->flush_cond, &wq->flush_lock);
468 pthread_mutex_unlock(&wq->flush_lock);
469 } while (1);
470
bddc8d16
JA
471 if (!error)
472 return 0;
a9da8ab2 473
bddc8d16
JA
474err:
475 log_err("Can't create rate workqueue\n");
476 td_verror(td, ESRCH, "workqueue_init");
477 workqueue_exit(wq);
478 return 1;
a9da8ab2 479}