Merge branch 'samples-colnames' of https://github.com/parallel-fs-utils/fio
[fio.git] / rate-submit.c
1 /*
2  * Rated submission helpers
3  *
4  * Copyright (C) 2015 Jens Axboe <axboe@kernel.dk>
5  *
6  */
7 #include "fio.h"
8 #include "ioengines.h"
9 #include "lib/getrusage.h"
10 #include "rate-submit.h"
11
12 static void check_overlap(struct io_u *io_u)
13 {
14         int i;
15         struct thread_data *td;
16         bool overlap = false;
17
18         do {
19                 /*
20                  * Allow only one thread to check for overlap at a
21                  * time to prevent two threads from thinking the coast
22                  * is clear and then submitting IOs that overlap with
23                  * each other
24                  */
25                 pthread_mutex_lock(&overlap_check);
26                 for_each_td(td, i) {
27                         if (td->runstate <= TD_SETTING_UP ||
28                                 td->runstate >= TD_FINISHING ||
29                                 !td->o.serialize_overlap ||
30                                 td->o.io_submit_mode != IO_MODE_OFFLOAD)
31                                 continue;
32
33                         overlap = in_flight_overlap(&td->io_u_all, io_u);
34                         if (overlap) {
35                                 pthread_mutex_unlock(&overlap_check);
36                                 break;
37                         }
38                 }
39         } while (overlap);
40 }
41
42 static int io_workqueue_fn(struct submit_worker *sw,
43                            struct workqueue_work *work)
44 {
45         struct io_u *io_u = container_of(work, struct io_u, work);
46         const enum fio_ddir ddir = io_u->ddir;
47         struct thread_data *td = sw->priv;
48         int ret;
49
50         if (td->o.serialize_overlap)
51                 check_overlap(io_u);
52
53         dprint(FD_RATE, "io_u %p queued by %u\n", io_u, gettid());
54
55         io_u_set(td, io_u, IO_U_F_NO_FILE_PUT);
56
57         td->cur_depth++;
58
59         do {
60                 ret = td_io_queue(td, io_u);
61                 if (ret != FIO_Q_BUSY)
62                         break;
63                 ret = io_u_queued_complete(td, 1);
64                 if (ret > 0)
65                         td->cur_depth -= ret;
66                 io_u_clear(td, io_u, IO_U_F_FLIGHT);
67         } while (1);
68
69         dprint(FD_RATE, "io_u %p ret %d by %u\n", io_u, ret, gettid());
70
71         io_queue_event(td, io_u, &ret, ddir, NULL, 0, NULL);
72
73         if (ret == FIO_Q_COMPLETED)
74                 td->cur_depth--;
75         else if (ret == FIO_Q_QUEUED) {
76                 unsigned int min_evts;
77
78                 if (td->o.iodepth == 1)
79                         min_evts = 1;
80                 else
81                         min_evts = 0;
82
83                 ret = io_u_queued_complete(td, min_evts);
84                 if (ret > 0)
85                         td->cur_depth -= ret;
86         }
87
88         return 0;
89 }
90
91 static bool io_workqueue_pre_sleep_flush_fn(struct submit_worker *sw)
92 {
93         struct thread_data *td = sw->priv;
94
95         if (td->io_u_queued || td->cur_depth || td->io_u_in_flight)
96                 return true;
97
98         return false;
99 }
100
101 static void io_workqueue_pre_sleep_fn(struct submit_worker *sw)
102 {
103         struct thread_data *td = sw->priv;
104         int ret;
105
106         ret = io_u_quiesce(td);
107         if (ret > 0)
108                 td->cur_depth -= ret;
109 }
110
111 static int io_workqueue_alloc_fn(struct submit_worker *sw)
112 {
113         struct thread_data *td;
114
115         td = calloc(1, sizeof(*td));
116         sw->priv = td;
117         return 0;
118 }
119
120 static void io_workqueue_free_fn(struct submit_worker *sw)
121 {
122         free(sw->priv);
123         sw->priv = NULL;
124 }
125
126 static int io_workqueue_init_worker_fn(struct submit_worker *sw)
127 {
128         struct thread_data *parent = sw->wq->td;
129         struct thread_data *td = sw->priv;
130
131         memcpy(&td->o, &parent->o, sizeof(td->o));
132         memcpy(&td->ts, &parent->ts, sizeof(td->ts));
133         td->o.uid = td->o.gid = -1U;
134         dup_files(td, parent);
135         td->eo = parent->eo;
136         fio_options_mem_dupe(td);
137
138         if (ioengine_load(td))
139                 goto err;
140
141         td->pid = gettid();
142
143         INIT_FLIST_HEAD(&td->io_log_list);
144         INIT_FLIST_HEAD(&td->io_hist_list);
145         INIT_FLIST_HEAD(&td->verify_list);
146         INIT_FLIST_HEAD(&td->trim_list);
147         td->io_hist_tree = RB_ROOT;
148
149         td->o.iodepth = 1;
150         if (td_io_init(td))
151                 goto err_io_init;
152
153         set_epoch_time(td, td->o.log_unix_epoch);
154         fio_getrusage(&td->ru_start);
155         clear_io_state(td, 1);
156
157         td_set_runstate(td, TD_RUNNING);
158         td->flags |= TD_F_CHILD | TD_F_NEED_LOCK;
159         td->parent = parent;
160         return 0;
161
162 err_io_init:
163         close_ioengine(td);
164 err:
165         return 1;
166
167 }
168
169 static void io_workqueue_exit_worker_fn(struct submit_worker *sw,
170                                         unsigned int *sum_cnt)
171 {
172         struct thread_data *td = sw->priv;
173
174         (*sum_cnt)++;
175         sum_thread_stats(&sw->wq->td->ts, &td->ts, *sum_cnt == 1);
176
177         fio_options_free(td);
178         close_and_free_files(td);
179         if (td->io_ops)
180                 close_ioengine(td);
181         td_set_runstate(td, TD_EXITED);
182 }
183
184 #ifdef CONFIG_SFAA
185 static void sum_val(uint64_t *dst, uint64_t *src)
186 {
187         if (*src) {
188                 __sync_fetch_and_add(dst, *src);
189                 *src = 0;
190         }
191 }
192 #else
193 static void sum_val(uint64_t *dst, uint64_t *src)
194 {
195         if (*src) {
196                 *dst += *src;
197                 *src = 0;
198         }
199 }
200 #endif
201
202 static void pthread_double_unlock(pthread_mutex_t *lock1,
203                                   pthread_mutex_t *lock2)
204 {
205 #ifndef CONFIG_SFAA
206         pthread_mutex_unlock(lock1);
207         pthread_mutex_unlock(lock2);
208 #endif
209 }
210
211 static void pthread_double_lock(pthread_mutex_t *lock1, pthread_mutex_t *lock2)
212 {
213 #ifndef CONFIG_SFAA
214         if (lock1 < lock2) {
215                 pthread_mutex_lock(lock1);
216                 pthread_mutex_lock(lock2);
217         } else {
218                 pthread_mutex_lock(lock2);
219                 pthread_mutex_lock(lock1);
220         }
221 #endif
222 }
223
224 static void sum_ddir(struct thread_data *dst, struct thread_data *src,
225                      enum fio_ddir ddir)
226 {
227         pthread_double_lock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
228
229         sum_val(&dst->io_bytes[ddir], &src->io_bytes[ddir]);
230         sum_val(&dst->io_blocks[ddir], &src->io_blocks[ddir]);
231         sum_val(&dst->this_io_blocks[ddir], &src->this_io_blocks[ddir]);
232         sum_val(&dst->this_io_bytes[ddir], &src->this_io_bytes[ddir]);
233         sum_val(&dst->bytes_done[ddir], &src->bytes_done[ddir]);
234
235         pthread_double_unlock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
236 }
237
238 static void io_workqueue_update_acct_fn(struct submit_worker *sw)
239 {
240         struct thread_data *src = sw->priv;
241         struct thread_data *dst = sw->wq->td;
242
243         if (td_read(src))
244                 sum_ddir(dst, src, DDIR_READ);
245         if (td_write(src))
246                 sum_ddir(dst, src, DDIR_WRITE);
247         if (td_trim(src))
248                 sum_ddir(dst, src, DDIR_TRIM);
249
250 }
251
252 static struct workqueue_ops rated_wq_ops = {
253         .fn                     = io_workqueue_fn,
254         .pre_sleep_flush_fn     = io_workqueue_pre_sleep_flush_fn,
255         .pre_sleep_fn           = io_workqueue_pre_sleep_fn,
256         .update_acct_fn         = io_workqueue_update_acct_fn,
257         .alloc_worker_fn        = io_workqueue_alloc_fn,
258         .free_worker_fn         = io_workqueue_free_fn,
259         .init_worker_fn         = io_workqueue_init_worker_fn,
260         .exit_worker_fn         = io_workqueue_exit_worker_fn,
261 };
262
263 int rate_submit_init(struct thread_data *td, struct sk_out *sk_out)
264 {
265         if (td->o.io_submit_mode != IO_MODE_OFFLOAD)
266                 return 0;
267
268         return workqueue_init(td, &td->io_wq, &rated_wq_ops, td->o.iodepth, sk_out);
269 }
270
271 void rate_submit_exit(struct thread_data *td)
272 {
273         if (td->o.io_submit_mode != IO_MODE_OFFLOAD)
274                 return;
275
276         workqueue_exit(&td->io_wq);
277 }