engines/aioring: various updates and fixes
[fio.git] / engines / aioring.c
1 /*
2  * aioring engine
3  *
4  * IO engine using the new native Linux libaio ring interface. See:
5  *
6  * http://git.kernel.dk/cgit/linux-block/log/?h=aio-poll
7  *
8  */
9 #include <stdlib.h>
10 #include <unistd.h>
11 #include <errno.h>
12 #include <libaio.h>
13 #include <sys/time.h>
14 #include <sys/resource.h>
15
16 #include "../fio.h"
17 #include "../lib/pow2.h"
18 #include "../optgroup.h"
19 #include "../lib/memalign.h"
20
21 #ifdef ARCH_HAVE_AIORING
22
23 #ifndef IOCB_FLAG_HIPRI
24 #define IOCB_FLAG_HIPRI (1 << 2)
25 #endif
26
27 /*
28  * io_setup2(2) flags
29  */
30 #ifndef IOCTX_FLAG_IOPOLL
31 #define IOCTX_FLAG_IOPOLL       (1 << 0)
32 #endif
33 #ifndef IOCTX_FLAG_SCQRING
34 #define IOCTX_FLAG_SCQRING      (1 << 1)
35 #endif
36 #ifndef IOCTX_FLAG_FIXEDBUFS
37 #define IOCTX_FLAG_FIXEDBUFS    (1 << 2)
38 #endif
39 #ifndef IOCTX_FLAG_SQTHREAD
40 #define IOCTX_FLAG_SQTHREAD     (1 << 3)
41 #endif
42 #ifndef IOCTX_FLAG_SQWQ
43 #define IOCTX_FLAG_SQWQ         (1 << 4)
44 #endif
45
46 /*
47  * io_ring_enter(2) flags
48  */
49 #ifndef IORING_FLAG_SUBMIT
50 #define IORING_FLAG_SUBMIT      (1 << 0)
51 #endif
52 #ifndef IORING_FLAG_GETEVENTS
53 #define IORING_FLAG_GETEVENTS   (1 << 1)
54 #endif
55
56 typedef uint64_t u64;
57 typedef uint32_t u32;
58 typedef uint16_t u16;
59
60 struct aio_sq_ring {
61         union {
62                 struct {
63                         u32 head;
64                         u32 tail;
65                         u32 nr_events;
66                         u16 sq_thread_cpu;
67                         u64 iocbs;
68                 };
69                 u32 pad[16];
70         };
71         u32 array[0];
72 };
73
74 struct aio_cq_ring {
75         union {
76                 struct {
77                         u32 head;
78                         u32 tail;
79                         u32 nr_events;
80                 };
81                 struct io_event pad;
82         };
83         struct io_event events[0];
84 };
85
86 struct aioring_data {
87         io_context_t aio_ctx;
88         struct io_u **io_us;
89         struct io_u **io_u_index;
90
91         struct aio_sq_ring *sq_ring;
92         struct iocb *iocbs;
93
94         struct aio_cq_ring *cq_ring;
95         struct io_event *events;
96
97         int queued;
98         int cq_ring_off;
99 };
100
101 struct aioring_options {
102         void *pad;
103         unsigned int hipri;
104         unsigned int fixedbufs;
105         unsigned int sqthread;
106         unsigned int sqthread_set;
107         unsigned int sqwq;
108 };
109
110 static int fio_aioring_sqthread_cb(void *data,
111                                    unsigned long long *val)
112 {
113         struct aioring_options *o = data;
114
115         o->sqthread = *val;
116         o->sqthread_set = 1;
117         return 0;
118 }
119
120 static struct fio_option options[] = {
121         {
122                 .name   = "hipri",
123                 .lname  = "High Priority",
124                 .type   = FIO_OPT_STR_SET,
125                 .off1   = offsetof(struct aioring_options, hipri),
126                 .help   = "Use polled IO completions",
127                 .category = FIO_OPT_C_ENGINE,
128                 .group  = FIO_OPT_G_LIBAIO,
129         },
130         {
131                 .name   = "fixedbufs",
132                 .lname  = "Fixed (pre-mapped) IO buffers",
133                 .type   = FIO_OPT_STR_SET,
134                 .off1   = offsetof(struct aioring_options, fixedbufs),
135                 .help   = "Pre map IO buffers",
136                 .category = FIO_OPT_C_ENGINE,
137                 .group  = FIO_OPT_G_LIBAIO,
138         },
139         {
140                 .name   = "sqthread",
141                 .lname  = "Use kernel SQ thread on this CPU",
142                 .type   = FIO_OPT_INT,
143                 .cb     = fio_aioring_sqthread_cb,
144                 .help   = "Offload submission to kernel thread",
145                 .category = FIO_OPT_C_ENGINE,
146                 .group  = FIO_OPT_G_LIBAIO,
147         },
148         {
149                 .name   = "sqwq",
150                 .lname  = "Offload submission to kernel workqueue",
151                 .type   = FIO_OPT_STR_SET,
152                 .off1   = offsetof(struct aioring_options, sqwq),
153                 .help   = "Offload submission to kernel workqueue",
154                 .category = FIO_OPT_C_ENGINE,
155                 .group  = FIO_OPT_G_LIBAIO,
156         },
157         {
158                 .name   = NULL,
159         },
160 };
161
162 static int fio_aioring_commit(struct thread_data *td);
163
164 static int io_ring_enter(io_context_t ctx, unsigned int to_submit,
165                          unsigned int min_complete, unsigned int flags)
166 {
167         return syscall(__NR_sys_io_ring_enter, ctx, to_submit, min_complete,
168                         flags);
169 }
170
171 static int fio_aioring_prep(struct thread_data *td, struct io_u *io_u)
172 {
173         struct aioring_data *ld = td->io_ops_data;
174         struct fio_file *f = io_u->file;
175         struct aioring_options *o = td->eo;
176         struct iocb *iocb;
177
178         iocb = &ld->iocbs[io_u->index];
179
180         if (io_u->ddir == DDIR_READ) {
181                 if (o->fixedbufs) {
182                         iocb->aio_fildes = f->fd;
183                         iocb->aio_lio_opcode = IO_CMD_PREAD;
184                         iocb->u.c.offset = io_u->offset;
185                 } else {
186                         io_prep_pread(iocb, f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
187                         if (o->hipri)
188                                 iocb->u.c.flags |= IOCB_FLAG_HIPRI;
189                 }
190         } else if (io_u->ddir == DDIR_WRITE) {
191                 if (o->fixedbufs) {
192                         iocb->aio_fildes = f->fd;
193                         iocb->aio_lio_opcode = IO_CMD_PWRITE;
194                         iocb->u.c.offset = io_u->offset;
195                 } else {
196                         io_prep_pwrite(iocb, f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
197                         if (o->hipri)
198                                 iocb->u.c.flags |= IOCB_FLAG_HIPRI;
199                 }
200         } else if (ddir_sync(io_u->ddir))
201                 io_prep_fsync(iocb, f->fd);
202
203         iocb->data = io_u;
204         return 0;
205 }
206
207 static struct io_u *fio_aioring_event(struct thread_data *td, int event)
208 {
209         struct aioring_data *ld = td->io_ops_data;
210         struct io_event *ev;
211         struct io_u *io_u;
212         int index;
213
214         index = event + ld->cq_ring_off;
215         if (index >= ld->cq_ring->nr_events)
216                 index -= ld->cq_ring->nr_events;
217
218         ev = &ld->cq_ring->events[index];
219         io_u = ev->data;
220
221         if (ev->res != io_u->xfer_buflen) {
222                 if (ev->res > io_u->xfer_buflen)
223                         io_u->error = -ev->res;
224                 else
225                         io_u->resid = io_u->xfer_buflen - ev->res;
226         } else
227                 io_u->error = 0;
228
229         return io_u;
230 }
231
232 static int fio_aioring_cqring_reap(struct thread_data *td, unsigned int events,
233                                    unsigned int max)
234 {
235         struct aioring_data *ld = td->io_ops_data;
236         struct aio_cq_ring *ring = ld->cq_ring;
237         u32 head, reaped = 0;
238
239         head = ring->head;
240         do {
241                 read_barrier();
242                 if (head == ring->tail)
243                         break;
244                 reaped++;
245                 head++;
246                 if (head == ring->nr_events)
247                         head = 0;
248         } while (reaped + events < max);
249
250         ring->head = head;
251         write_barrier();
252         return reaped;
253 }
254
255 static int fio_aioring_getevents(struct thread_data *td, unsigned int min,
256                                  unsigned int max, const struct timespec *t)
257 {
258         struct aioring_data *ld = td->io_ops_data;
259         unsigned actual_min = td->o.iodepth_batch_complete_min == 0 ? 0 : min;
260         struct aio_cq_ring *ring = ld->cq_ring;
261         int r, events = 0;
262
263         ld->cq_ring_off = ring->head;
264         do {
265                 r = fio_aioring_cqring_reap(td, events, max);
266                 if (r) {
267                         events += r;
268                         continue;
269                 }
270
271                 r = io_ring_enter(ld->aio_ctx, 0, actual_min,
272                                         IORING_FLAG_GETEVENTS);
273                 if (r < 0) {
274                         if (errno == EAGAIN)
275                                 continue;
276                         td_verror(td, errno, "io_ring_enter get");
277                         break;
278                 }
279         } while (events < min);
280
281         return r < 0 ? r : events;
282 }
283
284 static enum fio_q_status fio_aioring_queue(struct thread_data *td,
285                                            struct io_u *io_u)
286 {
287         struct aioring_data *ld = td->io_ops_data;
288         struct aio_sq_ring *ring = ld->sq_ring;
289         unsigned tail, next_tail;
290
291         fio_ro_check(td, io_u);
292
293         if (ld->queued == td->o.iodepth)
294                 return FIO_Q_BUSY;
295
296         if (io_u->ddir == DDIR_TRIM) {
297                 if (ld->queued)
298                         return FIO_Q_BUSY;
299
300                 do_io_u_trim(td, io_u);
301                 io_u_mark_submit(td, 1);
302                 io_u_mark_complete(td, 1);
303                 return FIO_Q_COMPLETED;
304         }
305
306         tail = ring->tail;
307         next_tail = tail + 1;
308         if (next_tail == ring->nr_events)
309                 next_tail = 0;
310         read_barrier();
311         if (next_tail == ring->head)
312                 return FIO_Q_BUSY;
313
314         ring->array[tail] = io_u->index;
315         ring->tail = next_tail;
316         write_barrier();
317
318         ld->queued++;
319         return FIO_Q_QUEUED;
320 }
321
322 static void fio_aioring_queued(struct thread_data *td, int start, int nr)
323 {
324         struct aioring_data *ld = td->io_ops_data;
325         struct timespec now;
326
327         if (!fio_fill_issue_time(td))
328                 return;
329
330         fio_gettime(&now, NULL);
331
332         while (nr--) {
333                 int index = ld->sq_ring->array[start];
334                 struct io_u *io_u = io_u = ld->io_u_index[index];
335
336                 memcpy(&io_u->issue_time, &now, sizeof(now));
337                 io_u_queued(td, io_u);
338
339                 start++;
340                 if (start == ld->sq_ring->nr_events)
341                         start = 0;
342         }
343 }
344
345 static int fio_aioring_commit(struct thread_data *td)
346 {
347         struct aioring_data *ld = td->io_ops_data;
348         int ret;
349
350         if (!ld->queued)
351                 return 0;
352
353         do {
354                 int start = ld->sq_ring->head;
355                 long nr = ld->queued;
356
357                 ret = io_ring_enter(ld->aio_ctx, nr, 0, IORING_FLAG_SUBMIT |
358                                                 IORING_FLAG_GETEVENTS);
359                 if (ret > 0) {
360                         fio_aioring_queued(td, start, ret);
361                         io_u_mark_submit(td, ret);
362
363                         ld->queued -= ret;
364                         ret = 0;
365                 } else if (!ret) {
366                         io_u_mark_submit(td, ret);
367                         continue;
368                 } else {
369                         if (errno == EAGAIN) {
370                                 ret = fio_aioring_cqring_reap(td, 0, ld->queued);
371                                 if (ret)
372                                         continue;
373                                 /* Shouldn't happen */
374                                 usleep(1);
375                                 continue;
376                         }
377                         td_verror(td, errno, "io_ring_enter sumit");
378                         break;
379                 }
380         } while (ld->queued);
381
382         return ret;
383 }
384
385 static size_t aioring_cq_size(struct thread_data *td)
386 {
387         return sizeof(struct aio_cq_ring) + 2 * td->o.iodepth * sizeof(struct io_event);
388 }
389
390 static size_t aioring_sq_iocb(struct thread_data *td)
391 {
392         return sizeof(struct iocb) * td->o.iodepth;
393 }
394
395 static size_t aioring_sq_size(struct thread_data *td)
396 {
397         return sizeof(struct aio_sq_ring) + td->o.iodepth * sizeof(u32);
398 }
399
400 static void fio_aioring_cleanup(struct thread_data *td)
401 {
402         struct aioring_data *ld = td->io_ops_data;
403
404         if (ld) {
405                 /* Bump depth to match init depth */
406                 td->o.iodepth++;
407
408                 /*
409                  * Work-around to avoid huge RCU stalls at exit time. If we
410                  * don't do this here, then it'll be torn down by exit_aio().
411                  * But for that case we can parallellize the freeing, thus
412                  * speeding it up a lot.
413                  */
414                 if (!(td->flags & TD_F_CHILD))
415                         io_destroy(ld->aio_ctx);
416                 free(ld->io_u_index);
417                 free(ld->io_us);
418                 fio_memfree(ld->sq_ring, aioring_sq_size(td), false);
419                 fio_memfree(ld->iocbs, aioring_sq_iocb(td), false);
420                 fio_memfree(ld->cq_ring, aioring_cq_size(td), false);
421                 free(ld);
422         }
423 }
424
425 static int fio_aioring_queue_init(struct thread_data *td)
426 {
427         struct aioring_data *ld = td->io_ops_data;
428         struct aioring_options *o = td->eo;
429         int flags = IOCTX_FLAG_SCQRING;
430         int depth = td->o.iodepth;
431
432         if (o->hipri)
433                 flags |= IOCTX_FLAG_IOPOLL;
434         if (o->sqthread_set) {
435                 ld->sq_ring->sq_thread_cpu = o->sqthread;
436                 flags |= IOCTX_FLAG_SQTHREAD;
437         } else if (o->sqwq)
438                 flags |= IOCTX_FLAG_SQWQ;
439
440         if (o->fixedbufs) {
441                 struct rlimit rlim = {
442                         .rlim_cur = RLIM_INFINITY,
443                         .rlim_max = RLIM_INFINITY,
444                 };
445
446                 setrlimit(RLIMIT_MEMLOCK, &rlim);
447                 flags |= IOCTX_FLAG_FIXEDBUFS;
448         }
449
450         return syscall(__NR_sys_io_setup2, depth, flags,
451                         ld->sq_ring, ld->cq_ring, &ld->aio_ctx);
452 }
453
454 static int fio_aioring_post_init(struct thread_data *td)
455 {
456         struct aioring_data *ld = td->io_ops_data;
457         struct aioring_options *o = td->eo;
458         struct io_u *io_u;
459         struct iocb *iocb;
460         int err = 0;
461
462         if (o->fixedbufs) {
463                 int i;
464
465                 for (i = 0; i < td->o.iodepth; i++) {
466                         io_u = ld->io_u_index[i];
467                         iocb = &ld->iocbs[i];
468                         iocb->u.c.buf = io_u->buf;
469                         iocb->u.c.nbytes = td_max_bs(td);
470
471                         if (o->hipri)
472                                 iocb->u.c.flags |= IOCB_FLAG_HIPRI;
473                 }
474         }
475
476         err = fio_aioring_queue_init(td);
477         if (err) {
478                 td_verror(td, -err, "io_queue_init");
479                 return 1;
480         }
481
482         /* Adjust depth back again */
483         td->o.iodepth--;
484         return 0;
485 }
486
487 static int fio_aioring_init(struct thread_data *td)
488 {
489         struct aioring_options *o = td->eo;
490         struct aioring_data *ld;
491
492         if (o->sqthread_set && o->sqwq) {
493                 log_err("fio: aioring sqthread and sqwq are mutually exclusive\n");
494                 return 1;
495         }
496
497         /* ring needs an extra entry, add one to achieve QD set */
498         td->o.iodepth++;
499
500         ld = calloc(1, sizeof(*ld));
501
502         /* io_u index */
503         ld->io_u_index = calloc(td->o.iodepth, sizeof(struct io_u *));
504         ld->io_us = calloc(td->o.iodepth, sizeof(struct io_u *));
505
506         ld->iocbs = fio_memalign(page_size, aioring_sq_iocb(td), false);
507         memset(ld->iocbs, 0, aioring_sq_iocb(td));
508
509         ld->sq_ring = fio_memalign(page_size, aioring_sq_size(td), false);
510         memset(ld->sq_ring, 0, aioring_sq_size(td));
511         ld->sq_ring->nr_events = td->o.iodepth;
512         ld->sq_ring->iocbs = (u64) (uintptr_t) ld->iocbs;
513
514         ld->cq_ring = fio_memalign(page_size, aioring_cq_size(td), false);
515         memset(ld->cq_ring, 0, aioring_cq_size(td));
516         ld->cq_ring->nr_events = td->o.iodepth * 2;
517
518         td->io_ops_data = ld;
519         return 0;
520 }
521
522 static int fio_aioring_io_u_init(struct thread_data *td, struct io_u *io_u)
523 {
524         struct aioring_data *ld = td->io_ops_data;
525
526         ld->io_u_index[io_u->index] = io_u;
527         return 0;
528 }
529
530 static struct ioengine_ops ioengine = {
531         .name                   = "aio-ring",
532         .version                = FIO_IOOPS_VERSION,
533         .init                   = fio_aioring_init,
534         .post_init              = fio_aioring_post_init,
535         .io_u_init              = fio_aioring_io_u_init,
536         .prep                   = fio_aioring_prep,
537         .queue                  = fio_aioring_queue,
538         .commit                 = fio_aioring_commit,
539         .getevents              = fio_aioring_getevents,
540         .event                  = fio_aioring_event,
541         .cleanup                = fio_aioring_cleanup,
542         .open_file              = generic_open_file,
543         .close_file             = generic_close_file,
544         .get_file_size          = generic_get_file_size,
545         .options                = options,
546         .option_struct_size     = sizeof(struct aioring_options),
547 };
548
549 static void fio_init fio_aioring_register(void)
550 {
551         register_ioengine(&ioengine);
552 }
553
554 static void fio_exit fio_aioring_unregister(void)
555 {
556         unregister_ioengine(&ioengine);
557 }
558 #endif