a68a3dd3a7b26bf1e4a03c03e3583c28ebfc6faa
[fio.git] / rate-submit.c
1 #include "fio.h"
2 #include "ioengine.h"
3 #include "lib/getrusage.h"
4
5 static void io_workqueue_fn(struct submit_worker *sw,
6                             struct workqueue_work *work)
7 {
8         struct io_u *io_u = container_of(work, struct io_u, work);
9         const enum fio_ddir ddir = io_u->ddir;
10         struct thread_data *td = sw->private;
11         int ret;
12
13         dprint(FD_RATE, "io_u %p queued by %u\n", io_u, gettid());
14
15         io_u_set(io_u, IO_U_F_NO_FILE_PUT);
16
17         td->cur_depth++;
18
19         do {
20                 ret = td_io_queue(td, io_u);
21                 if (ret != FIO_Q_BUSY)
22                         break;
23                 ret = io_u_queued_complete(td, 1);
24                 if (ret > 0)
25                         td->cur_depth -= ret;
26                 io_u_clear(io_u, IO_U_F_FLIGHT);
27         } while (1);
28
29         dprint(FD_RATE, "io_u %p ret %d by %u\n", io_u, ret, gettid());
30
31         io_queue_event(td, io_u, &ret, ddir, NULL, 0, NULL);
32
33         if (ret == FIO_Q_COMPLETED)
34                 td->cur_depth--;
35         else if (ret == FIO_Q_QUEUED) {
36                 unsigned int min_evts;
37
38                 if (td->o.iodepth == 1)
39                         min_evts = 1;
40                 else
41                         min_evts = 0;
42
43                 ret = io_u_queued_complete(td, min_evts);
44                 if (ret > 0)
45                         td->cur_depth -= ret;
46         } else if (ret == FIO_Q_BUSY) {
47                 ret = io_u_queued_complete(td, td->cur_depth);
48                 if (ret > 0)
49                         td->cur_depth -= ret;
50         }
51 }
52
53 static bool io_workqueue_pre_sleep_flush_fn(struct submit_worker *sw)
54 {
55         struct thread_data *td = sw->private;
56
57         if (td->io_u_queued || td->cur_depth || td->io_u_in_flight)
58                 return true;
59
60         return false;
61 }
62
63 static void io_workqueue_pre_sleep_fn(struct submit_worker *sw)
64 {
65         struct thread_data *td = sw->private;
66         int ret;
67
68         ret = io_u_quiesce(td);
69         if (ret > 0)
70                 td->cur_depth -= ret;
71 }
72
73 static int io_workqueue_alloc_fn(struct submit_worker *sw)
74 {
75         struct thread_data *td;
76
77         td = calloc(1, sizeof(*td));
78         sw->private = td;
79         return 0;
80 }
81
82 static void io_workqueue_free_fn(struct submit_worker *sw)
83 {
84         free(sw->private);
85         sw->private = NULL;
86 }
87
88 static int io_workqueue_init_worker_fn(struct submit_worker *sw)
89 {
90         struct thread_data *parent = sw->wq->td;
91         struct thread_data *td = sw->private;
92         int fio_unused ret;
93
94         memcpy(&td->o, &parent->o, sizeof(td->o));
95         memcpy(&td->ts, &parent->ts, sizeof(td->ts));
96         td->o.uid = td->o.gid = -1U;
97         dup_files(td, parent);
98         td->eo = parent->eo;
99         fio_options_mem_dupe(td);
100
101         if (ioengine_load(td))
102                 goto err;
103
104         if (td->o.odirect)
105                 td->io_ops->flags |= FIO_RAWIO;
106
107         td->pid = gettid();
108
109         INIT_FLIST_HEAD(&td->io_log_list);
110         INIT_FLIST_HEAD(&td->io_hist_list);
111         INIT_FLIST_HEAD(&td->verify_list);
112         INIT_FLIST_HEAD(&td->trim_list);
113         INIT_FLIST_HEAD(&td->next_rand_list);
114         td->io_hist_tree = RB_ROOT;
115
116         td->o.iodepth = 1;
117         if (td_io_init(td))
118                 goto err_io_init;
119
120         fio_gettime(&td->epoch, NULL);
121         fio_getrusage(&td->ru_start);
122         clear_io_state(td, 1);
123
124         td_set_runstate(td, TD_RUNNING);
125         td->flags |= TD_F_CHILD;
126         td->parent = parent;
127         return 0;
128
129 err_io_init:
130         close_ioengine(td);
131 err:
132         return 1;
133
134 }
135
136 static void io_workqueue_exit_worker_fn(struct submit_worker *sw,
137                                         unsigned int *sum_cnt)
138 {
139         struct thread_data *td = sw->private;
140
141         (*sum_cnt)++;
142         sum_thread_stats(&sw->wq->td->ts, &td->ts, *sum_cnt == 1);
143
144         fio_options_free(td);
145         close_and_free_files(td);
146         if (td->io_ops)
147                 close_ioengine(td);
148         td_set_runstate(td, TD_EXITED);
149 }
150
151 #ifdef CONFIG_SFAA
152 static void sum_val(uint64_t *dst, uint64_t *src)
153 {
154         if (*src) {
155                 __sync_fetch_and_add(dst, *src);
156                 *src = 0;
157         }
158 }
159 #else
160 static void sum_val(uint64_t *dst, uint64_t *src)
161 {
162         if (*src) {
163                 *dst += *src;
164                 *src = 0;
165         }
166 }
167 #endif
168
169 static void pthread_double_unlock(pthread_mutex_t *lock1,
170                                   pthread_mutex_t *lock2)
171 {
172 #ifndef CONFIG_SFAA
173         pthread_mutex_unlock(lock1);
174         pthread_mutex_unlock(lock2);
175 #endif
176 }
177
178 static void pthread_double_lock(pthread_mutex_t *lock1, pthread_mutex_t *lock2)
179 {
180 #ifndef CONFIG_SFAA
181         if (lock1 < lock2) {
182                 pthread_mutex_lock(lock1);
183                 pthread_mutex_lock(lock2);
184         } else {
185                 pthread_mutex_lock(lock2);
186                 pthread_mutex_lock(lock1);
187         }
188 #endif
189 }
190
191 static void sum_ddir(struct thread_data *dst, struct thread_data *src,
192                      enum fio_ddir ddir)
193 {
194         pthread_double_lock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
195
196         sum_val(&dst->io_bytes[ddir], &src->io_bytes[ddir]);
197         sum_val(&dst->io_blocks[ddir], &src->io_blocks[ddir]);
198         sum_val(&dst->this_io_blocks[ddir], &src->this_io_blocks[ddir]);
199         sum_val(&dst->this_io_bytes[ddir], &src->this_io_bytes[ddir]);
200         sum_val(&dst->bytes_done[ddir], &src->bytes_done[ddir]);
201
202         pthread_double_unlock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
203 }
204
205 static void io_workqueue_update_acct_fn(struct submit_worker *sw)
206 {
207         struct thread_data *src = sw->private;
208         struct thread_data *dst = sw->wq->td;
209
210         if (td_read(src))
211                 sum_ddir(dst, src, DDIR_READ);
212         if (td_write(src))
213                 sum_ddir(dst, src, DDIR_WRITE);
214         if (td_trim(src))
215                 sum_ddir(dst, src, DDIR_TRIM);
216
217 }
218
219 struct workqueue_ops rated_wq_ops = {
220         .fn                     = io_workqueue_fn,
221         .pre_sleep_flush_fn     = io_workqueue_pre_sleep_flush_fn,
222         .pre_sleep_fn           = io_workqueue_pre_sleep_fn,
223         .update_acct_fn         = io_workqueue_update_acct_fn,
224         .alloc_worker_fn        = io_workqueue_alloc_fn,
225         .free_worker_fn         = io_workqueue_free_fn,
226         .init_worker_fn         = io_workqueue_init_worker_fn,
227         .exit_worker_fn         = io_workqueue_exit_worker_fn,
228 };