Add aioring engine
[fio.git] / engines / aioring.c
CommitLineData
52885fa2
JA
1/*
2 * aioring engine
3 *
4 * IO engine using the new native Linux libaio ring interface
5 *
6 */
7#include <stdlib.h>
8#include <unistd.h>
9#include <errno.h>
10#include <libaio.h>
11#include <sys/time.h>
12#include <sys/resource.h>
13
14#include "../fio.h"
15#include "../lib/pow2.h"
16#include "../optgroup.h"
17#include "../lib/memalign.h"
18
19#ifndef IOCB_FLAG_HIPRI
20#define IOCB_FLAG_HIPRI (1 << 2)
21#endif
22
23/*
24 * io_setup2(2) flags
25 */
26#ifndef IOCTX_FLAG_IOPOLL
27#define IOCTX_FLAG_IOPOLL (1 << 0)
28#endif
29#ifndef IOCTX_FLAG_SCQRING
30#define IOCTX_FLAG_SCQRING (1 << 1)
31#endif
32#ifndef IOCTX_FLAG_FIXEDBUFS
33#define IOCTX_FLAG_FIXEDBUFS (1 << 2)
34#endif
35#ifndef IOCTX_FLAG_SQTHREAD
36#define IOCTX_FLAG_SQTHREAD (1 << 3)
37#endif
38#ifndef IOCTX_FLAG_SQWQ
39#define IOCTX_FLAG_SQWQ (1 << 4)
40#endif
41
42/*
43 * io_ring_enter(2) flags
44 */
45#ifndef IORING_FLAG_SUBMIT
46#define IORING_FLAG_SUBMIT (1 << 0)
47#endif
48#ifndef IORING_FLAG_GETEVENTS
49#define IORING_FLAG_GETEVENTS (1 << 1)
50#endif
51
52typedef uint64_t u64;
53typedef uint32_t u32;
54typedef uint16_t u16;
55
56struct aio_sq_ring {
57 union {
58 struct {
59 u32 head;
60 u32 tail;
61 u32 nr_events;
62 u16 sq_thread_cpu;
63 u64 iocbs;
64 };
65 u32 pad[16];
66 };
67 u32 array[0];
68};
69
70struct aio_cq_ring {
71 union {
72 struct {
73 u32 head;
74 u32 tail;
75 u32 nr_events;
76 };
77 struct io_event pad;
78 };
79 struct io_event events[0];
80};
81
82struct aioring_data {
83 io_context_t aio_ctx;
84 struct io_u **io_us;
85 struct io_u **io_u_index;
86
87 struct aio_sq_ring *sq_ring;
88 struct iocb *iocbs;
89
90 struct aio_cq_ring *cq_ring;
91 struct io_event *events;
92
93 int queued;
94 int cq_ring_off;
95};
96
97struct aioring_options {
98 void *pad;
99 unsigned int hipri;
100 unsigned int fixedbufs;
101};
102
103static struct fio_option options[] = {
104 {
105 .name = "hipri",
106 .lname = "High Priority",
107 .type = FIO_OPT_STR_SET,
108 .off1 = offsetof(struct aioring_options, hipri),
109 .help = "Use polled IO completions",
110 .category = FIO_OPT_C_ENGINE,
111 .group = FIO_OPT_G_LIBAIO,
112 },
113 {
114 .name = "fixedbufs",
115 .lname = "Fixed (pre-mapped) IO buffers",
116 .type = FIO_OPT_STR_SET,
117 .off1 = offsetof(struct aioring_options, fixedbufs),
118 .help = "Pre map IO buffers",
119 .category = FIO_OPT_C_ENGINE,
120 .group = FIO_OPT_G_LIBAIO,
121 },
122 {
123 .name = NULL,
124 },
125};
126
127static int fio_aioring_commit(struct thread_data *td);
128
129static int io_ring_enter(io_context_t ctx, unsigned int to_submit,
130 unsigned int min_complete, unsigned int flags)
131{
132#ifdef __NR_sys_io_ring_enter
133 return syscall(__NR_sys_io_ring_enter, ctx, to_submit, min_complete,
134 flags);
135#else
136 return -1;
137#endif
138}
139
140static int fio_aioring_prep(struct thread_data *td, struct io_u *io_u)
141{
142 struct aioring_data *ld = td->io_ops_data;
143 struct fio_file *f = io_u->file;
144 struct aioring_options *o = td->eo;
145 struct iocb *iocb;
146
147 iocb = &ld->iocbs[io_u->index];
148
149 if (io_u->ddir == DDIR_READ) {
150 if (o->fixedbufs) {
151 iocb->aio_fildes = f->fd;
152 iocb->aio_lio_opcode = IO_CMD_PREAD;
153 iocb->u.c.offset = io_u->offset;
154 } else {
155 io_prep_pread(iocb, f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
156 if (o->hipri)
157 iocb->u.c.flags |= IOCB_FLAG_HIPRI;
158 }
159 } else if (io_u->ddir == DDIR_WRITE) {
160 if (o->fixedbufs) {
161 iocb->aio_fildes = f->fd;
162 iocb->aio_lio_opcode = IO_CMD_PWRITE;
163 iocb->u.c.offset = io_u->offset;
164 } else {
165 io_prep_pwrite(iocb, f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
166 if (o->hipri)
167 iocb->u.c.flags |= IOCB_FLAG_HIPRI;
168 }
169 } else if (ddir_sync(io_u->ddir))
170 io_prep_fsync(iocb, f->fd);
171
172 iocb->data = io_u;
173 return 0;
174}
175
176static struct io_u *fio_aioring_event(struct thread_data *td, int event)
177{
178 struct aioring_data *ld = td->io_ops_data;
179 struct io_event *ev;
180 struct io_u *io_u;
181 int index;
182
183 index = event + ld->cq_ring_off;
184 if (index >= ld->cq_ring->nr_events)
185 index -= ld->cq_ring->nr_events;
186
187 ev = &ld->cq_ring->events[index];
188 io_u = ev->data;
189
190 if (ev->res != io_u->xfer_buflen) {
191 if (ev->res > io_u->xfer_buflen)
192 io_u->error = -ev->res;
193 else
194 io_u->resid = io_u->xfer_buflen - ev->res;
195 } else
196 io_u->error = 0;
197
198 return io_u;
199}
200
201static int fio_aioring_cqring_reap(struct thread_data *td, unsigned int events,
202 unsigned int max)
203{
204 struct aioring_data *ld = td->io_ops_data;
205 struct aio_cq_ring *ring = ld->cq_ring;
206 u32 head, reaped = 0;
207
208 head = ring->head;
209 do {
210 read_barrier();
211 if (head == ring->tail)
212 break;
213 reaped++;
214 head++;
215 if (head == ring->nr_events)
216 head = 0;
217 } while (reaped + events < max);
218
219 ring->head = head;
220 write_barrier();
221 return reaped;
222}
223
224static int fio_aioring_getevents(struct thread_data *td, unsigned int min,
225 unsigned int max, const struct timespec *t)
226{
227 struct aioring_data *ld = td->io_ops_data;
228 unsigned actual_min = td->o.iodepth_batch_complete_min == 0 ? 0 : min;
229 struct aio_cq_ring *ring = ld->cq_ring;
230 int r, events = 0;
231
232 ld->cq_ring_off = ring->head;
233 do {
234 r = fio_aioring_cqring_reap(td, events, max);
235 if (r) {
236 events += r;
237 continue;
238 }
239
240 r = io_ring_enter(ld->aio_ctx, 0, actual_min,
241 IORING_FLAG_GETEVENTS);
242 if (r < 0) {
243 if (errno == EAGAIN)
244 continue;
245 perror("ring enter");
246 break;
247 }
248 } while (events < min);
249
250 return r < 0 ? r : events;
251}
252
253static enum fio_q_status fio_aioring_queue(struct thread_data *td,
254 struct io_u *io_u)
255{
256 struct aioring_data *ld = td->io_ops_data;
257 struct aio_sq_ring *ring = ld->sq_ring;
258 unsigned tail, next_tail;
259
260 fio_ro_check(td, io_u);
261
262 if (ld->queued == td->o.iodepth)
263 return FIO_Q_BUSY;
264
265 /*
266 * fsync is tricky, since it can fail and we need to do it
267 * serialized with other io. the reason is that linux doesn't
268 * support aio fsync yet. So return busy for the case where we
269 * have pending io, to let fio complete those first.
270 */
271 if (ddir_sync(io_u->ddir)) {
272 if (ld->queued)
273 return FIO_Q_BUSY;
274
275 do_io_u_sync(td, io_u);
276 return FIO_Q_COMPLETED;
277 }
278
279 if (io_u->ddir == DDIR_TRIM) {
280 if (ld->queued)
281 return FIO_Q_BUSY;
282
283 do_io_u_trim(td, io_u);
284 io_u_mark_submit(td, 1);
285 io_u_mark_complete(td, 1);
286 return FIO_Q_COMPLETED;
287 }
288
289 tail = ring->tail;
290 next_tail = tail + 1;
291 if (next_tail == ring->nr_events)
292 next_tail = 0;
293 read_barrier();
294 if (next_tail == ring->head)
295 return FIO_Q_BUSY;
296
297 ring->array[tail] = io_u->index;
298 ring->tail = next_tail;
299 write_barrier();
300
301 ld->queued++;
302 return FIO_Q_QUEUED;
303}
304
305static void fio_aioring_queued(struct thread_data *td, int start, int nr)
306{
307 struct aioring_data *ld = td->io_ops_data;
308 struct timespec now;
309
310 if (!fio_fill_issue_time(td))
311 return;
312
313 fio_gettime(&now, NULL);
314
315 while (nr--) {
316 int index = ld->sq_ring->array[start];
317 struct io_u *io_u = io_u = ld->io_u_index[index];
318
319 memcpy(&io_u->issue_time, &now, sizeof(now));
320 io_u_queued(td, io_u);
321
322 start++;
323 if (start == ld->sq_ring->nr_events)
324 start = 0;
325 }
326}
327
328static int fio_aioring_commit(struct thread_data *td)
329{
330 struct aioring_data *ld = td->io_ops_data;
331 int ret;
332
333 if (!ld->queued)
334 return 0;
335
336 do {
337 int start = ld->sq_ring->head;
338 long nr = ld->queued;
339
340 ret = io_ring_enter(ld->aio_ctx, nr, 0, IORING_FLAG_SUBMIT |
341 IORING_FLAG_GETEVENTS);
342 if (ret == -1)
343 perror("io_ring_enter");
344 if (ret > 0) {
345 fio_aioring_queued(td, start, ret);
346 io_u_mark_submit(td, ret);
347
348 ld->queued -= ret;
349 ret = 0;
350 } else if (ret == -EINTR || !ret) {
351 if (!ret)
352 io_u_mark_submit(td, ret);
353 continue;
354 } else if (ret == -EAGAIN) {
355 /*
356 * If we get EAGAIN, we should break out without
357 * error and let the upper layer reap some
358 * events for us. If we have no queued IO, we
359 * must loop here. If we loop for more than 30s,
360 * just error out, something must be buggy in the
361 * IO path.
362 */
363 if (ld->queued) {
364 ret = 0;
365 break;
366 }
367 usleep(1);
368 continue;
369 } else if (ret == -ENOMEM) {
370 /*
371 * If we get -ENOMEM, reap events if we can. If
372 * we cannot, treat it as a fatal event since there's
373 * nothing we can do about it.
374 */
375 if (ld->queued)
376 ret = 0;
377 break;
378 } else
379 break;
380 } while (ld->queued);
381
382 return ret;
383}
384
385static size_t aioring_cq_size(struct thread_data *td)
386{
387 return sizeof(struct aio_cq_ring) + 2 * td->o.iodepth * sizeof(struct io_event);
388}
389
390static size_t aioring_sq_iocb(struct thread_data *td)
391{
392 return sizeof(struct iocb) * td->o.iodepth;
393}
394
395static size_t aioring_sq_size(struct thread_data *td)
396{
397 return sizeof(struct aio_sq_ring) + td->o.iodepth * sizeof(u32);
398}
399
400static void fio_aioring_cleanup(struct thread_data *td)
401{
402 struct aioring_data *ld = td->io_ops_data;
403
404 if (ld) {
405 /*
406 * Work-around to avoid huge RCU stalls at exit time. If we
407 * don't do this here, then it'll be torn down by exit_aio().
408 * But for that case we can parallellize the freeing, thus
409 * speeding it up a lot.
410 */
411 if (!(td->flags & TD_F_CHILD))
412 io_destroy(ld->aio_ctx);
413 free(ld->io_u_index);
414 free(ld->io_us);
415 fio_memfree(ld->sq_ring, aioring_sq_size(td), false);
416 fio_memfree(ld->iocbs, aioring_sq_iocb(td), false);
417 fio_memfree(ld->cq_ring, aioring_cq_size(td), false);
418 free(ld);
419 }
420}
421
422static int fio_aioring_queue_init(struct thread_data *td)
423{
424#ifdef __NR_sys_io_setup2
425 struct aioring_data *ld = td->io_ops_data;
426 struct aioring_options *o = td->eo;
427 int flags = IOCTX_FLAG_SCQRING;
428 int depth = td->o.iodepth;
429
430 if (o->hipri)
431 flags |= IOCTX_FLAG_IOPOLL;
432 if (o->fixedbufs) {
433 struct rlimit rlim = {
434 .rlim_cur = RLIM_INFINITY,
435 .rlim_max = RLIM_INFINITY,
436 };
437
438 setrlimit(RLIMIT_MEMLOCK, &rlim);
439 flags |= IOCTX_FLAG_FIXEDBUFS;
440 }
441
442 return syscall(__NR_sys_io_setup2, depth, flags,
443 ld->sq_ring, ld->cq_ring, &ld->aio_ctx);
444#else
445 return -1;
446#endif
447}
448
449static int fio_aioring_post_init(struct thread_data *td)
450{
451 struct aioring_data *ld = td->io_ops_data;
452 struct aioring_options *o = td->eo;
453 struct io_u *io_u;
454 struct iocb *iocb;
455 int err = 0;
456
457 if (o->fixedbufs) {
458 int i;
459
460 for (i = 0; i < td->o.iodepth; i++) {
461 io_u = ld->io_u_index[i];
462 iocb = &ld->iocbs[i];
463 iocb->u.c.buf = io_u->buf;
464 iocb->u.c.nbytes = td_max_bs(td);
465
466 if (o->hipri)
467 iocb->u.c.flags |= IOCB_FLAG_HIPRI;
468 }
469 }
470
471 err = fio_aioring_queue_init(td);
472 if (err) {
473 td_verror(td, -err, "io_queue_init");
474 return 1;
475 }
476
477 return 0;
478}
479
480static int fio_aioring_init(struct thread_data *td)
481{
482 struct aioring_data *ld;
483
484 if (td->o.iodepth <= 1) {
485 printf("aio-ring: needs a minimum QD of 2\n");
486 return 1;
487 }
488
489 ld = calloc(1, sizeof(*ld));
490
491 /* io_u index */
492 ld->io_u_index = calloc(td->o.iodepth, sizeof(struct io_u *));
493 ld->io_us = calloc(td->o.iodepth, sizeof(struct io_u *));
494
495 ld->iocbs = fio_memalign(page_size, aioring_sq_iocb(td), false);
496 memset(ld->iocbs, 0, aioring_sq_iocb(td));
497
498 ld->sq_ring = fio_memalign(page_size, aioring_sq_size(td), false);
499 memset(ld->sq_ring, 0, aioring_sq_size(td));
500 ld->sq_ring->nr_events = td->o.iodepth;
501 ld->sq_ring->iocbs = (u64) ld->iocbs;
502
503 ld->cq_ring = fio_memalign(page_size, aioring_cq_size(td), false);
504 memset(ld->cq_ring, 0, aioring_cq_size(td));
505 ld->cq_ring->nr_events = td->o.iodepth * 2;
506
507 td->io_ops_data = ld;
508 return 0;
509}
510
511static int fio_aioring_io_u_init(struct thread_data *td, struct io_u *io_u)
512{
513 struct aioring_data *ld = td->io_ops_data;
514
515 ld->io_u_index[io_u->index] = io_u;
516 return 0;
517}
518
519static struct ioengine_ops ioengine = {
520 .name = "aio-ring",
521 .version = FIO_IOOPS_VERSION,
522 .init = fio_aioring_init,
523 .post_init = fio_aioring_post_init,
524 .io_u_init = fio_aioring_io_u_init,
525 .prep = fio_aioring_prep,
526 .queue = fio_aioring_queue,
527 .commit = fio_aioring_commit,
528 .getevents = fio_aioring_getevents,
529 .event = fio_aioring_event,
530 .cleanup = fio_aioring_cleanup,
531 .open_file = generic_open_file,
532 .close_file = generic_close_file,
533 .get_file_size = generic_get_file_size,
534 .options = options,
535 .option_struct_size = sizeof(struct aioring_options),
536};
537
538static void fio_init fio_aioring_register(void)
539{
540#ifdef __NR_sys_io_setup2
541#ifdef __NR_sys_io_ring_enter
542 register_ioengine(&ioengine);
543#endif
544#endif
545}
546
547static void fio_exit fio_aioring_unregister(void)
548{
549#ifdef __NR_sys_io_setup2
550#ifdef __NR_sys_io_ring_enter
551 unregister_ioengine(&ioengine);
552#endif
553#endif
554}