t/io_uring: fix latency stats for depth == 1
[fio.git] / t / io_uring.c
1 #include <stdio.h>
2 #include <errno.h>
3 #include <assert.h>
4 #include <stdlib.h>
5 #include <stddef.h>
6 #include <signal.h>
7 #include <inttypes.h>
8 #include <math.h>
9
10 #ifdef CONFIG_LIBAIO
11 #include <libaio.h>
12 #endif
13
14 #include <sys/types.h>
15 #include <sys/stat.h>
16 #include <sys/ioctl.h>
17 #include <sys/syscall.h>
18 #include <sys/resource.h>
19 #include <sys/mman.h>
20 #include <sys/uio.h>
21 #include <linux/fs.h>
22 #include <fcntl.h>
23 #include <unistd.h>
24 #include <string.h>
25 #include <pthread.h>
26 #include <sched.h>
27
28 #include "../arch/arch.h"
29 #include "../lib/types.h"
30 #include "../lib/roundup.h"
31 #include "../minmax.h"
32 #include "../os/linux/io_uring.h"
33
34 struct io_sq_ring {
35         unsigned *head;
36         unsigned *tail;
37         unsigned *ring_mask;
38         unsigned *ring_entries;
39         unsigned *flags;
40         unsigned *array;
41 };
42
43 struct io_cq_ring {
44         unsigned *head;
45         unsigned *tail;
46         unsigned *ring_mask;
47         unsigned *ring_entries;
48         struct io_uring_cqe *cqes;
49 };
50
51 #define DEPTH                   128
52 #define BATCH_SUBMIT            32
53 #define BATCH_COMPLETE          32
54 #define BS                      4096
55
56 #define MAX_FDS                 16
57
58 static unsigned sq_ring_mask, cq_ring_mask;
59
60 struct file {
61         unsigned long max_blocks;
62         unsigned pending_ios;
63         int real_fd;
64         int fixed_fd;
65         int fileno;
66 };
67
68 #define PLAT_BITS               6
69 #define PLAT_VAL                (1 << PLAT_BITS)
70 #define PLAT_GROUP_NR           29
71 #define PLAT_NR                 (PLAT_GROUP_NR * PLAT_VAL)
72
73 struct submitter {
74         pthread_t thread;
75         int ring_fd;
76         int index;
77         struct io_sq_ring sq_ring;
78         struct io_uring_sqe *sqes;
79         struct io_cq_ring cq_ring;
80         int inflight;
81         int tid;
82         unsigned long reaps;
83         unsigned long done;
84         unsigned long calls;
85         volatile int finish;
86
87         __s32 *fds;
88
89         unsigned long *clock_batch;
90         int clock_index;
91         unsigned long *plat;
92
93 #ifdef CONFIG_LIBAIO
94         io_context_t aio_ctx;
95 #endif
96
97         struct file files[MAX_FDS];
98         unsigned nr_files;
99         unsigned cur_file;
100         struct iovec iovecs[];
101 };
102
103 static struct submitter *submitter;
104 static volatile int finish;
105 static int stats_running;
106
107 static int depth = DEPTH;
108 static int batch_submit = BATCH_SUBMIT;
109 static int batch_complete = BATCH_COMPLETE;
110 static int bs = BS;
111 static int polled = 1;          /* use IO polling */
112 static int fixedbufs = 1;       /* use fixed user buffers */
113 static int register_files = 1;  /* use fixed files */
114 static int buffered = 0;        /* use buffered IO, not O_DIRECT */
115 static int sq_thread_poll = 0;  /* use kernel submission/poller thread */
116 static int sq_thread_cpu = -1;  /* pin above thread to this CPU */
117 static int do_nop = 0;          /* no-op SQ ring commands */
118 static int nthreads = 1;
119 static int stats = 0;           /* generate IO stats */
120 static int aio = 0;             /* use libaio */
121 static int runtime = 0; /* runtime */
122
123 static unsigned long tsc_rate;
124
125 #define TSC_RATE_FILE   "tsc-rate"
126
127 static int vectored = 1;
128
129 static float plist[] = { 1.0, 5.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0,
130                         80.0, 90.0, 95.0, 99.0, 99.5, 99.9, 99.95, 99.99 };
131 static int plist_len = 17;
132
133 static unsigned long cycles_to_nsec(unsigned long cycles)
134 {
135         uint64_t val;
136
137         if (!tsc_rate)
138                 return cycles;
139
140         val = cycles * 1000000000ULL;
141         return val / tsc_rate;
142 }
143
144 static unsigned long plat_idx_to_val(unsigned int idx)
145 {
146         unsigned int error_bits;
147         unsigned long k, base;
148
149         assert(idx < PLAT_NR);
150
151         /* MSB <= (PLAT_BITS-1), cannot be rounded off. Use
152          * all bits of the sample as index */
153         if (idx < (PLAT_VAL << 1))
154                 return cycles_to_nsec(idx);
155
156         /* Find the group and compute the minimum value of that group */
157         error_bits = (idx >> PLAT_BITS) - 1;
158         base = ((unsigned long) 1) << (error_bits + PLAT_BITS);
159
160         /* Find its bucket number of the group */
161         k = idx % PLAT_VAL;
162
163         /* Return the mean of the range of the bucket */
164         return cycles_to_nsec(base + ((k + 0.5) * (1 << error_bits)));
165 }
166
167 unsigned int calc_clat_percentiles(unsigned long *io_u_plat, unsigned long nr,
168                                    unsigned long **output,
169                                    unsigned long *maxv, unsigned long *minv)
170 {
171         unsigned long sum = 0;
172         unsigned int len = plist_len, i, j = 0;
173         unsigned long *ovals = NULL;
174         bool is_last;
175
176         *minv = -1ULL;
177         *maxv = 0;
178
179         ovals = malloc(len * sizeof(*ovals));
180         if (!ovals)
181                 return 0;
182
183         /*
184          * Calculate bucket values, note down max and min values
185          */
186         is_last = false;
187         for (i = 0; i < PLAT_NR && !is_last; i++) {
188                 sum += io_u_plat[i];
189                 while (sum >= ((long double) plist[j] / 100.0 * nr)) {
190                         assert(plist[j] <= 100.0);
191
192                         ovals[j] = plat_idx_to_val(i);
193                         if (ovals[j] < *minv)
194                                 *minv = ovals[j];
195                         if (ovals[j] > *maxv)
196                                 *maxv = ovals[j];
197
198                         is_last = (j == len - 1) != 0;
199                         if (is_last)
200                                 break;
201
202                         j++;
203                 }
204         }
205
206         if (!is_last)
207                 fprintf(stderr, "error calculating latency percentiles\n");
208
209         *output = ovals;
210         return len;
211 }
212
213 static void show_clat_percentiles(unsigned long *io_u_plat, unsigned long nr,
214                                   unsigned int precision)
215 {
216         unsigned int divisor, len, i, j = 0;
217         unsigned long minv, maxv;
218         unsigned long *ovals;
219         int per_line, scale_down, time_width;
220         bool is_last;
221         char fmt[32];
222
223         len = calc_clat_percentiles(io_u_plat, nr, &ovals, &maxv, &minv);
224         if (!len || !ovals)
225                 goto out;
226
227         if (!tsc_rate) {
228                 scale_down = 0;
229                 divisor = 1;
230                 printf("    percentiles (tsc ticks):\n     |");
231         } else if (minv > 2000 && maxv > 99999) {
232                 scale_down = 1;
233                 divisor = 1000;
234                 printf("    percentiles (usec):\n     |");
235         } else {
236                 scale_down = 0;
237                 divisor = 1;
238                 printf("    percentiles (nsec):\n     |");
239         }
240
241         time_width = max(5, (int) (log10(maxv / divisor) + 1));
242         snprintf(fmt, sizeof(fmt), " %%%u.%ufth=[%%%dllu]%%c", precision + 3,
243                         precision, time_width);
244         /* fmt will be something like " %5.2fth=[%4llu]%c" */
245         per_line = (80 - 7) / (precision + 10 + time_width);
246
247         for (j = 0; j < len; j++) {
248                 /* for formatting */
249                 if (j != 0 && (j % per_line) == 0)
250                         printf("     |");
251
252                 /* end of the list */
253                 is_last = (j == len - 1) != 0;
254
255                 for (i = 0; i < scale_down; i++)
256                         ovals[j] = (ovals[j] + 999) / 1000;
257
258                 printf(fmt, plist[j], ovals[j], is_last ? '\n' : ',');
259
260                 if (is_last)
261                         break;
262
263                 if ((j % per_line) == per_line - 1)     /* for formatting */
264                         printf("\n");
265         }
266
267 out:
268         free(ovals);
269 }
270
271 static unsigned int plat_val_to_idx(unsigned long val)
272 {
273         unsigned int msb, error_bits, base, offset, idx;
274
275         /* Find MSB starting from bit 0 */
276         if (val == 0)
277                 msb = 0;
278         else
279                 msb = (sizeof(val)*8) - __builtin_clzll(val) - 1;
280
281         /*
282          * MSB <= (PLAT_BITS-1), cannot be rounded off. Use
283          * all bits of the sample as index
284          */
285         if (msb <= PLAT_BITS)
286                 return val;
287
288         /* Compute the number of error bits to discard*/
289         error_bits = msb - PLAT_BITS;
290
291         /* Compute the number of buckets before the group */
292         base = (error_bits + 1) << PLAT_BITS;
293
294         /*
295          * Discard the error bits and apply the mask to find the
296          * index for the buckets in the group
297          */
298         offset = (PLAT_VAL - 1) & (val >> error_bits);
299
300         /* Make sure the index does not exceed (array size - 1) */
301         idx = (base + offset) < (PLAT_NR - 1) ?
302                 (base + offset) : (PLAT_NR - 1);
303
304         return idx;
305 }
306
307 static void add_stat(struct submitter *s, int clock_index, int nr)
308 {
309 #ifdef ARCH_HAVE_CPU_CLOCK
310         unsigned long cycles;
311         unsigned int pidx;
312
313         if (!s->finish && clock_index) {
314                 cycles = get_cpu_clock();
315                 cycles -= s->clock_batch[clock_index];
316                 pidx = plat_val_to_idx(cycles);
317                 s->plat[pidx] += nr;
318         }
319 #endif
320 }
321
322 static int io_uring_register_buffers(struct submitter *s)
323 {
324         if (do_nop)
325                 return 0;
326
327         return syscall(__NR_io_uring_register, s->ring_fd,
328                         IORING_REGISTER_BUFFERS, s->iovecs, depth);
329 }
330
331 static int io_uring_register_files(struct submitter *s)
332 {
333         int i;
334
335         if (do_nop)
336                 return 0;
337
338         s->fds = calloc(s->nr_files, sizeof(__s32));
339         for (i = 0; i < s->nr_files; i++) {
340                 s->fds[i] = s->files[i].real_fd;
341                 s->files[i].fixed_fd = i;
342         }
343
344         return syscall(__NR_io_uring_register, s->ring_fd,
345                         IORING_REGISTER_FILES, s->fds, s->nr_files);
346 }
347
348 static int io_uring_setup(unsigned entries, struct io_uring_params *p)
349 {
350         return syscall(__NR_io_uring_setup, entries, p);
351 }
352
353 static void io_uring_probe(int fd)
354 {
355         struct io_uring_probe *p;
356         int ret;
357
358         p = malloc(sizeof(*p) + 256 * sizeof(struct io_uring_probe_op));
359         if (!p)
360                 return;
361
362         memset(p, 0, sizeof(*p) + 256 * sizeof(struct io_uring_probe_op));
363         ret = syscall(__NR_io_uring_register, fd, IORING_REGISTER_PROBE, p, 256);
364         if (ret < 0)
365                 goto out;
366
367         if (IORING_OP_READ > p->ops_len)
368                 goto out;
369
370         if ((p->ops[IORING_OP_READ].flags & IO_URING_OP_SUPPORTED))
371                 vectored = 0;
372 out:
373         free(p);
374 }
375
376 static int io_uring_enter(struct submitter *s, unsigned int to_submit,
377                           unsigned int min_complete, unsigned int flags)
378 {
379         return syscall(__NR_io_uring_enter, s->ring_fd, to_submit, min_complete,
380                         flags, NULL, 0);
381 }
382
383 #ifndef CONFIG_HAVE_GETTID
384 static int gettid(void)
385 {
386         return syscall(__NR_gettid);
387 }
388 #endif
389
390 static unsigned file_depth(struct submitter *s)
391 {
392         return (depth + s->nr_files - 1) / s->nr_files;
393 }
394
395 static void init_io(struct submitter *s, unsigned index)
396 {
397         struct io_uring_sqe *sqe = &s->sqes[index];
398         unsigned long offset;
399         struct file *f;
400         long r;
401
402         if (do_nop) {
403                 sqe->opcode = IORING_OP_NOP;
404                 return;
405         }
406
407         if (s->nr_files == 1) {
408                 f = &s->files[0];
409         } else {
410                 f = &s->files[s->cur_file];
411                 if (f->pending_ios >= file_depth(s)) {
412                         s->cur_file++;
413                         if (s->cur_file == s->nr_files)
414                                 s->cur_file = 0;
415                         f = &s->files[s->cur_file];
416                 }
417         }
418         f->pending_ios++;
419
420         r = lrand48();
421         offset = (r % (f->max_blocks - 1)) * bs;
422
423         if (register_files) {
424                 sqe->flags = IOSQE_FIXED_FILE;
425                 sqe->fd = f->fixed_fd;
426         } else {
427                 sqe->flags = 0;
428                 sqe->fd = f->real_fd;
429         }
430         if (fixedbufs) {
431                 sqe->opcode = IORING_OP_READ_FIXED;
432                 sqe->addr = (unsigned long) s->iovecs[index].iov_base;
433                 sqe->len = bs;
434                 sqe->buf_index = index;
435         } else if (!vectored) {
436                 sqe->opcode = IORING_OP_READ;
437                 sqe->addr = (unsigned long) s->iovecs[index].iov_base;
438                 sqe->len = bs;
439                 sqe->buf_index = 0;
440         } else {
441                 sqe->opcode = IORING_OP_READV;
442                 sqe->addr = (unsigned long) &s->iovecs[index];
443                 sqe->len = 1;
444                 sqe->buf_index = 0;
445         }
446         sqe->ioprio = 0;
447         sqe->off = offset;
448         sqe->user_data = (unsigned long) f->fileno;
449         if (stats && stats_running)
450                 sqe->user_data |= ((unsigned long)s->clock_index << 32);
451 }
452
453 static int prep_more_ios_uring(struct submitter *s, int max_ios)
454 {
455         struct io_sq_ring *ring = &s->sq_ring;
456         unsigned index, tail, next_tail, prepped = 0;
457
458         next_tail = tail = *ring->tail;
459         do {
460                 next_tail++;
461                 if (next_tail == atomic_load_acquire(ring->head))
462                         break;
463
464                 index = tail & sq_ring_mask;
465                 init_io(s, index);
466                 ring->array[index] = index;
467                 prepped++;
468                 tail = next_tail;
469         } while (prepped < max_ios);
470
471         if (prepped)
472                 atomic_store_release(ring->tail, tail);
473         return prepped;
474 }
475
476 static int get_file_size(struct file *f)
477 {
478         struct stat st;
479
480         if (fstat(f->real_fd, &st) < 0)
481                 return -1;
482         if (S_ISBLK(st.st_mode)) {
483                 unsigned long long bytes;
484
485                 if (ioctl(f->real_fd, BLKGETSIZE64, &bytes) != 0)
486                         return -1;
487
488                 f->max_blocks = bytes / bs;
489                 return 0;
490         } else if (S_ISREG(st.st_mode)) {
491                 f->max_blocks = st.st_size / bs;
492                 return 0;
493         }
494
495         return -1;
496 }
497
498 static int reap_events_uring(struct submitter *s)
499 {
500         struct io_cq_ring *ring = &s->cq_ring;
501         struct io_uring_cqe *cqe;
502         unsigned head, reaped = 0;
503         int last_idx = -1, stat_nr = 0;
504
505         head = *ring->head;
506         do {
507                 struct file *f;
508
509                 read_barrier();
510                 if (head == atomic_load_acquire(ring->tail))
511                         break;
512                 cqe = &ring->cqes[head & cq_ring_mask];
513                 if (!do_nop) {
514                         int fileno = cqe->user_data & 0xffffffff;
515
516                         f = &s->files[fileno];
517                         f->pending_ios--;
518                         if (cqe->res != bs) {
519                                 printf("io: unexpected ret=%d\n", cqe->res);
520                                 if (polled && cqe->res == -EOPNOTSUPP)
521                                         printf("Your filesystem/driver/kernel doesn't support polled IO\n");
522                                 return -1;
523                         }
524                 }
525                 if (stats) {
526                         int clock_index = cqe->user_data >> 32;
527
528                         if (last_idx != clock_index) {
529                                 if (last_idx != -1) {
530                                         add_stat(s, last_idx, stat_nr);
531                                         stat_nr = 0;
532                                 }
533                                 last_idx = clock_index;
534                         }
535                         stat_nr++;
536                 }
537                 reaped++;
538                 head++;
539         } while (1);
540
541         if (stat_nr)
542                 add_stat(s, last_idx, stat_nr);
543
544         if (reaped) {
545                 s->inflight -= reaped;
546                 atomic_store_release(ring->head, head);
547         }
548         return reaped;
549 }
550
551 static int submitter_init(struct submitter *s)
552 {
553         int i, nr_batch;
554
555         s->tid = gettid();
556         printf("submitter=%d, tid=%d\n", s->index, s->tid);
557
558         srand48(pthread_self());
559
560         for (i = 0; i < MAX_FDS; i++)
561                 s->files[i].fileno = i;
562
563         if (stats) {
564                 nr_batch = roundup_pow2(depth / batch_submit);
565                 if (nr_batch < 2)
566                         nr_batch = 2;
567                 s->clock_batch = calloc(nr_batch, sizeof(unsigned long));
568                 s->clock_index = 1;
569
570                 s->plat = calloc(PLAT_NR, sizeof(unsigned long));
571         } else {
572                 s->clock_batch = NULL;
573                 s->plat = NULL;
574                 nr_batch = 0;
575         }
576
577         return nr_batch;
578 }
579
580 #ifdef CONFIG_LIBAIO
581 static int prep_more_ios_aio(struct submitter *s, int max_ios, struct iocb *iocbs)
582 {
583         unsigned long offset, data;
584         struct file *f;
585         unsigned index;
586         long r;
587
588         index = 0;
589         while (index < max_ios) {
590                 struct iocb *iocb = &iocbs[index];
591
592                 if (s->nr_files == 1) {
593                         f = &s->files[0];
594                 } else {
595                         f = &s->files[s->cur_file];
596                         if (f->pending_ios >= file_depth(s)) {
597                                 s->cur_file++;
598                                 if (s->cur_file == s->nr_files)
599                                         s->cur_file = 0;
600                                 f = &s->files[s->cur_file];
601                         }
602                 }
603                 f->pending_ios++;
604
605                 r = lrand48();
606                 offset = (r % (f->max_blocks - 1)) * bs;
607                 io_prep_pread(iocb, f->real_fd, s->iovecs[index].iov_base,
608                                 s->iovecs[index].iov_len, offset);
609
610                 data = f->fileno;
611                 if (stats && stats_running)
612                         data |= ((unsigned long) s->clock_index << 32);
613                 iocb->data = (void *) (uintptr_t) data;
614                 index++;
615         }
616         return index;
617 }
618
619 static int reap_events_aio(struct submitter *s, struct io_event *events, int evs)
620 {
621         int last_idx = -1, stat_nr = 0;
622         int reaped = 0;
623
624         while (evs) {
625                 unsigned long data = (uintptr_t) events[reaped].data;
626                 struct file *f = &s->files[data & 0xffffffff];
627
628                 f->pending_ios--;
629                 if (events[reaped].res != bs) {
630                         printf("io: unexpected ret=%ld\n", events[reaped].res);
631                         return -1;
632                 }
633                 if (stats) {
634                         int clock_index = data >> 32;
635
636                         if (last_idx != clock_index) {
637                                 if (last_idx != -1) {
638                                         add_stat(s, last_idx, stat_nr);
639                                         stat_nr = 0;
640                                 }
641                                 last_idx = clock_index;
642                         }
643                         stat_nr++;
644                 }
645                 reaped++;
646                 evs--;
647         }
648
649         if (stat_nr)
650                 add_stat(s, last_idx, stat_nr);
651
652         s->inflight -= reaped;
653         s->done += reaped;
654         return reaped;
655 }
656
657 static void *submitter_aio_fn(void *data)
658 {
659         struct submitter *s = data;
660         int i, ret, prepped, nr_batch;
661         struct iocb **iocbsptr;
662         struct iocb *iocbs;
663         struct io_event *events;
664
665         nr_batch = submitter_init(s);
666
667         iocbsptr = calloc(depth, sizeof(struct iocb *));
668         iocbs = calloc(depth, sizeof(struct iocb));
669         events = calloc(depth, sizeof(struct io_event));
670
671         for (i = 0; i < depth; i++)
672                 iocbsptr[i] = &iocbs[i];
673
674         prepped = 0;
675         do {
676                 int to_wait, to_submit, to_prep;
677
678                 if (!prepped && s->inflight < depth) {
679                         to_prep = min(depth - s->inflight, batch_submit);
680                         prepped = prep_more_ios_aio(s, to_prep, iocbs);
681 #ifdef ARCH_HAVE_CPU_CLOCK
682                         if (prepped && stats) {
683                                 s->clock_batch[s->clock_index] = get_cpu_clock();
684                                 s->clock_index = (s->clock_index + 1) & (nr_batch - 1);
685                         }
686 #endif
687                 }
688                 s->inflight += prepped;
689                 to_submit = prepped;
690
691                 if (to_submit && (s->inflight + to_submit <= depth))
692                         to_wait = 0;
693                 else
694                         to_wait = min(s->inflight + to_submit, batch_complete);
695
696                 ret = io_submit(s->aio_ctx, to_submit, iocbsptr);
697                 s->calls++;
698                 if (ret < 0) {
699                         perror("io_submit");
700                         break;
701                 } else if (ret != to_submit) {
702                         printf("submitted %d, wanted %d\n", ret, to_submit);
703                         break;
704                 }
705                 prepped = 0;
706
707                 while (to_wait) {
708                         int r;
709
710                         s->calls++;
711                         r = io_getevents(s->aio_ctx, to_wait, to_wait, events, NULL);
712                         if (r < 0) {
713                                 perror("io_getevents");
714                                 break;
715                         } else if (r != to_wait) {
716                                 printf("r=%d, wait=%d\n", r, to_wait);
717                                 break;
718                         }
719                         r = reap_events_aio(s, events, r);
720                         s->reaps += r;
721                         to_wait -= r;
722                 }
723         } while (!s->finish);
724
725         free(iocbsptr);
726         free(iocbs);
727         free(events);
728         finish = 1;
729         return NULL;
730 }
731 #endif
732
733 static void *submitter_uring_fn(void *data)
734 {
735         struct submitter *s = data;
736         struct io_sq_ring *ring = &s->sq_ring;
737         int ret, prepped, nr_batch;
738
739         nr_batch = submitter_init(s);
740
741         prepped = 0;
742         do {
743                 int to_wait, to_submit, this_reap, to_prep;
744                 unsigned ring_flags = 0;
745
746                 if (!prepped && s->inflight < depth) {
747                         to_prep = min(depth - s->inflight, batch_submit);
748                         prepped = prep_more_ios_uring(s, to_prep);
749 #ifdef ARCH_HAVE_CPU_CLOCK
750                         if (prepped && stats) {
751                                 s->clock_batch[s->clock_index] = get_cpu_clock();
752                                 s->clock_index = (s->clock_index + 1) & (nr_batch - 1);
753                         }
754 #endif
755                 }
756                 s->inflight += prepped;
757 submit_more:
758                 to_submit = prepped;
759 submit:
760                 if (to_submit && (s->inflight + to_submit <= depth))
761                         to_wait = 0;
762                 else
763                         to_wait = min(s->inflight + to_submit, batch_complete);
764
765                 /*
766                  * Only need to call io_uring_enter if we're not using SQ thread
767                  * poll, or if IORING_SQ_NEED_WAKEUP is set.
768                  */
769                 if (sq_thread_poll)
770                         ring_flags = atomic_load_acquire(ring->flags);
771                 if (!sq_thread_poll || ring_flags & IORING_SQ_NEED_WAKEUP) {
772                         unsigned flags = 0;
773
774                         if (to_wait)
775                                 flags = IORING_ENTER_GETEVENTS;
776                         if (ring_flags & IORING_SQ_NEED_WAKEUP)
777                                 flags |= IORING_ENTER_SQ_WAKEUP;
778                         ret = io_uring_enter(s, to_submit, to_wait, flags);
779                         s->calls++;
780                 } else {
781                         /* for SQPOLL, we submitted it all effectively */
782                         ret = to_submit;
783                 }
784
785                 /*
786                  * For non SQ thread poll, we already got the events we needed
787                  * through the io_uring_enter() above. For SQ thread poll, we
788                  * need to loop here until we find enough events.
789                  */
790                 this_reap = 0;
791                 do {
792                         int r;
793
794                         r = reap_events_uring(s);
795                         if (r == -1) {
796                                 s->finish = 1;
797                                 break;
798                         } else if (r > 0)
799                                 this_reap += r;
800                 } while (sq_thread_poll && this_reap < to_wait);
801                 s->reaps += this_reap;
802
803                 if (ret >= 0) {
804                         if (!ret) {
805                                 to_submit = 0;
806                                 if (s->inflight)
807                                         goto submit;
808                                 continue;
809                         } else if (ret < to_submit) {
810                                 int diff = to_submit - ret;
811
812                                 s->done += ret;
813                                 prepped -= diff;
814                                 goto submit_more;
815                         }
816                         s->done += ret;
817                         prepped = 0;
818                         continue;
819                 } else if (ret < 0) {
820                         if (errno == EAGAIN) {
821                                 if (s->finish)
822                                         break;
823                                 if (this_reap)
824                                         goto submit;
825                                 to_submit = 0;
826                                 goto submit;
827                         }
828                         printf("io_submit: %s\n", strerror(errno));
829                         break;
830                 }
831         } while (!s->finish);
832
833         finish = 1;
834         return NULL;
835 }
836
837 static struct submitter *get_submitter(int offset)
838 {
839         void *ret;
840
841         ret = submitter;
842         if (offset)
843                 ret += offset * (sizeof(*submitter) + depth * sizeof(struct iovec));
844         return ret;
845 }
846
847 static void do_finish(const char *reason)
848 {
849         int j;
850         printf("Exiting on %s\n", reason);
851         for (j = 0; j < nthreads; j++) {
852                 struct submitter *s = get_submitter(j);
853                 s->finish = 1;
854         }
855         finish = 1;
856 }
857
858 static void sig_int(int sig)
859 {
860         do_finish("signal");
861 }
862
863 static void arm_sig_int(void)
864 {
865         struct sigaction act;
866
867         memset(&act, 0, sizeof(act));
868         act.sa_handler = sig_int;
869         act.sa_flags = SA_RESTART;
870         sigaction(SIGINT, &act, NULL);
871
872         /* Windows uses SIGBREAK as a quit signal from other applications */
873 #ifdef WIN32
874         sigaction(SIGBREAK, &act, NULL);
875 #endif
876 }
877
878 static int setup_aio(struct submitter *s)
879 {
880 #ifdef CONFIG_LIBAIO
881         if (polled) {
882                 fprintf(stderr, "aio does not support polled IO\n");
883                 polled = 0;
884         }
885         if (sq_thread_poll) {
886                 fprintf(stderr, "aio does not support SQPOLL IO\n");
887                 sq_thread_poll = 0;
888         }
889         if (do_nop) {
890                 fprintf(stderr, "aio does not support polled IO\n");
891                 do_nop = 0;
892         }
893         if (fixedbufs || register_files) {
894                 fprintf(stderr, "aio does not support registered files or buffers\n");
895                 fixedbufs = register_files = 0;
896         }
897
898         return io_queue_init(depth, &s->aio_ctx);
899 #else
900         fprintf(stderr, "Legacy AIO not available on this system/build\n");
901         errno = EINVAL;
902         return -1;
903 #endif
904 }
905
906 static int setup_ring(struct submitter *s)
907 {
908         struct io_sq_ring *sring = &s->sq_ring;
909         struct io_cq_ring *cring = &s->cq_ring;
910         struct io_uring_params p;
911         int ret, fd;
912         void *ptr;
913
914         memset(&p, 0, sizeof(p));
915
916         if (polled && !do_nop)
917                 p.flags |= IORING_SETUP_IOPOLL;
918         if (sq_thread_poll) {
919                 p.flags |= IORING_SETUP_SQPOLL;
920                 if (sq_thread_cpu != -1) {
921                         p.flags |= IORING_SETUP_SQ_AFF;
922                         p.sq_thread_cpu = sq_thread_cpu;
923                 }
924         }
925
926         fd = io_uring_setup(depth, &p);
927         if (fd < 0) {
928                 perror("io_uring_setup");
929                 return 1;
930         }
931         s->ring_fd = fd;
932
933         io_uring_probe(fd);
934
935         if (fixedbufs) {
936                 struct rlimit rlim;
937
938                 rlim.rlim_cur = RLIM_INFINITY;
939                 rlim.rlim_max = RLIM_INFINITY;
940                 /* ignore potential error, not needed on newer kernels */
941                 setrlimit(RLIMIT_MEMLOCK, &rlim);
942
943                 ret = io_uring_register_buffers(s);
944                 if (ret < 0) {
945                         perror("io_uring_register_buffers");
946                         return 1;
947                 }
948         }
949
950         if (register_files) {
951                 ret = io_uring_register_files(s);
952                 if (ret < 0) {
953                         perror("io_uring_register_files");
954                         return 1;
955                 }
956         }
957
958         ptr = mmap(0, p.sq_off.array + p.sq_entries * sizeof(__u32),
959                         PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
960                         IORING_OFF_SQ_RING);
961         sring->head = ptr + p.sq_off.head;
962         sring->tail = ptr + p.sq_off.tail;
963         sring->ring_mask = ptr + p.sq_off.ring_mask;
964         sring->ring_entries = ptr + p.sq_off.ring_entries;
965         sring->flags = ptr + p.sq_off.flags;
966         sring->array = ptr + p.sq_off.array;
967         sq_ring_mask = *sring->ring_mask;
968
969         s->sqes = mmap(0, p.sq_entries * sizeof(struct io_uring_sqe),
970                         PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
971                         IORING_OFF_SQES);
972
973         ptr = mmap(0, p.cq_off.cqes + p.cq_entries * sizeof(struct io_uring_cqe),
974                         PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
975                         IORING_OFF_CQ_RING);
976         cring->head = ptr + p.cq_off.head;
977         cring->tail = ptr + p.cq_off.tail;
978         cring->ring_mask = ptr + p.cq_off.ring_mask;
979         cring->ring_entries = ptr + p.cq_off.ring_entries;
980         cring->cqes = ptr + p.cq_off.cqes;
981         cq_ring_mask = *cring->ring_mask;
982         return 0;
983 }
984
985 static void file_depths(char *buf)
986 {
987         bool prev = false;
988         char *p;
989         int i, j;
990
991         buf[0] = '\0';
992         p = buf;
993         for (j = 0; j < nthreads; j++) {
994                 struct submitter *s = get_submitter(j);
995
996                 for (i = 0; i < s->nr_files; i++) {
997                         struct file *f = &s->files[i];
998
999                         if (prev)
1000                                 p += sprintf(p, " %d", f->pending_ios);
1001                         else
1002                                 p += sprintf(p, "%d", f->pending_ios);
1003                         prev = true;
1004                 }
1005         }
1006 }
1007
1008 static void usage(char *argv, int status)
1009 {
1010         char runtime_str[16];
1011         snprintf(runtime_str, sizeof(runtime_str), "%d", runtime);
1012         printf("%s [options] -- [filenames]\n"
1013                 " -d <int>  : IO Depth, default %d\n"
1014                 " -s <int>  : Batch submit, default %d\n"
1015                 " -c <int>  : Batch complete, default %d\n"
1016                 " -b <int>  : Block size, default %d\n"
1017                 " -p <bool> : Polled IO, default %d\n"
1018                 " -B <bool> : Fixed buffers, default %d\n"
1019                 " -F <bool> : Register files, default %d\n"
1020                 " -n <int>  : Number of threads, default %d\n"
1021                 " -O <bool> : Use O_DIRECT, default %d\n"
1022                 " -N <bool> : Perform just no-op requests, default %d\n"
1023                 " -t <bool> : Track IO latencies, default %d\n"
1024                 " -T <int>  : TSC rate in HZ\n"
1025                 " -a <bool> : Use legacy aio, default %d\n"
1026                 " -r <int>  : Runtime in seconds, default %s\n",
1027                 argv, DEPTH, BATCH_SUBMIT, BATCH_COMPLETE, BS, polled,
1028                 fixedbufs, register_files, nthreads, !buffered, do_nop, stats, aio,
1029                 runtime == 0 ? "unlimited" : runtime_str);
1030         exit(status);
1031 }
1032
1033 static void read_tsc_rate(void)
1034 {
1035         char buffer[32];
1036         int fd, ret;
1037
1038         if (tsc_rate)
1039                 return;
1040
1041         fd = open(TSC_RATE_FILE, O_RDONLY);
1042         if (fd < 0)
1043                 return;
1044
1045         ret = read(fd, buffer, sizeof(buffer));
1046         if (ret < 0) {
1047                 close(fd);
1048                 return;
1049         }
1050
1051         tsc_rate = strtoul(buffer, NULL, 10);
1052         printf("Using TSC rate %luHz\n", tsc_rate);
1053         close(fd);
1054 }
1055
1056 static void write_tsc_rate(void)
1057 {
1058         char buffer[32];
1059         struct stat sb;
1060         int fd, ret;
1061
1062         if (!stat(TSC_RATE_FILE, &sb))
1063                 return;
1064
1065         fd = open(TSC_RATE_FILE, O_WRONLY | O_CREAT, 0644);
1066         if (fd < 0)
1067                 return;
1068
1069         memset(buffer, 0, sizeof(buffer));
1070         sprintf(buffer, "%lu", tsc_rate);
1071         ret = write(fd, buffer, strlen(buffer));
1072         if (ret < 0)
1073                 perror("write");
1074         close(fd);
1075 }
1076
1077 int main(int argc, char *argv[])
1078 {
1079         struct submitter *s;
1080         unsigned long done, calls, reap;
1081         int err, i, j, flags, fd, opt, threads_per_f, threads_rem = 0, nfiles;
1082         struct file f;
1083         char *fdepths;
1084         void *ret;
1085
1086         if (!do_nop && argc < 2)
1087                 usage(argv[0], 1);
1088
1089         while ((opt = getopt(argc, argv, "d:s:c:b:p:B:F:n:N:O:t:T:a:r:h?")) != -1) {
1090                 switch (opt) {
1091                 case 'a':
1092                         aio = !!atoi(optarg);
1093                         break;
1094                 case 'd':
1095                         depth = atoi(optarg);
1096                         break;
1097                 case 's':
1098                         batch_submit = atoi(optarg);
1099                         if (!batch_submit)
1100                                 batch_submit = 1;
1101                         break;
1102                 case 'c':
1103                         batch_complete = atoi(optarg);
1104                         if (!batch_complete)
1105                                 batch_complete = 1;
1106                         break;
1107                 case 'b':
1108                         bs = atoi(optarg);
1109                         break;
1110                 case 'p':
1111                         polled = !!atoi(optarg);
1112                         break;
1113                 case 'B':
1114                         fixedbufs = !!atoi(optarg);
1115                         break;
1116                 case 'F':
1117                         register_files = !!atoi(optarg);
1118                         break;
1119                 case 'n':
1120                         nthreads = atoi(optarg);
1121                         if (!nthreads) {
1122                                 printf("Threads must be non-zero\n");
1123                                 usage(argv[0], 1);
1124                         }
1125                         break;
1126                 case 'N':
1127                         do_nop = !!atoi(optarg);
1128                         break;
1129                 case 'O':
1130                         buffered = !atoi(optarg);
1131                         break;
1132                 case 't':
1133 #ifndef ARCH_HAVE_CPU_CLOCK
1134                         fprintf(stderr, "Stats not supported on this CPU\n");
1135                         return 1;
1136 #endif
1137                         stats = !!atoi(optarg);
1138                         break;
1139                 case 'T':
1140 #ifndef ARCH_HAVE_CPU_CLOCK
1141                         fprintf(stderr, "Stats not supported on this CPU\n");
1142                         return 1;
1143 #endif
1144                         tsc_rate = strtoul(optarg, NULL, 10);
1145                         write_tsc_rate();
1146                         break;
1147                 case 'r':
1148                         runtime = atoi(optarg);
1149                         break;
1150                 case 'h':
1151                 case '?':
1152                 default:
1153                         usage(argv[0], 0);
1154                         break;
1155                 }
1156         }
1157
1158         if (stats)
1159                 read_tsc_rate();
1160
1161         if (batch_complete > depth)
1162                 batch_complete = depth;
1163         if (batch_submit > depth)
1164                 batch_submit = depth;
1165
1166         submitter = calloc(nthreads, sizeof(*submitter) +
1167                                 depth * sizeof(struct iovec));
1168         for (j = 0; j < nthreads; j++) {
1169                 s = get_submitter(j);
1170                 s->index = j;
1171                 s->done = s->calls = s->reaps = 0;
1172         }
1173
1174         flags = O_RDONLY | O_NOATIME;
1175         if (!buffered)
1176                 flags |= O_DIRECT;
1177
1178         j = 0;
1179         i = optind;
1180         nfiles = argc - i;
1181         if (!do_nop) {
1182                 if (!nfiles) {
1183                         printf("No files specified\n");
1184                         usage(argv[0], 1);
1185                 }
1186                 threads_per_f = nthreads / nfiles;
1187                 /* make sure each thread gets assigned files */
1188                 if (threads_per_f == 0) {
1189                         threads_per_f = 1;
1190                 } else {
1191                         threads_rem = nthreads - threads_per_f * nfiles;
1192                 }
1193         }
1194         while (!do_nop && i < argc) {
1195                 int k, limit;
1196
1197                 memset(&f, 0, sizeof(f));
1198
1199                 fd = open(argv[i], flags);
1200                 if (fd < 0) {
1201                         perror("open");
1202                         return 1;
1203                 }
1204                 f.real_fd = fd;
1205                 if (get_file_size(&f)) {
1206                         printf("failed getting size of device/file\n");
1207                         return 1;
1208                 }
1209                 if (f.max_blocks <= 1) {
1210                         printf("Zero file/device size?\n");
1211                         return 1;
1212                 }
1213                 f.max_blocks--;
1214
1215                 limit = threads_per_f;
1216                 limit += threads_rem > 0 ? 1 : 0;
1217                 for (k = 0; k < limit; k++) {
1218                         s = get_submitter((j + k) % nthreads);
1219
1220                         if (s->nr_files == MAX_FDS) {
1221                                 printf("Max number of files (%d) reached\n", MAX_FDS);
1222                                 break;
1223                         }
1224
1225                         memcpy(&s->files[s->nr_files], &f, sizeof(f));
1226
1227                         printf("Added file %s (submitter %d)\n", argv[i], s->index);
1228                         s->nr_files++;
1229                 }
1230                 threads_rem--;
1231                 i++;
1232                 j += limit;
1233         }
1234
1235         arm_sig_int();
1236
1237         for (j = 0; j < nthreads; j++) {
1238                 s = get_submitter(j);
1239                 for (i = 0; i < depth; i++) {
1240                         void *buf;
1241
1242                         if (posix_memalign(&buf, bs, bs)) {
1243                                 printf("failed alloc\n");
1244                                 return 1;
1245                         }
1246                         s->iovecs[i].iov_base = buf;
1247                         s->iovecs[i].iov_len = bs;
1248                 }
1249         }
1250
1251         for (j = 0; j < nthreads; j++) {
1252                 s = get_submitter(j);
1253
1254                 if (!aio)
1255                         err = setup_ring(s);
1256                 else
1257                         err = setup_aio(s);
1258                 if (err) {
1259                         printf("ring setup failed: %s, %d\n", strerror(errno), err);
1260                         return 1;
1261                 }
1262         }
1263         s = get_submitter(0);
1264         printf("polled=%d, fixedbufs=%d, register_files=%d, buffered=%d, QD=%d\n", polled, fixedbufs, register_files, buffered, depth);
1265         if (!aio)
1266                 printf("Engine=io_uring, sq_ring=%d, cq_ring=%d\n", *s->sq_ring.ring_entries, *s->cq_ring.ring_entries);
1267         else
1268                 printf("Engine=aio\n");
1269
1270         for (j = 0; j < nthreads; j++) {
1271                 s = get_submitter(j);
1272                 if (!aio)
1273                         pthread_create(&s->thread, NULL, submitter_uring_fn, s);
1274 #ifdef CONFIG_LIBAIO
1275                 else
1276                         pthread_create(&s->thread, NULL, submitter_aio_fn, s);
1277 #endif
1278         }
1279
1280         fdepths = malloc(8 * s->nr_files * nthreads);
1281         reap = calls = done = 0;
1282         do {
1283                 unsigned long this_done = 0;
1284                 unsigned long this_reap = 0;
1285                 unsigned long this_call = 0;
1286                 unsigned long rpc = 0, ipc = 0;
1287                 unsigned long iops, bw;
1288
1289                 sleep(1);
1290                 if (runtime && !--runtime)
1291                         do_finish("timeout");
1292
1293                 /* don't print partial run, if interrupted by signal */
1294                 if (finish)
1295                         break;
1296
1297                 /* one second in to the run, enable stats */
1298                 if (stats)
1299                         stats_running = 1;
1300
1301                 for (j = 0; j < nthreads; j++) {
1302                         this_done += s->done;
1303                         this_call += s->calls;
1304                         this_reap += s->reaps;
1305                 }
1306                 if (this_call - calls) {
1307                         rpc = (this_done - done) / (this_call - calls);
1308                         ipc = (this_reap - reap) / (this_call - calls);
1309                 } else
1310                         rpc = ipc = -1;
1311                 file_depths(fdepths);
1312                 iops = this_done - done;
1313                 if (bs > 1048576)
1314                         bw = iops * (bs / 1048576);
1315                 else
1316                         bw = iops / (1048576 / bs);
1317                 printf("IOPS=%lu, ", iops);
1318                 if (!do_nop)
1319                         printf("BW=%luMiB/s, ", bw);
1320                 printf("IOS/call=%ld/%ld, inflight=(%s)\n", rpc, ipc, fdepths);
1321                 done = this_done;
1322                 calls = this_call;
1323                 reap = this_reap;
1324         } while (!finish);
1325
1326         for (j = 0; j < nthreads; j++) {
1327                 s = get_submitter(j);
1328                 pthread_join(s->thread, &ret);
1329                 close(s->ring_fd);
1330
1331                 if (stats) {
1332                         unsigned long nr;
1333
1334                         printf("%d: Latency percentiles:\n", s->tid);
1335                         for (i = 0, nr = 0; i < PLAT_NR; i++)
1336                                 nr += s->plat[i];
1337                         show_clat_percentiles(s->plat, nr, 4);
1338                         free(s->clock_batch);
1339                         free(s->plat);
1340                 }
1341         }
1342
1343         free(fdepths);
1344         free(submitter);
1345         return 0;
1346 }