t/io_uring: change fatal map buffers condition with multiple files
[fio.git] / t / io_uring.c
1 #include <stdio.h>
2 #include <errno.h>
3 #include <assert.h>
4 #include <stdlib.h>
5 #include <stddef.h>
6 #include <signal.h>
7 #include <inttypes.h>
8 #include <math.h>
9
10 #ifdef CONFIG_LIBAIO
11 #include <libaio.h>
12 #endif
13
14 #include <sys/types.h>
15 #include <sys/stat.h>
16 #include <sys/ioctl.h>
17 #include <sys/syscall.h>
18 #include <sys/resource.h>
19 #include <sys/mman.h>
20 #include <sys/uio.h>
21 #include <linux/fs.h>
22 #include <fcntl.h>
23 #include <unistd.h>
24 #include <string.h>
25 #include <pthread.h>
26 #include <sched.h>
27
28 #include "../arch/arch.h"
29 #include "../lib/types.h"
30 #include "../lib/roundup.h"
31 #include "../lib/rand.h"
32 #include "../minmax.h"
33 #include "../os/linux/io_uring.h"
34
35 struct io_sq_ring {
36         unsigned *head;
37         unsigned *tail;
38         unsigned *ring_mask;
39         unsigned *ring_entries;
40         unsigned *flags;
41         unsigned *array;
42 };
43
44 struct io_cq_ring {
45         unsigned *head;
46         unsigned *tail;
47         unsigned *ring_mask;
48         unsigned *ring_entries;
49         struct io_uring_cqe *cqes;
50 };
51
52 #define DEPTH                   128
53 #define BATCH_SUBMIT            32
54 #define BATCH_COMPLETE          32
55 #define BS                      4096
56
57 #define MAX_FDS                 16
58
59 static unsigned sq_ring_mask, cq_ring_mask;
60
61 struct file {
62         unsigned long max_blocks;
63         unsigned long max_size;
64         unsigned long cur_off;
65         unsigned pending_ios;
66         int real_fd;
67         int fixed_fd;
68         int fileno;
69 };
70
71 #define PLAT_BITS               6
72 #define PLAT_VAL                (1 << PLAT_BITS)
73 #define PLAT_GROUP_NR           29
74 #define PLAT_NR                 (PLAT_GROUP_NR * PLAT_VAL)
75
76 struct submitter {
77         pthread_t thread;
78         int ring_fd;
79         int index;
80         struct io_sq_ring sq_ring;
81         struct io_uring_sqe *sqes;
82         struct io_cq_ring cq_ring;
83         int inflight;
84         int tid;
85         unsigned long reaps;
86         unsigned long done;
87         unsigned long calls;
88         volatile int finish;
89
90         __s32 *fds;
91
92         struct taus258_state rand_state;
93
94         unsigned long *clock_batch;
95         int clock_index;
96         unsigned long *plat;
97
98 #ifdef CONFIG_LIBAIO
99         io_context_t aio_ctx;
100 #endif
101
102         struct file files[MAX_FDS];
103         unsigned nr_files;
104         unsigned cur_file;
105         struct iovec iovecs[];
106 };
107
108 static struct submitter *submitter;
109 static volatile int finish;
110 static int stats_running;
111 static unsigned long max_iops;
112
113 static int depth = DEPTH;
114 static int batch_submit = BATCH_SUBMIT;
115 static int batch_complete = BATCH_COMPLETE;
116 static int bs = BS;
117 static int polled = 1;          /* use IO polling */
118 static int fixedbufs = 1;       /* use fixed user buffers */
119 static int dma_map;             /* pre-map DMA buffers */
120 static int register_files = 1;  /* use fixed files */
121 static int buffered = 0;        /* use buffered IO, not O_DIRECT */
122 static int sq_thread_poll = 0;  /* use kernel submission/poller thread */
123 static int sq_thread_cpu = -1;  /* pin above thread to this CPU */
124 static int do_nop = 0;          /* no-op SQ ring commands */
125 static int nthreads = 1;
126 static int stats = 0;           /* generate IO stats */
127 static int aio = 0;             /* use libaio */
128 static int runtime = 0;         /* runtime */
129 static int random_io = 1;       /* random or sequential IO */
130
131 static unsigned long tsc_rate;
132
133 #define TSC_RATE_FILE   "tsc-rate"
134
135 static int vectored = 1;
136
137 static float plist[] = { 1.0, 5.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0,
138                         80.0, 90.0, 95.0, 99.0, 99.5, 99.9, 99.95, 99.99 };
139 static int plist_len = 17;
140
141 #ifndef IORING_REGISTER_MAP_BUFFERS
142 #define IORING_REGISTER_MAP_BUFFERS     22
143 struct io_uring_map_buffers {
144         __s32   fd;
145         __u32   buf_start;
146         __u32   buf_end;
147         __u32   flags;
148         __u64   rsvd[2];
149 };
150 #endif
151
152 static unsigned long cycles_to_nsec(unsigned long cycles)
153 {
154         uint64_t val;
155
156         if (!tsc_rate)
157                 return cycles;
158
159         val = cycles * 1000000000ULL;
160         return val / tsc_rate;
161 }
162
163 static unsigned long plat_idx_to_val(unsigned int idx)
164 {
165         unsigned int error_bits;
166         unsigned long k, base;
167
168         assert(idx < PLAT_NR);
169
170         /* MSB <= (PLAT_BITS-1), cannot be rounded off. Use
171          * all bits of the sample as index */
172         if (idx < (PLAT_VAL << 1))
173                 return cycles_to_nsec(idx);
174
175         /* Find the group and compute the minimum value of that group */
176         error_bits = (idx >> PLAT_BITS) - 1;
177         base = ((unsigned long) 1) << (error_bits + PLAT_BITS);
178
179         /* Find its bucket number of the group */
180         k = idx % PLAT_VAL;
181
182         /* Return the mean of the range of the bucket */
183         return cycles_to_nsec(base + ((k + 0.5) * (1 << error_bits)));
184 }
185
186 unsigned int calc_clat_percentiles(unsigned long *io_u_plat, unsigned long nr,
187                                    unsigned long **output,
188                                    unsigned long *maxv, unsigned long *minv)
189 {
190         unsigned long sum = 0;
191         unsigned int len = plist_len, i, j = 0;
192         unsigned long *ovals = NULL;
193         bool is_last;
194
195         *minv = -1UL;
196         *maxv = 0;
197
198         ovals = malloc(len * sizeof(*ovals));
199         if (!ovals)
200                 return 0;
201
202         /*
203          * Calculate bucket values, note down max and min values
204          */
205         is_last = false;
206         for (i = 0; i < PLAT_NR && !is_last; i++) {
207                 sum += io_u_plat[i];
208                 while (sum >= ((long double) plist[j] / 100.0 * nr)) {
209                         assert(plist[j] <= 100.0);
210
211                         ovals[j] = plat_idx_to_val(i);
212                         if (ovals[j] < *minv)
213                                 *minv = ovals[j];
214                         if (ovals[j] > *maxv)
215                                 *maxv = ovals[j];
216
217                         is_last = (j == len - 1) != 0;
218                         if (is_last)
219                                 break;
220
221                         j++;
222                 }
223         }
224
225         if (!is_last)
226                 fprintf(stderr, "error calculating latency percentiles\n");
227
228         *output = ovals;
229         return len;
230 }
231
232 static void show_clat_percentiles(unsigned long *io_u_plat, unsigned long nr,
233                                   unsigned int precision)
234 {
235         unsigned int divisor, len, i, j = 0;
236         unsigned long minv, maxv;
237         unsigned long *ovals;
238         int per_line, scale_down, time_width;
239         bool is_last;
240         char fmt[32];
241
242         len = calc_clat_percentiles(io_u_plat, nr, &ovals, &maxv, &minv);
243         if (!len || !ovals)
244                 goto out;
245
246         if (!tsc_rate) {
247                 scale_down = 0;
248                 divisor = 1;
249                 printf("    percentiles (tsc ticks):\n     |");
250         } else if (minv > 2000 && maxv > 99999) {
251                 scale_down = 1;
252                 divisor = 1000;
253                 printf("    percentiles (usec):\n     |");
254         } else {
255                 scale_down = 0;
256                 divisor = 1;
257                 printf("    percentiles (nsec):\n     |");
258         }
259
260         time_width = max(5, (int) (log10(maxv / divisor) + 1));
261         snprintf(fmt, sizeof(fmt), " %%%u.%ufth=[%%%dllu]%%c", precision + 3,
262                         precision, time_width);
263         /* fmt will be something like " %5.2fth=[%4llu]%c" */
264         per_line = (80 - 7) / (precision + 10 + time_width);
265
266         for (j = 0; j < len; j++) {
267                 /* for formatting */
268                 if (j != 0 && (j % per_line) == 0)
269                         printf("     |");
270
271                 /* end of the list */
272                 is_last = (j == len - 1) != 0;
273
274                 for (i = 0; i < scale_down; i++)
275                         ovals[j] = (ovals[j] + 999) / 1000;
276
277                 printf(fmt, plist[j], ovals[j], is_last ? '\n' : ',');
278
279                 if (is_last)
280                         break;
281
282                 if ((j % per_line) == per_line - 1)     /* for formatting */
283                         printf("\n");
284         }
285
286 out:
287         free(ovals);
288 }
289
290 #ifdef ARCH_HAVE_CPU_CLOCK
291 static unsigned int plat_val_to_idx(unsigned long val)
292 {
293         unsigned int msb, error_bits, base, offset, idx;
294
295         /* Find MSB starting from bit 0 */
296         if (val == 0)
297                 msb = 0;
298         else
299                 msb = (sizeof(val)*8) - __builtin_clzll(val) - 1;
300
301         /*
302          * MSB <= (PLAT_BITS-1), cannot be rounded off. Use
303          * all bits of the sample as index
304          */
305         if (msb <= PLAT_BITS)
306                 return val;
307
308         /* Compute the number of error bits to discard*/
309         error_bits = msb - PLAT_BITS;
310
311         /* Compute the number of buckets before the group */
312         base = (error_bits + 1) << PLAT_BITS;
313
314         /*
315          * Discard the error bits and apply the mask to find the
316          * index for the buckets in the group
317          */
318         offset = (PLAT_VAL - 1) & (val >> error_bits);
319
320         /* Make sure the index does not exceed (array size - 1) */
321         idx = (base + offset) < (PLAT_NR - 1) ?
322                 (base + offset) : (PLAT_NR - 1);
323
324         return idx;
325 }
326 #endif
327
328 static void add_stat(struct submitter *s, int clock_index, int nr)
329 {
330 #ifdef ARCH_HAVE_CPU_CLOCK
331         unsigned long cycles;
332         unsigned int pidx;
333
334         if (!s->finish && clock_index) {
335                 cycles = get_cpu_clock();
336                 cycles -= s->clock_batch[clock_index];
337                 pidx = plat_val_to_idx(cycles);
338                 s->plat[pidx] += nr;
339         }
340 #endif
341 }
342
343 static int io_uring_map_buffers(struct submitter *s)
344 {
345         struct io_uring_map_buffers map = {
346                 .fd             = s->files[0].real_fd,
347                 .buf_end        = depth,
348         };
349
350         if (do_nop)
351                 return 0;
352         if (s->nr_files > 1)
353                 fprintf(stdout, "Mapping buffers may not work with multiple files\n");
354
355         return syscall(__NR_io_uring_register, s->ring_fd,
356                         IORING_REGISTER_MAP_BUFFERS, &map, 1);
357 }
358
359 static int io_uring_register_buffers(struct submitter *s)
360 {
361         if (do_nop)
362                 return 0;
363
364         return syscall(__NR_io_uring_register, s->ring_fd,
365                         IORING_REGISTER_BUFFERS, s->iovecs, roundup_pow2(depth));
366 }
367
368 static int io_uring_register_files(struct submitter *s)
369 {
370         int i;
371
372         if (do_nop)
373                 return 0;
374
375         s->fds = calloc(s->nr_files, sizeof(__s32));
376         for (i = 0; i < s->nr_files; i++) {
377                 s->fds[i] = s->files[i].real_fd;
378                 s->files[i].fixed_fd = i;
379         }
380
381         return syscall(__NR_io_uring_register, s->ring_fd,
382                         IORING_REGISTER_FILES, s->fds, s->nr_files);
383 }
384
385 static int io_uring_setup(unsigned entries, struct io_uring_params *p)
386 {
387         /*
388          * Clamp CQ ring size at our SQ ring size, we don't need more entries
389          * than that.
390          */
391         p->flags |= IORING_SETUP_CQSIZE;
392         p->cq_entries = entries;
393
394         return syscall(__NR_io_uring_setup, entries, p);
395 }
396
397 static void io_uring_probe(int fd)
398 {
399         struct io_uring_probe *p;
400         int ret;
401
402         p = malloc(sizeof(*p) + 256 * sizeof(struct io_uring_probe_op));
403         if (!p)
404                 return;
405
406         memset(p, 0, sizeof(*p) + 256 * sizeof(struct io_uring_probe_op));
407         ret = syscall(__NR_io_uring_register, fd, IORING_REGISTER_PROBE, p, 256);
408         if (ret < 0)
409                 goto out;
410
411         if (IORING_OP_READ > p->ops_len)
412                 goto out;
413
414         if ((p->ops[IORING_OP_READ].flags & IO_URING_OP_SUPPORTED))
415                 vectored = 0;
416 out:
417         free(p);
418 }
419
420 static int io_uring_enter(struct submitter *s, unsigned int to_submit,
421                           unsigned int min_complete, unsigned int flags)
422 {
423 #ifdef FIO_ARCH_HAS_SYSCALL
424         return __do_syscall6(__NR_io_uring_enter, s->ring_fd, to_submit,
425                                 min_complete, flags, NULL, 0);
426 #else
427         return syscall(__NR_io_uring_enter, s->ring_fd, to_submit, min_complete,
428                         flags, NULL, 0);
429 #endif
430 }
431
432 #ifndef CONFIG_HAVE_GETTID
433 static int gettid(void)
434 {
435         return syscall(__NR_gettid);
436 }
437 #endif
438
439 static unsigned file_depth(struct submitter *s)
440 {
441         return (depth + s->nr_files - 1) / s->nr_files;
442 }
443
444 static void init_io(struct submitter *s, unsigned index)
445 {
446         struct io_uring_sqe *sqe = &s->sqes[index];
447         unsigned long offset;
448         struct file *f;
449         long r;
450
451         if (do_nop) {
452                 sqe->opcode = IORING_OP_NOP;
453                 return;
454         }
455
456         if (s->nr_files == 1) {
457                 f = &s->files[0];
458         } else {
459                 f = &s->files[s->cur_file];
460                 if (f->pending_ios >= file_depth(s)) {
461                         s->cur_file++;
462                         if (s->cur_file == s->nr_files)
463                                 s->cur_file = 0;
464                         f = &s->files[s->cur_file];
465                 }
466         }
467         f->pending_ios++;
468
469         if (random_io) {
470                 r = __rand64(&s->rand_state);
471                 offset = (r % (f->max_blocks - 1)) * bs;
472         } else {
473                 offset = f->cur_off;
474                 f->cur_off += bs;
475                 if (f->cur_off + bs > f->max_size)
476                         f->cur_off = 0;
477         }
478
479         if (register_files) {
480                 sqe->flags = IOSQE_FIXED_FILE;
481                 sqe->fd = f->fixed_fd;
482         } else {
483                 sqe->flags = 0;
484                 sqe->fd = f->real_fd;
485         }
486         if (fixedbufs) {
487                 sqe->opcode = IORING_OP_READ_FIXED;
488                 sqe->addr = (unsigned long) s->iovecs[index].iov_base;
489                 sqe->len = bs;
490                 sqe->buf_index = index;
491         } else if (!vectored) {
492                 sqe->opcode = IORING_OP_READ;
493                 sqe->addr = (unsigned long) s->iovecs[index].iov_base;
494                 sqe->len = bs;
495                 sqe->buf_index = 0;
496         } else {
497                 sqe->opcode = IORING_OP_READV;
498                 sqe->addr = (unsigned long) &s->iovecs[index];
499                 sqe->len = 1;
500                 sqe->buf_index = 0;
501         }
502         sqe->ioprio = 0;
503         sqe->off = offset;
504         sqe->user_data = (unsigned long) f->fileno;
505         if (stats && stats_running)
506                 sqe->user_data |= ((uint64_t)s->clock_index << 32);
507 }
508
509 static int prep_more_ios_uring(struct submitter *s, int max_ios)
510 {
511         struct io_sq_ring *ring = &s->sq_ring;
512         unsigned index, tail, next_tail, prepped = 0;
513
514         next_tail = tail = *ring->tail;
515         do {
516                 next_tail++;
517                 if (next_tail == atomic_load_acquire(ring->head))
518                         break;
519
520                 index = tail & sq_ring_mask;
521                 init_io(s, index);
522                 ring->array[index] = index;
523                 prepped++;
524                 tail = next_tail;
525         } while (prepped < max_ios);
526
527         if (prepped)
528                 atomic_store_release(ring->tail, tail);
529         return prepped;
530 }
531
532 static int get_file_size(struct file *f)
533 {
534         struct stat st;
535
536         if (fstat(f->real_fd, &st) < 0)
537                 return -1;
538         if (S_ISBLK(st.st_mode)) {
539                 unsigned long long bytes;
540
541                 if (ioctl(f->real_fd, BLKGETSIZE64, &bytes) != 0)
542                         return -1;
543
544                 f->max_blocks = bytes / bs;
545                 f->max_size = bytes;
546                 return 0;
547         } else if (S_ISREG(st.st_mode)) {
548                 f->max_blocks = st.st_size / bs;
549                 f->max_size = st.st_size;
550                 return 0;
551         }
552
553         return -1;
554 }
555
556 static int reap_events_uring(struct submitter *s)
557 {
558         struct io_cq_ring *ring = &s->cq_ring;
559         struct io_uring_cqe *cqe;
560         unsigned head, reaped = 0;
561         int last_idx = -1, stat_nr = 0;
562
563         head = *ring->head;
564         do {
565                 struct file *f;
566
567                 read_barrier();
568                 if (head == atomic_load_acquire(ring->tail))
569                         break;
570                 cqe = &ring->cqes[head & cq_ring_mask];
571                 if (!do_nop) {
572                         int fileno = cqe->user_data & 0xffffffff;
573
574                         f = &s->files[fileno];
575                         f->pending_ios--;
576                         if (cqe->res != bs) {
577                                 printf("io: unexpected ret=%d\n", cqe->res);
578                                 if (polled && cqe->res == -EOPNOTSUPP)
579                                         printf("Your filesystem/driver/kernel doesn't support polled IO\n");
580                                 return -1;
581                         }
582                 }
583                 if (stats) {
584                         int clock_index = cqe->user_data >> 32;
585
586                         if (last_idx != clock_index) {
587                                 if (last_idx != -1) {
588                                         add_stat(s, last_idx, stat_nr);
589                                         stat_nr = 0;
590                                 }
591                                 last_idx = clock_index;
592                         }
593                         stat_nr++;
594                 }
595                 reaped++;
596                 head++;
597         } while (1);
598
599         if (stat_nr)
600                 add_stat(s, last_idx, stat_nr);
601
602         if (reaped) {
603                 s->inflight -= reaped;
604                 atomic_store_release(ring->head, head);
605         }
606         return reaped;
607 }
608
609 static int submitter_init(struct submitter *s)
610 {
611         int i, nr_batch;
612
613         s->tid = gettid();
614         printf("submitter=%d, tid=%d\n", s->index, s->tid);
615
616         __init_rand64(&s->rand_state, pthread_self());
617         srand48(pthread_self());
618
619         for (i = 0; i < MAX_FDS; i++)
620                 s->files[i].fileno = i;
621
622         if (stats) {
623                 nr_batch = roundup_pow2(depth / batch_submit);
624                 if (nr_batch < 2)
625                         nr_batch = 2;
626                 s->clock_batch = calloc(nr_batch, sizeof(unsigned long));
627                 s->clock_index = 1;
628
629                 s->plat = calloc(PLAT_NR, sizeof(unsigned long));
630         } else {
631                 s->clock_batch = NULL;
632                 s->plat = NULL;
633                 nr_batch = 0;
634         }
635
636         return nr_batch;
637 }
638
639 #ifdef CONFIG_LIBAIO
640 static int prep_more_ios_aio(struct submitter *s, int max_ios, struct iocb *iocbs)
641 {
642         uint64_t data;
643         long long offset;
644         struct file *f;
645         unsigned index;
646         long r;
647
648         index = 0;
649         while (index < max_ios) {
650                 struct iocb *iocb = &iocbs[index];
651
652                 if (s->nr_files == 1) {
653                         f = &s->files[0];
654                 } else {
655                         f = &s->files[s->cur_file];
656                         if (f->pending_ios >= file_depth(s)) {
657                                 s->cur_file++;
658                                 if (s->cur_file == s->nr_files)
659                                         s->cur_file = 0;
660                                 f = &s->files[s->cur_file];
661                         }
662                 }
663                 f->pending_ios++;
664
665                 r = lrand48();
666                 offset = (r % (f->max_blocks - 1)) * bs;
667                 io_prep_pread(iocb, f->real_fd, s->iovecs[index].iov_base,
668                                 s->iovecs[index].iov_len, offset);
669
670                 data = f->fileno;
671                 if (stats && stats_running)
672                         data |= (((uint64_t) s->clock_index) << 32);
673                 iocb->data = (void *) (uintptr_t) data;
674                 index++;
675         }
676         return index;
677 }
678
679 static int reap_events_aio(struct submitter *s, struct io_event *events, int evs)
680 {
681         int last_idx = -1, stat_nr = 0;
682         int reaped = 0;
683
684         while (evs) {
685                 uint64_t data = (uintptr_t) events[reaped].data;
686                 struct file *f = &s->files[data & 0xffffffff];
687
688                 f->pending_ios--;
689                 if (events[reaped].res != bs) {
690                         printf("io: unexpected ret=%ld\n", events[reaped].res);
691                         return -1;
692                 }
693                 if (stats) {
694                         int clock_index = data >> 32;
695
696                         if (last_idx != clock_index) {
697                                 if (last_idx != -1) {
698                                         add_stat(s, last_idx, stat_nr);
699                                         stat_nr = 0;
700                                 }
701                                 last_idx = clock_index;
702                         }
703                         stat_nr++;
704                 }
705                 reaped++;
706                 evs--;
707         }
708
709         if (stat_nr)
710                 add_stat(s, last_idx, stat_nr);
711
712         s->inflight -= reaped;
713         s->done += reaped;
714         return reaped;
715 }
716
717 static void *submitter_aio_fn(void *data)
718 {
719         struct submitter *s = data;
720         int i, ret, prepped;
721         struct iocb **iocbsptr;
722         struct iocb *iocbs;
723         struct io_event *events;
724 #ifdef ARCH_HAVE_CPU_CLOCK
725         int nr_batch = submitter_init(s);
726 #else
727         submitter_init(s);
728 #endif
729
730         iocbsptr = calloc(depth, sizeof(struct iocb *));
731         iocbs = calloc(depth, sizeof(struct iocb));
732         events = calloc(depth, sizeof(struct io_event));
733
734         for (i = 0; i < depth; i++)
735                 iocbsptr[i] = &iocbs[i];
736
737         prepped = 0;
738         do {
739                 int to_wait, to_submit, to_prep;
740
741                 if (!prepped && s->inflight < depth) {
742                         to_prep = min(depth - s->inflight, batch_submit);
743                         prepped = prep_more_ios_aio(s, to_prep, iocbs);
744 #ifdef ARCH_HAVE_CPU_CLOCK
745                         if (prepped && stats) {
746                                 s->clock_batch[s->clock_index] = get_cpu_clock();
747                                 s->clock_index = (s->clock_index + 1) & (nr_batch - 1);
748                         }
749 #endif
750                 }
751                 s->inflight += prepped;
752                 to_submit = prepped;
753
754                 if (to_submit && (s->inflight + to_submit <= depth))
755                         to_wait = 0;
756                 else
757                         to_wait = min(s->inflight + to_submit, batch_complete);
758
759                 ret = io_submit(s->aio_ctx, to_submit, iocbsptr);
760                 s->calls++;
761                 if (ret < 0) {
762                         perror("io_submit");
763                         break;
764                 } else if (ret != to_submit) {
765                         printf("submitted %d, wanted %d\n", ret, to_submit);
766                         break;
767                 }
768                 prepped = 0;
769
770                 while (to_wait) {
771                         int r;
772
773                         s->calls++;
774                         r = io_getevents(s->aio_ctx, to_wait, to_wait, events, NULL);
775                         if (r < 0) {
776                                 perror("io_getevents");
777                                 break;
778                         } else if (r != to_wait) {
779                                 printf("r=%d, wait=%d\n", r, to_wait);
780                                 break;
781                         }
782                         r = reap_events_aio(s, events, r);
783                         s->reaps += r;
784                         to_wait -= r;
785                 }
786         } while (!s->finish);
787
788         free(iocbsptr);
789         free(iocbs);
790         free(events);
791         finish = 1;
792         return NULL;
793 }
794 #endif
795
796 static void *submitter_uring_fn(void *data)
797 {
798         struct submitter *s = data;
799         struct io_sq_ring *ring = &s->sq_ring;
800         int ret, prepped;
801 #ifdef ARCH_HAVE_CPU_CLOCK
802         int nr_batch = submitter_init(s);
803 #else
804         submitter_init(s);
805 #endif
806
807         prepped = 0;
808         do {
809                 int to_wait, to_submit, this_reap, to_prep;
810                 unsigned ring_flags = 0;
811
812                 if (!prepped && s->inflight < depth) {
813                         to_prep = min(depth - s->inflight, batch_submit);
814                         prepped = prep_more_ios_uring(s, to_prep);
815 #ifdef ARCH_HAVE_CPU_CLOCK
816                         if (prepped && stats) {
817                                 s->clock_batch[s->clock_index] = get_cpu_clock();
818                                 s->clock_index = (s->clock_index + 1) & (nr_batch - 1);
819                         }
820 #endif
821                 }
822                 s->inflight += prepped;
823 submit_more:
824                 to_submit = prepped;
825 submit:
826                 if (to_submit && (s->inflight + to_submit <= depth))
827                         to_wait = 0;
828                 else
829                         to_wait = min(s->inflight + to_submit, batch_complete);
830
831                 /*
832                  * Only need to call io_uring_enter if we're not using SQ thread
833                  * poll, or if IORING_SQ_NEED_WAKEUP is set.
834                  */
835                 if (sq_thread_poll)
836                         ring_flags = atomic_load_acquire(ring->flags);
837                 if (!sq_thread_poll || ring_flags & IORING_SQ_NEED_WAKEUP) {
838                         unsigned flags = 0;
839
840                         if (to_wait)
841                                 flags = IORING_ENTER_GETEVENTS;
842                         if (ring_flags & IORING_SQ_NEED_WAKEUP)
843                                 flags |= IORING_ENTER_SQ_WAKEUP;
844                         ret = io_uring_enter(s, to_submit, to_wait, flags);
845                         s->calls++;
846                 } else {
847                         /* for SQPOLL, we submitted it all effectively */
848                         ret = to_submit;
849                 }
850
851                 /*
852                  * For non SQ thread poll, we already got the events we needed
853                  * through the io_uring_enter() above. For SQ thread poll, we
854                  * need to loop here until we find enough events.
855                  */
856                 this_reap = 0;
857                 do {
858                         int r;
859
860                         r = reap_events_uring(s);
861                         if (r == -1) {
862                                 s->finish = 1;
863                                 break;
864                         } else if (r > 0)
865                                 this_reap += r;
866                 } while (sq_thread_poll && this_reap < to_wait);
867                 s->reaps += this_reap;
868
869                 if (ret >= 0) {
870                         if (!ret) {
871                                 to_submit = 0;
872                                 if (s->inflight)
873                                         goto submit;
874                                 continue;
875                         } else if (ret < to_submit) {
876                                 int diff = to_submit - ret;
877
878                                 s->done += ret;
879                                 prepped -= diff;
880                                 goto submit_more;
881                         }
882                         s->done += ret;
883                         prepped = 0;
884                         continue;
885                 } else if (ret < 0) {
886                         if (errno == EAGAIN) {
887                                 if (s->finish)
888                                         break;
889                                 if (this_reap)
890                                         goto submit;
891                                 to_submit = 0;
892                                 goto submit;
893                         }
894                         printf("io_submit: %s\n", strerror(errno));
895                         break;
896                 }
897         } while (!s->finish);
898
899         finish = 1;
900         return NULL;
901 }
902
903 static struct submitter *get_submitter(int offset)
904 {
905         void *ret;
906
907         ret = submitter;
908         if (offset)
909                 ret += offset * (sizeof(*submitter) + depth * sizeof(struct iovec));
910         return ret;
911 }
912
913 static void do_finish(const char *reason)
914 {
915         int j;
916         printf("Exiting on %s\n", reason);
917         for (j = 0; j < nthreads; j++) {
918                 struct submitter *s = get_submitter(j);
919                 s->finish = 1;
920         }
921         if (max_iops > 100000)
922                 printf("Maximum IOPS=%luK\n", max_iops / 1000);
923         else if (max_iops)
924                 printf("Maximum IOPS=%lu\n", max_iops);
925         finish = 1;
926 }
927
928 static void sig_int(int sig)
929 {
930         do_finish("signal");
931 }
932
933 static void arm_sig_int(void)
934 {
935         struct sigaction act;
936
937         memset(&act, 0, sizeof(act));
938         act.sa_handler = sig_int;
939         act.sa_flags = SA_RESTART;
940         sigaction(SIGINT, &act, NULL);
941
942         /* Windows uses SIGBREAK as a quit signal from other applications */
943 #ifdef WIN32
944         sigaction(SIGBREAK, &act, NULL);
945 #endif
946 }
947
948 static int setup_aio(struct submitter *s)
949 {
950 #ifdef CONFIG_LIBAIO
951         if (polled) {
952                 fprintf(stderr, "aio does not support polled IO\n");
953                 polled = 0;
954         }
955         if (sq_thread_poll) {
956                 fprintf(stderr, "aio does not support SQPOLL IO\n");
957                 sq_thread_poll = 0;
958         }
959         if (do_nop) {
960                 fprintf(stderr, "aio does not support polled IO\n");
961                 do_nop = 0;
962         }
963         if (fixedbufs || register_files) {
964                 fprintf(stderr, "aio does not support registered files or buffers\n");
965                 fixedbufs = register_files = 0;
966         }
967
968         return io_queue_init(roundup_pow2(depth), &s->aio_ctx);
969 #else
970         fprintf(stderr, "Legacy AIO not available on this system/build\n");
971         errno = EINVAL;
972         return -1;
973 #endif
974 }
975
976 static int setup_ring(struct submitter *s)
977 {
978         struct io_sq_ring *sring = &s->sq_ring;
979         struct io_cq_ring *cring = &s->cq_ring;
980         struct io_uring_params p;
981         int ret, fd;
982         void *ptr;
983
984         memset(&p, 0, sizeof(p));
985
986         if (polled && !do_nop)
987                 p.flags |= IORING_SETUP_IOPOLL;
988         if (sq_thread_poll) {
989                 p.flags |= IORING_SETUP_SQPOLL;
990                 if (sq_thread_cpu != -1) {
991                         p.flags |= IORING_SETUP_SQ_AFF;
992                         p.sq_thread_cpu = sq_thread_cpu;
993                 }
994         }
995
996         fd = io_uring_setup(depth, &p);
997         if (fd < 0) {
998                 perror("io_uring_setup");
999                 return 1;
1000         }
1001         s->ring_fd = fd;
1002
1003         io_uring_probe(fd);
1004
1005         if (fixedbufs) {
1006                 struct rlimit rlim;
1007
1008                 rlim.rlim_cur = RLIM_INFINITY;
1009                 rlim.rlim_max = RLIM_INFINITY;
1010                 /* ignore potential error, not needed on newer kernels */
1011                 setrlimit(RLIMIT_MEMLOCK, &rlim);
1012
1013                 ret = io_uring_register_buffers(s);
1014                 if (ret < 0) {
1015                         perror("io_uring_register_buffers");
1016                         return 1;
1017                 }
1018
1019                 if (dma_map) {
1020                         ret = io_uring_map_buffers(s);
1021                         if (ret < 0) {
1022                                 perror("io_uring_map_buffers");
1023                                 return 1;
1024                         }
1025                 }
1026         }
1027
1028         if (register_files) {
1029                 ret = io_uring_register_files(s);
1030                 if (ret < 0) {
1031                         perror("io_uring_register_files");
1032                         return 1;
1033                 }
1034         }
1035
1036         ptr = mmap(0, p.sq_off.array + p.sq_entries * sizeof(__u32),
1037                         PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
1038                         IORING_OFF_SQ_RING);
1039         sring->head = ptr + p.sq_off.head;
1040         sring->tail = ptr + p.sq_off.tail;
1041         sring->ring_mask = ptr + p.sq_off.ring_mask;
1042         sring->ring_entries = ptr + p.sq_off.ring_entries;
1043         sring->flags = ptr + p.sq_off.flags;
1044         sring->array = ptr + p.sq_off.array;
1045         sq_ring_mask = *sring->ring_mask;
1046
1047         s->sqes = mmap(0, p.sq_entries * sizeof(struct io_uring_sqe),
1048                         PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
1049                         IORING_OFF_SQES);
1050
1051         ptr = mmap(0, p.cq_off.cqes + p.cq_entries * sizeof(struct io_uring_cqe),
1052                         PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
1053                         IORING_OFF_CQ_RING);
1054         cring->head = ptr + p.cq_off.head;
1055         cring->tail = ptr + p.cq_off.tail;
1056         cring->ring_mask = ptr + p.cq_off.ring_mask;
1057         cring->ring_entries = ptr + p.cq_off.ring_entries;
1058         cring->cqes = ptr + p.cq_off.cqes;
1059         cq_ring_mask = *cring->ring_mask;
1060         return 0;
1061 }
1062
1063 static void file_depths(char *buf)
1064 {
1065         bool prev = false;
1066         char *p;
1067         int i, j;
1068
1069         buf[0] = '\0';
1070         p = buf;
1071         for (j = 0; j < nthreads; j++) {
1072                 struct submitter *s = get_submitter(j);
1073
1074                 for (i = 0; i < s->nr_files; i++) {
1075                         struct file *f = &s->files[i];
1076
1077                         if (prev)
1078                                 p += sprintf(p, " %d", f->pending_ios);
1079                         else
1080                                 p += sprintf(p, "%d", f->pending_ios);
1081                         prev = true;
1082                 }
1083         }
1084 }
1085
1086 static void usage(char *argv, int status)
1087 {
1088         char runtime_str[16];
1089         snprintf(runtime_str, sizeof(runtime_str), "%d", runtime);
1090         printf("%s [options] -- [filenames]\n"
1091                 " -d <int>  : IO Depth, default %d\n"
1092                 " -s <int>  : Batch submit, default %d\n"
1093                 " -c <int>  : Batch complete, default %d\n"
1094                 " -b <int>  : Block size, default %d\n"
1095                 " -p <bool> : Polled IO, default %d\n"
1096                 " -B <bool> : Fixed buffers, default %d\n"
1097                 " -D <bool> : DMA map fixed buffers, default %d\n"
1098                 " -F <bool> : Register files, default %d\n"
1099                 " -n <int>  : Number of threads, default %d\n"
1100                 " -O <bool> : Use O_DIRECT, default %d\n"
1101                 " -N <bool> : Perform just no-op requests, default %d\n"
1102                 " -t <bool> : Track IO latencies, default %d\n"
1103                 " -T <int>  : TSC rate in HZ\n"
1104                 " -r <int>  : Runtime in seconds, default %s\n"
1105                 " -R <bool> : Use random IO, default %d\n"
1106                 " -a <bool> : Use legacy aio, default %d\n",
1107                 argv, DEPTH, BATCH_SUBMIT, BATCH_COMPLETE, BS, polled,
1108                 fixedbufs, dma_map, register_files, nthreads, !buffered, do_nop,
1109                 stats, runtime == 0 ? "unlimited" : runtime_str, random_io, aio);
1110         exit(status);
1111 }
1112
1113 static void read_tsc_rate(void)
1114 {
1115         char buffer[32];
1116         int fd, ret;
1117
1118         if (tsc_rate)
1119                 return;
1120
1121         fd = open(TSC_RATE_FILE, O_RDONLY);
1122         if (fd < 0)
1123                 return;
1124
1125         ret = read(fd, buffer, sizeof(buffer));
1126         if (ret < 0) {
1127                 close(fd);
1128                 return;
1129         }
1130
1131         tsc_rate = strtoul(buffer, NULL, 10);
1132         printf("Using TSC rate %luHz\n", tsc_rate);
1133         close(fd);
1134 }
1135
1136 static void write_tsc_rate(void)
1137 {
1138         char buffer[32];
1139         struct stat sb;
1140         int fd, ret;
1141
1142         if (!stat(TSC_RATE_FILE, &sb))
1143                 return;
1144
1145         fd = open(TSC_RATE_FILE, O_WRONLY | O_CREAT, 0644);
1146         if (fd < 0)
1147                 return;
1148
1149         memset(buffer, 0, sizeof(buffer));
1150         sprintf(buffer, "%lu", tsc_rate);
1151         ret = write(fd, buffer, strlen(buffer));
1152         if (ret < 0)
1153                 perror("write");
1154         close(fd);
1155 }
1156
1157 int main(int argc, char *argv[])
1158 {
1159         struct submitter *s;
1160         unsigned long done, calls, reap;
1161         int err, i, j, flags, fd, opt, threads_per_f, threads_rem = 0, nfiles;
1162         long page_size;
1163         struct file f;
1164         char *fdepths;
1165         void *ret;
1166
1167         if (!do_nop && argc < 2)
1168                 usage(argv[0], 1);
1169
1170         while ((opt = getopt(argc, argv, "d:s:c:b:p:B:F:n:N:O:t:T:a:r:D:R:h?")) != -1) {
1171                 switch (opt) {
1172                 case 'a':
1173                         aio = !!atoi(optarg);
1174                         break;
1175                 case 'd':
1176                         depth = atoi(optarg);
1177                         break;
1178                 case 's':
1179                         batch_submit = atoi(optarg);
1180                         if (!batch_submit)
1181                                 batch_submit = 1;
1182                         break;
1183                 case 'c':
1184                         batch_complete = atoi(optarg);
1185                         if (!batch_complete)
1186                                 batch_complete = 1;
1187                         break;
1188                 case 'b':
1189                         bs = atoi(optarg);
1190                         break;
1191                 case 'p':
1192                         polled = !!atoi(optarg);
1193                         break;
1194                 case 'B':
1195                         fixedbufs = !!atoi(optarg);
1196                         break;
1197                 case 'F':
1198                         register_files = !!atoi(optarg);
1199                         break;
1200                 case 'n':
1201                         nthreads = atoi(optarg);
1202                         if (!nthreads) {
1203                                 printf("Threads must be non-zero\n");
1204                                 usage(argv[0], 1);
1205                         }
1206                         break;
1207                 case 'N':
1208                         do_nop = !!atoi(optarg);
1209                         break;
1210                 case 'O':
1211                         buffered = !atoi(optarg);
1212                         break;
1213                 case 't':
1214 #ifndef ARCH_HAVE_CPU_CLOCK
1215                         fprintf(stderr, "Stats not supported on this CPU\n");
1216                         return 1;
1217 #endif
1218                         stats = !!atoi(optarg);
1219                         break;
1220                 case 'T':
1221 #ifndef ARCH_HAVE_CPU_CLOCK
1222                         fprintf(stderr, "Stats not supported on this CPU\n");
1223                         return 1;
1224 #endif
1225                         tsc_rate = strtoul(optarg, NULL, 10);
1226                         write_tsc_rate();
1227                         break;
1228                 case 'r':
1229                         runtime = atoi(optarg);
1230                         break;
1231                 case 'D':
1232                         dma_map = !!atoi(optarg);
1233                         break;
1234                 case 'R':
1235                         random_io = !!atoi(optarg);
1236                         break;
1237                 case 'h':
1238                 case '?':
1239                 default:
1240                         usage(argv[0], 0);
1241                         break;
1242                 }
1243         }
1244
1245         if (stats)
1246                 read_tsc_rate();
1247
1248         if (batch_complete > depth)
1249                 batch_complete = depth;
1250         if (batch_submit > depth)
1251                 batch_submit = depth;
1252         if (!fixedbufs && dma_map)
1253                 dma_map = 0;
1254
1255         submitter = calloc(nthreads, sizeof(*submitter) +
1256                                 roundup_pow2(depth) * sizeof(struct iovec));
1257         for (j = 0; j < nthreads; j++) {
1258                 s = get_submitter(j);
1259                 s->index = j;
1260                 s->done = s->calls = s->reaps = 0;
1261         }
1262
1263         flags = O_RDONLY | O_NOATIME;
1264         if (!buffered)
1265                 flags |= O_DIRECT;
1266
1267         j = 0;
1268         i = optind;
1269         nfiles = argc - i;
1270         if (!do_nop) {
1271                 if (!nfiles) {
1272                         printf("No files specified\n");
1273                         usage(argv[0], 1);
1274                 }
1275                 threads_per_f = nthreads / nfiles;
1276                 /* make sure each thread gets assigned files */
1277                 if (threads_per_f == 0) {
1278                         threads_per_f = 1;
1279                 } else {
1280                         threads_rem = nthreads - threads_per_f * nfiles;
1281                 }
1282         }
1283         while (!do_nop && i < argc) {
1284                 int k, limit;
1285
1286                 memset(&f, 0, sizeof(f));
1287
1288                 fd = open(argv[i], flags);
1289                 if (fd < 0) {
1290                         perror("open");
1291                         return 1;
1292                 }
1293                 f.real_fd = fd;
1294                 if (get_file_size(&f)) {
1295                         printf("failed getting size of device/file\n");
1296                         return 1;
1297                 }
1298                 if (f.max_blocks <= 1) {
1299                         printf("Zero file/device size?\n");
1300                         return 1;
1301                 }
1302                 f.max_blocks--;
1303
1304                 limit = threads_per_f;
1305                 limit += threads_rem > 0 ? 1 : 0;
1306                 for (k = 0; k < limit; k++) {
1307                         s = get_submitter((j + k) % nthreads);
1308
1309                         if (s->nr_files == MAX_FDS) {
1310                                 printf("Max number of files (%d) reached\n", MAX_FDS);
1311                                 break;
1312                         }
1313
1314                         memcpy(&s->files[s->nr_files], &f, sizeof(f));
1315
1316                         printf("Added file %s (submitter %d)\n", argv[i], s->index);
1317                         s->nr_files++;
1318                 }
1319                 threads_rem--;
1320                 i++;
1321                 j += limit;
1322         }
1323
1324         arm_sig_int();
1325
1326         page_size = sysconf(_SC_PAGESIZE);
1327         if (page_size < 0)
1328                 page_size = 4096;
1329
1330         for (j = 0; j < nthreads; j++) {
1331                 s = get_submitter(j);
1332                 for (i = 0; i < roundup_pow2(depth); i++) {
1333                         void *buf;
1334
1335                         if (posix_memalign(&buf, page_size, bs)) {
1336                                 printf("failed alloc\n");
1337                                 return 1;
1338                         }
1339                         s->iovecs[i].iov_base = buf;
1340                         s->iovecs[i].iov_len = bs;
1341                 }
1342         }
1343
1344         for (j = 0; j < nthreads; j++) {
1345                 s = get_submitter(j);
1346
1347                 if (!aio)
1348                         err = setup_ring(s);
1349                 else
1350                         err = setup_aio(s);
1351                 if (err) {
1352                         printf("ring setup failed: %s, %d\n", strerror(errno), err);
1353                         return 1;
1354                 }
1355         }
1356         s = get_submitter(0);
1357         printf("polled=%d, fixedbufs=%d/%d, register_files=%d, buffered=%d, QD=%d\n", polled, fixedbufs, dma_map, register_files, buffered, depth);
1358         if (!aio)
1359                 printf("Engine=io_uring, sq_ring=%d, cq_ring=%d\n", *s->sq_ring.ring_entries, *s->cq_ring.ring_entries);
1360         else
1361                 printf("Engine=aio\n");
1362
1363         for (j = 0; j < nthreads; j++) {
1364                 s = get_submitter(j);
1365                 if (!aio)
1366                         pthread_create(&s->thread, NULL, submitter_uring_fn, s);
1367 #ifdef CONFIG_LIBAIO
1368                 else
1369                         pthread_create(&s->thread, NULL, submitter_aio_fn, s);
1370 #endif
1371         }
1372
1373         fdepths = malloc(8 * s->nr_files * nthreads);
1374         reap = calls = done = 0;
1375         do {
1376                 unsigned long this_done = 0;
1377                 unsigned long this_reap = 0;
1378                 unsigned long this_call = 0;
1379                 unsigned long rpc = 0, ipc = 0;
1380                 unsigned long iops, bw;
1381
1382                 sleep(1);
1383                 if (runtime && !--runtime)
1384                         do_finish("timeout");
1385
1386                 /* don't print partial run, if interrupted by signal */
1387                 if (finish)
1388                         break;
1389
1390                 /* one second in to the run, enable stats */
1391                 if (stats)
1392                         stats_running = 1;
1393
1394                 for (j = 0; j < nthreads; j++) {
1395                         s = get_submitter(j);
1396                         this_done += s->done;
1397                         this_call += s->calls;
1398                         this_reap += s->reaps;
1399                 }
1400                 if (this_call - calls) {
1401                         rpc = (this_done - done) / (this_call - calls);
1402                         ipc = (this_reap - reap) / (this_call - calls);
1403                 } else
1404                         rpc = ipc = -1;
1405                 file_depths(fdepths);
1406                 iops = this_done - done;
1407                 if (bs > 1048576)
1408                         bw = iops * (bs / 1048576);
1409                 else
1410                         bw = iops / (1048576 / bs);
1411                 if (iops > 100000)
1412                         printf("IOPS=%luK, ", iops / 1000);
1413                 else
1414                         printf("IOPS=%lu, ", iops);
1415                 max_iops = max(max_iops, iops);
1416                 if (!do_nop)
1417                         printf("BW=%luMiB/s, ", bw);
1418                 printf("IOS/call=%ld/%ld, inflight=(%s)\n", rpc, ipc, fdepths);
1419                 done = this_done;
1420                 calls = this_call;
1421                 reap = this_reap;
1422         } while (!finish);
1423
1424         for (j = 0; j < nthreads; j++) {
1425                 s = get_submitter(j);
1426                 pthread_join(s->thread, &ret);
1427                 close(s->ring_fd);
1428
1429                 if (stats) {
1430                         unsigned long nr;
1431
1432                         printf("%d: Latency percentiles:\n", s->tid);
1433                         for (i = 0, nr = 0; i < PLAT_NR; i++)
1434                                 nr += s->plat[i];
1435                         show_clat_percentiles(s->plat, nr, 4);
1436                         free(s->clock_batch);
1437                         free(s->plat);
1438                 }
1439         }
1440
1441         free(fdepths);
1442         free(submitter);
1443         return 0;
1444 }