engines/aioring: update for newer mmap based API
[fio.git] / engines / aioring.c
1 /*
2  * aioring engine
3  *
4  * IO engine using the new native Linux libaio ring interface. See:
5  *
6  * http://git.kernel.dk/cgit/linux-block/log/?h=aio-poll
7  *
8  */
9 #include <stdlib.h>
10 #include <unistd.h>
11 #include <errno.h>
12 #include <libaio.h>
13 #include <sys/time.h>
14 #include <sys/resource.h>
15
16 #include "../fio.h"
17 #include "../lib/pow2.h"
18 #include "../optgroup.h"
19 #include "../lib/memalign.h"
20 #include "../lib/fls.h"
21
22 #ifdef ARCH_HAVE_AIORING
23
24 /*
25  * io_uring_setup(2) flags
26  */
27 #ifndef IOCTX_FLAG_SCQRING
28 #define IOCTX_FLAG_SCQRING      (1 << 0)
29 #endif
30 #ifndef IOCTX_FLAG_IOPOLL
31 #define IOCTX_FLAG_IOPOLL       (1 << 1)
32 #endif
33 #ifndef IOCTX_FLAG_FIXEDBUFS
34 #define IOCTX_FLAG_FIXEDBUFS    (1 << 2)
35 #endif
36 #ifndef IOCTX_FLAG_SQTHREAD
37 #define IOCTX_FLAG_SQTHREAD     (1 << 3)
38 #endif
39 #ifndef IOCTX_FLAG_SQWQ
40 #define IOCTX_FLAG_SQWQ         (1 << 4)
41 #endif
42 #ifndef IOCTX_FLAG_SQPOLL
43 #define IOCTX_FLAG_SQPOLL       (1 << 5)
44 #endif
45
46 #define IORING_OFF_SQ_RING      0ULL
47 #define IORING_OFF_CQ_RING      0x8000000ULL
48 #define IORING_OFF_IOCB         0x10000000ULL
49
50 /*
51  * io_uring_enter(2) flags
52  */
53 #ifndef IORING_ENTER_GETEVENTS
54 #define IORING_ENTER_GETEVENTS  (1 << 0)
55 #endif
56
57 typedef uint64_t u64;
58 typedef uint32_t u32;
59 typedef uint16_t u16;
60
61 #define IORING_SQ_NEED_WAKEUP   (1 << 0)
62
63 #define IOEV_RES2_CACHEHIT      (1 << 0)
64
65 struct aio_uring_offsets {
66         u32 head;
67         u32 tail;
68         u32 ring_mask;
69         u32 ring_entries;
70         u32 flags;
71         u32 elems;
72 };
73
74 struct aio_uring_params {
75         u32 sq_entries;
76         u32 cq_entries;
77         u32 flags;
78         u16 sq_thread_cpu;
79         u16 resv[9];
80         struct aio_uring_offsets sq_off;
81         struct aio_uring_offsets cq_off;
82 };
83
84 struct aio_sq_ring {
85         u32 *head;
86         u32 *tail;
87         u32 *ring_mask;
88         u32 *ring_entries;
89         u32 *flags;
90         u32 *array;
91 };
92
93 struct aio_cq_ring {
94         u32 *head;
95         u32 *tail;
96         u32 *ring_mask;
97         u32 *ring_entries;
98         struct io_event *events;
99 };
100
101 struct aioring_mmap {
102         void *ptr;
103         size_t len;
104 };
105
106 struct aioring_data {
107         int ring_fd;
108
109         struct io_u **io_us;
110         struct io_u **io_u_index;
111
112         struct aio_sq_ring sq_ring;
113         struct iocb *iocbs;
114         struct iovec *iovecs;
115         unsigned sq_ring_mask;
116
117         struct aio_cq_ring cq_ring;
118         struct io_event *events;
119         unsigned cq_ring_mask;
120
121         int queued;
122         int cq_ring_off;
123         unsigned iodepth;
124
125         uint64_t cachehit;
126         uint64_t cachemiss;
127
128         struct aioring_mmap mmap[3];
129 };
130
131 struct aioring_options {
132         void *pad;
133         unsigned int hipri;
134         unsigned int fixedbufs;
135         unsigned int sqthread;
136         unsigned int sqthread_set;
137         unsigned int sqthread_poll;
138         unsigned int sqwq;
139 };
140
141 static int fio_aioring_sqthread_cb(void *data,
142                                    unsigned long long *val)
143 {
144         struct aioring_options *o = data;
145
146         o->sqthread = *val;
147         o->sqthread_set = 1;
148         return 0;
149 }
150
151 static struct fio_option options[] = {
152         {
153                 .name   = "hipri",
154                 .lname  = "High Priority",
155                 .type   = FIO_OPT_STR_SET,
156                 .off1   = offsetof(struct aioring_options, hipri),
157                 .help   = "Use polled IO completions",
158                 .category = FIO_OPT_C_ENGINE,
159                 .group  = FIO_OPT_G_LIBAIO,
160         },
161         {
162                 .name   = "fixedbufs",
163                 .lname  = "Fixed (pre-mapped) IO buffers",
164                 .type   = FIO_OPT_STR_SET,
165                 .off1   = offsetof(struct aioring_options, fixedbufs),
166                 .help   = "Pre map IO buffers",
167                 .category = FIO_OPT_C_ENGINE,
168                 .group  = FIO_OPT_G_LIBAIO,
169         },
170         {
171                 .name   = "sqthread",
172                 .lname  = "Use kernel SQ thread on this CPU",
173                 .type   = FIO_OPT_INT,
174                 .cb     = fio_aioring_sqthread_cb,
175                 .help   = "Offload submission to kernel thread",
176                 .category = FIO_OPT_C_ENGINE,
177                 .group  = FIO_OPT_G_LIBAIO,
178         },
179         {
180                 .name   = "sqthread_poll",
181                 .lname  = "Kernel SQ thread should poll",
182                 .type   = FIO_OPT_STR_SET,
183                 .off1   = offsetof(struct aioring_options, sqthread_poll),
184                 .help   = "Used with sqthread, enables kernel side polling",
185                 .category = FIO_OPT_C_ENGINE,
186                 .group  = FIO_OPT_G_LIBAIO,
187         },
188         {
189                 .name   = "sqwq",
190                 .lname  = "Offload submission to kernel workqueue",
191                 .type   = FIO_OPT_STR_SET,
192                 .off1   = offsetof(struct aioring_options, sqwq),
193                 .help   = "Offload submission to kernel workqueue",
194                 .category = FIO_OPT_C_ENGINE,
195                 .group  = FIO_OPT_G_LIBAIO,
196         },
197         {
198                 .name   = NULL,
199         },
200 };
201
202 static int io_uring_enter(struct aioring_data *ld, unsigned int to_submit,
203                          unsigned int min_complete, unsigned int flags)
204 {
205         return syscall(__NR_sys_io_uring_enter, ld->ring_fd, to_submit,
206                         min_complete, flags);
207 }
208
209 static int fio_aioring_prep(struct thread_data *td, struct io_u *io_u)
210 {
211         struct aioring_data *ld = td->io_ops_data;
212         struct fio_file *f = io_u->file;
213         struct iocb *iocb;
214
215         iocb = &ld->iocbs[io_u->index];
216
217         if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
218                 if (io_u->ddir == DDIR_READ)
219                         iocb->aio_lio_opcode = IO_CMD_PREAD;
220                 else
221                         iocb->aio_lio_opcode = IO_CMD_PWRITE;
222                 iocb->aio_reqprio = 0;
223                 iocb->aio_fildes = f->fd;
224                 iocb->u.c.buf = io_u->xfer_buf;
225                 iocb->u.c.nbytes = io_u->xfer_buflen;
226                 iocb->u.c.offset = io_u->offset;
227                 iocb->u.c.flags = 0;
228         } else if (ddir_sync(io_u->ddir))
229                 io_prep_fsync(iocb, f->fd);
230
231         iocb->data = io_u;
232         return 0;
233 }
234
235 static struct io_u *fio_aioring_event(struct thread_data *td, int event)
236 {
237         struct aioring_data *ld = td->io_ops_data;
238         struct io_event *ev;
239         struct io_u *io_u;
240         unsigned index;
241
242         index = (event + ld->cq_ring_off) & ld->cq_ring_mask;
243
244         ev = &ld->cq_ring.events[index];
245         io_u = ev->data;
246
247         if (ev->res != io_u->xfer_buflen) {
248                 if (ev->res > io_u->xfer_buflen)
249                         io_u->error = -ev->res;
250                 else
251                         io_u->resid = io_u->xfer_buflen - ev->res;
252         } else
253                 io_u->error = 0;
254
255         if (io_u->ddir == DDIR_READ) {
256                 if (ev->res2 & IOEV_RES2_CACHEHIT)
257                         ld->cachehit++;
258                 else
259                         ld->cachemiss++;
260         }
261
262         return io_u;
263 }
264
265 static int fio_aioring_cqring_reap(struct thread_data *td, unsigned int events,
266                                    unsigned int max)
267 {
268         struct aioring_data *ld = td->io_ops_data;
269         struct aio_cq_ring *ring = &ld->cq_ring;
270         u32 head, reaped = 0;
271
272         head = *ring->head;
273         do {
274                 read_barrier();
275                 if (head == *ring->tail)
276                         break;
277                 reaped++;
278                 head++;
279         } while (reaped + events < max);
280
281         *ring->head = head;
282         write_barrier();
283         return reaped;
284 }
285
286 static int fio_aioring_getevents(struct thread_data *td, unsigned int min,
287                                  unsigned int max, const struct timespec *t)
288 {
289         struct aioring_data *ld = td->io_ops_data;
290         unsigned actual_min = td->o.iodepth_batch_complete_min == 0 ? 0 : min;
291         struct aioring_options *o = td->eo;
292         struct aio_cq_ring *ring = &ld->cq_ring;
293         unsigned events = 0;
294         int r;
295
296         ld->cq_ring_off = *ring->head;
297         do {
298                 r = fio_aioring_cqring_reap(td, events, max);
299                 if (r) {
300                         events += r;
301                         continue;
302                 }
303
304                 if (!o->sqthread_poll) {
305                         r = io_uring_enter(ld, 0, actual_min,
306                                                 IORING_ENTER_GETEVENTS);
307                         if (r < 0) {
308                                 if (errno == EAGAIN)
309                                         continue;
310                                 td_verror(td, errno, "io_uring_enter");
311                                 break;
312                         }
313                 }
314         } while (events < min);
315
316         return r < 0 ? r : events;
317 }
318
319 static enum fio_q_status fio_aioring_queue(struct thread_data *td,
320                                            struct io_u *io_u)
321 {
322         struct aioring_data *ld = td->io_ops_data;
323         struct aio_sq_ring *ring = &ld->sq_ring;
324         unsigned tail, next_tail;
325
326         fio_ro_check(td, io_u);
327
328         if (ld->queued == ld->iodepth)
329                 return FIO_Q_BUSY;
330
331         if (io_u->ddir == DDIR_TRIM) {
332                 if (ld->queued)
333                         return FIO_Q_BUSY;
334
335                 do_io_u_trim(td, io_u);
336                 io_u_mark_submit(td, 1);
337                 io_u_mark_complete(td, 1);
338                 return FIO_Q_COMPLETED;
339         }
340
341         tail = *ring->tail;
342         next_tail = tail + 1;
343         read_barrier();
344         if (next_tail == *ring->head)
345                 return FIO_Q_BUSY;
346
347         ring->array[tail & ld->sq_ring_mask] = io_u->index;
348         *ring->tail = next_tail;
349         write_barrier();
350
351         ld->queued++;
352         return FIO_Q_QUEUED;
353 }
354
355 static void fio_aioring_queued(struct thread_data *td, int start, int nr)
356 {
357         struct aioring_data *ld = td->io_ops_data;
358         struct timespec now;
359
360         if (!fio_fill_issue_time(td))
361                 return;
362
363         fio_gettime(&now, NULL);
364
365         while (nr--) {
366                 struct aio_sq_ring *ring = &ld->sq_ring;
367                 int index = ring->array[start & ld->sq_ring_mask];
368                 struct io_u *io_u = ld->io_u_index[index];
369
370                 memcpy(&io_u->issue_time, &now, sizeof(now));
371                 io_u_queued(td, io_u);
372
373                 start++;
374         }
375 }
376
377 static int fio_aioring_commit(struct thread_data *td)
378 {
379         struct aioring_data *ld = td->io_ops_data;
380         struct aioring_options *o = td->eo;
381         int ret;
382
383         if (!ld->queued)
384                 return 0;
385
386         /* Nothing to do */
387         if (o->sqthread_poll) {
388                 struct aio_sq_ring *ring = &ld->sq_ring;
389
390                 if (*ring->flags & IORING_SQ_NEED_WAKEUP)
391                         io_uring_enter(ld, ld->queued, 0, 0);
392                 ld->queued = 0;
393                 return 0;
394         }
395
396         do {
397                 unsigned start = *ld->sq_ring.head;
398                 long nr = ld->queued;
399
400                 ret = io_uring_enter(ld, nr, 0, IORING_ENTER_GETEVENTS);
401                 if (ret > 0) {
402                         fio_aioring_queued(td, start, ret);
403                         io_u_mark_submit(td, ret);
404
405                         ld->queued -= ret;
406                         ret = 0;
407                 } else if (!ret) {
408                         io_u_mark_submit(td, ret);
409                         continue;
410                 } else {
411                         if (errno == EAGAIN) {
412                                 ret = fio_aioring_cqring_reap(td, 0, ld->queued);
413                                 if (ret)
414                                         continue;
415                                 /* Shouldn't happen */
416                                 usleep(1);
417                                 continue;
418                         }
419                         td_verror(td, errno, "io_uring_enter submit");
420                         break;
421                 }
422         } while (ld->queued);
423
424         return ret;
425 }
426
427 static void fio_aioring_unmap(struct aioring_data *ld)
428 {
429         int i;
430
431         for (i = 0; i < ARRAY_SIZE(ld->mmap); i++)
432                 munmap(ld->mmap[i].ptr, ld->mmap[i].len);
433         close(ld->ring_fd);
434 }
435
436 static void fio_aioring_cleanup(struct thread_data *td)
437 {
438         struct aioring_data *ld = td->io_ops_data;
439
440         if (ld) {
441                 td->ts.cachehit += ld->cachehit;
442                 td->ts.cachemiss += ld->cachemiss;
443
444                 /*
445                  * Work-around to avoid huge RCU stalls at exit time. If we
446                  * don't do this here, then it'll be torn down by exit_aio().
447                  * But for that case we can parallellize the freeing, thus
448                  * speeding it up a lot.
449                  */
450                 if (!(td->flags & TD_F_CHILD))
451                         fio_aioring_unmap(ld);
452
453                 free(ld->io_u_index);
454                 free(ld->io_us);
455                 free(ld->iovecs);
456                 free(ld);
457         }
458 }
459
460 static int fio_aioring_mmap(struct aioring_data *ld, struct aio_uring_params *p)
461 {
462         struct aio_sq_ring *sring = &ld->sq_ring;
463         struct aio_cq_ring *cring = &ld->cq_ring;
464         void *ptr;
465
466         ld->mmap[0].len = p->sq_off.elems + p->sq_entries * sizeof(u32);
467         ptr = mmap(0, ld->mmap[0].len, PROT_READ | PROT_WRITE,
468                         MAP_SHARED | MAP_POPULATE, ld->ring_fd,
469                         IORING_OFF_SQ_RING);
470         ld->mmap[0].ptr = ptr;
471         sring->head = ptr + p->sq_off.head;
472         sring->tail = ptr + p->sq_off.tail;
473         sring->ring_mask = ptr + p->sq_off.ring_mask;
474         sring->ring_entries = ptr + p->sq_off.ring_entries;
475         sring->flags = ptr + p->sq_off.flags;
476         sring->array = ptr + p->sq_off.elems;
477         ld->sq_ring_mask = *sring->ring_mask;
478
479         ld->mmap[1].len = p->sq_entries * sizeof(struct iocb);
480         ld->iocbs = mmap(0, ld->mmap[1].len, PROT_READ | PROT_WRITE,
481                                 MAP_SHARED | MAP_POPULATE, ld->ring_fd,
482                                 IORING_OFF_IOCB);
483         ld->mmap[1].ptr = ld->iocbs;
484
485         ld->mmap[2].len = p->cq_off.elems +
486                                 p->cq_entries * sizeof(struct io_event);
487         ptr = mmap(0, ld->mmap[2].len, PROT_READ | PROT_WRITE,
488                         MAP_SHARED | MAP_POPULATE, ld->ring_fd,
489                         IORING_OFF_CQ_RING);
490         ld->mmap[2].ptr = ptr;
491         cring->head = ptr + p->cq_off.head;
492         cring->tail = ptr + p->cq_off.tail;
493         cring->ring_mask = ptr + p->cq_off.ring_mask;
494         cring->ring_entries = ptr + p->cq_off.ring_entries;
495         cring->events = ptr + p->cq_off.elems;
496         ld->cq_ring_mask = *cring->ring_mask;
497         return 0;
498 }
499
500 static int fio_aioring_queue_init(struct thread_data *td)
501 {
502         struct aioring_data *ld = td->io_ops_data;
503         struct aioring_options *o = td->eo;
504         int depth = td->o.iodepth;
505         struct aio_uring_params p;
506         int ret;
507
508         memset(&p, 0, sizeof(p));
509         p.flags = IOCTX_FLAG_SCQRING;
510
511         if (o->hipri)
512                 p.flags |= IOCTX_FLAG_IOPOLL;
513         if (o->sqthread_set) {
514                 p.sq_thread_cpu = o->sqthread;
515                 p.flags |= IOCTX_FLAG_SQTHREAD;
516                 if (o->sqthread_poll)
517                         p.flags |= IOCTX_FLAG_SQPOLL;
518         }
519         if (o->sqwq)
520                 p.flags |= IOCTX_FLAG_SQWQ;
521
522         if (o->fixedbufs) {
523                 struct rlimit rlim = {
524                         .rlim_cur = RLIM_INFINITY,
525                         .rlim_max = RLIM_INFINITY,
526                 };
527
528                 setrlimit(RLIMIT_MEMLOCK, &rlim);
529                 p.flags |= IOCTX_FLAG_FIXEDBUFS;
530         }
531
532         ret = syscall(__NR_sys_io_uring_setup, depth, ld->iovecs, &p);
533         if (ret < 0)
534                 return ret;
535
536         ld->ring_fd = ret;
537         return fio_aioring_mmap(ld, &p);
538 }
539
540 static int fio_aioring_post_init(struct thread_data *td)
541 {
542         struct aioring_data *ld = td->io_ops_data;
543         struct aioring_options *o = td->eo;
544         struct io_u *io_u;
545         int err;
546
547         if (o->fixedbufs) {
548                 int i;
549
550                 for (i = 0; i < td->o.iodepth; i++) {
551                         struct iovec *iov = &ld->iovecs[i];
552
553                         io_u = ld->io_u_index[i];
554                         iov->iov_base = io_u->buf;
555                         iov->iov_len = td_max_bs(td);
556                 }
557         }
558
559         err = fio_aioring_queue_init(td);
560         if (err) {
561                 td_verror(td, errno, "io_queue_init");
562                 return 1;
563         }
564
565         return 0;
566 }
567
568 static unsigned roundup_pow2(unsigned depth)
569 {
570         return 1UL << __fls(depth - 1);
571 }
572
573 static int fio_aioring_init(struct thread_data *td)
574 {
575         struct aioring_data *ld;
576
577         ld = calloc(1, sizeof(*ld));
578
579         /* ring depth must be a power-of-2 */
580         ld->iodepth = td->o.iodepth;
581         td->o.iodepth = roundup_pow2(td->o.iodepth);
582
583         /* io_u index */
584         ld->io_u_index = calloc(td->o.iodepth, sizeof(struct io_u *));
585         ld->io_us = calloc(td->o.iodepth, sizeof(struct io_u *));
586
587         ld->iovecs = calloc(td->o.iodepth, sizeof(struct iovec));
588
589         td->io_ops_data = ld;
590         return 0;
591 }
592
593 static int fio_aioring_io_u_init(struct thread_data *td, struct io_u *io_u)
594 {
595         struct aioring_data *ld = td->io_ops_data;
596
597         ld->io_u_index[io_u->index] = io_u;
598         return 0;
599 }
600
601 static struct ioengine_ops ioengine = {
602         .name                   = "aio-ring",
603         .version                = FIO_IOOPS_VERSION,
604         .init                   = fio_aioring_init,
605         .post_init              = fio_aioring_post_init,
606         .io_u_init              = fio_aioring_io_u_init,
607         .prep                   = fio_aioring_prep,
608         .queue                  = fio_aioring_queue,
609         .commit                 = fio_aioring_commit,
610         .getevents              = fio_aioring_getevents,
611         .event                  = fio_aioring_event,
612         .cleanup                = fio_aioring_cleanup,
613         .open_file              = generic_open_file,
614         .close_file             = generic_close_file,
615         .get_file_size          = generic_get_file_size,
616         .options                = options,
617         .option_struct_size     = sizeof(struct aioring_options),
618 };
619
620 static void fio_init fio_aioring_register(void)
621 {
622         register_ioengine(&ioengine);
623 }
624
625 static void fio_exit fio_aioring_unregister(void)
626 {
627         unregister_ioengine(&ioengine);
628 }
629 #endif