5a9add7937529b98a8924faf6d8c200edf438f73
[fio.git] / engines / aioring.c
1 #ifdef ARCH_HAVE_AIORING
2 /*
3  * aioring engine
4  *
5  * IO engine using the new native Linux libaio ring interface
6  *
7  */
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <errno.h>
11 #include <libaio.h>
12 #include <sys/time.h>
13 #include <sys/resource.h>
14
15 #include "../fio.h"
16 #include "../lib/pow2.h"
17 #include "../optgroup.h"
18 #include "../lib/memalign.h"
19
20 #ifndef IOCB_FLAG_HIPRI
21 #define IOCB_FLAG_HIPRI (1 << 2)
22 #endif
23
24 /*
25  * io_setup2(2) flags
26  */
27 #ifndef IOCTX_FLAG_IOPOLL
28 #define IOCTX_FLAG_IOPOLL       (1 << 0)
29 #endif
30 #ifndef IOCTX_FLAG_SCQRING
31 #define IOCTX_FLAG_SCQRING      (1 << 1)
32 #endif
33 #ifndef IOCTX_FLAG_FIXEDBUFS
34 #define IOCTX_FLAG_FIXEDBUFS    (1 << 2)
35 #endif
36 #ifndef IOCTX_FLAG_SQTHREAD
37 #define IOCTX_FLAG_SQTHREAD     (1 << 3)
38 #endif
39 #ifndef IOCTX_FLAG_SQWQ
40 #define IOCTX_FLAG_SQWQ         (1 << 4)
41 #endif
42
43 /*
44  * io_ring_enter(2) flags
45  */
46 #ifndef IORING_FLAG_SUBMIT
47 #define IORING_FLAG_SUBMIT      (1 << 0)
48 #endif
49 #ifndef IORING_FLAG_GETEVENTS
50 #define IORING_FLAG_GETEVENTS   (1 << 1)
51 #endif
52
53 typedef uint64_t u64;
54 typedef uint32_t u32;
55 typedef uint16_t u16;
56
57 struct aio_sq_ring {
58         union {
59                 struct {
60                         u32 head;
61                         u32 tail;
62                         u32 nr_events;
63                         u16 sq_thread_cpu;
64                         u64 iocbs;
65                 };
66                 u32 pad[16];
67         };
68         u32 array[0];
69 };
70
71 struct aio_cq_ring {
72         union {
73                 struct {
74                         u32 head;
75                         u32 tail;
76                         u32 nr_events;
77                 };
78                 struct io_event pad;
79         };
80         struct io_event events[0];
81 };
82
83 struct aioring_data {
84         io_context_t aio_ctx;
85         struct io_u **io_us;
86         struct io_u **io_u_index;
87
88         struct aio_sq_ring *sq_ring;
89         struct iocb *iocbs;
90
91         struct aio_cq_ring *cq_ring;
92         struct io_event *events;
93
94         int queued;
95         int cq_ring_off;
96 };
97
98 struct aioring_options {
99         void *pad;
100         unsigned int hipri;
101         unsigned int fixedbufs;
102 };
103
104 static struct fio_option options[] = {
105         {
106                 .name   = "hipri",
107                 .lname  = "High Priority",
108                 .type   = FIO_OPT_STR_SET,
109                 .off1   = offsetof(struct aioring_options, hipri),
110                 .help   = "Use polled IO completions",
111                 .category = FIO_OPT_C_ENGINE,
112                 .group  = FIO_OPT_G_LIBAIO,
113         },
114         {
115                 .name   = "fixedbufs",
116                 .lname  = "Fixed (pre-mapped) IO buffers",
117                 .type   = FIO_OPT_STR_SET,
118                 .off1   = offsetof(struct aioring_options, fixedbufs),
119                 .help   = "Pre map IO buffers",
120                 .category = FIO_OPT_C_ENGINE,
121                 .group  = FIO_OPT_G_LIBAIO,
122         },
123         {
124                 .name   = NULL,
125         },
126 };
127
128 static int fio_aioring_commit(struct thread_data *td);
129
130 static int io_ring_enter(io_context_t ctx, unsigned int to_submit,
131                          unsigned int min_complete, unsigned int flags)
132 {
133 #ifdef __NR_sys_io_ring_enter
134         return syscall(__NR_sys_io_ring_enter, ctx, to_submit, min_complete,
135                         flags);
136 #else
137         return -1;
138 #endif
139 }
140
141 static int fio_aioring_prep(struct thread_data *td, struct io_u *io_u)
142 {
143         struct aioring_data *ld = td->io_ops_data;
144         struct fio_file *f = io_u->file;
145         struct aioring_options *o = td->eo;
146         struct iocb *iocb;
147
148         iocb = &ld->iocbs[io_u->index];
149
150         if (io_u->ddir == DDIR_READ) {
151                 if (o->fixedbufs) {
152                         iocb->aio_fildes = f->fd;
153                         iocb->aio_lio_opcode = IO_CMD_PREAD;
154                         iocb->u.c.offset = io_u->offset;
155                 } else {
156                         io_prep_pread(iocb, f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
157                         if (o->hipri)
158                                 iocb->u.c.flags |= IOCB_FLAG_HIPRI;
159                 }
160         } else if (io_u->ddir == DDIR_WRITE) {
161                 if (o->fixedbufs) {
162                         iocb->aio_fildes = f->fd;
163                         iocb->aio_lio_opcode = IO_CMD_PWRITE;
164                         iocb->u.c.offset = io_u->offset;
165                 } else {
166                         io_prep_pwrite(iocb, f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
167                         if (o->hipri)
168                                 iocb->u.c.flags |= IOCB_FLAG_HIPRI;
169                 }
170         } else if (ddir_sync(io_u->ddir))
171                 io_prep_fsync(iocb, f->fd);
172
173         iocb->data = io_u;
174         return 0;
175 }
176
177 static struct io_u *fio_aioring_event(struct thread_data *td, int event)
178 {
179         struct aioring_data *ld = td->io_ops_data;
180         struct io_event *ev;
181         struct io_u *io_u;
182         int index;
183
184         index = event + ld->cq_ring_off;
185         if (index >= ld->cq_ring->nr_events)
186                 index -= ld->cq_ring->nr_events;
187
188         ev = &ld->cq_ring->events[index];
189         io_u = ev->data;
190
191         if (ev->res != io_u->xfer_buflen) {
192                 if (ev->res > io_u->xfer_buflen)
193                         io_u->error = -ev->res;
194                 else
195                         io_u->resid = io_u->xfer_buflen - ev->res;
196         } else
197                 io_u->error = 0;
198
199         return io_u;
200 }
201
202 static int fio_aioring_cqring_reap(struct thread_data *td, unsigned int events,
203                                    unsigned int max)
204 {
205         struct aioring_data *ld = td->io_ops_data;
206         struct aio_cq_ring *ring = ld->cq_ring;
207         u32 head, reaped = 0;
208
209         head = ring->head;
210         do {
211                 read_barrier();
212                 if (head == ring->tail)
213                         break;
214                 reaped++;
215                 head++;
216                 if (head == ring->nr_events)
217                         head = 0;
218         } while (reaped + events < max);
219
220         ring->head = head;
221         write_barrier();
222         return reaped;
223 }
224
225 static int fio_aioring_getevents(struct thread_data *td, unsigned int min,
226                                  unsigned int max, const struct timespec *t)
227 {
228         struct aioring_data *ld = td->io_ops_data;
229         unsigned actual_min = td->o.iodepth_batch_complete_min == 0 ? 0 : min;
230         struct aio_cq_ring *ring = ld->cq_ring;
231         int r, events = 0;
232
233         ld->cq_ring_off = ring->head;
234         do {
235                 r = fio_aioring_cqring_reap(td, events, max);
236                 if (r) {
237                         events += r;
238                         continue;
239                 }
240
241                 r = io_ring_enter(ld->aio_ctx, 0, actual_min,
242                                         IORING_FLAG_GETEVENTS);
243                 if (r < 0) {
244                         if (errno == EAGAIN)
245                                 continue;
246                         perror("ring enter");
247                         break;
248                 }
249         } while (events < min);
250
251         return r < 0 ? r : events;
252 }
253
254 static enum fio_q_status fio_aioring_queue(struct thread_data *td,
255                                            struct io_u *io_u)
256 {
257         struct aioring_data *ld = td->io_ops_data;
258         struct aio_sq_ring *ring = ld->sq_ring;
259         unsigned tail, next_tail;
260
261         fio_ro_check(td, io_u);
262
263         if (ld->queued == td->o.iodepth)
264                 return FIO_Q_BUSY;
265
266         /*
267          * fsync is tricky, since it can fail and we need to do it
268          * serialized with other io. the reason is that linux doesn't
269          * support aio fsync yet. So return busy for the case where we
270          * have pending io, to let fio complete those first.
271          */
272         if (ddir_sync(io_u->ddir)) {
273                 if (ld->queued)
274                         return FIO_Q_BUSY;
275
276                 do_io_u_sync(td, io_u);
277                 return FIO_Q_COMPLETED;
278         }
279
280         if (io_u->ddir == DDIR_TRIM) {
281                 if (ld->queued)
282                         return FIO_Q_BUSY;
283
284                 do_io_u_trim(td, io_u);
285                 io_u_mark_submit(td, 1);
286                 io_u_mark_complete(td, 1);
287                 return FIO_Q_COMPLETED;
288         }
289
290         tail = ring->tail;
291         next_tail = tail + 1;
292         if (next_tail == ring->nr_events)
293                 next_tail = 0;
294         read_barrier();
295         if (next_tail == ring->head)
296                 return FIO_Q_BUSY;
297
298         ring->array[tail] = io_u->index;
299         ring->tail = next_tail;
300         write_barrier();
301
302         ld->queued++;
303         return FIO_Q_QUEUED;
304 }
305
306 static void fio_aioring_queued(struct thread_data *td, int start, int nr)
307 {
308         struct aioring_data *ld = td->io_ops_data;
309         struct timespec now;
310
311         if (!fio_fill_issue_time(td))
312                 return;
313
314         fio_gettime(&now, NULL);
315
316         while (nr--) {
317                 int index = ld->sq_ring->array[start];
318                 struct io_u *io_u = io_u = ld->io_u_index[index];
319
320                 memcpy(&io_u->issue_time, &now, sizeof(now));
321                 io_u_queued(td, io_u);
322
323                 start++;
324                 if (start == ld->sq_ring->nr_events)
325                         start = 0;
326         }
327 }
328
329 static int fio_aioring_commit(struct thread_data *td)
330 {
331         struct aioring_data *ld = td->io_ops_data;
332         int ret;
333
334         if (!ld->queued)
335                 return 0;
336
337         do {
338                 int start = ld->sq_ring->head;
339                 long nr = ld->queued;
340
341                 ret = io_ring_enter(ld->aio_ctx, nr, 0, IORING_FLAG_SUBMIT |
342                                                 IORING_FLAG_GETEVENTS);
343                 if (ret == -1)
344                         perror("io_ring_enter");
345                 if (ret > 0) {
346                         fio_aioring_queued(td, start, ret);
347                         io_u_mark_submit(td, ret);
348
349                         ld->queued -= ret;
350                         ret = 0;
351                 } else if (ret == -EINTR || !ret) {
352                         if (!ret)
353                                 io_u_mark_submit(td, ret);
354                         continue;
355                 } else if (ret == -EAGAIN) {
356                         /*
357                          * If we get EAGAIN, we should break out without
358                          * error and let the upper layer reap some
359                          * events for us. If we have no queued IO, we
360                          * must loop here. If we loop for more than 30s,
361                          * just error out, something must be buggy in the
362                          * IO path.
363                          */
364                         if (ld->queued) {
365                                 ret = 0;
366                                 break;
367                         }
368                         usleep(1);
369                         continue;
370                 } else if (ret == -ENOMEM) {
371                         /*
372                          * If we get -ENOMEM, reap events if we can. If
373                          * we cannot, treat it as a fatal event since there's
374                          * nothing we can do about it.
375                          */
376                         if (ld->queued)
377                                 ret = 0;
378                         break;
379                 } else
380                         break;
381         } while (ld->queued);
382
383         return ret;
384 }
385
386 static size_t aioring_cq_size(struct thread_data *td)
387 {
388         return sizeof(struct aio_cq_ring) + 2 * td->o.iodepth * sizeof(struct io_event);
389 }
390
391 static size_t aioring_sq_iocb(struct thread_data *td)
392 {
393         return sizeof(struct iocb) * td->o.iodepth;
394 }
395
396 static size_t aioring_sq_size(struct thread_data *td)
397 {
398         return sizeof(struct aio_sq_ring) + td->o.iodepth * sizeof(u32);
399 }
400
401 static void fio_aioring_cleanup(struct thread_data *td)
402 {
403         struct aioring_data *ld = td->io_ops_data;
404
405         if (ld) {
406                 /*
407                  * Work-around to avoid huge RCU stalls at exit time. If we
408                  * don't do this here, then it'll be torn down by exit_aio().
409                  * But for that case we can parallellize the freeing, thus
410                  * speeding it up a lot.
411                  */
412                 if (!(td->flags & TD_F_CHILD))
413                         io_destroy(ld->aio_ctx);
414                 free(ld->io_u_index);
415                 free(ld->io_us);
416                 fio_memfree(ld->sq_ring, aioring_sq_size(td), false);
417                 fio_memfree(ld->iocbs, aioring_sq_iocb(td), false);
418                 fio_memfree(ld->cq_ring, aioring_cq_size(td), false);
419                 free(ld);
420         }
421 }
422
423 static int fio_aioring_queue_init(struct thread_data *td)
424 {
425 #ifdef __NR_sys_io_setup2
426         struct aioring_data *ld = td->io_ops_data;
427         struct aioring_options *o = td->eo;
428         int flags = IOCTX_FLAG_SCQRING;
429         int depth = td->o.iodepth;
430
431         if (o->hipri)
432                 flags |= IOCTX_FLAG_IOPOLL;
433         if (o->fixedbufs) {
434                 struct rlimit rlim = {
435                         .rlim_cur = RLIM_INFINITY,
436                         .rlim_max = RLIM_INFINITY,
437                 };
438
439                 setrlimit(RLIMIT_MEMLOCK, &rlim);
440                 flags |= IOCTX_FLAG_FIXEDBUFS;
441         }
442
443         return syscall(__NR_sys_io_setup2, depth, flags,
444                         ld->sq_ring, ld->cq_ring, &ld->aio_ctx);
445 #else
446         return -1;
447 #endif
448 }
449
450 static int fio_aioring_post_init(struct thread_data *td)
451 {
452         struct aioring_data *ld = td->io_ops_data;
453         struct aioring_options *o = td->eo;
454         struct io_u *io_u;
455         struct iocb *iocb;
456         int err = 0;
457
458         if (o->fixedbufs) {
459                 int i;
460
461                 for (i = 0; i < td->o.iodepth; i++) {
462                         io_u = ld->io_u_index[i];
463                         iocb = &ld->iocbs[i];
464                         iocb->u.c.buf = io_u->buf;
465                         iocb->u.c.nbytes = td_max_bs(td);
466
467                         if (o->hipri)
468                                 iocb->u.c.flags |= IOCB_FLAG_HIPRI;
469                 }
470         }
471
472         err = fio_aioring_queue_init(td);
473         if (err) {
474                 td_verror(td, -err, "io_queue_init");
475                 return 1;
476         }
477
478         return 0;
479 }
480
481 static int fio_aioring_init(struct thread_data *td)
482 {
483         struct aioring_data *ld;
484
485         if (td->o.iodepth <= 1) {
486                 printf("aio-ring: needs a minimum QD of 2\n");
487                 return 1;
488         }
489
490         ld = calloc(1, sizeof(*ld));
491
492         /* io_u index */
493         ld->io_u_index = calloc(td->o.iodepth, sizeof(struct io_u *));
494         ld->io_us = calloc(td->o.iodepth, sizeof(struct io_u *));
495
496         ld->iocbs = fio_memalign(page_size, aioring_sq_iocb(td), false);
497         memset(ld->iocbs, 0, aioring_sq_iocb(td));
498
499         ld->sq_ring = fio_memalign(page_size, aioring_sq_size(td), false);
500         memset(ld->sq_ring, 0, aioring_sq_size(td));
501         ld->sq_ring->nr_events = td->o.iodepth;
502         ld->sq_ring->iocbs = (u64) (uintptr_t) ld->iocbs;
503
504         ld->cq_ring = fio_memalign(page_size, aioring_cq_size(td), false);
505         memset(ld->cq_ring, 0, aioring_cq_size(td));
506         ld->cq_ring->nr_events = td->o.iodepth * 2;
507
508         td->io_ops_data = ld;
509         return 0;
510 }
511
512 static int fio_aioring_io_u_init(struct thread_data *td, struct io_u *io_u)
513 {
514         struct aioring_data *ld = td->io_ops_data;
515
516         ld->io_u_index[io_u->index] = io_u;
517         return 0;
518 }
519
520 static struct ioengine_ops ioengine = {
521         .name                   = "aio-ring",
522         .version                = FIO_IOOPS_VERSION,
523         .init                   = fio_aioring_init,
524         .post_init              = fio_aioring_post_init,
525         .io_u_init              = fio_aioring_io_u_init,
526         .prep                   = fio_aioring_prep,
527         .queue                  = fio_aioring_queue,
528         .commit                 = fio_aioring_commit,
529         .getevents              = fio_aioring_getevents,
530         .event                  = fio_aioring_event,
531         .cleanup                = fio_aioring_cleanup,
532         .open_file              = generic_open_file,
533         .close_file             = generic_close_file,
534         .get_file_size          = generic_get_file_size,
535         .options                = options,
536         .option_struct_size     = sizeof(struct aioring_options),
537 };
538
539 static void fio_init fio_aioring_register(void)
540 {
541         register_ioengine(&ioengine);
542 }
543
544 static void fio_exit fio_aioring_unregister(void)
545 {
546         unregister_ioengine(&ioengine);
547 }
548 #endif