t/io_uring: add support for registering the ring fd
[fio.git] / t / io_uring.c
1 #include <stdio.h>
2 #include <errno.h>
3 #include <assert.h>
4 #include <stdlib.h>
5 #include <stddef.h>
6 #include <signal.h>
7 #include <inttypes.h>
8 #include <math.h>
9
10 #ifdef CONFIG_LIBAIO
11 #include <libaio.h>
12 #endif
13
14 #include <sys/types.h>
15 #include <sys/stat.h>
16 #include <sys/ioctl.h>
17 #include <sys/syscall.h>
18 #include <sys/resource.h>
19 #include <sys/mman.h>
20 #include <sys/uio.h>
21 #include <linux/fs.h>
22 #include <fcntl.h>
23 #include <unistd.h>
24 #include <string.h>
25 #include <pthread.h>
26 #include <sched.h>
27
28 #include "../arch/arch.h"
29 #include "../lib/types.h"
30 #include "../lib/roundup.h"
31 #include "../lib/rand.h"
32 #include "../minmax.h"
33 #include "../os/linux/io_uring.h"
34
35 struct io_sq_ring {
36         unsigned *head;
37         unsigned *tail;
38         unsigned *ring_mask;
39         unsigned *ring_entries;
40         unsigned *flags;
41         unsigned *array;
42 };
43
44 struct io_cq_ring {
45         unsigned *head;
46         unsigned *tail;
47         unsigned *ring_mask;
48         unsigned *ring_entries;
49         struct io_uring_cqe *cqes;
50 };
51
52 #define DEPTH                   128
53 #define BATCH_SUBMIT            32
54 #define BATCH_COMPLETE          32
55 #define BS                      4096
56
57 #define MAX_FDS                 16
58
59 static unsigned sq_ring_mask, cq_ring_mask;
60
61 struct file {
62         unsigned long max_blocks;
63         unsigned long max_size;
64         unsigned long cur_off;
65         unsigned pending_ios;
66         int real_fd;
67         int fixed_fd;
68         int fileno;
69 };
70
71 #define PLAT_BITS               6
72 #define PLAT_VAL                (1 << PLAT_BITS)
73 #define PLAT_GROUP_NR           29
74 #define PLAT_NR                 (PLAT_GROUP_NR * PLAT_VAL)
75
76 struct submitter {
77         pthread_t thread;
78         int ring_fd;
79         int enter_ring_fd;
80         int index;
81         struct io_sq_ring sq_ring;
82         struct io_uring_sqe *sqes;
83         struct io_cq_ring cq_ring;
84         int inflight;
85         int tid;
86         unsigned long reaps;
87         unsigned long done;
88         unsigned long calls;
89         volatile int finish;
90
91         __s32 *fds;
92
93         struct taus258_state rand_state;
94
95         unsigned long *clock_batch;
96         int clock_index;
97         unsigned long *plat;
98
99 #ifdef CONFIG_LIBAIO
100         io_context_t aio_ctx;
101 #endif
102
103         struct file files[MAX_FDS];
104         unsigned nr_files;
105         unsigned cur_file;
106         struct iovec iovecs[];
107 };
108
109 static struct submitter *submitter;
110 static volatile int finish;
111 static int stats_running;
112 static unsigned long max_iops;
113
114 static int depth = DEPTH;
115 static int batch_submit = BATCH_SUBMIT;
116 static int batch_complete = BATCH_COMPLETE;
117 static int bs = BS;
118 static int polled = 1;          /* use IO polling */
119 static int fixedbufs = 1;       /* use fixed user buffers */
120 static int dma_map;             /* pre-map DMA buffers */
121 static int register_files = 1;  /* use fixed files */
122 static int buffered = 0;        /* use buffered IO, not O_DIRECT */
123 static int sq_thread_poll = 0;  /* use kernel submission/poller thread */
124 static int sq_thread_cpu = -1;  /* pin above thread to this CPU */
125 static int do_nop = 0;          /* no-op SQ ring commands */
126 static int nthreads = 1;
127 static int stats = 0;           /* generate IO stats */
128 static int aio = 0;             /* use libaio */
129 static int runtime = 0;         /* runtime */
130 static int random_io = 1;       /* random or sequential IO */
131 static int register_ring = 1;   /* register ring */
132
133 static unsigned long tsc_rate;
134
135 #define TSC_RATE_FILE   "tsc-rate"
136
137 static int vectored = 1;
138
139 static float plist[] = { 1.0, 5.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0,
140                         80.0, 90.0, 95.0, 99.0, 99.5, 99.9, 99.95, 99.99 };
141 static int plist_len = 17;
142
143 #ifndef IORING_REGISTER_MAP_BUFFERS
144 #define IORING_REGISTER_MAP_BUFFERS     22
145 struct io_uring_map_buffers {
146         __s32   fd;
147         __u32   buf_start;
148         __u32   buf_end;
149         __u32   flags;
150         __u64   rsvd[2];
151 };
152 #endif
153
154 static unsigned long cycles_to_nsec(unsigned long cycles)
155 {
156         uint64_t val;
157
158         if (!tsc_rate)
159                 return cycles;
160
161         val = cycles * 1000000000ULL;
162         return val / tsc_rate;
163 }
164
165 static unsigned long plat_idx_to_val(unsigned int idx)
166 {
167         unsigned int error_bits;
168         unsigned long k, base;
169
170         assert(idx < PLAT_NR);
171
172         /* MSB <= (PLAT_BITS-1), cannot be rounded off. Use
173          * all bits of the sample as index */
174         if (idx < (PLAT_VAL << 1))
175                 return cycles_to_nsec(idx);
176
177         /* Find the group and compute the minimum value of that group */
178         error_bits = (idx >> PLAT_BITS) - 1;
179         base = ((unsigned long) 1) << (error_bits + PLAT_BITS);
180
181         /* Find its bucket number of the group */
182         k = idx % PLAT_VAL;
183
184         /* Return the mean of the range of the bucket */
185         return cycles_to_nsec(base + ((k + 0.5) * (1 << error_bits)));
186 }
187
188 unsigned int calc_clat_percentiles(unsigned long *io_u_plat, unsigned long nr,
189                                    unsigned long **output,
190                                    unsigned long *maxv, unsigned long *minv)
191 {
192         unsigned long sum = 0;
193         unsigned int len = plist_len, i, j = 0;
194         unsigned long *ovals = NULL;
195         bool is_last;
196
197         *minv = -1UL;
198         *maxv = 0;
199
200         ovals = malloc(len * sizeof(*ovals));
201         if (!ovals)
202                 return 0;
203
204         /*
205          * Calculate bucket values, note down max and min values
206          */
207         is_last = false;
208         for (i = 0; i < PLAT_NR && !is_last; i++) {
209                 sum += io_u_plat[i];
210                 while (sum >= ((long double) plist[j] / 100.0 * nr)) {
211                         assert(plist[j] <= 100.0);
212
213                         ovals[j] = plat_idx_to_val(i);
214                         if (ovals[j] < *minv)
215                                 *minv = ovals[j];
216                         if (ovals[j] > *maxv)
217                                 *maxv = ovals[j];
218
219                         is_last = (j == len - 1) != 0;
220                         if (is_last)
221                                 break;
222
223                         j++;
224                 }
225         }
226
227         if (!is_last)
228                 fprintf(stderr, "error calculating latency percentiles\n");
229
230         *output = ovals;
231         return len;
232 }
233
234 static void show_clat_percentiles(unsigned long *io_u_plat, unsigned long nr,
235                                   unsigned int precision)
236 {
237         unsigned int divisor, len, i, j = 0;
238         unsigned long minv, maxv;
239         unsigned long *ovals;
240         int per_line, scale_down, time_width;
241         bool is_last;
242         char fmt[32];
243
244         len = calc_clat_percentiles(io_u_plat, nr, &ovals, &maxv, &minv);
245         if (!len || !ovals)
246                 goto out;
247
248         if (!tsc_rate) {
249                 scale_down = 0;
250                 divisor = 1;
251                 printf("    percentiles (tsc ticks):\n     |");
252         } else if (minv > 2000 && maxv > 99999) {
253                 scale_down = 1;
254                 divisor = 1000;
255                 printf("    percentiles (usec):\n     |");
256         } else {
257                 scale_down = 0;
258                 divisor = 1;
259                 printf("    percentiles (nsec):\n     |");
260         }
261
262         time_width = max(5, (int) (log10(maxv / divisor) + 1));
263         snprintf(fmt, sizeof(fmt), " %%%u.%ufth=[%%%dllu]%%c", precision + 3,
264                         precision, time_width);
265         /* fmt will be something like " %5.2fth=[%4llu]%c" */
266         per_line = (80 - 7) / (precision + 10 + time_width);
267
268         for (j = 0; j < len; j++) {
269                 /* for formatting */
270                 if (j != 0 && (j % per_line) == 0)
271                         printf("     |");
272
273                 /* end of the list */
274                 is_last = (j == len - 1) != 0;
275
276                 for (i = 0; i < scale_down; i++)
277                         ovals[j] = (ovals[j] + 999) / 1000;
278
279                 printf(fmt, plist[j], ovals[j], is_last ? '\n' : ',');
280
281                 if (is_last)
282                         break;
283
284                 if ((j % per_line) == per_line - 1)     /* for formatting */
285                         printf("\n");
286         }
287
288 out:
289         free(ovals);
290 }
291
292 #ifdef ARCH_HAVE_CPU_CLOCK
293 static unsigned int plat_val_to_idx(unsigned long val)
294 {
295         unsigned int msb, error_bits, base, offset, idx;
296
297         /* Find MSB starting from bit 0 */
298         if (val == 0)
299                 msb = 0;
300         else
301                 msb = (sizeof(val)*8) - __builtin_clzll(val) - 1;
302
303         /*
304          * MSB <= (PLAT_BITS-1), cannot be rounded off. Use
305          * all bits of the sample as index
306          */
307         if (msb <= PLAT_BITS)
308                 return val;
309
310         /* Compute the number of error bits to discard*/
311         error_bits = msb - PLAT_BITS;
312
313         /* Compute the number of buckets before the group */
314         base = (error_bits + 1) << PLAT_BITS;
315
316         /*
317          * Discard the error bits and apply the mask to find the
318          * index for the buckets in the group
319          */
320         offset = (PLAT_VAL - 1) & (val >> error_bits);
321
322         /* Make sure the index does not exceed (array size - 1) */
323         idx = (base + offset) < (PLAT_NR - 1) ?
324                 (base + offset) : (PLAT_NR - 1);
325
326         return idx;
327 }
328 #endif
329
330 static void add_stat(struct submitter *s, int clock_index, int nr)
331 {
332 #ifdef ARCH_HAVE_CPU_CLOCK
333         unsigned long cycles;
334         unsigned int pidx;
335
336         if (!s->finish && clock_index) {
337                 cycles = get_cpu_clock();
338                 cycles -= s->clock_batch[clock_index];
339                 pidx = plat_val_to_idx(cycles);
340                 s->plat[pidx] += nr;
341         }
342 #endif
343 }
344
345 static int io_uring_map_buffers(struct submitter *s)
346 {
347         struct io_uring_map_buffers map = {
348                 .fd             = s->files[0].real_fd,
349                 .buf_end        = depth,
350         };
351
352         if (do_nop)
353                 return 0;
354         if (s->nr_files > 1)
355                 fprintf(stdout, "Mapping buffers may not work with multiple files\n");
356
357         return syscall(__NR_io_uring_register, s->ring_fd,
358                         IORING_REGISTER_MAP_BUFFERS, &map, 1);
359 }
360
361 static int io_uring_register_buffers(struct submitter *s)
362 {
363         if (do_nop)
364                 return 0;
365
366         return syscall(__NR_io_uring_register, s->ring_fd,
367                         IORING_REGISTER_BUFFERS, s->iovecs, roundup_pow2(depth));
368 }
369
370 static int io_uring_register_files(struct submitter *s)
371 {
372         int i;
373
374         if (do_nop)
375                 return 0;
376
377         s->fds = calloc(s->nr_files, sizeof(__s32));
378         for (i = 0; i < s->nr_files; i++) {
379                 s->fds[i] = s->files[i].real_fd;
380                 s->files[i].fixed_fd = i;
381         }
382
383         return syscall(__NR_io_uring_register, s->ring_fd,
384                         IORING_REGISTER_FILES, s->fds, s->nr_files);
385 }
386
387 static int io_uring_setup(unsigned entries, struct io_uring_params *p)
388 {
389         /*
390          * Clamp CQ ring size at our SQ ring size, we don't need more entries
391          * than that.
392          */
393         p->flags |= IORING_SETUP_CQSIZE;
394         p->cq_entries = entries;
395
396         return syscall(__NR_io_uring_setup, entries, p);
397 }
398
399 static void io_uring_probe(int fd)
400 {
401         struct io_uring_probe *p;
402         int ret;
403
404         p = malloc(sizeof(*p) + 256 * sizeof(struct io_uring_probe_op));
405         if (!p)
406                 return;
407
408         memset(p, 0, sizeof(*p) + 256 * sizeof(struct io_uring_probe_op));
409         ret = syscall(__NR_io_uring_register, fd, IORING_REGISTER_PROBE, p, 256);
410         if (ret < 0)
411                 goto out;
412
413         if (IORING_OP_READ > p->ops_len)
414                 goto out;
415
416         if ((p->ops[IORING_OP_READ].flags & IO_URING_OP_SUPPORTED))
417                 vectored = 0;
418 out:
419         free(p);
420 }
421
422 static int io_uring_enter(struct submitter *s, unsigned int to_submit,
423                           unsigned int min_complete, unsigned int flags)
424 {
425         if (register_ring)
426                 flags |= IORING_ENTER_REGISTERED_RING;
427 #ifdef FIO_ARCH_HAS_SYSCALL
428         return __do_syscall6(__NR_io_uring_enter, s->enter_ring_fd, to_submit,
429                                 min_complete, flags, NULL, 0);
430 #else
431         return syscall(__NR_io_uring_enter, s->enter_ring_fd, to_submit,
432                         min_complete, flags, NULL, 0);
433 #endif
434 }
435
436 #ifndef CONFIG_HAVE_GETTID
437 static int gettid(void)
438 {
439         return syscall(__NR_gettid);
440 }
441 #endif
442
443 static unsigned file_depth(struct submitter *s)
444 {
445         return (depth + s->nr_files - 1) / s->nr_files;
446 }
447
448 static void init_io(struct submitter *s, unsigned index)
449 {
450         struct io_uring_sqe *sqe = &s->sqes[index];
451         unsigned long offset;
452         struct file *f;
453         long r;
454
455         if (do_nop) {
456                 sqe->opcode = IORING_OP_NOP;
457                 return;
458         }
459
460         if (s->nr_files == 1) {
461                 f = &s->files[0];
462         } else {
463                 f = &s->files[s->cur_file];
464                 if (f->pending_ios >= file_depth(s)) {
465                         s->cur_file++;
466                         if (s->cur_file == s->nr_files)
467                                 s->cur_file = 0;
468                         f = &s->files[s->cur_file];
469                 }
470         }
471         f->pending_ios++;
472
473         if (random_io) {
474                 r = __rand64(&s->rand_state);
475                 offset = (r % (f->max_blocks - 1)) * bs;
476         } else {
477                 offset = f->cur_off;
478                 f->cur_off += bs;
479                 if (f->cur_off + bs > f->max_size)
480                         f->cur_off = 0;
481         }
482
483         if (register_files) {
484                 sqe->flags = IOSQE_FIXED_FILE;
485                 sqe->fd = f->fixed_fd;
486         } else {
487                 sqe->flags = 0;
488                 sqe->fd = f->real_fd;
489         }
490         if (fixedbufs) {
491                 sqe->opcode = IORING_OP_READ_FIXED;
492                 sqe->addr = (unsigned long) s->iovecs[index].iov_base;
493                 sqe->len = bs;
494                 sqe->buf_index = index;
495         } else if (!vectored) {
496                 sqe->opcode = IORING_OP_READ;
497                 sqe->addr = (unsigned long) s->iovecs[index].iov_base;
498                 sqe->len = bs;
499                 sqe->buf_index = 0;
500         } else {
501                 sqe->opcode = IORING_OP_READV;
502                 sqe->addr = (unsigned long) &s->iovecs[index];
503                 sqe->len = 1;
504                 sqe->buf_index = 0;
505         }
506         sqe->ioprio = 0;
507         sqe->off = offset;
508         sqe->user_data = (unsigned long) f->fileno;
509         if (stats && stats_running)
510                 sqe->user_data |= ((uint64_t)s->clock_index << 32);
511 }
512
513 static int prep_more_ios_uring(struct submitter *s, int max_ios)
514 {
515         struct io_sq_ring *ring = &s->sq_ring;
516         unsigned index, tail, next_tail, prepped = 0;
517
518         next_tail = tail = *ring->tail;
519         do {
520                 next_tail++;
521                 if (next_tail == atomic_load_acquire(ring->head))
522                         break;
523
524                 index = tail & sq_ring_mask;
525                 init_io(s, index);
526                 ring->array[index] = index;
527                 prepped++;
528                 tail = next_tail;
529         } while (prepped < max_ios);
530
531         if (prepped)
532                 atomic_store_release(ring->tail, tail);
533         return prepped;
534 }
535
536 static int get_file_size(struct file *f)
537 {
538         struct stat st;
539
540         if (fstat(f->real_fd, &st) < 0)
541                 return -1;
542         if (S_ISBLK(st.st_mode)) {
543                 unsigned long long bytes;
544
545                 if (ioctl(f->real_fd, BLKGETSIZE64, &bytes) != 0)
546                         return -1;
547
548                 f->max_blocks = bytes / bs;
549                 f->max_size = bytes;
550                 return 0;
551         } else if (S_ISREG(st.st_mode)) {
552                 f->max_blocks = st.st_size / bs;
553                 f->max_size = st.st_size;
554                 return 0;
555         }
556
557         return -1;
558 }
559
560 static int reap_events_uring(struct submitter *s)
561 {
562         struct io_cq_ring *ring = &s->cq_ring;
563         struct io_uring_cqe *cqe;
564         unsigned head, reaped = 0;
565         int last_idx = -1, stat_nr = 0;
566
567         head = *ring->head;
568         do {
569                 struct file *f;
570
571                 read_barrier();
572                 if (head == atomic_load_acquire(ring->tail))
573                         break;
574                 cqe = &ring->cqes[head & cq_ring_mask];
575                 if (!do_nop) {
576                         int fileno = cqe->user_data & 0xffffffff;
577
578                         f = &s->files[fileno];
579                         f->pending_ios--;
580                         if (cqe->res != bs) {
581                                 printf("io: unexpected ret=%d\n", cqe->res);
582                                 if (polled && cqe->res == -EOPNOTSUPP)
583                                         printf("Your filesystem/driver/kernel doesn't support polled IO\n");
584                                 return -1;
585                         }
586                 }
587                 if (stats) {
588                         int clock_index = cqe->user_data >> 32;
589
590                         if (last_idx != clock_index) {
591                                 if (last_idx != -1) {
592                                         add_stat(s, last_idx, stat_nr);
593                                         stat_nr = 0;
594                                 }
595                                 last_idx = clock_index;
596                         }
597                         stat_nr++;
598                 }
599                 reaped++;
600                 head++;
601         } while (1);
602
603         if (stat_nr)
604                 add_stat(s, last_idx, stat_nr);
605
606         if (reaped) {
607                 s->inflight -= reaped;
608                 atomic_store_release(ring->head, head);
609         }
610         return reaped;
611 }
612
613 static int submitter_init(struct submitter *s)
614 {
615         int i, nr_batch;
616
617         s->tid = gettid();
618         printf("submitter=%d, tid=%d\n", s->index, s->tid);
619
620         __init_rand64(&s->rand_state, pthread_self());
621         srand48(pthread_self());
622
623         for (i = 0; i < MAX_FDS; i++)
624                 s->files[i].fileno = i;
625
626         if (stats) {
627                 nr_batch = roundup_pow2(depth / batch_submit);
628                 if (nr_batch < 2)
629                         nr_batch = 2;
630                 s->clock_batch = calloc(nr_batch, sizeof(unsigned long));
631                 s->clock_index = 1;
632
633                 s->plat = calloc(PLAT_NR, sizeof(unsigned long));
634         } else {
635                 s->clock_batch = NULL;
636                 s->plat = NULL;
637                 nr_batch = 0;
638         }
639
640         return nr_batch;
641 }
642
643 #ifdef CONFIG_LIBAIO
644 static int prep_more_ios_aio(struct submitter *s, int max_ios, struct iocb *iocbs)
645 {
646         uint64_t data;
647         long long offset;
648         struct file *f;
649         unsigned index;
650         long r;
651
652         index = 0;
653         while (index < max_ios) {
654                 struct iocb *iocb = &iocbs[index];
655
656                 if (s->nr_files == 1) {
657                         f = &s->files[0];
658                 } else {
659                         f = &s->files[s->cur_file];
660                         if (f->pending_ios >= file_depth(s)) {
661                                 s->cur_file++;
662                                 if (s->cur_file == s->nr_files)
663                                         s->cur_file = 0;
664                                 f = &s->files[s->cur_file];
665                         }
666                 }
667                 f->pending_ios++;
668
669                 r = lrand48();
670                 offset = (r % (f->max_blocks - 1)) * bs;
671                 io_prep_pread(iocb, f->real_fd, s->iovecs[index].iov_base,
672                                 s->iovecs[index].iov_len, offset);
673
674                 data = f->fileno;
675                 if (stats && stats_running)
676                         data |= (((uint64_t) s->clock_index) << 32);
677                 iocb->data = (void *) (uintptr_t) data;
678                 index++;
679         }
680         return index;
681 }
682
683 static int reap_events_aio(struct submitter *s, struct io_event *events, int evs)
684 {
685         int last_idx = -1, stat_nr = 0;
686         int reaped = 0;
687
688         while (evs) {
689                 uint64_t data = (uintptr_t) events[reaped].data;
690                 struct file *f = &s->files[data & 0xffffffff];
691
692                 f->pending_ios--;
693                 if (events[reaped].res != bs) {
694                         printf("io: unexpected ret=%ld\n", events[reaped].res);
695                         return -1;
696                 }
697                 if (stats) {
698                         int clock_index = data >> 32;
699
700                         if (last_idx != clock_index) {
701                                 if (last_idx != -1) {
702                                         add_stat(s, last_idx, stat_nr);
703                                         stat_nr = 0;
704                                 }
705                                 last_idx = clock_index;
706                         }
707                         stat_nr++;
708                 }
709                 reaped++;
710                 evs--;
711         }
712
713         if (stat_nr)
714                 add_stat(s, last_idx, stat_nr);
715
716         s->inflight -= reaped;
717         s->done += reaped;
718         return reaped;
719 }
720
721 static void *submitter_aio_fn(void *data)
722 {
723         struct submitter *s = data;
724         int i, ret, prepped;
725         struct iocb **iocbsptr;
726         struct iocb *iocbs;
727         struct io_event *events;
728 #ifdef ARCH_HAVE_CPU_CLOCK
729         int nr_batch = submitter_init(s);
730 #else
731         submitter_init(s);
732 #endif
733
734         iocbsptr = calloc(depth, sizeof(struct iocb *));
735         iocbs = calloc(depth, sizeof(struct iocb));
736         events = calloc(depth, sizeof(struct io_event));
737
738         for (i = 0; i < depth; i++)
739                 iocbsptr[i] = &iocbs[i];
740
741         prepped = 0;
742         do {
743                 int to_wait, to_submit, to_prep;
744
745                 if (!prepped && s->inflight < depth) {
746                         to_prep = min(depth - s->inflight, batch_submit);
747                         prepped = prep_more_ios_aio(s, to_prep, iocbs);
748 #ifdef ARCH_HAVE_CPU_CLOCK
749                         if (prepped && stats) {
750                                 s->clock_batch[s->clock_index] = get_cpu_clock();
751                                 s->clock_index = (s->clock_index + 1) & (nr_batch - 1);
752                         }
753 #endif
754                 }
755                 s->inflight += prepped;
756                 to_submit = prepped;
757
758                 if (to_submit && (s->inflight + to_submit <= depth))
759                         to_wait = 0;
760                 else
761                         to_wait = min(s->inflight + to_submit, batch_complete);
762
763                 ret = io_submit(s->aio_ctx, to_submit, iocbsptr);
764                 s->calls++;
765                 if (ret < 0) {
766                         perror("io_submit");
767                         break;
768                 } else if (ret != to_submit) {
769                         printf("submitted %d, wanted %d\n", ret, to_submit);
770                         break;
771                 }
772                 prepped = 0;
773
774                 while (to_wait) {
775                         int r;
776
777                         s->calls++;
778                         r = io_getevents(s->aio_ctx, to_wait, to_wait, events, NULL);
779                         if (r < 0) {
780                                 perror("io_getevents");
781                                 break;
782                         } else if (r != to_wait) {
783                                 printf("r=%d, wait=%d\n", r, to_wait);
784                                 break;
785                         }
786                         r = reap_events_aio(s, events, r);
787                         s->reaps += r;
788                         to_wait -= r;
789                 }
790         } while (!s->finish);
791
792         free(iocbsptr);
793         free(iocbs);
794         free(events);
795         finish = 1;
796         return NULL;
797 }
798 #endif
799
800 static void io_uring_unregister_ring(struct submitter *s)
801 {
802         struct io_uring_rsrc_update up = {
803                 .offset = s->enter_ring_fd,
804         };
805
806         syscall(__NR_io_uring_register, s->ring_fd, IORING_UNREGISTER_RING_FDS,
807                 &up, 1);
808 }
809
810 static int io_uring_register_ring(struct submitter *s)
811 {
812         struct io_uring_rsrc_update up = {
813                 .data   = s->ring_fd,
814                 .offset = -1U,
815         };
816         int ret;
817
818         ret = syscall(__NR_io_uring_register, s->ring_fd,
819                         IORING_REGISTER_RING_FDS, &up, 1);
820         if (ret == 1) {
821                 s->enter_ring_fd = up.offset;
822                 return 0;
823         }
824         register_ring = 0;
825         return -1;
826 }
827
828 static void *submitter_uring_fn(void *data)
829 {
830         struct submitter *s = data;
831         struct io_sq_ring *ring = &s->sq_ring;
832         int ret, prepped;
833 #ifdef ARCH_HAVE_CPU_CLOCK
834         int nr_batch = submitter_init(s);
835 #else
836         submitter_init(s);
837 #endif
838
839         if (register_ring)
840                 io_uring_register_ring(s);
841
842         prepped = 0;
843         do {
844                 int to_wait, to_submit, this_reap, to_prep;
845                 unsigned ring_flags = 0;
846
847                 if (!prepped && s->inflight < depth) {
848                         to_prep = min(depth - s->inflight, batch_submit);
849                         prepped = prep_more_ios_uring(s, to_prep);
850 #ifdef ARCH_HAVE_CPU_CLOCK
851                         if (prepped && stats) {
852                                 s->clock_batch[s->clock_index] = get_cpu_clock();
853                                 s->clock_index = (s->clock_index + 1) & (nr_batch - 1);
854                         }
855 #endif
856                 }
857                 s->inflight += prepped;
858 submit_more:
859                 to_submit = prepped;
860 submit:
861                 if (to_submit && (s->inflight + to_submit <= depth))
862                         to_wait = 0;
863                 else
864                         to_wait = min(s->inflight + to_submit, batch_complete);
865
866                 /*
867                  * Only need to call io_uring_enter if we're not using SQ thread
868                  * poll, or if IORING_SQ_NEED_WAKEUP is set.
869                  */
870                 if (sq_thread_poll)
871                         ring_flags = atomic_load_acquire(ring->flags);
872                 if (!sq_thread_poll || ring_flags & IORING_SQ_NEED_WAKEUP) {
873                         unsigned flags = 0;
874
875                         if (to_wait)
876                                 flags = IORING_ENTER_GETEVENTS;
877                         if (ring_flags & IORING_SQ_NEED_WAKEUP)
878                                 flags |= IORING_ENTER_SQ_WAKEUP;
879                         ret = io_uring_enter(s, to_submit, to_wait, flags);
880                         s->calls++;
881                 } else {
882                         /* for SQPOLL, we submitted it all effectively */
883                         ret = to_submit;
884                 }
885
886                 /*
887                  * For non SQ thread poll, we already got the events we needed
888                  * through the io_uring_enter() above. For SQ thread poll, we
889                  * need to loop here until we find enough events.
890                  */
891                 this_reap = 0;
892                 do {
893                         int r;
894
895                         r = reap_events_uring(s);
896                         if (r == -1) {
897                                 s->finish = 1;
898                                 break;
899                         } else if (r > 0)
900                                 this_reap += r;
901                 } while (sq_thread_poll && this_reap < to_wait);
902                 s->reaps += this_reap;
903
904                 if (ret >= 0) {
905                         if (!ret) {
906                                 to_submit = 0;
907                                 if (s->inflight)
908                                         goto submit;
909                                 continue;
910                         } else if (ret < to_submit) {
911                                 int diff = to_submit - ret;
912
913                                 s->done += ret;
914                                 prepped -= diff;
915                                 goto submit_more;
916                         }
917                         s->done += ret;
918                         prepped = 0;
919                         continue;
920                 } else if (ret < 0) {
921                         if (errno == EAGAIN) {
922                                 if (s->finish)
923                                         break;
924                                 if (this_reap)
925                                         goto submit;
926                                 to_submit = 0;
927                                 goto submit;
928                         }
929                         printf("io_submit: %s\n", strerror(errno));
930                         break;
931                 }
932         } while (!s->finish);
933
934         if (register_ring)
935                 io_uring_unregister_ring(s);
936
937         finish = 1;
938         return NULL;
939 }
940
941 static struct submitter *get_submitter(int offset)
942 {
943         void *ret;
944
945         ret = submitter;
946         if (offset)
947                 ret += offset * (sizeof(*submitter) + depth * sizeof(struct iovec));
948         return ret;
949 }
950
951 static void do_finish(const char *reason)
952 {
953         int j;
954         printf("Exiting on %s\n", reason);
955         for (j = 0; j < nthreads; j++) {
956                 struct submitter *s = get_submitter(j);
957                 s->finish = 1;
958         }
959         if (max_iops > 100000)
960                 printf("Maximum IOPS=%luK\n", max_iops / 1000);
961         else if (max_iops)
962                 printf("Maximum IOPS=%lu\n", max_iops);
963         finish = 1;
964 }
965
966 static void sig_int(int sig)
967 {
968         do_finish("signal");
969 }
970
971 static void arm_sig_int(void)
972 {
973         struct sigaction act;
974
975         memset(&act, 0, sizeof(act));
976         act.sa_handler = sig_int;
977         act.sa_flags = SA_RESTART;
978         sigaction(SIGINT, &act, NULL);
979
980         /* Windows uses SIGBREAK as a quit signal from other applications */
981 #ifdef WIN32
982         sigaction(SIGBREAK, &act, NULL);
983 #endif
984 }
985
986 static int setup_aio(struct submitter *s)
987 {
988 #ifdef CONFIG_LIBAIO
989         if (polled) {
990                 fprintf(stderr, "aio does not support polled IO\n");
991                 polled = 0;
992         }
993         if (sq_thread_poll) {
994                 fprintf(stderr, "aio does not support SQPOLL IO\n");
995                 sq_thread_poll = 0;
996         }
997         if (do_nop) {
998                 fprintf(stderr, "aio does not support polled IO\n");
999                 do_nop = 0;
1000         }
1001         if (fixedbufs || register_files) {
1002                 fprintf(stderr, "aio does not support registered files or buffers\n");
1003                 fixedbufs = register_files = 0;
1004         }
1005
1006         return io_queue_init(roundup_pow2(depth), &s->aio_ctx);
1007 #else
1008         fprintf(stderr, "Legacy AIO not available on this system/build\n");
1009         errno = EINVAL;
1010         return -1;
1011 #endif
1012 }
1013
1014 static int setup_ring(struct submitter *s)
1015 {
1016         struct io_sq_ring *sring = &s->sq_ring;
1017         struct io_cq_ring *cring = &s->cq_ring;
1018         struct io_uring_params p;
1019         int ret, fd;
1020         void *ptr;
1021
1022         memset(&p, 0, sizeof(p));
1023
1024         if (polled && !do_nop)
1025                 p.flags |= IORING_SETUP_IOPOLL;
1026         if (sq_thread_poll) {
1027                 p.flags |= IORING_SETUP_SQPOLL;
1028                 if (sq_thread_cpu != -1) {
1029                         p.flags |= IORING_SETUP_SQ_AFF;
1030                         p.sq_thread_cpu = sq_thread_cpu;
1031                 }
1032         }
1033
1034         fd = io_uring_setup(depth, &p);
1035         if (fd < 0) {
1036                 perror("io_uring_setup");
1037                 return 1;
1038         }
1039         s->ring_fd = s->enter_ring_fd = fd;
1040
1041         io_uring_probe(fd);
1042
1043         if (fixedbufs) {
1044                 struct rlimit rlim;
1045
1046                 rlim.rlim_cur = RLIM_INFINITY;
1047                 rlim.rlim_max = RLIM_INFINITY;
1048                 /* ignore potential error, not needed on newer kernels */
1049                 setrlimit(RLIMIT_MEMLOCK, &rlim);
1050
1051                 ret = io_uring_register_buffers(s);
1052                 if (ret < 0) {
1053                         perror("io_uring_register_buffers");
1054                         return 1;
1055                 }
1056
1057                 if (dma_map) {
1058                         ret = io_uring_map_buffers(s);
1059                         if (ret < 0) {
1060                                 perror("io_uring_map_buffers");
1061                                 return 1;
1062                         }
1063                 }
1064         }
1065
1066         if (register_files) {
1067                 ret = io_uring_register_files(s);
1068                 if (ret < 0) {
1069                         perror("io_uring_register_files");
1070                         return 1;
1071                 }
1072         }
1073
1074         ptr = mmap(0, p.sq_off.array + p.sq_entries * sizeof(__u32),
1075                         PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
1076                         IORING_OFF_SQ_RING);
1077         sring->head = ptr + p.sq_off.head;
1078         sring->tail = ptr + p.sq_off.tail;
1079         sring->ring_mask = ptr + p.sq_off.ring_mask;
1080         sring->ring_entries = ptr + p.sq_off.ring_entries;
1081         sring->flags = ptr + p.sq_off.flags;
1082         sring->array = ptr + p.sq_off.array;
1083         sq_ring_mask = *sring->ring_mask;
1084
1085         s->sqes = mmap(0, p.sq_entries * sizeof(struct io_uring_sqe),
1086                         PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
1087                         IORING_OFF_SQES);
1088
1089         ptr = mmap(0, p.cq_off.cqes + p.cq_entries * sizeof(struct io_uring_cqe),
1090                         PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
1091                         IORING_OFF_CQ_RING);
1092         cring->head = ptr + p.cq_off.head;
1093         cring->tail = ptr + p.cq_off.tail;
1094         cring->ring_mask = ptr + p.cq_off.ring_mask;
1095         cring->ring_entries = ptr + p.cq_off.ring_entries;
1096         cring->cqes = ptr + p.cq_off.cqes;
1097         cq_ring_mask = *cring->ring_mask;
1098         return 0;
1099 }
1100
1101 static void file_depths(char *buf)
1102 {
1103         bool prev = false;
1104         char *p;
1105         int i, j;
1106
1107         buf[0] = '\0';
1108         p = buf;
1109         for (j = 0; j < nthreads; j++) {
1110                 struct submitter *s = get_submitter(j);
1111
1112                 for (i = 0; i < s->nr_files; i++) {
1113                         struct file *f = &s->files[i];
1114
1115                         if (prev)
1116                                 p += sprintf(p, " %d", f->pending_ios);
1117                         else
1118                                 p += sprintf(p, "%d", f->pending_ios);
1119                         prev = true;
1120                 }
1121         }
1122 }
1123
1124 static void usage(char *argv, int status)
1125 {
1126         char runtime_str[16];
1127         snprintf(runtime_str, sizeof(runtime_str), "%d", runtime);
1128         printf("%s [options] -- [filenames]\n"
1129                 " -d <int>  : IO Depth, default %d\n"
1130                 " -s <int>  : Batch submit, default %d\n"
1131                 " -c <int>  : Batch complete, default %d\n"
1132                 " -b <int>  : Block size, default %d\n"
1133                 " -p <bool> : Polled IO, default %d\n"
1134                 " -B <bool> : Fixed buffers, default %d\n"
1135                 " -D <bool> : DMA map fixed buffers, default %d\n"
1136                 " -F <bool> : Register files, default %d\n"
1137                 " -n <int>  : Number of threads, default %d\n"
1138                 " -O <bool> : Use O_DIRECT, default %d\n"
1139                 " -N <bool> : Perform just no-op requests, default %d\n"
1140                 " -t <bool> : Track IO latencies, default %d\n"
1141                 " -T <int>  : TSC rate in HZ\n"
1142                 " -r <int>  : Runtime in seconds, default %s\n"
1143                 " -R <bool> : Use random IO, default %d\n"
1144                 " -a <bool> : Use legacy aio, default %d\n"
1145                 " -X <bool> : Use registered ring %d\n",
1146                 argv, DEPTH, BATCH_SUBMIT, BATCH_COMPLETE, BS, polled,
1147                 fixedbufs, dma_map, register_files, nthreads, !buffered, do_nop,
1148                 stats, runtime == 0 ? "unlimited" : runtime_str, random_io, aio,
1149                 register_ring);
1150         exit(status);
1151 }
1152
1153 static void read_tsc_rate(void)
1154 {
1155         char buffer[32];
1156         int fd, ret;
1157
1158         if (tsc_rate)
1159                 return;
1160
1161         fd = open(TSC_RATE_FILE, O_RDONLY);
1162         if (fd < 0)
1163                 return;
1164
1165         ret = read(fd, buffer, sizeof(buffer));
1166         if (ret < 0) {
1167                 close(fd);
1168                 return;
1169         }
1170
1171         tsc_rate = strtoul(buffer, NULL, 10);
1172         printf("Using TSC rate %luHz\n", tsc_rate);
1173         close(fd);
1174 }
1175
1176 static void write_tsc_rate(void)
1177 {
1178         char buffer[32];
1179         struct stat sb;
1180         int fd, ret;
1181
1182         if (!stat(TSC_RATE_FILE, &sb))
1183                 return;
1184
1185         fd = open(TSC_RATE_FILE, O_WRONLY | O_CREAT, 0644);
1186         if (fd < 0)
1187                 return;
1188
1189         memset(buffer, 0, sizeof(buffer));
1190         sprintf(buffer, "%lu", tsc_rate);
1191         ret = write(fd, buffer, strlen(buffer));
1192         if (ret < 0)
1193                 perror("write");
1194         close(fd);
1195 }
1196
1197 int main(int argc, char *argv[])
1198 {
1199         struct submitter *s;
1200         unsigned long done, calls, reap;
1201         int err, i, j, flags, fd, opt, threads_per_f, threads_rem = 0, nfiles;
1202         long page_size;
1203         struct file f;
1204         char *fdepths;
1205         void *ret;
1206
1207         if (!do_nop && argc < 2)
1208                 usage(argv[0], 1);
1209
1210         while ((opt = getopt(argc, argv, "d:s:c:b:p:B:F:n:N:O:t:T:a:r:D:R:X:h?")) != -1) {
1211                 switch (opt) {
1212                 case 'a':
1213                         aio = !!atoi(optarg);
1214                         break;
1215                 case 'd':
1216                         depth = atoi(optarg);
1217                         break;
1218                 case 's':
1219                         batch_submit = atoi(optarg);
1220                         if (!batch_submit)
1221                                 batch_submit = 1;
1222                         break;
1223                 case 'c':
1224                         batch_complete = atoi(optarg);
1225                         if (!batch_complete)
1226                                 batch_complete = 1;
1227                         break;
1228                 case 'b':
1229                         bs = atoi(optarg);
1230                         break;
1231                 case 'p':
1232                         polled = !!atoi(optarg);
1233                         break;
1234                 case 'B':
1235                         fixedbufs = !!atoi(optarg);
1236                         break;
1237                 case 'F':
1238                         register_files = !!atoi(optarg);
1239                         break;
1240                 case 'n':
1241                         nthreads = atoi(optarg);
1242                         if (!nthreads) {
1243                                 printf("Threads must be non-zero\n");
1244                                 usage(argv[0], 1);
1245                         }
1246                         break;
1247                 case 'N':
1248                         do_nop = !!atoi(optarg);
1249                         break;
1250                 case 'O':
1251                         buffered = !atoi(optarg);
1252                         break;
1253                 case 't':
1254 #ifndef ARCH_HAVE_CPU_CLOCK
1255                         fprintf(stderr, "Stats not supported on this CPU\n");
1256                         return 1;
1257 #endif
1258                         stats = !!atoi(optarg);
1259                         break;
1260                 case 'T':
1261 #ifndef ARCH_HAVE_CPU_CLOCK
1262                         fprintf(stderr, "Stats not supported on this CPU\n");
1263                         return 1;
1264 #endif
1265                         tsc_rate = strtoul(optarg, NULL, 10);
1266                         write_tsc_rate();
1267                         break;
1268                 case 'r':
1269                         runtime = atoi(optarg);
1270                         break;
1271                 case 'D':
1272                         dma_map = !!atoi(optarg);
1273                         break;
1274                 case 'R':
1275                         random_io = !!atoi(optarg);
1276                         break;
1277                 case 'X':
1278                         register_ring = !!atoi(optarg);
1279                         break;
1280                 case 'h':
1281                 case '?':
1282                 default:
1283                         usage(argv[0], 0);
1284                         break;
1285                 }
1286         }
1287
1288         if (stats)
1289                 read_tsc_rate();
1290
1291         if (batch_complete > depth)
1292                 batch_complete = depth;
1293         if (batch_submit > depth)
1294                 batch_submit = depth;
1295         if (!fixedbufs && dma_map)
1296                 dma_map = 0;
1297
1298         submitter = calloc(nthreads, sizeof(*submitter) +
1299                                 roundup_pow2(depth) * sizeof(struct iovec));
1300         for (j = 0; j < nthreads; j++) {
1301                 s = get_submitter(j);
1302                 s->index = j;
1303                 s->done = s->calls = s->reaps = 0;
1304         }
1305
1306         flags = O_RDONLY | O_NOATIME;
1307         if (!buffered)
1308                 flags |= O_DIRECT;
1309
1310         j = 0;
1311         i = optind;
1312         nfiles = argc - i;
1313         if (!do_nop) {
1314                 if (!nfiles) {
1315                         printf("No files specified\n");
1316                         usage(argv[0], 1);
1317                 }
1318                 threads_per_f = nthreads / nfiles;
1319                 /* make sure each thread gets assigned files */
1320                 if (threads_per_f == 0) {
1321                         threads_per_f = 1;
1322                 } else {
1323                         threads_rem = nthreads - threads_per_f * nfiles;
1324                 }
1325         }
1326         while (!do_nop && i < argc) {
1327                 int k, limit;
1328
1329                 memset(&f, 0, sizeof(f));
1330
1331                 fd = open(argv[i], flags);
1332                 if (fd < 0) {
1333                         perror("open");
1334                         return 1;
1335                 }
1336                 f.real_fd = fd;
1337                 if (get_file_size(&f)) {
1338                         printf("failed getting size of device/file\n");
1339                         return 1;
1340                 }
1341                 if (f.max_blocks <= 1) {
1342                         printf("Zero file/device size?\n");
1343                         return 1;
1344                 }
1345                 f.max_blocks--;
1346
1347                 limit = threads_per_f;
1348                 limit += threads_rem > 0 ? 1 : 0;
1349                 for (k = 0; k < limit; k++) {
1350                         s = get_submitter((j + k) % nthreads);
1351
1352                         if (s->nr_files == MAX_FDS) {
1353                                 printf("Max number of files (%d) reached\n", MAX_FDS);
1354                                 break;
1355                         }
1356
1357                         memcpy(&s->files[s->nr_files], &f, sizeof(f));
1358
1359                         printf("Added file %s (submitter %d)\n", argv[i], s->index);
1360                         s->nr_files++;
1361                 }
1362                 threads_rem--;
1363                 i++;
1364                 j += limit;
1365         }
1366
1367         arm_sig_int();
1368
1369         page_size = sysconf(_SC_PAGESIZE);
1370         if (page_size < 0)
1371                 page_size = 4096;
1372
1373         for (j = 0; j < nthreads; j++) {
1374                 s = get_submitter(j);
1375                 for (i = 0; i < roundup_pow2(depth); i++) {
1376                         void *buf;
1377
1378                         if (posix_memalign(&buf, page_size, bs)) {
1379                                 printf("failed alloc\n");
1380                                 return 1;
1381                         }
1382                         s->iovecs[i].iov_base = buf;
1383                         s->iovecs[i].iov_len = bs;
1384                 }
1385         }
1386
1387         for (j = 0; j < nthreads; j++) {
1388                 s = get_submitter(j);
1389
1390                 if (!aio)
1391                         err = setup_ring(s);
1392                 else
1393                         err = setup_aio(s);
1394                 if (err) {
1395                         printf("ring setup failed: %s, %d\n", strerror(errno), err);
1396                         return 1;
1397                 }
1398         }
1399         s = get_submitter(0);
1400         printf("polled=%d, fixedbufs=%d/%d, register_files=%d, buffered=%d, QD=%d\n", polled, fixedbufs, dma_map, register_files, buffered, depth);
1401         if (!aio)
1402                 printf("Engine=io_uring, sq_ring=%d, cq_ring=%d\n", *s->sq_ring.ring_entries, *s->cq_ring.ring_entries);
1403         else
1404                 printf("Engine=aio\n");
1405
1406         for (j = 0; j < nthreads; j++) {
1407                 s = get_submitter(j);
1408                 if (!aio)
1409                         pthread_create(&s->thread, NULL, submitter_uring_fn, s);
1410 #ifdef CONFIG_LIBAIO
1411                 else
1412                         pthread_create(&s->thread, NULL, submitter_aio_fn, s);
1413 #endif
1414         }
1415
1416         fdepths = malloc(8 * s->nr_files * nthreads);
1417         reap = calls = done = 0;
1418         do {
1419                 unsigned long this_done = 0;
1420                 unsigned long this_reap = 0;
1421                 unsigned long this_call = 0;
1422                 unsigned long rpc = 0, ipc = 0;
1423                 unsigned long iops, bw;
1424
1425                 sleep(1);
1426                 if (runtime && !--runtime)
1427                         do_finish("timeout");
1428
1429                 /* don't print partial run, if interrupted by signal */
1430                 if (finish)
1431                         break;
1432
1433                 /* one second in to the run, enable stats */
1434                 if (stats)
1435                         stats_running = 1;
1436
1437                 for (j = 0; j < nthreads; j++) {
1438                         s = get_submitter(j);
1439                         this_done += s->done;
1440                         this_call += s->calls;
1441                         this_reap += s->reaps;
1442                 }
1443                 if (this_call - calls) {
1444                         rpc = (this_done - done) / (this_call - calls);
1445                         ipc = (this_reap - reap) / (this_call - calls);
1446                 } else
1447                         rpc = ipc = -1;
1448                 file_depths(fdepths);
1449                 iops = this_done - done;
1450                 if (bs > 1048576)
1451                         bw = iops * (bs / 1048576);
1452                 else
1453                         bw = iops / (1048576 / bs);
1454                 if (iops > 100000)
1455                         printf("IOPS=%luK, ", iops / 1000);
1456                 else
1457                         printf("IOPS=%lu, ", iops);
1458                 max_iops = max(max_iops, iops);
1459                 if (!do_nop)
1460                         printf("BW=%luMiB/s, ", bw);
1461                 printf("IOS/call=%ld/%ld, inflight=(%s)\n", rpc, ipc, fdepths);
1462                 done = this_done;
1463                 calls = this_call;
1464                 reap = this_reap;
1465         } while (!finish);
1466
1467         for (j = 0; j < nthreads; j++) {
1468                 s = get_submitter(j);
1469                 pthread_join(s->thread, &ret);
1470                 close(s->ring_fd);
1471
1472                 if (stats) {
1473                         unsigned long nr;
1474
1475                         printf("%d: Latency percentiles:\n", s->tid);
1476                         for (i = 0, nr = 0; i < PLAT_NR; i++)
1477                                 nr += s->plat[i];
1478                         show_clat_percentiles(s->plat, nr, 4);
1479                         free(s->clock_batch);
1480                         free(s->plat);
1481                 }
1482         }
1483
1484         free(fdepths);
1485         free(submitter);
1486         return 0;
1487 }