aioring: update to newer API
[fio.git] / engines / aioring.c
1 /*
2  * aioring engine
3  *
4  * IO engine using the new native Linux libaio ring interface. See:
5  *
6  * http://git.kernel.dk/cgit/linux-block/log/?h=aio-poll
7  *
8  */
9 #include <stdlib.h>
10 #include <unistd.h>
11 #include <errno.h>
12 #include <libaio.h>
13 #include <sys/time.h>
14 #include <sys/resource.h>
15
16 #include "../fio.h"
17 #include "../lib/pow2.h"
18 #include "../optgroup.h"
19 #include "../lib/memalign.h"
20 #include "../lib/fls.h"
21
22 #ifdef ARCH_HAVE_AIORING
23
24 /*
25  * io_uring_setup(2) flags
26  */
27 #ifndef IOCTX_FLAG_SCQRING
28 #define IOCTX_FLAG_SCQRING      (1 << 0)
29 #endif
30 #ifndef IOCTX_FLAG_IOPOLL
31 #define IOCTX_FLAG_IOPOLL       (1 << 1)
32 #endif
33 #ifndef IOCTX_FLAG_FIXEDBUFS
34 #define IOCTX_FLAG_FIXEDBUFS    (1 << 2)
35 #endif
36 #ifndef IOCTX_FLAG_SQTHREAD
37 #define IOCTX_FLAG_SQTHREAD     (1 << 3)
38 #endif
39 #ifndef IOCTX_FLAG_SQWQ
40 #define IOCTX_FLAG_SQWQ         (1 << 4)
41 #endif
42 #ifndef IOCTX_FLAG_SQPOLL
43 #define IOCTX_FLAG_SQPOLL       (1 << 5)
44 #endif
45
46 #define IORING_OFF_SQ_RING      0ULL
47 #define IORING_OFF_CQ_RING      0x8000000ULL
48 #define IORING_OFF_IOCB         0x10000000ULL
49
50 /*
51  * io_uring_enter(2) flags
52  */
53 #ifndef IORING_ENTER_GETEVENTS
54 #define IORING_ENTER_GETEVENTS  (1 << 0)
55 #endif
56
57 typedef uint64_t u64;
58 typedef uint32_t u32;
59 typedef uint16_t u16;
60
61 #define IORING_SQ_NEED_WAKEUP   (1 << 0)
62
63 #define IOEV_RES2_CACHEHIT      (1 << 0)
64
65 struct aio_sqring_offsets {
66         u32 head;
67         u32 tail;
68         u32 ring_mask;
69         u32 ring_entries;
70         u32 flags;
71         u32 array;
72 };
73
74 struct aio_cqring_offsets {
75         u32 head;
76         u32 tail;
77         u32 ring_mask;
78         u32 ring_entries;
79         u32 overflow;
80         u32 events;
81 };
82
83 struct aio_uring_params {
84         u32 sq_entries;
85         u32 cq_entries;
86         u32 flags;
87         u16 sq_thread_cpu;
88         u16 resv[9];
89         struct aio_sqring_offsets sq_off;
90         struct aio_cqring_offsets cq_off;
91 };
92
93 struct aio_sq_ring {
94         u32 *head;
95         u32 *tail;
96         u32 *ring_mask;
97         u32 *ring_entries;
98         u32 *flags;
99         u32 *array;
100 };
101
102 struct aio_cq_ring {
103         u32 *head;
104         u32 *tail;
105         u32 *ring_mask;
106         u32 *ring_entries;
107         struct io_event *events;
108 };
109
110 struct aioring_mmap {
111         void *ptr;
112         size_t len;
113 };
114
115 struct aioring_data {
116         int ring_fd;
117
118         struct io_u **io_us;
119         struct io_u **io_u_index;
120
121         struct aio_sq_ring sq_ring;
122         struct iocb *iocbs;
123         struct iovec *iovecs;
124         unsigned sq_ring_mask;
125
126         struct aio_cq_ring cq_ring;
127         struct io_event *events;
128         unsigned cq_ring_mask;
129
130         int queued;
131         int cq_ring_off;
132         unsigned iodepth;
133
134         uint64_t cachehit;
135         uint64_t cachemiss;
136
137         struct aioring_mmap mmap[3];
138 };
139
140 struct aioring_options {
141         void *pad;
142         unsigned int hipri;
143         unsigned int fixedbufs;
144         unsigned int sqthread;
145         unsigned int sqthread_set;
146         unsigned int sqthread_poll;
147         unsigned int sqwq;
148 };
149
150 static int fio_aioring_sqthread_cb(void *data,
151                                    unsigned long long *val)
152 {
153         struct aioring_options *o = data;
154
155         o->sqthread = *val;
156         o->sqthread_set = 1;
157         return 0;
158 }
159
160 static struct fio_option options[] = {
161         {
162                 .name   = "hipri",
163                 .lname  = "High Priority",
164                 .type   = FIO_OPT_STR_SET,
165                 .off1   = offsetof(struct aioring_options, hipri),
166                 .help   = "Use polled IO completions",
167                 .category = FIO_OPT_C_ENGINE,
168                 .group  = FIO_OPT_G_LIBAIO,
169         },
170         {
171                 .name   = "fixedbufs",
172                 .lname  = "Fixed (pre-mapped) IO buffers",
173                 .type   = FIO_OPT_STR_SET,
174                 .off1   = offsetof(struct aioring_options, fixedbufs),
175                 .help   = "Pre map IO buffers",
176                 .category = FIO_OPT_C_ENGINE,
177                 .group  = FIO_OPT_G_LIBAIO,
178         },
179         {
180                 .name   = "sqthread",
181                 .lname  = "Use kernel SQ thread on this CPU",
182                 .type   = FIO_OPT_INT,
183                 .cb     = fio_aioring_sqthread_cb,
184                 .help   = "Offload submission to kernel thread",
185                 .category = FIO_OPT_C_ENGINE,
186                 .group  = FIO_OPT_G_LIBAIO,
187         },
188         {
189                 .name   = "sqthread_poll",
190                 .lname  = "Kernel SQ thread should poll",
191                 .type   = FIO_OPT_STR_SET,
192                 .off1   = offsetof(struct aioring_options, sqthread_poll),
193                 .help   = "Used with sqthread, enables kernel side polling",
194                 .category = FIO_OPT_C_ENGINE,
195                 .group  = FIO_OPT_G_LIBAIO,
196         },
197         {
198                 .name   = "sqwq",
199                 .lname  = "Offload submission to kernel workqueue",
200                 .type   = FIO_OPT_STR_SET,
201                 .off1   = offsetof(struct aioring_options, sqwq),
202                 .help   = "Offload submission to kernel workqueue",
203                 .category = FIO_OPT_C_ENGINE,
204                 .group  = FIO_OPT_G_LIBAIO,
205         },
206         {
207                 .name   = NULL,
208         },
209 };
210
211 static int io_uring_enter(struct aioring_data *ld, unsigned int to_submit,
212                          unsigned int min_complete, unsigned int flags)
213 {
214         return syscall(__NR_sys_io_uring_enter, ld->ring_fd, to_submit,
215                         min_complete, flags);
216 }
217
218 static int fio_aioring_prep(struct thread_data *td, struct io_u *io_u)
219 {
220         struct aioring_data *ld = td->io_ops_data;
221         struct fio_file *f = io_u->file;
222         struct iocb *iocb;
223
224         iocb = &ld->iocbs[io_u->index];
225
226         if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
227                 if (io_u->ddir == DDIR_READ)
228                         iocb->aio_lio_opcode = IO_CMD_PREAD;
229                 else
230                         iocb->aio_lio_opcode = IO_CMD_PWRITE;
231                 iocb->aio_reqprio = 0;
232                 iocb->aio_fildes = f->fd;
233                 iocb->u.c.buf = io_u->xfer_buf;
234                 iocb->u.c.nbytes = io_u->xfer_buflen;
235                 iocb->u.c.offset = io_u->offset;
236                 iocb->u.c.flags = 0;
237         } else if (ddir_sync(io_u->ddir))
238                 io_prep_fsync(iocb, f->fd);
239
240         iocb->data = io_u;
241         return 0;
242 }
243
244 static struct io_u *fio_aioring_event(struct thread_data *td, int event)
245 {
246         struct aioring_data *ld = td->io_ops_data;
247         struct io_event *ev;
248         struct io_u *io_u;
249         unsigned index;
250
251         index = (event + ld->cq_ring_off) & ld->cq_ring_mask;
252
253         ev = &ld->cq_ring.events[index];
254         io_u = ev->data;
255
256         if (ev->res != io_u->xfer_buflen) {
257                 if (ev->res > io_u->xfer_buflen)
258                         io_u->error = -ev->res;
259                 else
260                         io_u->resid = io_u->xfer_buflen - ev->res;
261         } else
262                 io_u->error = 0;
263
264         if (io_u->ddir == DDIR_READ) {
265                 if (ev->res2 & IOEV_RES2_CACHEHIT)
266                         ld->cachehit++;
267                 else
268                         ld->cachemiss++;
269         }
270
271         return io_u;
272 }
273
274 static int fio_aioring_cqring_reap(struct thread_data *td, unsigned int events,
275                                    unsigned int max)
276 {
277         struct aioring_data *ld = td->io_ops_data;
278         struct aio_cq_ring *ring = &ld->cq_ring;
279         u32 head, reaped = 0;
280
281         head = *ring->head;
282         do {
283                 read_barrier();
284                 if (head == *ring->tail)
285                         break;
286                 reaped++;
287                 head++;
288         } while (reaped + events < max);
289
290         *ring->head = head;
291         write_barrier();
292         return reaped;
293 }
294
295 static int fio_aioring_getevents(struct thread_data *td, unsigned int min,
296                                  unsigned int max, const struct timespec *t)
297 {
298         struct aioring_data *ld = td->io_ops_data;
299         unsigned actual_min = td->o.iodepth_batch_complete_min == 0 ? 0 : min;
300         struct aioring_options *o = td->eo;
301         struct aio_cq_ring *ring = &ld->cq_ring;
302         unsigned events = 0;
303         int r;
304
305         ld->cq_ring_off = *ring->head;
306         do {
307                 r = fio_aioring_cqring_reap(td, events, max);
308                 if (r) {
309                         events += r;
310                         continue;
311                 }
312
313                 if (!o->sqthread_poll) {
314                         r = io_uring_enter(ld, 0, actual_min,
315                                                 IORING_ENTER_GETEVENTS);
316                         if (r < 0) {
317                                 if (errno == EAGAIN)
318                                         continue;
319                                 td_verror(td, errno, "io_uring_enter");
320                                 break;
321                         }
322                 }
323         } while (events < min);
324
325         return r < 0 ? r : events;
326 }
327
328 static enum fio_q_status fio_aioring_queue(struct thread_data *td,
329                                            struct io_u *io_u)
330 {
331         struct aioring_data *ld = td->io_ops_data;
332         struct aio_sq_ring *ring = &ld->sq_ring;
333         unsigned tail, next_tail;
334
335         fio_ro_check(td, io_u);
336
337         if (ld->queued == ld->iodepth)
338                 return FIO_Q_BUSY;
339
340         if (io_u->ddir == DDIR_TRIM) {
341                 if (ld->queued)
342                         return FIO_Q_BUSY;
343
344                 do_io_u_trim(td, io_u);
345                 io_u_mark_submit(td, 1);
346                 io_u_mark_complete(td, 1);
347                 return FIO_Q_COMPLETED;
348         }
349
350         tail = *ring->tail;
351         next_tail = tail + 1;
352         read_barrier();
353         if (next_tail == *ring->head)
354                 return FIO_Q_BUSY;
355
356         ring->array[tail & ld->sq_ring_mask] = io_u->index;
357         *ring->tail = next_tail;
358         write_barrier();
359
360         ld->queued++;
361         return FIO_Q_QUEUED;
362 }
363
364 static void fio_aioring_queued(struct thread_data *td, int start, int nr)
365 {
366         struct aioring_data *ld = td->io_ops_data;
367         struct timespec now;
368
369         if (!fio_fill_issue_time(td))
370                 return;
371
372         fio_gettime(&now, NULL);
373
374         while (nr--) {
375                 struct aio_sq_ring *ring = &ld->sq_ring;
376                 int index = ring->array[start & ld->sq_ring_mask];
377                 struct io_u *io_u = ld->io_u_index[index];
378
379                 memcpy(&io_u->issue_time, &now, sizeof(now));
380                 io_u_queued(td, io_u);
381
382                 start++;
383         }
384 }
385
386 static int fio_aioring_commit(struct thread_data *td)
387 {
388         struct aioring_data *ld = td->io_ops_data;
389         struct aioring_options *o = td->eo;
390         int ret;
391
392         if (!ld->queued)
393                 return 0;
394
395         /* Nothing to do */
396         if (o->sqthread_poll) {
397                 struct aio_sq_ring *ring = &ld->sq_ring;
398
399                 if (*ring->flags & IORING_SQ_NEED_WAKEUP)
400                         io_uring_enter(ld, ld->queued, 0, 0);
401                 ld->queued = 0;
402                 return 0;
403         }
404
405         do {
406                 unsigned start = *ld->sq_ring.head;
407                 long nr = ld->queued;
408
409                 ret = io_uring_enter(ld, nr, 0, IORING_ENTER_GETEVENTS);
410                 if (ret > 0) {
411                         fio_aioring_queued(td, start, ret);
412                         io_u_mark_submit(td, ret);
413
414                         ld->queued -= ret;
415                         ret = 0;
416                 } else if (!ret) {
417                         io_u_mark_submit(td, ret);
418                         continue;
419                 } else {
420                         if (errno == EAGAIN) {
421                                 ret = fio_aioring_cqring_reap(td, 0, ld->queued);
422                                 if (ret)
423                                         continue;
424                                 /* Shouldn't happen */
425                                 usleep(1);
426                                 continue;
427                         }
428                         td_verror(td, errno, "io_uring_enter submit");
429                         break;
430                 }
431         } while (ld->queued);
432
433         return ret;
434 }
435
436 static void fio_aioring_unmap(struct aioring_data *ld)
437 {
438         int i;
439
440         for (i = 0; i < ARRAY_SIZE(ld->mmap); i++)
441                 munmap(ld->mmap[i].ptr, ld->mmap[i].len);
442         close(ld->ring_fd);
443 }
444
445 static void fio_aioring_cleanup(struct thread_data *td)
446 {
447         struct aioring_data *ld = td->io_ops_data;
448
449         if (ld) {
450                 td->ts.cachehit += ld->cachehit;
451                 td->ts.cachemiss += ld->cachemiss;
452
453                 /*
454                  * Work-around to avoid huge RCU stalls at exit time. If we
455                  * don't do this here, then it'll be torn down by exit_aio().
456                  * But for that case we can parallellize the freeing, thus
457                  * speeding it up a lot.
458                  */
459                 if (!(td->flags & TD_F_CHILD))
460                         fio_aioring_unmap(ld);
461
462                 free(ld->io_u_index);
463                 free(ld->io_us);
464                 free(ld->iovecs);
465                 free(ld);
466         }
467 }
468
469 static int fio_aioring_mmap(struct aioring_data *ld, struct aio_uring_params *p)
470 {
471         struct aio_sq_ring *sring = &ld->sq_ring;
472         struct aio_cq_ring *cring = &ld->cq_ring;
473         void *ptr;
474
475         ld->mmap[0].len = p->sq_off.array + p->sq_entries * sizeof(u32);
476         ptr = mmap(0, ld->mmap[0].len, PROT_READ | PROT_WRITE,
477                         MAP_SHARED | MAP_POPULATE, ld->ring_fd,
478                         IORING_OFF_SQ_RING);
479         ld->mmap[0].ptr = ptr;
480         sring->head = ptr + p->sq_off.head;
481         sring->tail = ptr + p->sq_off.tail;
482         sring->ring_mask = ptr + p->sq_off.ring_mask;
483         sring->ring_entries = ptr + p->sq_off.ring_entries;
484         sring->flags = ptr + p->sq_off.flags;
485         sring->array = ptr + p->sq_off.array;
486         ld->sq_ring_mask = *sring->ring_mask;
487
488         ld->mmap[1].len = p->sq_entries * sizeof(struct iocb);
489         ld->iocbs = mmap(0, ld->mmap[1].len, PROT_READ | PROT_WRITE,
490                                 MAP_SHARED | MAP_POPULATE, ld->ring_fd,
491                                 IORING_OFF_IOCB);
492         ld->mmap[1].ptr = ld->iocbs;
493
494         ld->mmap[2].len = p->cq_off.events +
495                                 p->cq_entries * sizeof(struct io_event);
496         ptr = mmap(0, ld->mmap[2].len, PROT_READ | PROT_WRITE,
497                         MAP_SHARED | MAP_POPULATE, ld->ring_fd,
498                         IORING_OFF_CQ_RING);
499         ld->mmap[2].ptr = ptr;
500         cring->head = ptr + p->cq_off.head;
501         cring->tail = ptr + p->cq_off.tail;
502         cring->ring_mask = ptr + p->cq_off.ring_mask;
503         cring->ring_entries = ptr + p->cq_off.ring_entries;
504         cring->events = ptr + p->cq_off.events;
505         ld->cq_ring_mask = *cring->ring_mask;
506         return 0;
507 }
508
509 static int fio_aioring_queue_init(struct thread_data *td)
510 {
511         struct aioring_data *ld = td->io_ops_data;
512         struct aioring_options *o = td->eo;
513         int depth = td->o.iodepth;
514         struct aio_uring_params p;
515         int ret;
516
517         memset(&p, 0, sizeof(p));
518         p.flags = IOCTX_FLAG_SCQRING;
519
520         if (o->hipri)
521                 p.flags |= IOCTX_FLAG_IOPOLL;
522         if (o->sqthread_set) {
523                 p.sq_thread_cpu = o->sqthread;
524                 p.flags |= IOCTX_FLAG_SQTHREAD;
525                 if (o->sqthread_poll)
526                         p.flags |= IOCTX_FLAG_SQPOLL;
527         }
528         if (o->sqwq)
529                 p.flags |= IOCTX_FLAG_SQWQ;
530
531         if (o->fixedbufs) {
532                 struct rlimit rlim = {
533                         .rlim_cur = RLIM_INFINITY,
534                         .rlim_max = RLIM_INFINITY,
535                 };
536
537                 setrlimit(RLIMIT_MEMLOCK, &rlim);
538                 p.flags |= IOCTX_FLAG_FIXEDBUFS;
539         }
540
541         ret = syscall(__NR_sys_io_uring_setup, depth, ld->iovecs, &p);
542         if (ret < 0)
543                 return ret;
544
545         ld->ring_fd = ret;
546         return fio_aioring_mmap(ld, &p);
547 }
548
549 static int fio_aioring_post_init(struct thread_data *td)
550 {
551         struct aioring_data *ld = td->io_ops_data;
552         struct aioring_options *o = td->eo;
553         struct io_u *io_u;
554         int err;
555
556         if (o->fixedbufs) {
557                 int i;
558
559                 for (i = 0; i < td->o.iodepth; i++) {
560                         struct iovec *iov = &ld->iovecs[i];
561
562                         io_u = ld->io_u_index[i];
563                         iov->iov_base = io_u->buf;
564                         iov->iov_len = td_max_bs(td);
565                 }
566         }
567
568         err = fio_aioring_queue_init(td);
569         if (err) {
570                 td_verror(td, errno, "io_queue_init");
571                 return 1;
572         }
573
574         return 0;
575 }
576
577 static unsigned roundup_pow2(unsigned depth)
578 {
579         return 1UL << __fls(depth - 1);
580 }
581
582 static int fio_aioring_init(struct thread_data *td)
583 {
584         struct aioring_data *ld;
585
586         ld = calloc(1, sizeof(*ld));
587
588         /* ring depth must be a power-of-2 */
589         ld->iodepth = td->o.iodepth;
590         td->o.iodepth = roundup_pow2(td->o.iodepth);
591
592         /* io_u index */
593         ld->io_u_index = calloc(td->o.iodepth, sizeof(struct io_u *));
594         ld->io_us = calloc(td->o.iodepth, sizeof(struct io_u *));
595
596         ld->iovecs = calloc(td->o.iodepth, sizeof(struct iovec));
597
598         td->io_ops_data = ld;
599         return 0;
600 }
601
602 static int fio_aioring_io_u_init(struct thread_data *td, struct io_u *io_u)
603 {
604         struct aioring_data *ld = td->io_ops_data;
605
606         ld->io_u_index[io_u->index] = io_u;
607         return 0;
608 }
609
610 static struct ioengine_ops ioengine = {
611         .name                   = "aio-ring",
612         .version                = FIO_IOOPS_VERSION,
613         .init                   = fio_aioring_init,
614         .post_init              = fio_aioring_post_init,
615         .io_u_init              = fio_aioring_io_u_init,
616         .prep                   = fio_aioring_prep,
617         .queue                  = fio_aioring_queue,
618         .commit                 = fio_aioring_commit,
619         .getevents              = fio_aioring_getevents,
620         .event                  = fio_aioring_event,
621         .cleanup                = fio_aioring_cleanup,
622         .open_file              = generic_open_file,
623         .close_file             = generic_close_file,
624         .get_file_size          = generic_get_file_size,
625         .options                = options,
626         .option_struct_size     = sizeof(struct aioring_options),
627 };
628
629 static void fio_init fio_aioring_register(void)
630 {
631         register_ioengine(&ioengine);
632 }
633
634 static void fio_exit fio_aioring_unregister(void)
635 {
636         unregister_ioengine(&ioengine);
637 }
638 #endif