Kill now unused lockfile_batch variable
[fio.git] / engines / libaio.c
1 /*
2  * libaio engine
3  *
4  * IO engine using the Linux native aio interface.
5  *
6  */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <errno.h>
11 #include <assert.h>
12 #include <libaio.h>
13
14 #include "../fio.h"
15
16 struct libaio_data {
17         io_context_t aio_ctx;
18         struct io_event *aio_events;
19         struct iocb **iocbs;
20         struct io_u **io_us;
21         int iocbs_nr;
22 };
23
24 struct libaio_options {
25         struct thread_data *td;
26         unsigned int userspace_reap;
27 };
28
29 static struct fio_option options[] = {
30         {
31                 .name   = "userspace_reap",
32                 .type   = FIO_OPT_STR_SET,
33                 .off1   = offsetof(struct libaio_options, userspace_reap),
34                 .help   = "Use alternative user-space reap implementation",
35         },
36         {
37                 .name   = NULL,
38         },
39 };
40
41 static int fio_libaio_prep(struct thread_data fio_unused *td, struct io_u *io_u)
42 {
43         struct fio_file *f = io_u->file;
44
45         if (io_u->ddir == DDIR_READ)
46                 io_prep_pread(&io_u->iocb, f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
47         else if (io_u->ddir == DDIR_WRITE)
48                 io_prep_pwrite(&io_u->iocb, f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
49         else if (ddir_sync(io_u->ddir))
50                 io_prep_fsync(&io_u->iocb, f->fd);
51
52         return 0;
53 }
54
55 static struct io_u *fio_libaio_event(struct thread_data *td, int event)
56 {
57         struct libaio_data *ld = td->io_ops->data;
58         struct io_event *ev;
59         struct io_u *io_u;
60
61         ev = ld->aio_events + event;
62         io_u = container_of(ev->obj, struct io_u, iocb);
63
64         if (ev->res != io_u->xfer_buflen) {
65                 if (ev->res > io_u->xfer_buflen)
66                         io_u->error = -ev->res;
67                 else
68                         io_u->resid = io_u->xfer_buflen - ev->res;
69         } else
70                 io_u->error = 0;
71
72         return io_u;
73 }
74
75 struct aio_ring {
76         unsigned id;             /** kernel internal index number */
77         unsigned nr;             /** number of io_events */
78         unsigned head;
79         unsigned tail;
80
81         unsigned magic;
82         unsigned compat_features;
83         unsigned incompat_features;
84         unsigned header_length; /** size of aio_ring */
85
86         struct io_event events[0];
87 };
88
89 #define AIO_RING_MAGIC  0xa10a10a1
90
91 static int user_io_getevents(io_context_t aio_ctx, unsigned int max,
92                              struct io_event *events)
93 {
94         long i = 0;
95         unsigned head;
96         struct aio_ring *ring = (struct aio_ring*) aio_ctx;
97
98         while (i < max) {
99                 head = ring->head;
100
101                 if (head == ring->tail) {
102                         /* There are no more completions */
103                         break;
104                 } else {
105                         /* There is another completion to reap */
106                         events[i] = ring->events[head];
107                         read_barrier();
108                         ring->head = (head + 1) % ring->nr;
109                         i++;
110                 }
111         }
112
113         return i;
114 }
115
116 static int fio_libaio_getevents(struct thread_data *td, unsigned int min,
117                                 unsigned int max, struct timespec *t)
118 {
119         struct libaio_data *ld = td->io_ops->data;
120         struct libaio_options *o = td->eo;
121         unsigned actual_min = td->o.iodepth_batch_complete == 0 ? 0 : min;
122         int r, events = 0;
123
124         do {
125                 if (o->userspace_reap == 1
126                     && actual_min == 0
127                     && ((struct aio_ring *)(ld->aio_ctx))->magic
128                                 == AIO_RING_MAGIC) {
129                         r = user_io_getevents(ld->aio_ctx, max,
130                                 ld->aio_events + events);
131                 } else {
132                         r = io_getevents(ld->aio_ctx, actual_min,
133                                 max, ld->aio_events + events, t);
134                 }
135                 if (r >= 0)
136                         events += r;
137                 else if (r == -EAGAIN)
138                         usleep(100);
139         } while (events < min);
140
141         return r < 0 ? r : events;
142 }
143
144 static int fio_libaio_queue(struct thread_data *td, struct io_u *io_u)
145 {
146         struct libaio_data *ld = td->io_ops->data;
147
148         fio_ro_check(td, io_u);
149
150         if (ld->iocbs_nr == (int) td->o.iodepth)
151                 return FIO_Q_BUSY;
152
153         /*
154          * fsync is tricky, since it can fail and we need to do it
155          * serialized with other io. the reason is that linux doesn't
156          * support aio fsync yet. So return busy for the case where we
157          * have pending io, to let fio complete those first.
158          */
159         if (ddir_sync(io_u->ddir)) {
160                 if (ld->iocbs_nr)
161                         return FIO_Q_BUSY;
162
163                 do_io_u_sync(td, io_u);
164                 return FIO_Q_COMPLETED;
165         }
166
167         if (io_u->ddir == DDIR_TRIM) {
168                 if (ld->iocbs_nr)
169                         return FIO_Q_BUSY;
170
171                 do_io_u_trim(td, io_u);
172                 return FIO_Q_COMPLETED;
173         }
174
175         ld->iocbs[ld->iocbs_nr] = &io_u->iocb;
176         ld->io_us[ld->iocbs_nr] = io_u;
177         ld->iocbs_nr++;
178         return FIO_Q_QUEUED;
179 }
180
181 static void fio_libaio_queued(struct thread_data *td, struct io_u **io_us,
182                               unsigned int nr)
183 {
184         struct timeval now;
185         unsigned int i;
186
187         if (!fio_fill_issue_time(td))
188                 return;
189
190         fio_gettime(&now, NULL);
191
192         for (i = 0; i < nr; i++) {
193                 struct io_u *io_u = io_us[i];
194
195                 memcpy(&io_u->issue_time, &now, sizeof(now));
196                 io_u_queued(td, io_u);
197         }
198 }
199
200 static int fio_libaio_commit(struct thread_data *td)
201 {
202         struct libaio_data *ld = td->io_ops->data;
203         struct iocb **iocbs;
204         struct io_u **io_us;
205         int ret;
206
207         if (!ld->iocbs_nr)
208                 return 0;
209
210         io_us = ld->io_us;
211         iocbs = ld->iocbs;
212         do {
213                 ret = io_submit(ld->aio_ctx, ld->iocbs_nr, iocbs);
214                 if (ret > 0) {
215                         fio_libaio_queued(td, io_us, ret);
216                         io_u_mark_submit(td, ret);
217                         ld->iocbs_nr -= ret;
218                         io_us += ret;
219                         iocbs += ret;
220                         ret = 0;
221                 } else if (!ret || ret == -EAGAIN || ret == -EINTR) {
222                         if (!ret)
223                                 io_u_mark_submit(td, ret);
224                         continue;
225                 } else
226                         break;
227         } while (ld->iocbs_nr);
228
229         return ret;
230 }
231
232 static int fio_libaio_cancel(struct thread_data *td, struct io_u *io_u)
233 {
234         struct libaio_data *ld = td->io_ops->data;
235
236         return io_cancel(ld->aio_ctx, &io_u->iocb, ld->aio_events);
237 }
238
239 static void fio_libaio_cleanup(struct thread_data *td)
240 {
241         struct libaio_data *ld = td->io_ops->data;
242
243         if (ld) {
244                 io_destroy(ld->aio_ctx);
245                 free(ld->aio_events);
246                 free(ld->iocbs);
247                 free(ld->io_us);
248                 free(ld);
249         }
250 }
251
252 static int fio_libaio_init(struct thread_data *td)
253 {
254         struct libaio_data *ld = malloc(sizeof(*ld));
255         struct libaio_options *o = td->eo;
256         int err = 0;
257
258         memset(ld, 0, sizeof(*ld));
259
260         /*
261          * First try passing in 0 for queue depth, since we don't
262          * care about the user ring. If that fails, the kernel is too old
263          * and we need the right depth.
264          */
265         if (!o->userspace_reap)
266                 err = io_queue_init(INT_MAX, &ld->aio_ctx);
267         if (o->userspace_reap || err == -EINVAL)
268                 err = io_queue_init(td->o.iodepth, &ld->aio_ctx);
269         if (err) {
270                 td_verror(td, -err, "io_queue_init");
271                 log_err("fio: check /proc/sys/fs/aio-max-nr\n");
272                 free(ld);
273                 return 1;
274         }
275
276         ld->aio_events = malloc(td->o.iodepth * sizeof(struct io_event));
277         memset(ld->aio_events, 0, td->o.iodepth * sizeof(struct io_event));
278         ld->iocbs = malloc(td->o.iodepth * sizeof(struct iocb *));
279         memset(ld->iocbs, 0, sizeof(struct iocb *));
280         ld->io_us = malloc(td->o.iodepth * sizeof(struct io_u *));
281         memset(ld->io_us, 0, td->o.iodepth * sizeof(struct io_u *));
282         ld->iocbs_nr = 0;
283
284         td->io_ops->data = ld;
285         return 0;
286 }
287
288 static struct ioengine_ops ioengine = {
289         .name                   = "libaio",
290         .version                = FIO_IOOPS_VERSION,
291         .init                   = fio_libaio_init,
292         .prep                   = fio_libaio_prep,
293         .queue                  = fio_libaio_queue,
294         .commit                 = fio_libaio_commit,
295         .cancel                 = fio_libaio_cancel,
296         .getevents              = fio_libaio_getevents,
297         .event                  = fio_libaio_event,
298         .cleanup                = fio_libaio_cleanup,
299         .open_file              = generic_open_file,
300         .close_file             = generic_close_file,
301         .get_file_size          = generic_get_file_size,
302         .options                = options,
303         .option_struct_size     = sizeof(struct libaio_options),
304 };
305
306 static void fio_init fio_libaio_register(void)
307 {
308         register_ioengine(&ioengine);
309 }
310
311 static void fio_exit fio_libaio_unregister(void)
312 {
313         unregister_ioengine(&ioengine);
314 }