workqueue: ensure that workqueue_enqueue() can't fail
[fio.git] / rate-submit.c
1 #include "fio.h"
2 #include "ioengine.h"
3 #include "lib/getrusage.h"
4
5 static int io_workqueue_fn(struct submit_worker *sw,
6                            struct workqueue_work *work)
7 {
8         struct io_u *io_u = container_of(work, struct io_u, work);
9         const enum fio_ddir ddir = io_u->ddir;
10         struct thread_data *td = sw->private;
11         int ret;
12
13         dprint(FD_RATE, "io_u %p queued by %u\n", io_u, gettid());
14
15         io_u_set(io_u, IO_U_F_NO_FILE_PUT);
16
17         td->cur_depth++;
18
19         do {
20                 ret = td_io_queue(td, io_u);
21                 if (ret != FIO_Q_BUSY)
22                         break;
23                 ret = io_u_queued_complete(td, 1);
24                 if (ret > 0)
25                         td->cur_depth -= ret;
26                 io_u_clear(io_u, IO_U_F_FLIGHT);
27         } while (1);
28
29         dprint(FD_RATE, "io_u %p ret %d by %u\n", io_u, ret, gettid());
30
31         io_queue_event(td, io_u, &ret, ddir, NULL, 0, NULL);
32
33         if (ret == FIO_Q_COMPLETED)
34                 td->cur_depth--;
35         else if (ret == FIO_Q_QUEUED) {
36                 unsigned int min_evts;
37
38                 if (td->o.iodepth == 1)
39                         min_evts = 1;
40                 else
41                         min_evts = 0;
42
43                 ret = io_u_queued_complete(td, min_evts);
44                 if (ret > 0)
45                         td->cur_depth -= ret;
46         } else if (ret == FIO_Q_BUSY) {
47                 ret = io_u_queued_complete(td, td->cur_depth);
48                 if (ret > 0)
49                         td->cur_depth -= ret;
50         }
51
52         return 0;
53 }
54
55 static bool io_workqueue_pre_sleep_flush_fn(struct submit_worker *sw)
56 {
57         struct thread_data *td = sw->private;
58
59         if (td->io_u_queued || td->cur_depth || td->io_u_in_flight)
60                 return true;
61
62         return false;
63 }
64
65 static void io_workqueue_pre_sleep_fn(struct submit_worker *sw)
66 {
67         struct thread_data *td = sw->private;
68         int ret;
69
70         ret = io_u_quiesce(td);
71         if (ret > 0)
72                 td->cur_depth -= ret;
73 }
74
75 static int io_workqueue_alloc_fn(struct submit_worker *sw)
76 {
77         struct thread_data *td;
78
79         td = calloc(1, sizeof(*td));
80         sw->private = td;
81         return 0;
82 }
83
84 static void io_workqueue_free_fn(struct submit_worker *sw)
85 {
86         free(sw->private);
87         sw->private = NULL;
88 }
89
90 static int io_workqueue_init_worker_fn(struct submit_worker *sw)
91 {
92         struct thread_data *parent = sw->wq->td;
93         struct thread_data *td = sw->private;
94         int fio_unused ret;
95
96         memcpy(&td->o, &parent->o, sizeof(td->o));
97         memcpy(&td->ts, &parent->ts, sizeof(td->ts));
98         td->o.uid = td->o.gid = -1U;
99         dup_files(td, parent);
100         td->eo = parent->eo;
101         fio_options_mem_dupe(td);
102
103         if (ioengine_load(td))
104                 goto err;
105
106         if (td->o.odirect)
107                 td->io_ops->flags |= FIO_RAWIO;
108
109         td->pid = gettid();
110
111         INIT_FLIST_HEAD(&td->io_log_list);
112         INIT_FLIST_HEAD(&td->io_hist_list);
113         INIT_FLIST_HEAD(&td->verify_list);
114         INIT_FLIST_HEAD(&td->trim_list);
115         INIT_FLIST_HEAD(&td->next_rand_list);
116         td->io_hist_tree = RB_ROOT;
117
118         td->o.iodepth = 1;
119         if (td_io_init(td))
120                 goto err_io_init;
121
122         fio_gettime(&td->epoch, NULL);
123         fio_getrusage(&td->ru_start);
124         clear_io_state(td, 1);
125
126         td_set_runstate(td, TD_RUNNING);
127         td->flags |= TD_F_CHILD;
128         td->parent = parent;
129         return 0;
130
131 err_io_init:
132         close_ioengine(td);
133 err:
134         return 1;
135
136 }
137
138 static void io_workqueue_exit_worker_fn(struct submit_worker *sw,
139                                         unsigned int *sum_cnt)
140 {
141         struct thread_data *td = sw->private;
142
143         (*sum_cnt)++;
144         sum_thread_stats(&sw->wq->td->ts, &td->ts, *sum_cnt == 1);
145
146         fio_options_free(td);
147         close_and_free_files(td);
148         if (td->io_ops)
149                 close_ioengine(td);
150         td_set_runstate(td, TD_EXITED);
151 }
152
153 #ifdef CONFIG_SFAA
154 static void sum_val(uint64_t *dst, uint64_t *src)
155 {
156         if (*src) {
157                 __sync_fetch_and_add(dst, *src);
158                 *src = 0;
159         }
160 }
161 #else
162 static void sum_val(uint64_t *dst, uint64_t *src)
163 {
164         if (*src) {
165                 *dst += *src;
166                 *src = 0;
167         }
168 }
169 #endif
170
171 static void pthread_double_unlock(pthread_mutex_t *lock1,
172                                   pthread_mutex_t *lock2)
173 {
174 #ifndef CONFIG_SFAA
175         pthread_mutex_unlock(lock1);
176         pthread_mutex_unlock(lock2);
177 #endif
178 }
179
180 static void pthread_double_lock(pthread_mutex_t *lock1, pthread_mutex_t *lock2)
181 {
182 #ifndef CONFIG_SFAA
183         if (lock1 < lock2) {
184                 pthread_mutex_lock(lock1);
185                 pthread_mutex_lock(lock2);
186         } else {
187                 pthread_mutex_lock(lock2);
188                 pthread_mutex_lock(lock1);
189         }
190 #endif
191 }
192
193 static void sum_ddir(struct thread_data *dst, struct thread_data *src,
194                      enum fio_ddir ddir)
195 {
196         pthread_double_lock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
197
198         sum_val(&dst->io_bytes[ddir], &src->io_bytes[ddir]);
199         sum_val(&dst->io_blocks[ddir], &src->io_blocks[ddir]);
200         sum_val(&dst->this_io_blocks[ddir], &src->this_io_blocks[ddir]);
201         sum_val(&dst->this_io_bytes[ddir], &src->this_io_bytes[ddir]);
202         sum_val(&dst->bytes_done[ddir], &src->bytes_done[ddir]);
203
204         pthread_double_unlock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
205 }
206
207 static void io_workqueue_update_acct_fn(struct submit_worker *sw)
208 {
209         struct thread_data *src = sw->private;
210         struct thread_data *dst = sw->wq->td;
211
212         if (td_read(src))
213                 sum_ddir(dst, src, DDIR_READ);
214         if (td_write(src))
215                 sum_ddir(dst, src, DDIR_WRITE);
216         if (td_trim(src))
217                 sum_ddir(dst, src, DDIR_TRIM);
218
219 }
220
221 struct workqueue_ops rated_wq_ops = {
222         .fn                     = io_workqueue_fn,
223         .pre_sleep_flush_fn     = io_workqueue_pre_sleep_flush_fn,
224         .pre_sleep_fn           = io_workqueue_pre_sleep_fn,
225         .update_acct_fn         = io_workqueue_update_acct_fn,
226         .alloc_worker_fn        = io_workqueue_alloc_fn,
227         .free_worker_fn         = io_workqueue_free_fn,
228         .init_worker_fn         = io_workqueue_init_worker_fn,
229         .exit_worker_fn         = io_workqueue_exit_worker_fn,
230 };