Merge branch 'master' of ssh://brick.kernel.dk/data/git/fio
[fio.git] / engines / libaio.c
1 /*
2  * libaio engine
3  *
4  * IO engine using the Linux native aio interface.
5  *
6  */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <errno.h>
11 #include <assert.h>
12
13 #include "../fio.h"
14
15 #ifdef FIO_HAVE_LIBAIO
16
17 struct libaio_data {
18         io_context_t aio_ctx;
19         struct io_event *aio_events;
20         struct iocb **iocbs;
21         struct io_u **io_us;
22         int iocbs_nr;
23 };
24
25 struct libaio_options {
26         struct thread_data *td;
27         unsigned int userspace_reap;
28 };
29
30 static struct fio_option options[] = {
31         {
32                 .name   = "userspace_reap",
33                 .type   = FIO_OPT_STR_SET,
34                 .off1   = offsetof(struct libaio_options, userspace_reap),
35                 .help   = "Use alternative user-space reap implementation",
36         },
37         {
38                 .name   = NULL,
39         },
40 };
41
42 static int fio_libaio_prep(struct thread_data fio_unused *td, struct io_u *io_u)
43 {
44         struct fio_file *f = io_u->file;
45
46         if (io_u->ddir == DDIR_READ)
47                 io_prep_pread(&io_u->iocb, f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
48         else if (io_u->ddir == DDIR_WRITE)
49                 io_prep_pwrite(&io_u->iocb, f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
50         else if (ddir_sync(io_u->ddir))
51                 io_prep_fsync(&io_u->iocb, f->fd);
52
53         return 0;
54 }
55
56 static struct io_u *fio_libaio_event(struct thread_data *td, int event)
57 {
58         struct libaio_data *ld = td->io_ops->data;
59         struct io_event *ev;
60         struct io_u *io_u;
61
62         ev = ld->aio_events + event;
63         io_u = container_of(ev->obj, struct io_u, iocb);
64
65         if (ev->res != io_u->xfer_buflen) {
66                 if (ev->res > io_u->xfer_buflen)
67                         io_u->error = -ev->res;
68                 else
69                         io_u->resid = io_u->xfer_buflen - ev->res;
70         } else
71                 io_u->error = 0;
72
73         return io_u;
74 }
75
76 struct aio_ring {
77         unsigned id;             /** kernel internal index number */
78         unsigned nr;             /** number of io_events */
79         unsigned head;
80         unsigned tail;
81
82         unsigned magic;
83         unsigned compat_features;
84         unsigned incompat_features;
85         unsigned header_length; /** size of aio_ring */
86
87         struct io_event events[0];
88 };
89
90 #define AIO_RING_MAGIC  0xa10a10a1
91
92 static int user_io_getevents(io_context_t aio_ctx, unsigned int max,
93                              struct io_event *events)
94 {
95         long i = 0;
96         unsigned head;
97         struct aio_ring *ring = (struct aio_ring*) aio_ctx;
98
99         while (i < max) {
100                 head = ring->head;
101
102                 if (head == ring->tail) {
103                         /* There are no more completions */
104                         break;
105                 } else {
106                         /* There is another completion to reap */
107                         events[i] = ring->events[head];
108                         read_barrier();
109                         ring->head = (head + 1) % ring->nr;
110                         i++;
111                 }
112         }
113
114         return i;
115 }
116
117 static int fio_libaio_getevents(struct thread_data *td, unsigned int min,
118                                 unsigned int max, struct timespec *t)
119 {
120         struct libaio_data *ld = td->io_ops->data;
121         struct libaio_options *o = td->eo;
122         unsigned actual_min = td->o.iodepth_batch_complete == 0 ? 0 : min;
123         int r, events = 0;
124
125         do {
126                 if (o->userspace_reap == 1
127                     && actual_min == 0
128                     && ((struct aio_ring *)(ld->aio_ctx))->magic
129                                 == AIO_RING_MAGIC) {
130                         r = user_io_getevents(ld->aio_ctx, max,
131                                 ld->aio_events + events);
132                 } else {
133                         r = io_getevents(ld->aio_ctx, actual_min,
134                                 max, ld->aio_events + events, t);
135                 }
136                 if (r >= 0)
137                         events += r;
138                 else if (r == -EAGAIN)
139                         usleep(100);
140         } while (events < min);
141
142         return r < 0 ? r : events;
143 }
144
145 static int fio_libaio_queue(struct thread_data *td, struct io_u *io_u)
146 {
147         struct libaio_data *ld = td->io_ops->data;
148
149         fio_ro_check(td, io_u);
150
151         if (ld->iocbs_nr == (int) td->o.iodepth)
152                 return FIO_Q_BUSY;
153
154         /*
155          * fsync is tricky, since it can fail and we need to do it
156          * serialized with other io. the reason is that linux doesn't
157          * support aio fsync yet. So return busy for the case where we
158          * have pending io, to let fio complete those first.
159          */
160         if (ddir_sync(io_u->ddir)) {
161                 if (ld->iocbs_nr)
162                         return FIO_Q_BUSY;
163
164                 do_io_u_sync(td, io_u);
165                 return FIO_Q_COMPLETED;
166         }
167
168         if (io_u->ddir == DDIR_TRIM) {
169                 if (ld->iocbs_nr)
170                         return FIO_Q_BUSY;
171
172                 do_io_u_trim(td, io_u);
173                 return FIO_Q_COMPLETED;
174         }
175
176         ld->iocbs[ld->iocbs_nr] = &io_u->iocb;
177         ld->io_us[ld->iocbs_nr] = io_u;
178         ld->iocbs_nr++;
179         return FIO_Q_QUEUED;
180 }
181
182 static void fio_libaio_queued(struct thread_data *td, struct io_u **io_us,
183                               unsigned int nr)
184 {
185         struct timeval now;
186         unsigned int i;
187
188         if (!fio_fill_issue_time(td))
189                 return;
190
191         fio_gettime(&now, NULL);
192
193         for (i = 0; i < nr; i++) {
194                 struct io_u *io_u = io_us[i];
195
196                 memcpy(&io_u->issue_time, &now, sizeof(now));
197                 io_u_queued(td, io_u);
198         }
199 }
200
201 static int fio_libaio_commit(struct thread_data *td)
202 {
203         struct libaio_data *ld = td->io_ops->data;
204         struct iocb **iocbs;
205         struct io_u **io_us;
206         int ret;
207
208         if (!ld->iocbs_nr)
209                 return 0;
210
211         io_us = ld->io_us;
212         iocbs = ld->iocbs;
213         do {
214                 ret = io_submit(ld->aio_ctx, ld->iocbs_nr, iocbs);
215                 if (ret > 0) {
216                         fio_libaio_queued(td, io_us, ret);
217                         io_u_mark_submit(td, ret);
218                         ld->iocbs_nr -= ret;
219                         io_us += ret;
220                         iocbs += ret;
221                         ret = 0;
222                 } else if (!ret || ret == -EAGAIN || ret == -EINTR) {
223                         if (!ret)
224                                 io_u_mark_submit(td, ret);
225                         continue;
226                 } else
227                         break;
228         } while (ld->iocbs_nr);
229
230         return ret;
231 }
232
233 static int fio_libaio_cancel(struct thread_data *td, struct io_u *io_u)
234 {
235         struct libaio_data *ld = td->io_ops->data;
236
237         return io_cancel(ld->aio_ctx, &io_u->iocb, ld->aio_events);
238 }
239
240 static void fio_libaio_cleanup(struct thread_data *td)
241 {
242         struct libaio_data *ld = td->io_ops->data;
243
244         if (ld) {
245                 io_destroy(ld->aio_ctx);
246                 free(ld->aio_events);
247                 free(ld->iocbs);
248                 free(ld->io_us);
249                 free(ld);
250         }
251 }
252
253 static int fio_libaio_init(struct thread_data *td)
254 {
255         struct libaio_data *ld = malloc(sizeof(*ld));
256         struct libaio_options *o = td->eo;
257         int err = 0;
258
259         memset(ld, 0, sizeof(*ld));
260
261         /*
262          * First try passing in 0 for queue depth, since we don't
263          * care about the user ring. If that fails, the kernel is too old
264          * and we need the right depth.
265          */
266         if (!o->userspace_reap)
267                 err = io_queue_init(INT_MAX, &ld->aio_ctx);
268         if (o->userspace_reap || err == -EINVAL)
269                 err = io_queue_init(td->o.iodepth, &ld->aio_ctx);
270         if (err) {
271                 td_verror(td, -err, "io_queue_init");
272                 log_err("fio: check /proc/sys/fs/aio-max-nr\n");
273                 free(ld);
274                 return 1;
275         }
276
277         ld->aio_events = malloc(td->o.iodepth * sizeof(struct io_event));
278         memset(ld->aio_events, 0, td->o.iodepth * sizeof(struct io_event));
279         ld->iocbs = malloc(td->o.iodepth * sizeof(struct iocb *));
280         memset(ld->iocbs, 0, sizeof(struct iocb *));
281         ld->io_us = malloc(td->o.iodepth * sizeof(struct io_u *));
282         memset(ld->io_us, 0, td->o.iodepth * sizeof(struct io_u *));
283         ld->iocbs_nr = 0;
284
285         td->io_ops->data = ld;
286         return 0;
287 }
288
289 static struct ioengine_ops ioengine = {
290         .name                   = "libaio",
291         .version                = FIO_IOOPS_VERSION,
292         .init                   = fio_libaio_init,
293         .prep                   = fio_libaio_prep,
294         .queue                  = fio_libaio_queue,
295         .commit                 = fio_libaio_commit,
296         .cancel                 = fio_libaio_cancel,
297         .getevents              = fio_libaio_getevents,
298         .event                  = fio_libaio_event,
299         .cleanup                = fio_libaio_cleanup,
300         .open_file              = generic_open_file,
301         .close_file             = generic_close_file,
302         .get_file_size          = generic_get_file_size,
303         .options                = options,
304         .option_struct_size     = sizeof(struct libaio_options),
305 };
306
307 #else /* FIO_HAVE_LIBAIO */
308
309 /*
310  * When we have a proper configure system in place, we simply wont build
311  * and install this io engine. For now install a crippled version that
312  * just complains and fails to load.
313  */
314 static int fio_libaio_init(struct thread_data fio_unused *td)
315 {
316         log_err("fio: libaio not available\n");
317         return 1;
318 }
319
320 static struct ioengine_ops ioengine = {
321         .name           = "libaio",
322         .version        = FIO_IOOPS_VERSION,
323         .init           = fio_libaio_init,
324 };
325
326 #endif
327
328 static void fio_init fio_libaio_register(void)
329 {
330         register_ioengine(&ioengine);
331 }
332
333 static void fio_exit fio_libaio_unregister(void)
334 {
335         unregister_ioengine(&ioengine);
336 }