io_uring: update to newer API
[fio.git] / engines / io_uring.c
1 /*
2  * io_uring engine
3  *
4  * IO engine using the new native Linux aio io_uring interface. See:
5  *
6  * http://git.kernel.dk/cgit/linux-block/log/?h=io_uring
7  *
8  */
9 #include <stdlib.h>
10 #include <unistd.h>
11 #include <errno.h>
12 #include <sys/time.h>
13 #include <sys/resource.h>
14
15 #include "../fio.h"
16 #include "../lib/pow2.h"
17 #include "../optgroup.h"
18 #include "../lib/memalign.h"
19 #include "../lib/fls.h"
20
21 #ifdef ARCH_HAVE_IOURING
22
23 #include "../lib/types.h"
24 #include "../os/io_uring.h"
25
26 struct io_sq_ring {
27         unsigned *head;
28         unsigned *tail;
29         unsigned *ring_mask;
30         unsigned *ring_entries;
31         unsigned *flags;
32         unsigned *array;
33 };
34
35 struct io_cq_ring {
36         unsigned *head;
37         unsigned *tail;
38         unsigned *ring_mask;
39         unsigned *ring_entries;
40         struct io_uring_cqe *cqes;
41 };
42
43 struct ioring_mmap {
44         void *ptr;
45         size_t len;
46 };
47
48 struct ioring_data {
49         int ring_fd;
50
51         struct io_u **io_us;
52         struct io_u **io_u_index;
53
54         struct io_sq_ring sq_ring;
55         struct io_uring_sqe *sqes;
56         struct iovec *iovecs;
57         unsigned sq_ring_mask;
58
59         struct io_cq_ring cq_ring;
60         unsigned cq_ring_mask;
61
62         int queued;
63         int cq_ring_off;
64         unsigned iodepth;
65
66         uint64_t cachehit;
67         uint64_t cachemiss;
68
69         struct ioring_mmap mmap[3];
70 };
71
72 struct ioring_options {
73         void *pad;
74         unsigned int hipri;
75         unsigned int fixedbufs;
76         unsigned int sqpoll_thread;
77         unsigned int sqpoll_set;
78         unsigned int sqpoll_cpu;
79 };
80
81 static int fio_ioring_sqpoll_cb(void *data, unsigned long long *val)
82 {
83         struct ioring_options *o = data;
84
85         o->sqpoll_cpu = *val;
86         o->sqpoll_set = 1;
87         return 0;
88 }
89
90 static struct fio_option options[] = {
91         {
92                 .name   = "hipri",
93                 .lname  = "High Priority",
94                 .type   = FIO_OPT_STR_SET,
95                 .off1   = offsetof(struct ioring_options, hipri),
96                 .help   = "Use polled IO completions",
97                 .category = FIO_OPT_C_ENGINE,
98                 .group  = FIO_OPT_G_LIBAIO,
99         },
100         {
101                 .name   = "fixedbufs",
102                 .lname  = "Fixed (pre-mapped) IO buffers",
103                 .type   = FIO_OPT_STR_SET,
104                 .off1   = offsetof(struct ioring_options, fixedbufs),
105                 .help   = "Pre map IO buffers",
106                 .category = FIO_OPT_C_ENGINE,
107                 .group  = FIO_OPT_G_LIBAIO,
108         },
109         {
110                 .name   = "sqthread_poll",
111                 .lname  = "Kernel SQ thread polling",
112                 .type   = FIO_OPT_INT,
113                 .off1   = offsetof(struct ioring_options, sqpoll_thread),
114                 .help   = "Offload submission/completion to kernel thread",
115                 .category = FIO_OPT_C_ENGINE,
116                 .group  = FIO_OPT_G_LIBAIO,
117         },
118         {
119                 .name   = "sqthread_poll_cpu",
120                 .lname  = "SQ Thread Poll CPU",
121                 .type   = FIO_OPT_INT,
122                 .cb     = fio_ioring_sqpoll_cb,
123                 .help   = "What CPU to run SQ thread polling on",
124                 .category = FIO_OPT_C_ENGINE,
125                 .group  = FIO_OPT_G_LIBAIO,
126         },
127         {
128                 .name   = NULL,
129         },
130 };
131
132 static int io_uring_enter(struct ioring_data *ld, unsigned int to_submit,
133                          unsigned int min_complete, unsigned int flags)
134 {
135         return syscall(__NR_sys_io_uring_enter, ld->ring_fd, to_submit,
136                         min_complete, flags);
137 }
138
139 static int fio_ioring_prep(struct thread_data *td, struct io_u *io_u)
140 {
141         struct ioring_data *ld = td->io_ops_data;
142         struct ioring_options *o = td->eo;
143         struct fio_file *f = io_u->file;
144         struct io_uring_sqe *sqe;
145
146         sqe = &ld->sqes[io_u->index];
147         sqe->fd = f->fd;
148         sqe->flags = 0;
149         sqe->ioprio = 0;
150         sqe->buf_index = 0;
151
152         if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
153                 if (o->fixedbufs) {
154                         if (io_u->ddir == DDIR_READ)
155                                 sqe->opcode = IORING_OP_READ_FIXED;
156                         else
157                                 sqe->opcode = IORING_OP_WRITE_FIXED;
158                         sqe->addr = io_u->xfer_buf;
159                         sqe->len = io_u->xfer_buflen;
160                         sqe->buf_index = io_u->index;
161                 } else {
162                         if (io_u->ddir == DDIR_READ)
163                                 sqe->opcode = IORING_OP_READV;
164                         else
165                                 sqe->opcode = IORING_OP_WRITEV;
166                         sqe->addr = &ld->iovecs[io_u->index];
167                         sqe->len = 1;
168                 }
169                 sqe->off = io_u->offset;
170         } else if (ddir_sync(io_u->ddir)) {
171                 sqe->fsync_flags = 0;
172                 if (io_u->ddir == DDIR_DATASYNC)
173                         sqe->fsync_flags |= IORING_FSYNC_DATASYNC;
174                 sqe->opcode = IORING_OP_FSYNC;
175         }
176
177         sqe->user_data = (unsigned long) io_u;
178         return 0;
179 }
180
181 static struct io_u *fio_ioring_event(struct thread_data *td, int event)
182 {
183         struct ioring_data *ld = td->io_ops_data;
184         struct io_uring_cqe *cqe;
185         struct io_u *io_u;
186         unsigned index;
187
188         index = (event + ld->cq_ring_off) & ld->cq_ring_mask;
189
190         cqe = &ld->cq_ring.cqes[index];
191         io_u = (struct io_u *) cqe->user_data;
192
193         if (cqe->res != io_u->xfer_buflen) {
194                 if (cqe->res > io_u->xfer_buflen)
195                         io_u->error = -cqe->res;
196                 else
197                         io_u->resid = io_u->xfer_buflen - cqe->res;
198         } else
199                 io_u->error = 0;
200
201         if (io_u->ddir == DDIR_READ) {
202                 if (cqe->flags & IOCQE_FLAG_CACHEHIT)
203                         ld->cachehit++;
204                 else
205                         ld->cachemiss++;
206         }
207
208         return io_u;
209 }
210
211 static int fio_ioring_cqring_reap(struct thread_data *td, unsigned int events,
212                                    unsigned int max)
213 {
214         struct ioring_data *ld = td->io_ops_data;
215         struct io_cq_ring *ring = &ld->cq_ring;
216         unsigned head, reaped = 0;
217
218         head = *ring->head;
219         do {
220                 read_barrier();
221                 if (head == *ring->tail)
222                         break;
223                 reaped++;
224                 head++;
225         } while (reaped + events < max);
226
227         *ring->head = head;
228         write_barrier();
229         return reaped;
230 }
231
232 static int fio_ioring_getevents(struct thread_data *td, unsigned int min,
233                                 unsigned int max, const struct timespec *t)
234 {
235         struct ioring_data *ld = td->io_ops_data;
236         unsigned actual_min = td->o.iodepth_batch_complete_min == 0 ? 0 : min;
237         struct ioring_options *o = td->eo;
238         struct io_cq_ring *ring = &ld->cq_ring;
239         unsigned events = 0;
240         int r;
241
242         ld->cq_ring_off = *ring->head;
243         do {
244                 r = fio_ioring_cqring_reap(td, events, max);
245                 if (r) {
246                         events += r;
247                         continue;
248                 }
249
250                 if (!o->sqpoll_thread) {
251                         r = io_uring_enter(ld, 0, actual_min,
252                                                 IORING_ENTER_GETEVENTS);
253                         if (r < 0) {
254                                 if (errno == EAGAIN)
255                                         continue;
256                                 td_verror(td, errno, "io_uring_enter");
257                                 break;
258                         }
259                 }
260         } while (events < min);
261
262         return r < 0 ? r : events;
263 }
264
265 static enum fio_q_status fio_ioring_queue(struct thread_data *td,
266                                           struct io_u *io_u)
267 {
268         struct ioring_data *ld = td->io_ops_data;
269         struct io_sq_ring *ring = &ld->sq_ring;
270         unsigned tail, next_tail;
271
272         fio_ro_check(td, io_u);
273
274         if (ld->queued == ld->iodepth)
275                 return FIO_Q_BUSY;
276
277         if (io_u->ddir == DDIR_TRIM) {
278                 if (ld->queued)
279                         return FIO_Q_BUSY;
280
281                 do_io_u_trim(td, io_u);
282                 io_u_mark_submit(td, 1);
283                 io_u_mark_complete(td, 1);
284                 return FIO_Q_COMPLETED;
285         }
286
287         tail = *ring->tail;
288         next_tail = tail + 1;
289         read_barrier();
290         if (next_tail == *ring->head)
291                 return FIO_Q_BUSY;
292
293         ring->array[tail & ld->sq_ring_mask] = io_u->index;
294         *ring->tail = next_tail;
295         write_barrier();
296
297         ld->queued++;
298         return FIO_Q_QUEUED;
299 }
300
301 static void fio_ioring_queued(struct thread_data *td, int start, int nr)
302 {
303         struct ioring_data *ld = td->io_ops_data;
304         struct timespec now;
305
306         if (!fio_fill_issue_time(td))
307                 return;
308
309         fio_gettime(&now, NULL);
310
311         while (nr--) {
312                 struct io_sq_ring *ring = &ld->sq_ring;
313                 int index = ring->array[start & ld->sq_ring_mask];
314                 struct io_u *io_u = ld->io_u_index[index];
315
316                 memcpy(&io_u->issue_time, &now, sizeof(now));
317                 io_u_queued(td, io_u);
318
319                 start++;
320         }
321 }
322
323 static int fio_ioring_commit(struct thread_data *td)
324 {
325         struct ioring_data *ld = td->io_ops_data;
326         struct ioring_options *o = td->eo;
327         int ret;
328
329         if (!ld->queued)
330                 return 0;
331
332         /*
333          * Kernel side does submission. just need to check if the ring is
334          * flagged as needing a kick, if so, call io_uring_enter(). This
335          * only happens if we've been idle too long.
336          */
337         if (o->sqpoll_thread) {
338                 struct io_sq_ring *ring = &ld->sq_ring;
339
340                 read_barrier();
341                 if (*ring->flags & IORING_SQ_NEED_WAKEUP)
342                         io_uring_enter(ld, ld->queued, 0, 0);
343                 ld->queued = 0;
344                 return 0;
345         }
346
347         do {
348                 unsigned start = *ld->sq_ring.head;
349                 long nr = ld->queued;
350
351                 ret = io_uring_enter(ld, nr, 0, IORING_ENTER_GETEVENTS);
352                 if (ret > 0) {
353                         fio_ioring_queued(td, start, ret);
354                         io_u_mark_submit(td, ret);
355
356                         ld->queued -= ret;
357                         ret = 0;
358                 } else if (!ret) {
359                         io_u_mark_submit(td, ret);
360                         continue;
361                 } else {
362                         if (errno == EAGAIN) {
363                                 ret = fio_ioring_cqring_reap(td, 0, ld->queued);
364                                 if (ret)
365                                         continue;
366                                 /* Shouldn't happen */
367                                 usleep(1);
368                                 continue;
369                         }
370                         td_verror(td, errno, "io_uring_enter submit");
371                         break;
372                 }
373         } while (ld->queued);
374
375         return ret;
376 }
377
378 static void fio_ioring_unmap(struct ioring_data *ld)
379 {
380         int i;
381
382         for (i = 0; i < ARRAY_SIZE(ld->mmap); i++)
383                 munmap(ld->mmap[i].ptr, ld->mmap[i].len);
384         close(ld->ring_fd);
385 }
386
387 static void fio_ioring_cleanup(struct thread_data *td)
388 {
389         struct ioring_data *ld = td->io_ops_data;
390
391         if (ld) {
392                 td->ts.cachehit += ld->cachehit;
393                 td->ts.cachemiss += ld->cachemiss;
394
395                 if (!(td->flags & TD_F_CHILD))
396                         fio_ioring_unmap(ld);
397
398                 free(ld->io_u_index);
399                 free(ld->io_us);
400                 free(ld->iovecs);
401                 free(ld);
402         }
403 }
404
405 static int fio_ioring_mmap(struct ioring_data *ld, struct io_uring_params *p)
406 {
407         struct io_sq_ring *sring = &ld->sq_ring;
408         struct io_cq_ring *cring = &ld->cq_ring;
409         void *ptr;
410
411         ld->mmap[0].len = p->sq_off.array + p->sq_entries * sizeof(__u32);
412         ptr = mmap(0, ld->mmap[0].len, PROT_READ | PROT_WRITE,
413                         MAP_SHARED | MAP_POPULATE, ld->ring_fd,
414                         IORING_OFF_SQ_RING);
415         ld->mmap[0].ptr = ptr;
416         sring->head = ptr + p->sq_off.head;
417         sring->tail = ptr + p->sq_off.tail;
418         sring->ring_mask = ptr + p->sq_off.ring_mask;
419         sring->ring_entries = ptr + p->sq_off.ring_entries;
420         sring->flags = ptr + p->sq_off.flags;
421         sring->array = ptr + p->sq_off.array;
422         ld->sq_ring_mask = *sring->ring_mask;
423
424         ld->mmap[1].len = p->sq_entries * sizeof(struct io_uring_sqe);
425         ld->sqes = mmap(0, ld->mmap[1].len, PROT_READ | PROT_WRITE,
426                                 MAP_SHARED | MAP_POPULATE, ld->ring_fd,
427                                 IORING_OFF_SQES);
428         ld->mmap[1].ptr = ld->sqes;
429
430         ld->mmap[2].len = p->cq_off.cqes +
431                                 p->cq_entries * sizeof(struct io_uring_cqe);
432         ptr = mmap(0, ld->mmap[2].len, PROT_READ | PROT_WRITE,
433                         MAP_SHARED | MAP_POPULATE, ld->ring_fd,
434                         IORING_OFF_CQ_RING);
435         ld->mmap[2].ptr = ptr;
436         cring->head = ptr + p->cq_off.head;
437         cring->tail = ptr + p->cq_off.tail;
438         cring->ring_mask = ptr + p->cq_off.ring_mask;
439         cring->ring_entries = ptr + p->cq_off.ring_entries;
440         cring->cqes = ptr + p->cq_off.cqes;
441         ld->cq_ring_mask = *cring->ring_mask;
442         return 0;
443 }
444
445 static int fio_ioring_queue_init(struct thread_data *td)
446 {
447         struct ioring_data *ld = td->io_ops_data;
448         struct ioring_options *o = td->eo;
449         int depth = td->o.iodepth;
450         struct io_uring_params p;
451         int ret;
452
453         memset(&p, 0, sizeof(p));
454
455         if (o->hipri)
456                 p.flags |= IORING_SETUP_IOPOLL;
457         if (o->sqpoll_thread) {
458                 p.flags |= IORING_SETUP_SQPOLL;
459                 if (o->sqpoll_set) {
460                         p.flags |= IORING_SETUP_SQ_AFF;
461                         p.sq_thread_cpu = o->sqpoll_cpu;
462                 }
463         }
464
465         if (o->fixedbufs) {
466                 struct rlimit rlim = {
467                         .rlim_cur = RLIM_INFINITY,
468                         .rlim_max = RLIM_INFINITY,
469                 };
470
471                 setrlimit(RLIMIT_MEMLOCK, &rlim);
472         }
473
474         ret = syscall(__NR_sys_io_uring_setup, depth, &p);
475         if (ret < 0)
476                 return ret;
477
478         ld->ring_fd = ret;
479
480         if (o->fixedbufs) {
481                 struct io_uring_register_buffers reg = {
482                         .iovecs = ld->iovecs,
483                         .nr_iovecs = depth
484                 };
485
486                 ret = syscall(__NR_sys_io_uring_register, ld->ring_fd,
487                                 IORING_REGISTER_BUFFERS, &reg);
488                 if (ret < 0)
489                         return ret;
490         }
491
492         return fio_ioring_mmap(ld, &p);
493 }
494
495 static int fio_ioring_post_init(struct thread_data *td)
496 {
497         struct ioring_data *ld = td->io_ops_data;
498         struct io_u *io_u;
499         int err, i;
500
501         for (i = 0; i < td->o.iodepth; i++) {
502                 struct iovec *iov = &ld->iovecs[i];
503
504                 io_u = ld->io_u_index[i];
505                 iov->iov_base = io_u->buf;
506                 iov->iov_len = td_max_bs(td);
507         }
508
509         err = fio_ioring_queue_init(td);
510         if (err) {
511                 td_verror(td, errno, "io_queue_init");
512                 return 1;
513         }
514
515         return 0;
516 }
517
518 static unsigned roundup_pow2(unsigned depth)
519 {
520         return 1UL << __fls(depth - 1);
521 }
522
523 static int fio_ioring_init(struct thread_data *td)
524 {
525         struct ioring_data *ld;
526
527         ld = calloc(1, sizeof(*ld));
528
529         /* ring depth must be a power-of-2 */
530         ld->iodepth = td->o.iodepth;
531         td->o.iodepth = roundup_pow2(td->o.iodepth);
532
533         /* io_u index */
534         ld->io_u_index = calloc(td->o.iodepth, sizeof(struct io_u *));
535         ld->io_us = calloc(td->o.iodepth, sizeof(struct io_u *));
536         ld->iovecs = calloc(td->o.iodepth, sizeof(struct iovec));
537
538         td->io_ops_data = ld;
539         return 0;
540 }
541
542 static int fio_ioring_io_u_init(struct thread_data *td, struct io_u *io_u)
543 {
544         struct ioring_data *ld = td->io_ops_data;
545
546         ld->io_u_index[io_u->index] = io_u;
547         return 0;
548 }
549
550 static struct ioengine_ops ioengine = {
551         .name                   = "io_uring",
552         .version                = FIO_IOOPS_VERSION,
553         .init                   = fio_ioring_init,
554         .post_init              = fio_ioring_post_init,
555         .io_u_init              = fio_ioring_io_u_init,
556         .prep                   = fio_ioring_prep,
557         .queue                  = fio_ioring_queue,
558         .commit                 = fio_ioring_commit,
559         .getevents              = fio_ioring_getevents,
560         .event                  = fio_ioring_event,
561         .cleanup                = fio_ioring_cleanup,
562         .open_file              = generic_open_file,
563         .close_file             = generic_close_file,
564         .get_file_size          = generic_get_file_size,
565         .options                = options,
566         .option_struct_size     = sizeof(struct ioring_options),
567 };
568
569 static void fio_init fio_ioring_register(void)
570 {
571         register_ioengine(&ioengine);
572 }
573
574 static void fio_exit fio_ioring_unregister(void)
575 {
576         unregister_ioengine(&ioengine);
577 }
578 #endif