Merge branch 'evelu-ocp' of https://github.com/ErwanAliasr1/fio
[fio.git] / t / io_uring.c
1 #include <stdio.h>
2 #include <errno.h>
3 #include <assert.h>
4 #include <stdlib.h>
5 #include <stddef.h>
6 #include <signal.h>
7 #include <inttypes.h>
8 #include <math.h>
9
10 #ifdef CONFIG_LIBAIO
11 #include <libaio.h>
12 #endif
13
14 #include <sys/types.h>
15 #include <sys/stat.h>
16 #include <sys/ioctl.h>
17 #include <sys/syscall.h>
18 #include <sys/resource.h>
19 #include <sys/mman.h>
20 #include <sys/uio.h>
21 #include <linux/fs.h>
22 #include <fcntl.h>
23 #include <unistd.h>
24 #include <string.h>
25 #include <pthread.h>
26 #include <sched.h>
27
28 #include "../arch/arch.h"
29 #include "../lib/types.h"
30 #include "../lib/roundup.h"
31 #include "../minmax.h"
32 #include "../os/linux/io_uring.h"
33
34 struct io_sq_ring {
35         unsigned *head;
36         unsigned *tail;
37         unsigned *ring_mask;
38         unsigned *ring_entries;
39         unsigned *flags;
40         unsigned *array;
41 };
42
43 struct io_cq_ring {
44         unsigned *head;
45         unsigned *tail;
46         unsigned *ring_mask;
47         unsigned *ring_entries;
48         struct io_uring_cqe *cqes;
49 };
50
51 #define DEPTH                   128
52 #define BATCH_SUBMIT            32
53 #define BATCH_COMPLETE          32
54 #define BS                      4096
55
56 #define MAX_FDS                 16
57
58 static unsigned sq_ring_mask, cq_ring_mask;
59
60 struct file {
61         unsigned long max_blocks;
62         unsigned pending_ios;
63         int real_fd;
64         int fixed_fd;
65         int fileno;
66 };
67
68 #define PLAT_BITS               6
69 #define PLAT_VAL                (1 << PLAT_BITS)
70 #define PLAT_GROUP_NR           29
71 #define PLAT_NR                 (PLAT_GROUP_NR * PLAT_VAL)
72
73 struct submitter {
74         pthread_t thread;
75         int ring_fd;
76         int index;
77         struct io_sq_ring sq_ring;
78         struct io_uring_sqe *sqes;
79         struct io_cq_ring cq_ring;
80         int inflight;
81         int tid;
82         unsigned long reaps;
83         unsigned long done;
84         unsigned long calls;
85         volatile int finish;
86
87         __s32 *fds;
88
89         unsigned long *clock_batch;
90         int clock_index;
91         unsigned long *plat;
92
93 #ifdef CONFIG_LIBAIO
94         io_context_t aio_ctx;
95 #endif
96
97         struct file files[MAX_FDS];
98         unsigned nr_files;
99         unsigned cur_file;
100         struct iovec iovecs[];
101 };
102
103 static struct submitter *submitter;
104 static volatile int finish;
105 static int stats_running;
106
107 static int depth = DEPTH;
108 static int batch_submit = BATCH_SUBMIT;
109 static int batch_complete = BATCH_COMPLETE;
110 static int bs = BS;
111 static int polled = 1;          /* use IO polling */
112 static int fixedbufs = 1;       /* use fixed user buffers */
113 static int register_files = 1;  /* use fixed files */
114 static int buffered = 0;        /* use buffered IO, not O_DIRECT */
115 static int sq_thread_poll = 0;  /* use kernel submission/poller thread */
116 static int sq_thread_cpu = -1;  /* pin above thread to this CPU */
117 static int do_nop = 0;          /* no-op SQ ring commands */
118 static int nthreads = 1;
119 static int stats = 0;           /* generate IO stats */
120 static int aio = 0;             /* use libaio */
121 static int runtime = 0; /* runtime */
122
123 static unsigned long tsc_rate;
124
125 #define TSC_RATE_FILE   "tsc-rate"
126
127 static int vectored = 1;
128
129 static float plist[] = { 1.0, 5.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0,
130                         80.0, 90.0, 95.0, 99.0, 99.5, 99.9, 99.95, 99.99 };
131 static int plist_len = 17;
132
133 static unsigned long cycles_to_nsec(unsigned long cycles)
134 {
135         uint64_t val;
136
137         if (!tsc_rate)
138                 return cycles;
139
140         val = cycles * 1000000000ULL;
141         return val / tsc_rate;
142 }
143
144 static unsigned long plat_idx_to_val(unsigned int idx)
145 {
146         unsigned int error_bits;
147         unsigned long k, base;
148
149         assert(idx < PLAT_NR);
150
151         /* MSB <= (PLAT_BITS-1), cannot be rounded off. Use
152          * all bits of the sample as index */
153         if (idx < (PLAT_VAL << 1))
154                 return cycles_to_nsec(idx);
155
156         /* Find the group and compute the minimum value of that group */
157         error_bits = (idx >> PLAT_BITS) - 1;
158         base = ((unsigned long) 1) << (error_bits + PLAT_BITS);
159
160         /* Find its bucket number of the group */
161         k = idx % PLAT_VAL;
162
163         /* Return the mean of the range of the bucket */
164         return cycles_to_nsec(base + ((k + 0.5) * (1 << error_bits)));
165 }
166
167 unsigned int calc_clat_percentiles(unsigned long *io_u_plat, unsigned long nr,
168                                    unsigned long **output,
169                                    unsigned long *maxv, unsigned long *minv)
170 {
171         unsigned long sum = 0;
172         unsigned int len = plist_len, i, j = 0;
173         unsigned long *ovals = NULL;
174         bool is_last;
175
176         *minv = -1ULL;
177         *maxv = 0;
178
179         ovals = malloc(len * sizeof(*ovals));
180         if (!ovals)
181                 return 0;
182
183         /*
184          * Calculate bucket values, note down max and min values
185          */
186         is_last = false;
187         for (i = 0; i < PLAT_NR && !is_last; i++) {
188                 sum += io_u_plat[i];
189                 while (sum >= ((long double) plist[j] / 100.0 * nr)) {
190                         assert(plist[j] <= 100.0);
191
192                         ovals[j] = plat_idx_to_val(i);
193                         if (ovals[j] < *minv)
194                                 *minv = ovals[j];
195                         if (ovals[j] > *maxv)
196                                 *maxv = ovals[j];
197
198                         is_last = (j == len - 1) != 0;
199                         if (is_last)
200                                 break;
201
202                         j++;
203                 }
204         }
205
206         if (!is_last)
207                 fprintf(stderr, "error calculating latency percentiles\n");
208
209         *output = ovals;
210         return len;
211 }
212
213 static void show_clat_percentiles(unsigned long *io_u_plat, unsigned long nr,
214                                   unsigned int precision)
215 {
216         unsigned int divisor, len, i, j = 0;
217         unsigned long minv, maxv;
218         unsigned long *ovals;
219         int per_line, scale_down, time_width;
220         bool is_last;
221         char fmt[32];
222
223         len = calc_clat_percentiles(io_u_plat, nr, &ovals, &maxv, &minv);
224         if (!len || !ovals)
225                 goto out;
226
227         if (!tsc_rate) {
228                 scale_down = 0;
229                 divisor = 1;
230                 printf("    percentiles (tsc ticks):\n     |");
231         } else if (minv > 2000 && maxv > 99999) {
232                 scale_down = 1;
233                 divisor = 1000;
234                 printf("    percentiles (usec):\n     |");
235         } else {
236                 scale_down = 0;
237                 divisor = 1;
238                 printf("    percentiles (nsec):\n     |");
239         }
240
241         time_width = max(5, (int) (log10(maxv / divisor) + 1));
242         snprintf(fmt, sizeof(fmt), " %%%u.%ufth=[%%%dllu]%%c", precision + 3,
243                         precision, time_width);
244         /* fmt will be something like " %5.2fth=[%4llu]%c" */
245         per_line = (80 - 7) / (precision + 10 + time_width);
246
247         for (j = 0; j < len; j++) {
248                 /* for formatting */
249                 if (j != 0 && (j % per_line) == 0)
250                         printf("     |");
251
252                 /* end of the list */
253                 is_last = (j == len - 1) != 0;
254
255                 for (i = 0; i < scale_down; i++)
256                         ovals[j] = (ovals[j] + 999) / 1000;
257
258                 printf(fmt, plist[j], ovals[j], is_last ? '\n' : ',');
259
260                 if (is_last)
261                         break;
262
263                 if ((j % per_line) == per_line - 1)     /* for formatting */
264                         printf("\n");
265         }
266
267 out:
268         free(ovals);
269 }
270
271 static unsigned int plat_val_to_idx(unsigned long val)
272 {
273         unsigned int msb, error_bits, base, offset, idx;
274
275         /* Find MSB starting from bit 0 */
276         if (val == 0)
277                 msb = 0;
278         else
279                 msb = (sizeof(val)*8) - __builtin_clzll(val) - 1;
280
281         /*
282          * MSB <= (PLAT_BITS-1), cannot be rounded off. Use
283          * all bits of the sample as index
284          */
285         if (msb <= PLAT_BITS)
286                 return val;
287
288         /* Compute the number of error bits to discard*/
289         error_bits = msb - PLAT_BITS;
290
291         /* Compute the number of buckets before the group */
292         base = (error_bits + 1) << PLAT_BITS;
293
294         /*
295          * Discard the error bits and apply the mask to find the
296          * index for the buckets in the group
297          */
298         offset = (PLAT_VAL - 1) & (val >> error_bits);
299
300         /* Make sure the index does not exceed (array size - 1) */
301         idx = (base + offset) < (PLAT_NR - 1) ?
302                 (base + offset) : (PLAT_NR - 1);
303
304         return idx;
305 }
306
307 static void add_stat(struct submitter *s, int clock_index, int nr)
308 {
309 #ifdef ARCH_HAVE_CPU_CLOCK
310         unsigned long cycles;
311         unsigned int pidx;
312
313         if (!s->finish && clock_index) {
314                 cycles = get_cpu_clock();
315                 cycles -= s->clock_batch[clock_index];
316                 pidx = plat_val_to_idx(cycles);
317                 s->plat[pidx] += nr;
318         }
319 #endif
320 }
321
322 static int io_uring_register_buffers(struct submitter *s)
323 {
324         if (do_nop)
325                 return 0;
326
327         return syscall(__NR_io_uring_register, s->ring_fd,
328                         IORING_REGISTER_BUFFERS, s->iovecs, depth);
329 }
330
331 static int io_uring_register_files(struct submitter *s)
332 {
333         int i;
334
335         if (do_nop)
336                 return 0;
337
338         s->fds = calloc(s->nr_files, sizeof(__s32));
339         for (i = 0; i < s->nr_files; i++) {
340                 s->fds[i] = s->files[i].real_fd;
341                 s->files[i].fixed_fd = i;
342         }
343
344         return syscall(__NR_io_uring_register, s->ring_fd,
345                         IORING_REGISTER_FILES, s->fds, s->nr_files);
346 }
347
348 static int io_uring_setup(unsigned entries, struct io_uring_params *p)
349 {
350         return syscall(__NR_io_uring_setup, entries, p);
351 }
352
353 static void io_uring_probe(int fd)
354 {
355         struct io_uring_probe *p;
356         int ret;
357
358         p = malloc(sizeof(*p) + 256 * sizeof(struct io_uring_probe_op));
359         if (!p)
360                 return;
361
362         memset(p, 0, sizeof(*p) + 256 * sizeof(struct io_uring_probe_op));
363         ret = syscall(__NR_io_uring_register, fd, IORING_REGISTER_PROBE, p, 256);
364         if (ret < 0)
365                 goto out;
366
367         if (IORING_OP_READ > p->ops_len)
368                 goto out;
369
370         if ((p->ops[IORING_OP_READ].flags & IO_URING_OP_SUPPORTED))
371                 vectored = 0;
372 out:
373         free(p);
374 }
375
376 static int io_uring_enter(struct submitter *s, unsigned int to_submit,
377                           unsigned int min_complete, unsigned int flags)
378 {
379         return syscall(__NR_io_uring_enter, s->ring_fd, to_submit, min_complete,
380                         flags, NULL, 0);
381 }
382
383 #ifndef CONFIG_HAVE_GETTID
384 static int gettid(void)
385 {
386         return syscall(__NR_gettid);
387 }
388 #endif
389
390 static unsigned file_depth(struct submitter *s)
391 {
392         return (depth + s->nr_files - 1) / s->nr_files;
393 }
394
395 static void init_io(struct submitter *s, unsigned index)
396 {
397         struct io_uring_sqe *sqe = &s->sqes[index];
398         unsigned long offset;
399         struct file *f;
400         long r;
401
402         if (do_nop) {
403                 sqe->opcode = IORING_OP_NOP;
404                 return;
405         }
406
407         if (s->nr_files == 1) {
408                 f = &s->files[0];
409         } else {
410                 f = &s->files[s->cur_file];
411                 if (f->pending_ios >= file_depth(s)) {
412                         s->cur_file++;
413                         if (s->cur_file == s->nr_files)
414                                 s->cur_file = 0;
415                         f = &s->files[s->cur_file];
416                 }
417         }
418         f->pending_ios++;
419
420         r = lrand48();
421         offset = (r % (f->max_blocks - 1)) * bs;
422
423         if (register_files) {
424                 sqe->flags = IOSQE_FIXED_FILE;
425                 sqe->fd = f->fixed_fd;
426         } else {
427                 sqe->flags = 0;
428                 sqe->fd = f->real_fd;
429         }
430         if (fixedbufs) {
431                 sqe->opcode = IORING_OP_READ_FIXED;
432                 sqe->addr = (unsigned long) s->iovecs[index].iov_base;
433                 sqe->len = bs;
434                 sqe->buf_index = index;
435         } else if (!vectored) {
436                 sqe->opcode = IORING_OP_READ;
437                 sqe->addr = (unsigned long) s->iovecs[index].iov_base;
438                 sqe->len = bs;
439                 sqe->buf_index = 0;
440         } else {
441                 sqe->opcode = IORING_OP_READV;
442                 sqe->addr = (unsigned long) &s->iovecs[index];
443                 sqe->len = 1;
444                 sqe->buf_index = 0;
445         }
446         sqe->ioprio = 0;
447         sqe->off = offset;
448         sqe->user_data = (unsigned long) f->fileno;
449         if (stats && stats_running)
450                 sqe->user_data |= ((unsigned long)s->clock_index << 32);
451 }
452
453 static int prep_more_ios_uring(struct submitter *s, int max_ios)
454 {
455         struct io_sq_ring *ring = &s->sq_ring;
456         unsigned index, tail, next_tail, prepped = 0;
457
458         next_tail = tail = *ring->tail;
459         do {
460                 next_tail++;
461                 if (next_tail == atomic_load_acquire(ring->head))
462                         break;
463
464                 index = tail & sq_ring_mask;
465                 init_io(s, index);
466                 ring->array[index] = index;
467                 prepped++;
468                 tail = next_tail;
469         } while (prepped < max_ios);
470
471         if (prepped)
472                 atomic_store_release(ring->tail, tail);
473         return prepped;
474 }
475
476 static int get_file_size(struct file *f)
477 {
478         struct stat st;
479
480         if (fstat(f->real_fd, &st) < 0)
481                 return -1;
482         if (S_ISBLK(st.st_mode)) {
483                 unsigned long long bytes;
484
485                 if (ioctl(f->real_fd, BLKGETSIZE64, &bytes) != 0)
486                         return -1;
487
488                 f->max_blocks = bytes / bs;
489                 return 0;
490         } else if (S_ISREG(st.st_mode)) {
491                 f->max_blocks = st.st_size / bs;
492                 return 0;
493         }
494
495         return -1;
496 }
497
498 static int reap_events_uring(struct submitter *s)
499 {
500         struct io_cq_ring *ring = &s->cq_ring;
501         struct io_uring_cqe *cqe;
502         unsigned head, reaped = 0;
503         int last_idx = -1, stat_nr = 0;
504
505         head = *ring->head;
506         do {
507                 struct file *f;
508
509                 read_barrier();
510                 if (head == atomic_load_acquire(ring->tail))
511                         break;
512                 cqe = &ring->cqes[head & cq_ring_mask];
513                 if (!do_nop) {
514                         int fileno = cqe->user_data & 0xffffffff;
515
516                         f = &s->files[fileno];
517                         f->pending_ios--;
518                         if (cqe->res != bs) {
519                                 printf("io: unexpected ret=%d\n", cqe->res);
520                                 if (polled && cqe->res == -EOPNOTSUPP)
521                                         printf("Your filesystem/driver/kernel doesn't support polled IO\n");
522                                 return -1;
523                         }
524                 }
525                 if (stats) {
526                         int clock_index = cqe->user_data >> 32;
527
528                         if (last_idx != clock_index) {
529                                 if (last_idx != -1) {
530                                         add_stat(s, last_idx, stat_nr);
531                                         stat_nr = 0;
532                                 }
533                                 last_idx = clock_index;
534                         } else if (clock_index)
535                                 stat_nr++;
536                 }
537                 reaped++;
538                 head++;
539         } while (1);
540
541         if (stat_nr)
542                 add_stat(s, last_idx, stat_nr);
543
544         if (reaped) {
545                 s->inflight -= reaped;
546                 atomic_store_release(ring->head, head);
547         }
548         return reaped;
549 }
550
551 static int submitter_init(struct submitter *s)
552 {
553         int i, nr_batch;
554
555         s->tid = gettid();
556         printf("submitter=%d, tid=%d\n", s->index, s->tid);
557
558         srand48(pthread_self());
559
560         for (i = 0; i < MAX_FDS; i++)
561                 s->files[i].fileno = i;
562
563         if (stats) {
564                 nr_batch = roundup_pow2(depth / batch_submit);
565                 s->clock_batch = calloc(nr_batch, sizeof(unsigned long));
566                 s->clock_index = 1;
567
568                 s->plat = calloc(PLAT_NR, sizeof(unsigned long));
569         } else {
570                 s->clock_batch = NULL;
571                 s->plat = NULL;
572                 nr_batch = 0;
573         }
574
575         return nr_batch;
576 }
577
578 #ifdef CONFIG_LIBAIO
579 static int prep_more_ios_aio(struct submitter *s, int max_ios, struct iocb *iocbs)
580 {
581         unsigned long offset, data;
582         struct file *f;
583         unsigned index;
584         long r;
585
586         index = 0;
587         while (index < max_ios) {
588                 struct iocb *iocb = &iocbs[index];
589
590                 if (s->nr_files == 1) {
591                         f = &s->files[0];
592                 } else {
593                         f = &s->files[s->cur_file];
594                         if (f->pending_ios >= file_depth(s)) {
595                                 s->cur_file++;
596                                 if (s->cur_file == s->nr_files)
597                                         s->cur_file = 0;
598                                 f = &s->files[s->cur_file];
599                         }
600                 }
601                 f->pending_ios++;
602
603                 r = lrand48();
604                 offset = (r % (f->max_blocks - 1)) * bs;
605                 io_prep_pread(iocb, f->real_fd, s->iovecs[index].iov_base,
606                                 s->iovecs[index].iov_len, offset);
607
608                 data = f->fileno;
609                 if (stats && stats_running)
610                         data |= ((unsigned long) s->clock_index << 32);
611                 iocb->data = (void *) (uintptr_t) data;
612                 index++;
613         }
614         return index;
615 }
616
617 static int reap_events_aio(struct submitter *s, struct io_event *events, int evs)
618 {
619         int last_idx = -1, stat_nr = 0;
620         int reaped = 0;
621
622         while (evs) {
623                 unsigned long data = (uintptr_t) events[reaped].data;
624                 struct file *f = &s->files[data & 0xffffffff];
625
626                 f->pending_ios--;
627                 if (events[reaped].res != bs) {
628                         printf("io: unexpected ret=%ld\n", events[reaped].res);
629                         return -1;
630                 }
631                 if (stats) {
632                         int clock_index = data >> 32;
633
634                         if (last_idx != clock_index) {
635                                 if (last_idx != -1) {
636                                         add_stat(s, last_idx, stat_nr);
637                                         stat_nr = 0;
638                                 }
639                                 last_idx = clock_index;
640                         } else if (clock_index)
641                                 stat_nr++;
642                 }
643                 reaped++;
644                 evs--;
645         }
646
647         if (stat_nr)
648                 add_stat(s, last_idx, stat_nr);
649
650         s->inflight -= reaped;
651         s->done += reaped;
652         return reaped;
653 }
654
655 static void *submitter_aio_fn(void *data)
656 {
657         struct submitter *s = data;
658         int i, ret, prepped, nr_batch;
659         struct iocb **iocbsptr;
660         struct iocb *iocbs;
661         struct io_event *events;
662
663         nr_batch = submitter_init(s);
664
665         iocbsptr = calloc(depth, sizeof(struct iocb *));
666         iocbs = calloc(depth, sizeof(struct iocb));
667         events = calloc(depth, sizeof(struct io_event));
668
669         for (i = 0; i < depth; i++)
670                 iocbsptr[i] = &iocbs[i];
671
672         prepped = 0;
673         do {
674                 int to_wait, to_submit, to_prep;
675
676                 if (!prepped && s->inflight < depth) {
677                         to_prep = min(depth - s->inflight, batch_submit);
678                         prepped = prep_more_ios_aio(s, to_prep, iocbs);
679 #ifdef ARCH_HAVE_CPU_CLOCK
680                         if (prepped && stats) {
681                                 s->clock_batch[s->clock_index] = get_cpu_clock();
682                                 s->clock_index = (s->clock_index + 1) & (nr_batch - 1);
683                         }
684 #endif
685                 }
686                 s->inflight += prepped;
687                 to_submit = prepped;
688
689                 if (to_submit && (s->inflight + to_submit <= depth))
690                         to_wait = 0;
691                 else
692                         to_wait = min(s->inflight + to_submit, batch_complete);
693
694                 ret = io_submit(s->aio_ctx, to_submit, iocbsptr);
695                 s->calls++;
696                 if (ret < 0) {
697                         perror("io_submit");
698                         break;
699                 } else if (ret != to_submit) {
700                         printf("submitted %d, wanted %d\n", ret, to_submit);
701                         break;
702                 }
703                 prepped = 0;
704
705                 while (to_wait) {
706                         int r;
707
708                         s->calls++;
709                         r = io_getevents(s->aio_ctx, to_wait, to_wait, events, NULL);
710                         if (r < 0) {
711                                 perror("io_getevents");
712                                 break;
713                         } else if (r != to_wait) {
714                                 printf("r=%d, wait=%d\n", r, to_wait);
715                                 break;
716                         }
717                         r = reap_events_aio(s, events, r);
718                         s->reaps += r;
719                         to_wait -= r;
720                 }
721         } while (!s->finish);
722
723         free(iocbsptr);
724         free(iocbs);
725         free(events);
726         finish = 1;
727         return NULL;
728 }
729 #endif
730
731 static void *submitter_uring_fn(void *data)
732 {
733         struct submitter *s = data;
734         struct io_sq_ring *ring = &s->sq_ring;
735         int ret, prepped, nr_batch;
736
737         nr_batch = submitter_init(s);
738
739         prepped = 0;
740         do {
741                 int to_wait, to_submit, this_reap, to_prep;
742                 unsigned ring_flags = 0;
743
744                 if (!prepped && s->inflight < depth) {
745                         to_prep = min(depth - s->inflight, batch_submit);
746                         prepped = prep_more_ios_uring(s, to_prep);
747 #ifdef ARCH_HAVE_CPU_CLOCK
748                         if (prepped && stats) {
749                                 s->clock_batch[s->clock_index] = get_cpu_clock();
750                                 s->clock_index = (s->clock_index + 1) & (nr_batch - 1);
751                         }
752 #endif
753                 }
754                 s->inflight += prepped;
755 submit_more:
756                 to_submit = prepped;
757 submit:
758                 if (to_submit && (s->inflight + to_submit <= depth))
759                         to_wait = 0;
760                 else
761                         to_wait = min(s->inflight + to_submit, batch_complete);
762
763                 /*
764                  * Only need to call io_uring_enter if we're not using SQ thread
765                  * poll, or if IORING_SQ_NEED_WAKEUP is set.
766                  */
767                 if (sq_thread_poll)
768                         ring_flags = atomic_load_acquire(ring->flags);
769                 if (!sq_thread_poll || ring_flags & IORING_SQ_NEED_WAKEUP) {
770                         unsigned flags = 0;
771
772                         if (to_wait)
773                                 flags = IORING_ENTER_GETEVENTS;
774                         if (ring_flags & IORING_SQ_NEED_WAKEUP)
775                                 flags |= IORING_ENTER_SQ_WAKEUP;
776                         ret = io_uring_enter(s, to_submit, to_wait, flags);
777                         s->calls++;
778                 } else {
779                         /* for SQPOLL, we submitted it all effectively */
780                         ret = to_submit;
781                 }
782
783                 /*
784                  * For non SQ thread poll, we already got the events we needed
785                  * through the io_uring_enter() above. For SQ thread poll, we
786                  * need to loop here until we find enough events.
787                  */
788                 this_reap = 0;
789                 do {
790                         int r;
791
792                         r = reap_events_uring(s);
793                         if (r == -1) {
794                                 s->finish = 1;
795                                 break;
796                         } else if (r > 0)
797                                 this_reap += r;
798                 } while (sq_thread_poll && this_reap < to_wait);
799                 s->reaps += this_reap;
800
801                 if (ret >= 0) {
802                         if (!ret) {
803                                 to_submit = 0;
804                                 if (s->inflight)
805                                         goto submit;
806                                 continue;
807                         } else if (ret < to_submit) {
808                                 int diff = to_submit - ret;
809
810                                 s->done += ret;
811                                 prepped -= diff;
812                                 goto submit_more;
813                         }
814                         s->done += ret;
815                         prepped = 0;
816                         continue;
817                 } else if (ret < 0) {
818                         if (errno == EAGAIN) {
819                                 if (s->finish)
820                                         break;
821                                 if (this_reap)
822                                         goto submit;
823                                 to_submit = 0;
824                                 goto submit;
825                         }
826                         printf("io_submit: %s\n", strerror(errno));
827                         break;
828                 }
829         } while (!s->finish);
830
831         finish = 1;
832         return NULL;
833 }
834
835 static struct submitter *get_submitter(int offset)
836 {
837         void *ret;
838
839         ret = submitter;
840         if (offset)
841                 ret += offset * (sizeof(*submitter) + depth * sizeof(struct iovec));
842         return ret;
843 }
844
845 static void do_finish(const char *reason)
846 {
847         int j;
848         printf("Exiting on %s\n", reason);
849         for (j = 0; j < nthreads; j++) {
850                 struct submitter *s = get_submitter(j);
851                 s->finish = 1;
852         }
853         finish = 1;
854 }
855
856 static void sig_int(int sig)
857 {
858         do_finish("signal");
859 }
860
861 static void arm_sig_int(void)
862 {
863         struct sigaction act;
864
865         memset(&act, 0, sizeof(act));
866         act.sa_handler = sig_int;
867         act.sa_flags = SA_RESTART;
868         sigaction(SIGINT, &act, NULL);
869
870         /* Windows uses SIGBREAK as a quit signal from other applications */
871 #ifdef WIN32
872         sigaction(SIGBREAK, &act, NULL);
873 #endif
874 }
875
876 static int setup_aio(struct submitter *s)
877 {
878 #ifdef CONFIG_LIBAIO
879         if (polled) {
880                 fprintf(stderr, "aio does not support polled IO\n");
881                 polled = 0;
882         }
883         if (sq_thread_poll) {
884                 fprintf(stderr, "aio does not support SQPOLL IO\n");
885                 sq_thread_poll = 0;
886         }
887         if (do_nop) {
888                 fprintf(stderr, "aio does not support polled IO\n");
889                 do_nop = 0;
890         }
891         if (fixedbufs || register_files) {
892                 fprintf(stderr, "aio does not support registered files or buffers\n");
893                 fixedbufs = register_files = 0;
894         }
895
896         return io_queue_init(depth, &s->aio_ctx);
897 #else
898         fprintf(stderr, "Legacy AIO not available on this system/build\n");
899         errno = EINVAL;
900         return -1;
901 #endif
902 }
903
904 static int setup_ring(struct submitter *s)
905 {
906         struct io_sq_ring *sring = &s->sq_ring;
907         struct io_cq_ring *cring = &s->cq_ring;
908         struct io_uring_params p;
909         int ret, fd;
910         void *ptr;
911
912         memset(&p, 0, sizeof(p));
913
914         if (polled && !do_nop)
915                 p.flags |= IORING_SETUP_IOPOLL;
916         if (sq_thread_poll) {
917                 p.flags |= IORING_SETUP_SQPOLL;
918                 if (sq_thread_cpu != -1) {
919                         p.flags |= IORING_SETUP_SQ_AFF;
920                         p.sq_thread_cpu = sq_thread_cpu;
921                 }
922         }
923
924         fd = io_uring_setup(depth, &p);
925         if (fd < 0) {
926                 perror("io_uring_setup");
927                 return 1;
928         }
929         s->ring_fd = fd;
930
931         io_uring_probe(fd);
932
933         if (fixedbufs) {
934                 struct rlimit rlim;
935
936                 rlim.rlim_cur = RLIM_INFINITY;
937                 rlim.rlim_max = RLIM_INFINITY;
938                 /* ignore potential error, not needed on newer kernels */
939                 setrlimit(RLIMIT_MEMLOCK, &rlim);
940
941                 ret = io_uring_register_buffers(s);
942                 if (ret < 0) {
943                         perror("io_uring_register_buffers");
944                         return 1;
945                 }
946         }
947
948         if (register_files) {
949                 ret = io_uring_register_files(s);
950                 if (ret < 0) {
951                         perror("io_uring_register_files");
952                         return 1;
953                 }
954         }
955
956         ptr = mmap(0, p.sq_off.array + p.sq_entries * sizeof(__u32),
957                         PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
958                         IORING_OFF_SQ_RING);
959         sring->head = ptr + p.sq_off.head;
960         sring->tail = ptr + p.sq_off.tail;
961         sring->ring_mask = ptr + p.sq_off.ring_mask;
962         sring->ring_entries = ptr + p.sq_off.ring_entries;
963         sring->flags = ptr + p.sq_off.flags;
964         sring->array = ptr + p.sq_off.array;
965         sq_ring_mask = *sring->ring_mask;
966
967         s->sqes = mmap(0, p.sq_entries * sizeof(struct io_uring_sqe),
968                         PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
969                         IORING_OFF_SQES);
970
971         ptr = mmap(0, p.cq_off.cqes + p.cq_entries * sizeof(struct io_uring_cqe),
972                         PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
973                         IORING_OFF_CQ_RING);
974         cring->head = ptr + p.cq_off.head;
975         cring->tail = ptr + p.cq_off.tail;
976         cring->ring_mask = ptr + p.cq_off.ring_mask;
977         cring->ring_entries = ptr + p.cq_off.ring_entries;
978         cring->cqes = ptr + p.cq_off.cqes;
979         cq_ring_mask = *cring->ring_mask;
980         return 0;
981 }
982
983 static void file_depths(char *buf)
984 {
985         bool prev = false;
986         char *p;
987         int i, j;
988
989         buf[0] = '\0';
990         p = buf;
991         for (j = 0; j < nthreads; j++) {
992                 struct submitter *s = get_submitter(j);
993
994                 for (i = 0; i < s->nr_files; i++) {
995                         struct file *f = &s->files[i];
996
997                         if (prev)
998                                 p += sprintf(p, " %d", f->pending_ios);
999                         else
1000                                 p += sprintf(p, "%d", f->pending_ios);
1001                         prev = true;
1002                 }
1003         }
1004 }
1005
1006 static void usage(char *argv, int status)
1007 {
1008         char runtime_str[16];
1009         snprintf(runtime_str, sizeof(runtime_str), "%d", runtime);
1010         printf("%s [options] -- [filenames]\n"
1011                 " -d <int>  : IO Depth, default %d\n"
1012                 " -s <int>  : Batch submit, default %d\n"
1013                 " -c <int>  : Batch complete, default %d\n"
1014                 " -b <int>  : Block size, default %d\n"
1015                 " -p <bool> : Polled IO, default %d\n"
1016                 " -B <bool> : Fixed buffers, default %d\n"
1017                 " -F <bool> : Register files, default %d\n"
1018                 " -n <int>  : Number of threads, default %d\n"
1019                 " -O <bool> : Use O_DIRECT, default %d\n"
1020                 " -N <bool> : Perform just no-op requests, default %d\n"
1021                 " -t <bool> : Track IO latencies, default %d\n"
1022                 " -T <int>  : TSC rate in HZ\n"
1023                 " -a <bool> : Use legacy aio, default %d\n"
1024                 " -r <int>  : Runtime in seconds, default %s\n",
1025                 argv, DEPTH, BATCH_SUBMIT, BATCH_COMPLETE, BS, polled,
1026                 fixedbufs, register_files, nthreads, !buffered, do_nop, stats, aio,
1027                 runtime == 0 ? "unlimited" : runtime_str);
1028         exit(status);
1029 }
1030
1031 static void read_tsc_rate(void)
1032 {
1033         char buffer[32];
1034         int fd, ret;
1035
1036         if (tsc_rate)
1037                 return;
1038
1039         fd = open(TSC_RATE_FILE, O_RDONLY);
1040         if (fd < 0)
1041                 return;
1042
1043         ret = read(fd, buffer, sizeof(buffer));
1044         if (ret < 0) {
1045                 close(fd);
1046                 return;
1047         }
1048
1049         tsc_rate = strtoul(buffer, NULL, 10);
1050         printf("Using TSC rate %luHz\n", tsc_rate);
1051         close(fd);
1052 }
1053
1054 static void write_tsc_rate(void)
1055 {
1056         char buffer[32];
1057         struct stat sb;
1058         int fd, ret;
1059
1060         if (!stat(TSC_RATE_FILE, &sb))
1061                 return;
1062
1063         fd = open(TSC_RATE_FILE, O_WRONLY | O_CREAT, 0644);
1064         if (fd < 0)
1065                 return;
1066
1067         memset(buffer, 0, sizeof(buffer));
1068         sprintf(buffer, "%lu", tsc_rate);
1069         ret = write(fd, buffer, strlen(buffer));
1070         if (ret < 0)
1071                 perror("write");
1072         close(fd);
1073 }
1074
1075 int main(int argc, char *argv[])
1076 {
1077         struct submitter *s;
1078         unsigned long done, calls, reap;
1079         int err, i, j, flags, fd, opt, threads_per_f, threads_rem = 0, nfiles;
1080         struct file f;
1081         char *fdepths;
1082         void *ret;
1083
1084         if (!do_nop && argc < 2)
1085                 usage(argv[0], 1);
1086
1087         while ((opt = getopt(argc, argv, "d:s:c:b:p:B:F:n:N:O:t:T:a:r:h?")) != -1) {
1088                 switch (opt) {
1089                 case 'a':
1090                         aio = !!atoi(optarg);
1091                         break;
1092                 case 'd':
1093                         depth = atoi(optarg);
1094                         break;
1095                 case 's':
1096                         batch_submit = atoi(optarg);
1097                         if (!batch_submit)
1098                                 batch_submit = 1;
1099                         break;
1100                 case 'c':
1101                         batch_complete = atoi(optarg);
1102                         if (!batch_complete)
1103                                 batch_complete = 1;
1104                         break;
1105                 case 'b':
1106                         bs = atoi(optarg);
1107                         break;
1108                 case 'p':
1109                         polled = !!atoi(optarg);
1110                         break;
1111                 case 'B':
1112                         fixedbufs = !!atoi(optarg);
1113                         break;
1114                 case 'F':
1115                         register_files = !!atoi(optarg);
1116                         break;
1117                 case 'n':
1118                         nthreads = atoi(optarg);
1119                         if (!nthreads) {
1120                                 printf("Threads must be non-zero\n");
1121                                 usage(argv[0], 1);
1122                         }
1123                         break;
1124                 case 'N':
1125                         do_nop = !!atoi(optarg);
1126                         break;
1127                 case 'O':
1128                         buffered = !atoi(optarg);
1129                         break;
1130                 case 't':
1131 #ifndef ARCH_HAVE_CPU_CLOCK
1132                         fprintf(stderr, "Stats not supported on this CPU\n");
1133                         return 1;
1134 #endif
1135                         stats = !!atoi(optarg);
1136                         break;
1137                 case 'T':
1138 #ifndef ARCH_HAVE_CPU_CLOCK
1139                         fprintf(stderr, "Stats not supported on this CPU\n");
1140                         return 1;
1141 #endif
1142                         tsc_rate = strtoul(optarg, NULL, 10);
1143                         write_tsc_rate();
1144                         break;
1145                 case 'r':
1146                         runtime = atoi(optarg);
1147                         break;
1148                 case 'h':
1149                 case '?':
1150                 default:
1151                         usage(argv[0], 0);
1152                         break;
1153                 }
1154         }
1155
1156         if (stats)
1157                 read_tsc_rate();
1158
1159         if (batch_complete > depth)
1160                 batch_complete = depth;
1161         if (batch_submit > depth)
1162                 batch_submit = depth;
1163
1164         submitter = calloc(nthreads, sizeof(*submitter) +
1165                                 depth * sizeof(struct iovec));
1166         for (j = 0; j < nthreads; j++) {
1167                 s = get_submitter(j);
1168                 s->index = j;
1169                 s->done = s->calls = s->reaps = 0;
1170         }
1171
1172         flags = O_RDONLY | O_NOATIME;
1173         if (!buffered)
1174                 flags |= O_DIRECT;
1175
1176         j = 0;
1177         i = optind;
1178         nfiles = argc - i;
1179         if (!do_nop) {
1180                 if (!nfiles) {
1181                         printf("No files specified\n");
1182                         usage(argv[0], 1);
1183                 }
1184                 threads_per_f = nthreads / nfiles;
1185                 /* make sure each thread gets assigned files */
1186                 if (threads_per_f == 0) {
1187                         threads_per_f = 1;
1188                 } else {
1189                         threads_rem = nthreads - threads_per_f * nfiles;
1190                 }
1191         }
1192         while (!do_nop && i < argc) {
1193                 int k, limit;
1194
1195                 memset(&f, 0, sizeof(f));
1196
1197                 fd = open(argv[i], flags);
1198                 if (fd < 0) {
1199                         perror("open");
1200                         return 1;
1201                 }
1202                 f.real_fd = fd;
1203                 if (get_file_size(&f)) {
1204                         printf("failed getting size of device/file\n");
1205                         return 1;
1206                 }
1207                 if (f.max_blocks <= 1) {
1208                         printf("Zero file/device size?\n");
1209                         return 1;
1210                 }
1211                 f.max_blocks--;
1212
1213                 limit = threads_per_f;
1214                 limit += threads_rem > 0 ? 1 : 0;
1215                 for (k = 0; k < limit; k++) {
1216                         s = get_submitter((j + k) % nthreads);
1217
1218                         if (s->nr_files == MAX_FDS) {
1219                                 printf("Max number of files (%d) reached\n", MAX_FDS);
1220                                 break;
1221                         }
1222
1223                         memcpy(&s->files[s->nr_files], &f, sizeof(f));
1224
1225                         printf("Added file %s (submitter %d)\n", argv[i], s->index);
1226                         s->nr_files++;
1227                 }
1228                 threads_rem--;
1229                 i++;
1230                 j += limit;
1231         }
1232
1233         arm_sig_int();
1234
1235         for (j = 0; j < nthreads; j++) {
1236                 s = get_submitter(j);
1237                 for (i = 0; i < depth; i++) {
1238                         void *buf;
1239
1240                         if (posix_memalign(&buf, bs, bs)) {
1241                                 printf("failed alloc\n");
1242                                 return 1;
1243                         }
1244                         s->iovecs[i].iov_base = buf;
1245                         s->iovecs[i].iov_len = bs;
1246                 }
1247         }
1248
1249         for (j = 0; j < nthreads; j++) {
1250                 s = get_submitter(j);
1251
1252                 if (!aio)
1253                         err = setup_ring(s);
1254                 else
1255                         err = setup_aio(s);
1256                 if (err) {
1257                         printf("ring setup failed: %s, %d\n", strerror(errno), err);
1258                         return 1;
1259                 }
1260         }
1261         s = get_submitter(0);
1262         printf("polled=%d, fixedbufs=%d, register_files=%d, buffered=%d, QD=%d\n", polled, fixedbufs, register_files, buffered, depth);
1263         if (!aio)
1264                 printf("Engine=io_uring, sq_ring=%d, cq_ring=%d\n", *s->sq_ring.ring_entries, *s->cq_ring.ring_entries);
1265         else
1266                 printf("Engine=aio\n");
1267
1268         for (j = 0; j < nthreads; j++) {
1269                 s = get_submitter(j);
1270                 if (!aio)
1271                         pthread_create(&s->thread, NULL, submitter_uring_fn, s);
1272 #ifdef CONFIG_LIBAIO
1273                 else
1274                         pthread_create(&s->thread, NULL, submitter_aio_fn, s);
1275 #endif
1276         }
1277
1278         fdepths = malloc(8 * s->nr_files * nthreads);
1279         reap = calls = done = 0;
1280         do {
1281                 unsigned long this_done = 0;
1282                 unsigned long this_reap = 0;
1283                 unsigned long this_call = 0;
1284                 unsigned long rpc = 0, ipc = 0;
1285                 unsigned long iops, bw;
1286
1287                 sleep(1);
1288                 if (runtime && !--runtime)
1289                         do_finish("timeout");
1290
1291                 /* don't print partial run, if interrupted by signal */
1292                 if (finish)
1293                         break;
1294
1295                 /* one second in to the run, enable stats */
1296                 if (stats)
1297                         stats_running = 1;
1298
1299                 for (j = 0; j < nthreads; j++) {
1300                         this_done += s->done;
1301                         this_call += s->calls;
1302                         this_reap += s->reaps;
1303                 }
1304                 if (this_call - calls) {
1305                         rpc = (this_done - done) / (this_call - calls);
1306                         ipc = (this_reap - reap) / (this_call - calls);
1307                 } else
1308                         rpc = ipc = -1;
1309                 file_depths(fdepths);
1310                 iops = this_done - done;
1311                 if (bs > 1048576)
1312                         bw = iops * (bs / 1048576);
1313                 else
1314                         bw = iops / (1048576 / bs);
1315                 printf("IOPS=%lu, ", iops);
1316                 if (!do_nop)
1317                         printf("BW=%luMiB/s, ", bw);
1318                 printf("IOS/call=%ld/%ld, inflight=(%s)\n", rpc, ipc, fdepths);
1319                 done = this_done;
1320                 calls = this_call;
1321                 reap = this_reap;
1322         } while (!finish);
1323
1324         for (j = 0; j < nthreads; j++) {
1325                 s = get_submitter(j);
1326                 pthread_join(s->thread, &ret);
1327                 close(s->ring_fd);
1328
1329                 if (stats) {
1330                         unsigned long nr;
1331
1332                         printf("%d: Latency percentiles:\n", s->tid);
1333                         for (i = 0, nr = 0; i < PLAT_NR; i++)
1334                                 nr += s->plat[i];
1335                         show_clat_percentiles(s->plat, nr, 4);
1336                         free(s->clock_batch);
1337                         free(s->plat);
1338                 }
1339         }
1340
1341         free(fdepths);
1342         free(submitter);
1343         return 0;
1344 }