8ecd285c7d91eba927e762e9ab6e5f422c01836f
[fio.git] / rate-submit.c
1 /*
2  * Rated submission helpers
3  *
4  * Copyright (C) 2015 Jens Axboe <axboe@kernel.dk>
5  *
6  */
7 #include "fio.h"
8 #include "ioengines.h"
9 #include "lib/getrusage.h"
10 #include "rate-submit.h"
11
12 static void check_overlap(struct io_u *io_u)
13 {
14         int i;
15         struct thread_data *td;
16         bool overlap = false;
17
18         do {
19                 /*
20                  * Allow only one thread to check for overlap at a
21                  * time to prevent two threads from thinking the coast
22                  * is clear and then submitting IOs that overlap with
23                  * each other
24                  */
25                 pthread_mutex_lock(&overlap_check);
26                 for_each_td(td, i) {
27                         if (td->runstate <= TD_SETTING_UP ||
28                                 td->runstate >= TD_FINISHING ||
29                                 !td->o.serialize_overlap ||
30                                 td->o.io_submit_mode != IO_MODE_OFFLOAD)
31                                 continue;
32
33                         overlap = in_flight_overlap(&td->io_u_all, io_u);
34                         if (overlap) {
35                                 pthread_mutex_unlock(&overlap_check);
36                                 break;
37                         }
38                 }
39         } while (overlap);
40 }
41
42 static int io_workqueue_fn(struct submit_worker *sw,
43                            struct workqueue_work *work)
44 {
45         struct io_u *io_u = container_of(work, struct io_u, work);
46         const enum fio_ddir ddir = io_u->ddir;
47         struct thread_data *td = sw->priv;
48         int ret;
49
50         if (td->o.serialize_overlap)
51                 check_overlap(io_u);
52
53         dprint(FD_RATE, "io_u %p queued by %u\n", io_u, gettid());
54
55         io_u_set(td, io_u, IO_U_F_NO_FILE_PUT);
56
57         td->cur_depth++;
58
59         do {
60                 ret = td_io_queue(td, io_u);
61                 if (ret != FIO_Q_BUSY)
62                         break;
63                 ret = io_u_queued_complete(td, 1);
64                 if (ret > 0)
65                         td->cur_depth -= ret;
66                 io_u_clear(td, io_u, IO_U_F_FLIGHT);
67         } while (1);
68
69         dprint(FD_RATE, "io_u %p ret %d by %u\n", io_u, ret, gettid());
70
71         io_queue_event(td, io_u, &ret, ddir, NULL, 0, NULL);
72
73         if (ret == FIO_Q_COMPLETED)
74                 td->cur_depth--;
75         else if (ret == FIO_Q_QUEUED) {
76                 unsigned int min_evts;
77
78                 if (td->o.iodepth == 1)
79                         min_evts = 1;
80                 else
81                         min_evts = 0;
82
83                 ret = io_u_queued_complete(td, min_evts);
84                 if (ret > 0)
85                         td->cur_depth -= ret;
86         } else if (ret == FIO_Q_BUSY) {
87                 ret = io_u_queued_complete(td, td->cur_depth);
88                 if (ret > 0)
89                         td->cur_depth -= ret;
90         }
91
92         return 0;
93 }
94
95 static bool io_workqueue_pre_sleep_flush_fn(struct submit_worker *sw)
96 {
97         struct thread_data *td = sw->priv;
98
99         if (td->io_u_queued || td->cur_depth || td->io_u_in_flight)
100                 return true;
101
102         return false;
103 }
104
105 static void io_workqueue_pre_sleep_fn(struct submit_worker *sw)
106 {
107         struct thread_data *td = sw->priv;
108         int ret;
109
110         ret = io_u_quiesce(td);
111         if (ret > 0)
112                 td->cur_depth -= ret;
113 }
114
115 static int io_workqueue_alloc_fn(struct submit_worker *sw)
116 {
117         struct thread_data *td;
118
119         td = calloc(1, sizeof(*td));
120         sw->priv = td;
121         return 0;
122 }
123
124 static void io_workqueue_free_fn(struct submit_worker *sw)
125 {
126         free(sw->priv);
127         sw->priv = NULL;
128 }
129
130 static int io_workqueue_init_worker_fn(struct submit_worker *sw)
131 {
132         struct thread_data *parent = sw->wq->td;
133         struct thread_data *td = sw->priv;
134
135         memcpy(&td->o, &parent->o, sizeof(td->o));
136         memcpy(&td->ts, &parent->ts, sizeof(td->ts));
137         td->o.uid = td->o.gid = -1U;
138         dup_files(td, parent);
139         td->eo = parent->eo;
140         fio_options_mem_dupe(td);
141
142         if (ioengine_load(td))
143                 goto err;
144
145         td->pid = gettid();
146
147         INIT_FLIST_HEAD(&td->io_log_list);
148         INIT_FLIST_HEAD(&td->io_hist_list);
149         INIT_FLIST_HEAD(&td->verify_list);
150         INIT_FLIST_HEAD(&td->trim_list);
151         td->io_hist_tree = RB_ROOT;
152
153         td->o.iodepth = 1;
154         if (td_io_init(td))
155                 goto err_io_init;
156
157         set_epoch_time(td, td->o.log_unix_epoch);
158         fio_getrusage(&td->ru_start);
159         clear_io_state(td, 1);
160
161         td_set_runstate(td, TD_RUNNING);
162         td->flags |= TD_F_CHILD | TD_F_NEED_LOCK;
163         td->parent = parent;
164         return 0;
165
166 err_io_init:
167         close_ioengine(td);
168 err:
169         return 1;
170
171 }
172
173 static void io_workqueue_exit_worker_fn(struct submit_worker *sw,
174                                         unsigned int *sum_cnt)
175 {
176         struct thread_data *td = sw->priv;
177
178         (*sum_cnt)++;
179         sum_thread_stats(&sw->wq->td->ts, &td->ts, *sum_cnt == 1);
180
181         fio_options_free(td);
182         close_and_free_files(td);
183         if (td->io_ops)
184                 close_ioengine(td);
185         td_set_runstate(td, TD_EXITED);
186 }
187
188 #ifdef CONFIG_SFAA
189 static void sum_val(uint64_t *dst, uint64_t *src)
190 {
191         if (*src) {
192                 __sync_fetch_and_add(dst, *src);
193                 *src = 0;
194         }
195 }
196 #else
197 static void sum_val(uint64_t *dst, uint64_t *src)
198 {
199         if (*src) {
200                 *dst += *src;
201                 *src = 0;
202         }
203 }
204 #endif
205
206 static void pthread_double_unlock(pthread_mutex_t *lock1,
207                                   pthread_mutex_t *lock2)
208 {
209 #ifndef CONFIG_SFAA
210         pthread_mutex_unlock(lock1);
211         pthread_mutex_unlock(lock2);
212 #endif
213 }
214
215 static void pthread_double_lock(pthread_mutex_t *lock1, pthread_mutex_t *lock2)
216 {
217 #ifndef CONFIG_SFAA
218         if (lock1 < lock2) {
219                 pthread_mutex_lock(lock1);
220                 pthread_mutex_lock(lock2);
221         } else {
222                 pthread_mutex_lock(lock2);
223                 pthread_mutex_lock(lock1);
224         }
225 #endif
226 }
227
228 static void sum_ddir(struct thread_data *dst, struct thread_data *src,
229                      enum fio_ddir ddir)
230 {
231         pthread_double_lock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
232
233         sum_val(&dst->io_bytes[ddir], &src->io_bytes[ddir]);
234         sum_val(&dst->io_blocks[ddir], &src->io_blocks[ddir]);
235         sum_val(&dst->this_io_blocks[ddir], &src->this_io_blocks[ddir]);
236         sum_val(&dst->this_io_bytes[ddir], &src->this_io_bytes[ddir]);
237         sum_val(&dst->bytes_done[ddir], &src->bytes_done[ddir]);
238
239         pthread_double_unlock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
240 }
241
242 static void io_workqueue_update_acct_fn(struct submit_worker *sw)
243 {
244         struct thread_data *src = sw->priv;
245         struct thread_data *dst = sw->wq->td;
246
247         if (td_read(src))
248                 sum_ddir(dst, src, DDIR_READ);
249         if (td_write(src))
250                 sum_ddir(dst, src, DDIR_WRITE);
251         if (td_trim(src))
252                 sum_ddir(dst, src, DDIR_TRIM);
253
254 }
255
256 static struct workqueue_ops rated_wq_ops = {
257         .fn                     = io_workqueue_fn,
258         .pre_sleep_flush_fn     = io_workqueue_pre_sleep_flush_fn,
259         .pre_sleep_fn           = io_workqueue_pre_sleep_fn,
260         .update_acct_fn         = io_workqueue_update_acct_fn,
261         .alloc_worker_fn        = io_workqueue_alloc_fn,
262         .free_worker_fn         = io_workqueue_free_fn,
263         .init_worker_fn         = io_workqueue_init_worker_fn,
264         .exit_worker_fn         = io_workqueue_exit_worker_fn,
265 };
266
267 int rate_submit_init(struct thread_data *td, struct sk_out *sk_out)
268 {
269         if (td->o.io_submit_mode != IO_MODE_OFFLOAD)
270                 return 0;
271
272         return workqueue_init(td, &td->io_wq, &rated_wq_ops, td->o.iodepth, sk_out);
273 }
274
275 void rate_submit_exit(struct thread_data *td)
276 {
277         if (td->o.io_submit_mode != IO_MODE_OFFLOAD)
278                 return;
279
280         workqueue_exit(&td->io_wq);
281 }