engines/libaio: increase RLIMIT_MEMLOCK for user buffers
[fio.git] / engines / libaio.c
1 /*
2  * libaio engine
3  *
4  * IO engine using the Linux native aio interface.
5  *
6  */
7 #include <stdlib.h>
8 #include <unistd.h>
9 #include <errno.h>
10 #include <libaio.h>
11 #include <sys/time.h>
12 #include <sys/resource.h>
13
14 #include "../fio.h"
15 #include "../lib/pow2.h"
16 #include "../optgroup.h"
17 #include "../lib/memalign.h"
18
19 #ifndef IOCB_FLAG_HIPRI
20 #define IOCB_FLAG_HIPRI (1 << 2)
21 #endif
22
23 #ifndef IOCTX_FLAG_USERIOCB
24 #define IOCTX_FLAG_USERIOCB     (1 << 0)
25 #endif
26 #ifndef IOCTX_FLAG_IOPOLL
27 #define IOCTX_FLAG_IOPOLL       (1 << 1)
28 #endif
29 #ifndef IOCTX_FLAG_FIXEDBUFS
30 #define IOCTX_FLAG_FIXEDBUFS    (1 << 2)
31 #endif
32
33 static int fio_libaio_commit(struct thread_data *td);
34
35 struct libaio_data {
36         io_context_t aio_ctx;
37         struct io_event *aio_events;
38         struct iocb **iocbs;
39         struct io_u **io_us;
40
41         struct iocb *user_iocbs;
42         struct io_u **io_u_index;
43
44         /*
45          * Basic ring buffer. 'head' is incremented in _queue(), and
46          * 'tail' is incremented in _commit(). We keep 'queued' so
47          * that we know if the ring is full or empty, when
48          * 'head' == 'tail'. 'entries' is the ring size, and
49          * 'is_pow2' is just an optimization to use AND instead of
50          * modulus to get the remainder on ring increment.
51          */
52         int is_pow2;
53         unsigned int entries;
54         unsigned int queued;
55         unsigned int head;
56         unsigned int tail;
57 };
58
59 struct libaio_options {
60         void *pad;
61         unsigned int userspace_reap;
62         unsigned int hipri;
63         unsigned int useriocb;
64         unsigned int fixedbufs;
65 };
66
67 static struct fio_option options[] = {
68         {
69                 .name   = "userspace_reap",
70                 .lname  = "Libaio userspace reaping",
71                 .type   = FIO_OPT_STR_SET,
72                 .off1   = offsetof(struct libaio_options, userspace_reap),
73                 .help   = "Use alternative user-space reap implementation",
74                 .category = FIO_OPT_C_ENGINE,
75                 .group  = FIO_OPT_G_LIBAIO,
76         },
77         {
78                 .name   = "hipri",
79                 .lname  = "High Priority",
80                 .type   = FIO_OPT_STR_SET,
81                 .off1   = offsetof(struct libaio_options, hipri),
82                 .help   = "Use polled IO completions",
83                 .category = FIO_OPT_C_ENGINE,
84                 .group  = FIO_OPT_G_LIBAIO,
85         },
86         {
87                 .name   = "useriocb",
88                 .lname  = "User IOCBs",
89                 .type   = FIO_OPT_STR_SET,
90                 .off1   = offsetof(struct libaio_options, useriocb),
91                 .help   = "Use user mapped IOCBs",
92                 .category = FIO_OPT_C_ENGINE,
93                 .group  = FIO_OPT_G_LIBAIO,
94         },
95         {
96                 .name   = "fixedbufs",
97                 .lname  = "Fixed (pre-mapped) IO buffers",
98                 .type   = FIO_OPT_STR_SET,
99                 .off1   = offsetof(struct libaio_options, fixedbufs),
100                 .help   = "Pre map IO buffers",
101                 .category = FIO_OPT_C_ENGINE,
102                 .group  = FIO_OPT_G_LIBAIO,
103         },
104         {
105                 .name   = NULL,
106         },
107 };
108
109 static inline void ring_inc(struct libaio_data *ld, unsigned int *val,
110                             unsigned int add)
111 {
112         if (ld->is_pow2)
113                 *val = (*val + add) & (ld->entries - 1);
114         else
115                 *val = (*val + add) % ld->entries;
116 }
117
118 static int fio_libaio_prep(struct thread_data fio_unused *td, struct io_u *io_u)
119 {
120         struct libaio_data *ld = td->io_ops_data;
121         struct fio_file *f = io_u->file;
122         struct libaio_options *o = td->eo;
123         struct iocb *iocb;
124
125         if (o->useriocb)
126                 iocb = &ld->user_iocbs[io_u->index];
127         else
128                 iocb = &io_u->iocb;
129
130         if (io_u->ddir == DDIR_READ) {
131                 if (o->fixedbufs) {
132                         iocb->aio_fildes = f->fd;
133                         iocb->aio_lio_opcode = IO_CMD_PREAD;
134                         iocb->u.c.offset = io_u->offset;
135                 } else {
136                         io_prep_pread(iocb, f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
137                         if (o->hipri)
138                                 iocb->u.c.flags |= IOCB_FLAG_HIPRI;
139                 }
140         } else if (io_u->ddir == DDIR_WRITE) {
141                 if (o->fixedbufs) {
142                         iocb->aio_fildes = f->fd;
143                         iocb->aio_lio_opcode = IO_CMD_PWRITE;
144                         iocb->u.c.offset = io_u->offset;
145                 } else {
146                         io_prep_pwrite(iocb, f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
147                         if (o->hipri)
148                                 iocb->u.c.flags |= IOCB_FLAG_HIPRI;
149                 }
150         } else if (ddir_sync(io_u->ddir))
151                 io_prep_fsync(iocb, f->fd);
152
153         return 0;
154 }
155
156 static struct io_u *fio_libaio_event(struct thread_data *td, int event)
157 {
158         struct libaio_data *ld = td->io_ops_data;
159         struct libaio_options *o = td->eo;
160         struct io_event *ev;
161         struct io_u *io_u;
162
163         ev = ld->aio_events + event;
164         if (o->useriocb) {
165                 int index = (int) (uintptr_t) ev->obj;
166                 io_u = ld->io_u_index[index];
167         } else
168                 io_u = container_of(ev->obj, struct io_u, iocb);
169
170         if (ev->res != io_u->xfer_buflen) {
171                 if (ev->res > io_u->xfer_buflen)
172                         io_u->error = -ev->res;
173                 else
174                         io_u->resid = io_u->xfer_buflen - ev->res;
175         } else
176                 io_u->error = 0;
177
178         return io_u;
179 }
180
181 struct aio_ring {
182         unsigned id;             /** kernel internal index number */
183         unsigned nr;             /** number of io_events */
184         unsigned head;
185         unsigned tail;
186
187         unsigned magic;
188         unsigned compat_features;
189         unsigned incompat_features;
190         unsigned header_length; /** size of aio_ring */
191
192         struct io_event events[0];
193 };
194
195 #define AIO_RING_MAGIC  0xa10a10a1
196
197 static int user_io_getevents(io_context_t aio_ctx, unsigned int max,
198                              struct io_event *events)
199 {
200         long i = 0;
201         unsigned head;
202         struct aio_ring *ring = (struct aio_ring*) aio_ctx;
203
204         while (i < max) {
205                 head = ring->head;
206
207                 if (head == ring->tail) {
208                         /* There are no more completions */
209                         break;
210                 } else {
211                         /* There is another completion to reap */
212                         events[i] = ring->events[head];
213                         read_barrier();
214                         ring->head = (head + 1) % ring->nr;
215                         i++;
216                 }
217         }
218
219         return i;
220 }
221
222 static int fio_libaio_getevents(struct thread_data *td, unsigned int min,
223                                 unsigned int max, const struct timespec *t)
224 {
225         struct libaio_data *ld = td->io_ops_data;
226         struct libaio_options *o = td->eo;
227         unsigned actual_min = td->o.iodepth_batch_complete_min == 0 ? 0 : min;
228         struct timespec __lt, *lt = NULL;
229         int r, events = 0;
230
231         if (t) {
232                 __lt = *t;
233                 lt = &__lt;
234         }
235
236         do {
237                 if (o->userspace_reap == 1
238                     && actual_min == 0
239                     && ((struct aio_ring *)(ld->aio_ctx))->magic
240                                 == AIO_RING_MAGIC) {
241                         r = user_io_getevents(ld->aio_ctx, max,
242                                 ld->aio_events + events);
243                 } else {
244                         r = io_getevents(ld->aio_ctx, actual_min,
245                                 max, ld->aio_events + events, lt);
246                 }
247                 if (r > 0)
248                         events += r;
249                 else if ((min && r == 0) || r == -EAGAIN) {
250                         fio_libaio_commit(td);
251                         if (actual_min)
252                                 usleep(10);
253                 } else if (r != -EINTR)
254                         break;
255         } while (events < min);
256
257         return r < 0 ? r : events;
258 }
259
260 static enum fio_q_status fio_libaio_queue(struct thread_data *td,
261                                           struct io_u *io_u)
262 {
263         struct libaio_data *ld = td->io_ops_data;
264         struct libaio_options *o = td->eo;
265
266         fio_ro_check(td, io_u);
267
268         if (ld->queued == td->o.iodepth)
269                 return FIO_Q_BUSY;
270
271         /*
272          * fsync is tricky, since it can fail and we need to do it
273          * serialized with other io. the reason is that linux doesn't
274          * support aio fsync yet. So return busy for the case where we
275          * have pending io, to let fio complete those first.
276          */
277         if (ddir_sync(io_u->ddir)) {
278                 if (ld->queued)
279                         return FIO_Q_BUSY;
280
281                 do_io_u_sync(td, io_u);
282                 return FIO_Q_COMPLETED;
283         }
284
285         if (io_u->ddir == DDIR_TRIM) {
286                 if (ld->queued)
287                         return FIO_Q_BUSY;
288
289                 do_io_u_trim(td, io_u);
290                 io_u_mark_submit(td, 1);
291                 io_u_mark_complete(td, 1);
292                 return FIO_Q_COMPLETED;
293         }
294
295         if (o->useriocb)
296                 ld->iocbs[ld->head] = (struct iocb *) (uintptr_t) io_u->index;
297         else
298                 ld->iocbs[ld->head] = &io_u->iocb;
299
300         ld->io_us[ld->head] = io_u;
301         ring_inc(ld, &ld->head, 1);
302         ld->queued++;
303         return FIO_Q_QUEUED;
304 }
305
306 static void fio_libaio_queued(struct thread_data *td, struct io_u **io_us,
307                               unsigned int nr)
308 {
309         struct timespec now;
310         unsigned int i;
311
312         if (!fio_fill_issue_time(td))
313                 return;
314
315         fio_gettime(&now, NULL);
316
317         for (i = 0; i < nr; i++) {
318                 struct io_u *io_u = io_us[i];
319
320                 memcpy(&io_u->issue_time, &now, sizeof(now));
321                 io_u_queued(td, io_u);
322         }
323 }
324
325 static int fio_libaio_commit(struct thread_data *td)
326 {
327         struct libaio_data *ld = td->io_ops_data;
328         struct iocb **iocbs;
329         struct io_u **io_us;
330         struct timespec ts;
331         int ret, wait_start = 0;
332
333         if (!ld->queued)
334                 return 0;
335
336         do {
337                 long nr = ld->queued;
338
339                 nr = min((unsigned int) nr, ld->entries - ld->tail);
340                 io_us = ld->io_us + ld->tail;
341                 iocbs = ld->iocbs + ld->tail;
342
343                 ret = io_submit(ld->aio_ctx, nr, iocbs);
344                 if (ret > 0) {
345                         fio_libaio_queued(td, io_us, ret);
346                         io_u_mark_submit(td, ret);
347
348                         ld->queued -= ret;
349                         ring_inc(ld, &ld->tail, ret);
350                         ret = 0;
351                         wait_start = 0;
352                 } else if (ret == -EINTR || !ret) {
353                         if (!ret)
354                                 io_u_mark_submit(td, ret);
355                         wait_start = 0;
356                         continue;
357                 } else if (ret == -EAGAIN) {
358                         /*
359                          * If we get EAGAIN, we should break out without
360                          * error and let the upper layer reap some
361                          * events for us. If we have no queued IO, we
362                          * must loop here. If we loop for more than 30s,
363                          * just error out, something must be buggy in the
364                          * IO path.
365                          */
366                         if (ld->queued) {
367                                 ret = 0;
368                                 break;
369                         }
370                         if (!wait_start) {
371                                 fio_gettime(&ts, NULL);
372                                 wait_start = 1;
373                         } else if (mtime_since_now(&ts) > 30000) {
374                                 log_err("fio: aio appears to be stalled, giving up\n");
375                                 break;
376                         }
377                         usleep(1);
378                         continue;
379                 } else if (ret == -ENOMEM) {
380                         /*
381                          * If we get -ENOMEM, reap events if we can. If
382                          * we cannot, treat it as a fatal event since there's
383                          * nothing we can do about it.
384                          */
385                         if (ld->queued)
386                                 ret = 0;
387                         break;
388                 } else
389                         break;
390         } while (ld->queued);
391
392         return ret;
393 }
394
395 static int fio_libaio_cancel(struct thread_data *td, struct io_u *io_u)
396 {
397         struct libaio_data *ld = td->io_ops_data;
398
399         return io_cancel(ld->aio_ctx, &io_u->iocb, ld->aio_events);
400 }
401
402 static void fio_libaio_cleanup(struct thread_data *td)
403 {
404         struct libaio_data *ld = td->io_ops_data;
405
406         if (ld) {
407                 /*
408                  * Work-around to avoid huge RCU stalls at exit time. If we
409                  * don't do this here, then it'll be torn down by exit_aio().
410                  * But for that case we can parallellize the freeing, thus
411                  * speeding it up a lot.
412                  */
413                 if (!(td->flags & TD_F_CHILD))
414                         io_destroy(ld->aio_ctx);
415                 free(ld->aio_events);
416                 free(ld->iocbs);
417                 free(ld->io_us);
418                 if (ld->user_iocbs) {
419                         size_t size = td->o.iodepth * sizeof(struct iocb);
420                         fio_memfree(ld->user_iocbs, size, false);
421                 }
422                 free(ld);
423         }
424 }
425
426 static int fio_libaio_old_queue_init(struct libaio_data *ld, unsigned int depth,
427                                      bool hipri, bool useriocb, bool fixedbufs)
428 {
429         if (hipri) {
430                 log_err("fio: polled aio not available on your platform\n");
431                 return 1;
432         }
433         if (useriocb) {
434                 log_err("fio: user mapped iocbs not available on your platform\n");
435                 return 1;
436         }
437         if (fixedbufs) {
438                 log_err("fio: fixed buffers not available on your platform\n");
439                 return 1;
440         }
441
442         return io_queue_init(depth, &ld->aio_ctx);
443 }
444
445 static int fio_libaio_queue_init(struct libaio_data *ld, unsigned int depth,
446                                  bool hipri, bool useriocb, bool fixedbufs)
447 {
448 #ifdef __NR_sys_io_setup2
449         int ret, flags = 0;
450
451         if (hipri)
452                 flags |= IOCTX_FLAG_IOPOLL;
453         if (useriocb)
454                 flags |= IOCTX_FLAG_USERIOCB;
455         if (fixedbufs) {
456                 struct rlimit rlim = {
457                         .rlim_cur = RLIM_INFINITY,
458                         .rlim_max = RLIM_INFINITY,
459                 };
460
461                 setrlimit(RLIMIT_MEMLOCK, &rlim);
462                 flags |= IOCTX_FLAG_FIXEDBUFS;
463         }
464
465         ret = syscall(__NR_sys_io_setup2, depth, flags, ld->user_iocbs,
466                         NULL, NULL, &ld->aio_ctx);
467         if (!ret)
468                 return 0;
469         /* fall through to old syscall */
470 #endif
471         return fio_libaio_old_queue_init(ld, depth, hipri, useriocb, fixedbufs);
472 }
473
474 static int fio_libaio_post_init(struct thread_data *td)
475 {
476         struct libaio_data *ld = td->io_ops_data;
477         struct libaio_options *o = td->eo;
478         struct io_u *io_u;
479         struct iocb *iocb;
480         int err = 0;
481
482         if (o->fixedbufs) {
483                 int i;
484
485                 for (i = 0; i < td->o.iodepth; i++) {
486                         io_u = ld->io_u_index[i];
487                         iocb = &ld->user_iocbs[i];
488                         iocb->u.c.buf = io_u->buf;
489                         iocb->u.c.nbytes = td_max_bs(td);
490
491                         iocb->u.c.flags = 0;
492                         if (o->hipri)
493                                 iocb->u.c.flags |= IOCB_FLAG_HIPRI;
494                 }
495         }
496
497         err = fio_libaio_queue_init(ld, td->o.iodepth, o->hipri, o->useriocb,
498                                         o->fixedbufs);
499         if (err) {
500                 td_verror(td, -err, "io_queue_init");
501                 return 1;
502         }
503
504         return 0;
505 }
506
507 static int fio_libaio_init(struct thread_data *td)
508 {
509         struct libaio_options *o = td->eo;
510         struct libaio_data *ld;
511
512         ld = calloc(1, sizeof(*ld));
513
514         if (o->useriocb) {
515                 size_t size;
516
517                 ld->io_u_index = calloc(td->o.iodepth, sizeof(struct io_u *));
518                 size = td->o.iodepth * sizeof(struct iocb);
519                 ld->user_iocbs = fio_memalign(page_size, size, false);
520                 memset(ld->user_iocbs, 0, size);
521         }
522
523         ld->entries = td->o.iodepth;
524         ld->is_pow2 = is_power_of_2(ld->entries);
525         ld->aio_events = calloc(ld->entries, sizeof(struct io_event));
526         ld->iocbs = calloc(ld->entries, sizeof(struct iocb *));
527         ld->io_us = calloc(ld->entries, sizeof(struct io_u *));
528
529         td->io_ops_data = ld;
530         return 0;
531 }
532
533 static int fio_libaio_io_u_init(struct thread_data *td, struct io_u *io_u)
534 {
535         struct libaio_options *o = td->eo;
536
537         if (o->useriocb) {
538                 struct libaio_data *ld = td->io_ops_data;
539
540                 ld->io_u_index[io_u->index] = io_u;
541         }
542
543         return 0;
544 }
545
546 static struct ioengine_ops ioengine = {
547         .name                   = "libaio",
548         .version                = FIO_IOOPS_VERSION,
549         .init                   = fio_libaio_init,
550         .post_init              = fio_libaio_post_init,
551         .io_u_init              = fio_libaio_io_u_init,
552         .prep                   = fio_libaio_prep,
553         .queue                  = fio_libaio_queue,
554         .commit                 = fio_libaio_commit,
555         .cancel                 = fio_libaio_cancel,
556         .getevents              = fio_libaio_getevents,
557         .event                  = fio_libaio_event,
558         .cleanup                = fio_libaio_cleanup,
559         .open_file              = generic_open_file,
560         .close_file             = generic_close_file,
561         .get_file_size          = generic_get_file_size,
562         .options                = options,
563         .option_struct_size     = sizeof(struct libaio_options),
564 };
565
566 static void fio_init fio_libaio_register(void)
567 {
568         register_ioengine(&ioengine);
569 }
570
571 static void fio_exit fio_libaio_unregister(void)
572 {
573         unregister_ioengine(&ioengine);
574 }