workqueue/rate-submit: update comments
[fio.git] / rate-submit.c
CommitLineData
51575029
JA
1/*
2 * Rated submission helpers
3 *
4 * Copyright (C) 2015 Jens Axboe <axboe@kernel.dk>
5 *
6 */
40511301
JA
7#include "fio.h"
8#include "ioengine.h"
9#include "lib/getrusage.h"
10
155f2f02
JA
11static int io_workqueue_fn(struct submit_worker *sw,
12 struct workqueue_work *work)
40511301
JA
13{
14 struct io_u *io_u = container_of(work, struct io_u, work);
15 const enum fio_ddir ddir = io_u->ddir;
16 struct thread_data *td = sw->private;
17 int ret;
18
19 dprint(FD_RATE, "io_u %p queued by %u\n", io_u, gettid());
20
21 io_u_set(io_u, IO_U_F_NO_FILE_PUT);
22
23 td->cur_depth++;
24
25 do {
26 ret = td_io_queue(td, io_u);
27 if (ret != FIO_Q_BUSY)
28 break;
29 ret = io_u_queued_complete(td, 1);
30 if (ret > 0)
31 td->cur_depth -= ret;
32 io_u_clear(io_u, IO_U_F_FLIGHT);
33 } while (1);
34
35 dprint(FD_RATE, "io_u %p ret %d by %u\n", io_u, ret, gettid());
36
37 io_queue_event(td, io_u, &ret, ddir, NULL, 0, NULL);
38
39 if (ret == FIO_Q_COMPLETED)
40 td->cur_depth--;
41 else if (ret == FIO_Q_QUEUED) {
42 unsigned int min_evts;
43
44 if (td->o.iodepth == 1)
45 min_evts = 1;
46 else
47 min_evts = 0;
48
49 ret = io_u_queued_complete(td, min_evts);
50 if (ret > 0)
51 td->cur_depth -= ret;
52 } else if (ret == FIO_Q_BUSY) {
53 ret = io_u_queued_complete(td, td->cur_depth);
54 if (ret > 0)
55 td->cur_depth -= ret;
56 }
155f2f02
JA
57
58 return 0;
40511301
JA
59}
60
61static bool io_workqueue_pre_sleep_flush_fn(struct submit_worker *sw)
62{
63 struct thread_data *td = sw->private;
64
65 if (td->io_u_queued || td->cur_depth || td->io_u_in_flight)
66 return true;
67
68 return false;
69}
70
71static void io_workqueue_pre_sleep_fn(struct submit_worker *sw)
72{
73 struct thread_data *td = sw->private;
74 int ret;
75
76 ret = io_u_quiesce(td);
77 if (ret > 0)
78 td->cur_depth -= ret;
79}
80
81static int io_workqueue_alloc_fn(struct submit_worker *sw)
82{
83 struct thread_data *td;
84
85 td = calloc(1, sizeof(*td));
86 sw->private = td;
87 return 0;
88}
89
90static void io_workqueue_free_fn(struct submit_worker *sw)
91{
92 free(sw->private);
93 sw->private = NULL;
94}
95
96static int io_workqueue_init_worker_fn(struct submit_worker *sw)
97{
98 struct thread_data *parent = sw->wq->td;
99 struct thread_data *td = sw->private;
100 int fio_unused ret;
101
102 memcpy(&td->o, &parent->o, sizeof(td->o));
103 memcpy(&td->ts, &parent->ts, sizeof(td->ts));
104 td->o.uid = td->o.gid = -1U;
105 dup_files(td, parent);
106 td->eo = parent->eo;
107 fio_options_mem_dupe(td);
108
109 if (ioengine_load(td))
110 goto err;
111
112 if (td->o.odirect)
113 td->io_ops->flags |= FIO_RAWIO;
114
115 td->pid = gettid();
116
117 INIT_FLIST_HEAD(&td->io_log_list);
118 INIT_FLIST_HEAD(&td->io_hist_list);
119 INIT_FLIST_HEAD(&td->verify_list);
120 INIT_FLIST_HEAD(&td->trim_list);
121 INIT_FLIST_HEAD(&td->next_rand_list);
122 td->io_hist_tree = RB_ROOT;
123
124 td->o.iodepth = 1;
125 if (td_io_init(td))
126 goto err_io_init;
127
128 fio_gettime(&td->epoch, NULL);
129 fio_getrusage(&td->ru_start);
130 clear_io_state(td, 1);
131
132 td_set_runstate(td, TD_RUNNING);
133 td->flags |= TD_F_CHILD;
134 td->parent = parent;
135 return 0;
136
137err_io_init:
138 close_ioengine(td);
139err:
140 return 1;
141
142}
143
144static void io_workqueue_exit_worker_fn(struct submit_worker *sw,
145 unsigned int *sum_cnt)
146{
147 struct thread_data *td = sw->private;
148
149 (*sum_cnt)++;
150 sum_thread_stats(&sw->wq->td->ts, &td->ts, *sum_cnt == 1);
151
152 fio_options_free(td);
153 close_and_free_files(td);
154 if (td->io_ops)
155 close_ioengine(td);
156 td_set_runstate(td, TD_EXITED);
157}
158
159#ifdef CONFIG_SFAA
160static void sum_val(uint64_t *dst, uint64_t *src)
161{
162 if (*src) {
163 __sync_fetch_and_add(dst, *src);
164 *src = 0;
165 }
166}
167#else
168static void sum_val(uint64_t *dst, uint64_t *src)
169{
170 if (*src) {
171 *dst += *src;
172 *src = 0;
173 }
174}
175#endif
176
177static void pthread_double_unlock(pthread_mutex_t *lock1,
178 pthread_mutex_t *lock2)
179{
180#ifndef CONFIG_SFAA
181 pthread_mutex_unlock(lock1);
182 pthread_mutex_unlock(lock2);
183#endif
184}
185
186static void pthread_double_lock(pthread_mutex_t *lock1, pthread_mutex_t *lock2)
187{
188#ifndef CONFIG_SFAA
189 if (lock1 < lock2) {
190 pthread_mutex_lock(lock1);
191 pthread_mutex_lock(lock2);
192 } else {
193 pthread_mutex_lock(lock2);
194 pthread_mutex_lock(lock1);
195 }
196#endif
197}
198
199static void sum_ddir(struct thread_data *dst, struct thread_data *src,
200 enum fio_ddir ddir)
201{
202 pthread_double_lock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
203
204 sum_val(&dst->io_bytes[ddir], &src->io_bytes[ddir]);
205 sum_val(&dst->io_blocks[ddir], &src->io_blocks[ddir]);
206 sum_val(&dst->this_io_blocks[ddir], &src->this_io_blocks[ddir]);
207 sum_val(&dst->this_io_bytes[ddir], &src->this_io_bytes[ddir]);
208 sum_val(&dst->bytes_done[ddir], &src->bytes_done[ddir]);
209
210 pthread_double_unlock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
211}
212
213static void io_workqueue_update_acct_fn(struct submit_worker *sw)
214{
215 struct thread_data *src = sw->private;
216 struct thread_data *dst = sw->wq->td;
217
218 if (td_read(src))
219 sum_ddir(dst, src, DDIR_READ);
220 if (td_write(src))
221 sum_ddir(dst, src, DDIR_WRITE);
222 if (td_trim(src))
223 sum_ddir(dst, src, DDIR_TRIM);
224
225}
226
227struct workqueue_ops rated_wq_ops = {
228 .fn = io_workqueue_fn,
229 .pre_sleep_flush_fn = io_workqueue_pre_sleep_flush_fn,
230 .pre_sleep_fn = io_workqueue_pre_sleep_fn,
231 .update_acct_fn = io_workqueue_update_acct_fn,
232 .alloc_worker_fn = io_workqueue_alloc_fn,
233 .free_worker_fn = io_workqueue_free_fn,
234 .init_worker_fn = io_workqueue_init_worker_fn,
235 .exit_worker_fn = io_workqueue_exit_worker_fn,
236};