t/io_uring: support NUMA placement
[fio.git] / t / io_uring.c
1 #include <stdio.h>
2 #include <errno.h>
3 #include <assert.h>
4 #include <stdlib.h>
5 #include <stddef.h>
6 #include <signal.h>
7 #include <inttypes.h>
8 #include <math.h>
9
10 #ifdef CONFIG_LIBAIO
11 #include <libaio.h>
12 #endif
13
14 #ifdef CONFIG_LIBNUMA
15 #include <numa.h>
16 #endif
17
18 #include <sys/types.h>
19 #include <sys/stat.h>
20 #include <sys/ioctl.h>
21 #include <sys/syscall.h>
22 #include <sys/resource.h>
23 #include <sys/mman.h>
24 #include <sys/uio.h>
25 #include <linux/fs.h>
26 #include <fcntl.h>
27 #include <unistd.h>
28 #include <string.h>
29 #include <pthread.h>
30 #include <sched.h>
31
32 #include "../arch/arch.h"
33 #include "../lib/types.h"
34 #include "../lib/roundup.h"
35 #include "../lib/rand.h"
36 #include "../minmax.h"
37 #include "../os/linux/io_uring.h"
38
39 struct io_sq_ring {
40         unsigned *head;
41         unsigned *tail;
42         unsigned *ring_mask;
43         unsigned *ring_entries;
44         unsigned *flags;
45         unsigned *array;
46 };
47
48 struct io_cq_ring {
49         unsigned *head;
50         unsigned *tail;
51         unsigned *ring_mask;
52         unsigned *ring_entries;
53         struct io_uring_cqe *cqes;
54 };
55
56 #define DEPTH                   128
57 #define BATCH_SUBMIT            32
58 #define BATCH_COMPLETE          32
59 #define BS                      4096
60
61 #define MAX_FDS                 16
62
63 static unsigned sq_ring_mask, cq_ring_mask;
64
65 struct file {
66         unsigned long max_blocks;
67         unsigned long max_size;
68         unsigned long cur_off;
69         unsigned pending_ios;
70         int real_fd;
71         int fixed_fd;
72         int fileno;
73 };
74
75 #define PLAT_BITS               6
76 #define PLAT_VAL                (1 << PLAT_BITS)
77 #define PLAT_GROUP_NR           29
78 #define PLAT_NR                 (PLAT_GROUP_NR * PLAT_VAL)
79
80 struct submitter {
81         pthread_t thread;
82         int ring_fd;
83         int enter_ring_fd;
84         int index;
85         struct io_sq_ring sq_ring;
86         struct io_uring_sqe *sqes;
87         struct io_cq_ring cq_ring;
88         int inflight;
89         int tid;
90         unsigned long reaps;
91         unsigned long done;
92         unsigned long calls;
93         volatile int finish;
94
95         __s32 *fds;
96
97         struct taus258_state rand_state;
98
99         unsigned long *clock_batch;
100         int clock_index;
101         unsigned long *plat;
102
103 #ifdef CONFIG_LIBAIO
104         io_context_t aio_ctx;
105 #endif
106
107         int numa_node;
108         const char *filename;
109
110         struct file files[MAX_FDS];
111         unsigned nr_files;
112         unsigned cur_file;
113         struct iovec iovecs[];
114 };
115
116 static struct submitter *submitter;
117 static volatile int finish;
118 static int stats_running;
119 static unsigned long max_iops;
120 static long page_size;
121
122 static int depth = DEPTH;
123 static int batch_submit = BATCH_SUBMIT;
124 static int batch_complete = BATCH_COMPLETE;
125 static int bs = BS;
126 static int polled = 1;          /* use IO polling */
127 static int fixedbufs = 1;       /* use fixed user buffers */
128 static int dma_map;             /* pre-map DMA buffers */
129 static int register_files = 1;  /* use fixed files */
130 static int buffered = 0;        /* use buffered IO, not O_DIRECT */
131 static int sq_thread_poll = 0;  /* use kernel submission/poller thread */
132 static int sq_thread_cpu = -1;  /* pin above thread to this CPU */
133 static int do_nop = 0;          /* no-op SQ ring commands */
134 static int nthreads = 1;
135 static int stats = 0;           /* generate IO stats */
136 static int aio = 0;             /* use libaio */
137 static int runtime = 0;         /* runtime */
138 static int random_io = 1;       /* random or sequential IO */
139 static int register_ring = 1;   /* register ring */
140 static int use_sync = 0;        /* use preadv2 */
141 static int numa_placement = 0;  /* set to node of device */
142
143 static unsigned long tsc_rate;
144
145 #define TSC_RATE_FILE   "tsc-rate"
146
147 static int vectored = 1;
148
149 static float plist[] = { 1.0, 5.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0,
150                         80.0, 90.0, 95.0, 99.0, 99.5, 99.9, 99.95, 99.99 };
151 static int plist_len = 17;
152
153 #ifndef IORING_REGISTER_MAP_BUFFERS
154 #define IORING_REGISTER_MAP_BUFFERS     22
155 struct io_uring_map_buffers {
156         __s32   fd;
157         __u32   buf_start;
158         __u32   buf_end;
159         __u32   flags;
160         __u64   rsvd[2];
161 };
162 #endif
163
164 static unsigned long cycles_to_nsec(unsigned long cycles)
165 {
166         uint64_t val;
167
168         if (!tsc_rate)
169                 return cycles;
170
171         val = cycles * 1000000000ULL;
172         return val / tsc_rate;
173 }
174
175 static unsigned long plat_idx_to_val(unsigned int idx)
176 {
177         unsigned int error_bits;
178         unsigned long k, base;
179
180         assert(idx < PLAT_NR);
181
182         /* MSB <= (PLAT_BITS-1), cannot be rounded off. Use
183          * all bits of the sample as index */
184         if (idx < (PLAT_VAL << 1))
185                 return cycles_to_nsec(idx);
186
187         /* Find the group and compute the minimum value of that group */
188         error_bits = (idx >> PLAT_BITS) - 1;
189         base = ((unsigned long) 1) << (error_bits + PLAT_BITS);
190
191         /* Find its bucket number of the group */
192         k = idx % PLAT_VAL;
193
194         /* Return the mean of the range of the bucket */
195         return cycles_to_nsec(base + ((k + 0.5) * (1 << error_bits)));
196 }
197
198 unsigned int calc_clat_percentiles(unsigned long *io_u_plat, unsigned long nr,
199                                    unsigned long **output,
200                                    unsigned long *maxv, unsigned long *minv)
201 {
202         unsigned long sum = 0;
203         unsigned int len = plist_len, i, j = 0;
204         unsigned long *ovals = NULL;
205         bool is_last;
206
207         *minv = -1UL;
208         *maxv = 0;
209
210         ovals = malloc(len * sizeof(*ovals));
211         if (!ovals)
212                 return 0;
213
214         /*
215          * Calculate bucket values, note down max and min values
216          */
217         is_last = false;
218         for (i = 0; i < PLAT_NR && !is_last; i++) {
219                 sum += io_u_plat[i];
220                 while (sum >= ((long double) plist[j] / 100.0 * nr)) {
221                         assert(plist[j] <= 100.0);
222
223                         ovals[j] = plat_idx_to_val(i);
224                         if (ovals[j] < *minv)
225                                 *minv = ovals[j];
226                         if (ovals[j] > *maxv)
227                                 *maxv = ovals[j];
228
229                         is_last = (j == len - 1) != 0;
230                         if (is_last)
231                                 break;
232
233                         j++;
234                 }
235         }
236
237         if (!is_last)
238                 fprintf(stderr, "error calculating latency percentiles\n");
239
240         *output = ovals;
241         return len;
242 }
243
244 static void show_clat_percentiles(unsigned long *io_u_plat, unsigned long nr,
245                                   unsigned int precision)
246 {
247         unsigned int divisor, len, i, j = 0;
248         unsigned long minv, maxv;
249         unsigned long *ovals;
250         int per_line, scale_down, time_width;
251         bool is_last;
252         char fmt[32];
253
254         len = calc_clat_percentiles(io_u_plat, nr, &ovals, &maxv, &minv);
255         if (!len || !ovals)
256                 goto out;
257
258         if (!tsc_rate) {
259                 scale_down = 0;
260                 divisor = 1;
261                 printf("    percentiles (tsc ticks):\n     |");
262         } else if (minv > 2000 && maxv > 99999) {
263                 scale_down = 1;
264                 divisor = 1000;
265                 printf("    percentiles (usec):\n     |");
266         } else {
267                 scale_down = 0;
268                 divisor = 1;
269                 printf("    percentiles (nsec):\n     |");
270         }
271
272         time_width = max(5, (int) (log10(maxv / divisor) + 1));
273         snprintf(fmt, sizeof(fmt), " %%%u.%ufth=[%%%dllu]%%c", precision + 3,
274                         precision, time_width);
275         /* fmt will be something like " %5.2fth=[%4llu]%c" */
276         per_line = (80 - 7) / (precision + 10 + time_width);
277
278         for (j = 0; j < len; j++) {
279                 /* for formatting */
280                 if (j != 0 && (j % per_line) == 0)
281                         printf("     |");
282
283                 /* end of the list */
284                 is_last = (j == len - 1) != 0;
285
286                 for (i = 0; i < scale_down; i++)
287                         ovals[j] = (ovals[j] + 999) / 1000;
288
289                 printf(fmt, plist[j], ovals[j], is_last ? '\n' : ',');
290
291                 if (is_last)
292                         break;
293
294                 if ((j % per_line) == per_line - 1)     /* for formatting */
295                         printf("\n");
296         }
297
298 out:
299         free(ovals);
300 }
301
302 #ifdef ARCH_HAVE_CPU_CLOCK
303 static unsigned int plat_val_to_idx(unsigned long val)
304 {
305         unsigned int msb, error_bits, base, offset, idx;
306
307         /* Find MSB starting from bit 0 */
308         if (val == 0)
309                 msb = 0;
310         else
311                 msb = (sizeof(val)*8) - __builtin_clzll(val) - 1;
312
313         /*
314          * MSB <= (PLAT_BITS-1), cannot be rounded off. Use
315          * all bits of the sample as index
316          */
317         if (msb <= PLAT_BITS)
318                 return val;
319
320         /* Compute the number of error bits to discard*/
321         error_bits = msb - PLAT_BITS;
322
323         /* Compute the number of buckets before the group */
324         base = (error_bits + 1) << PLAT_BITS;
325
326         /*
327          * Discard the error bits and apply the mask to find the
328          * index for the buckets in the group
329          */
330         offset = (PLAT_VAL - 1) & (val >> error_bits);
331
332         /* Make sure the index does not exceed (array size - 1) */
333         idx = (base + offset) < (PLAT_NR - 1) ?
334                 (base + offset) : (PLAT_NR - 1);
335
336         return idx;
337 }
338 #endif
339
340 static void add_stat(struct submitter *s, int clock_index, int nr)
341 {
342 #ifdef ARCH_HAVE_CPU_CLOCK
343         unsigned long cycles;
344         unsigned int pidx;
345
346         if (!s->finish && clock_index) {
347                 cycles = get_cpu_clock();
348                 cycles -= s->clock_batch[clock_index];
349                 pidx = plat_val_to_idx(cycles);
350                 s->plat[pidx] += nr;
351         }
352 #endif
353 }
354
355 static int io_uring_map_buffers(struct submitter *s)
356 {
357         struct io_uring_map_buffers map = {
358                 .fd             = s->files[0].real_fd,
359                 .buf_end        = depth,
360         };
361
362         if (do_nop)
363                 return 0;
364         if (s->nr_files > 1)
365                 fprintf(stdout, "Mapping buffers may not work with multiple files\n");
366
367         return syscall(__NR_io_uring_register, s->ring_fd,
368                         IORING_REGISTER_MAP_BUFFERS, &map, 1);
369 }
370
371 static int io_uring_register_buffers(struct submitter *s)
372 {
373         if (do_nop)
374                 return 0;
375
376         return syscall(__NR_io_uring_register, s->ring_fd,
377                         IORING_REGISTER_BUFFERS, s->iovecs, roundup_pow2(depth));
378 }
379
380 static int io_uring_register_files(struct submitter *s)
381 {
382         int i;
383
384         if (do_nop)
385                 return 0;
386
387         s->fds = calloc(s->nr_files, sizeof(__s32));
388         for (i = 0; i < s->nr_files; i++) {
389                 s->fds[i] = s->files[i].real_fd;
390                 s->files[i].fixed_fd = i;
391         }
392
393         return syscall(__NR_io_uring_register, s->ring_fd,
394                         IORING_REGISTER_FILES, s->fds, s->nr_files);
395 }
396
397 static int io_uring_setup(unsigned entries, struct io_uring_params *p)
398 {
399         /*
400          * Clamp CQ ring size at our SQ ring size, we don't need more entries
401          * than that.
402          */
403         p->flags |= IORING_SETUP_CQSIZE;
404         p->cq_entries = entries;
405
406         return syscall(__NR_io_uring_setup, entries, p);
407 }
408
409 static void io_uring_probe(int fd)
410 {
411         struct io_uring_probe *p;
412         int ret;
413
414         p = malloc(sizeof(*p) + 256 * sizeof(struct io_uring_probe_op));
415         if (!p)
416                 return;
417
418         memset(p, 0, sizeof(*p) + 256 * sizeof(struct io_uring_probe_op));
419         ret = syscall(__NR_io_uring_register, fd, IORING_REGISTER_PROBE, p, 256);
420         if (ret < 0)
421                 goto out;
422
423         if (IORING_OP_READ > p->ops_len)
424                 goto out;
425
426         if ((p->ops[IORING_OP_READ].flags & IO_URING_OP_SUPPORTED))
427                 vectored = 0;
428 out:
429         free(p);
430 }
431
432 static int io_uring_enter(struct submitter *s, unsigned int to_submit,
433                           unsigned int min_complete, unsigned int flags)
434 {
435         if (register_ring)
436                 flags |= IORING_ENTER_REGISTERED_RING;
437 #ifdef FIO_ARCH_HAS_SYSCALL
438         return __do_syscall6(__NR_io_uring_enter, s->enter_ring_fd, to_submit,
439                                 min_complete, flags, NULL, 0);
440 #else
441         return syscall(__NR_io_uring_enter, s->enter_ring_fd, to_submit,
442                         min_complete, flags, NULL, 0);
443 #endif
444 }
445
446 #ifndef CONFIG_HAVE_GETTID
447 static int gettid(void)
448 {
449         return syscall(__NR_gettid);
450 }
451 #endif
452
453 static unsigned file_depth(struct submitter *s)
454 {
455         return (depth + s->nr_files - 1) / s->nr_files;
456 }
457
458 static void init_io(struct submitter *s, unsigned index)
459 {
460         struct io_uring_sqe *sqe = &s->sqes[index];
461         unsigned long offset;
462         struct file *f;
463         long r;
464
465         if (do_nop) {
466                 sqe->opcode = IORING_OP_NOP;
467                 return;
468         }
469
470         if (s->nr_files == 1) {
471                 f = &s->files[0];
472         } else {
473                 f = &s->files[s->cur_file];
474                 if (f->pending_ios >= file_depth(s)) {
475                         s->cur_file++;
476                         if (s->cur_file == s->nr_files)
477                                 s->cur_file = 0;
478                         f = &s->files[s->cur_file];
479                 }
480         }
481         f->pending_ios++;
482
483         if (random_io) {
484                 r = __rand64(&s->rand_state);
485                 offset = (r % (f->max_blocks - 1)) * bs;
486         } else {
487                 offset = f->cur_off;
488                 f->cur_off += bs;
489                 if (f->cur_off + bs > f->max_size)
490                         f->cur_off = 0;
491         }
492
493         if (register_files) {
494                 sqe->flags = IOSQE_FIXED_FILE;
495                 sqe->fd = f->fixed_fd;
496         } else {
497                 sqe->flags = 0;
498                 sqe->fd = f->real_fd;
499         }
500         if (fixedbufs) {
501                 sqe->opcode = IORING_OP_READ_FIXED;
502                 sqe->addr = (unsigned long) s->iovecs[index].iov_base;
503                 sqe->len = bs;
504                 sqe->buf_index = index;
505         } else if (!vectored) {
506                 sqe->opcode = IORING_OP_READ;
507                 sqe->addr = (unsigned long) s->iovecs[index].iov_base;
508                 sqe->len = bs;
509                 sqe->buf_index = 0;
510         } else {
511                 sqe->opcode = IORING_OP_READV;
512                 sqe->addr = (unsigned long) &s->iovecs[index];
513                 sqe->len = 1;
514                 sqe->buf_index = 0;
515         }
516         sqe->ioprio = 0;
517         sqe->off = offset;
518         sqe->user_data = (unsigned long) f->fileno;
519         if (stats && stats_running)
520                 sqe->user_data |= ((uint64_t)s->clock_index << 32);
521 }
522
523 static int prep_more_ios_uring(struct submitter *s, int max_ios)
524 {
525         struct io_sq_ring *ring = &s->sq_ring;
526         unsigned index, tail, next_tail, prepped = 0;
527
528         next_tail = tail = *ring->tail;
529         do {
530                 next_tail++;
531                 if (next_tail == atomic_load_acquire(ring->head))
532                         break;
533
534                 index = tail & sq_ring_mask;
535                 init_io(s, index);
536                 ring->array[index] = index;
537                 prepped++;
538                 tail = next_tail;
539         } while (prepped < max_ios);
540
541         if (prepped)
542                 atomic_store_release(ring->tail, tail);
543         return prepped;
544 }
545
546 static int get_file_size(struct file *f)
547 {
548         struct stat st;
549
550         if (fstat(f->real_fd, &st) < 0)
551                 return -1;
552         if (S_ISBLK(st.st_mode)) {
553                 unsigned long long bytes;
554
555                 if (ioctl(f->real_fd, BLKGETSIZE64, &bytes) != 0)
556                         return -1;
557
558                 f->max_blocks = bytes / bs;
559                 f->max_size = bytes;
560                 return 0;
561         } else if (S_ISREG(st.st_mode)) {
562                 f->max_blocks = st.st_size / bs;
563                 f->max_size = st.st_size;
564                 return 0;
565         }
566
567         return -1;
568 }
569
570 static int reap_events_uring(struct submitter *s)
571 {
572         struct io_cq_ring *ring = &s->cq_ring;
573         struct io_uring_cqe *cqe;
574         unsigned head, reaped = 0;
575         int last_idx = -1, stat_nr = 0;
576
577         head = *ring->head;
578         do {
579                 struct file *f;
580
581                 read_barrier();
582                 if (head == atomic_load_acquire(ring->tail))
583                         break;
584                 cqe = &ring->cqes[head & cq_ring_mask];
585                 if (!do_nop) {
586                         int fileno = cqe->user_data & 0xffffffff;
587
588                         f = &s->files[fileno];
589                         f->pending_ios--;
590                         if (cqe->res != bs) {
591                                 printf("io: unexpected ret=%d\n", cqe->res);
592                                 if (polled && cqe->res == -EOPNOTSUPP)
593                                         printf("Your filesystem/driver/kernel doesn't support polled IO\n");
594                                 return -1;
595                         }
596                 }
597                 if (stats) {
598                         int clock_index = cqe->user_data >> 32;
599
600                         if (last_idx != clock_index) {
601                                 if (last_idx != -1) {
602                                         add_stat(s, last_idx, stat_nr);
603                                         stat_nr = 0;
604                                 }
605                                 last_idx = clock_index;
606                         }
607                         stat_nr++;
608                 }
609                 reaped++;
610                 head++;
611         } while (1);
612
613         if (stat_nr)
614                 add_stat(s, last_idx, stat_nr);
615
616         if (reaped) {
617                 s->inflight -= reaped;
618                 atomic_store_release(ring->head, head);
619         }
620         return reaped;
621 }
622
623 static void set_affinity(struct submitter *s)
624 {
625 #ifdef CONFIG_LIBNUMA
626         struct bitmask *mask;
627
628         if (s->numa_node == -1)
629                 return;
630
631         numa_set_preferred(s->numa_node);
632
633         mask = numa_allocate_cpumask();
634         numa_node_to_cpus(s->numa_node, mask);
635         numa_sched_setaffinity(s->tid, mask);
636 #endif
637 }
638
639 static int detect_node(struct submitter *s, const char *name)
640 {
641 #ifdef CONFIG_LIBNUMA
642         const char *base = basename(name);
643         char str[128];
644         int ret, fd, node;
645
646         sprintf(str, "/sys/block/%s/device/numa_node", base);
647         fd = open(str, O_RDONLY);
648         if (fd < 0)
649                 return -1;
650
651         ret = read(fd, str, sizeof(str));
652         if (ret < 0) {
653                 close(fd);
654                 return -1;
655         }
656         node = atoi(str);
657         s->numa_node = node;
658         close(fd);
659 #else
660         s->numa_node = -1;
661 #endif
662         return 0;
663 }
664
665 static int setup_aio(struct submitter *s)
666 {
667 #ifdef CONFIG_LIBAIO
668         if (polled) {
669                 fprintf(stderr, "aio does not support polled IO\n");
670                 polled = 0;
671         }
672         if (sq_thread_poll) {
673                 fprintf(stderr, "aio does not support SQPOLL IO\n");
674                 sq_thread_poll = 0;
675         }
676         if (do_nop) {
677                 fprintf(stderr, "aio does not support polled IO\n");
678                 do_nop = 0;
679         }
680         if (fixedbufs || register_files) {
681                 fprintf(stderr, "aio does not support registered files or buffers\n");
682                 fixedbufs = register_files = 0;
683         }
684
685         return io_queue_init(roundup_pow2(depth), &s->aio_ctx);
686 #else
687         fprintf(stderr, "Legacy AIO not available on this system/build\n");
688         errno = EINVAL;
689         return -1;
690 #endif
691 }
692
693 static int setup_ring(struct submitter *s)
694 {
695         struct io_sq_ring *sring = &s->sq_ring;
696         struct io_cq_ring *cring = &s->cq_ring;
697         struct io_uring_params p;
698         int ret, fd;
699         void *ptr;
700
701         memset(&p, 0, sizeof(p));
702
703         if (polled && !do_nop)
704                 p.flags |= IORING_SETUP_IOPOLL;
705         if (sq_thread_poll) {
706                 p.flags |= IORING_SETUP_SQPOLL;
707                 if (sq_thread_cpu != -1) {
708                         p.flags |= IORING_SETUP_SQ_AFF;
709                         p.sq_thread_cpu = sq_thread_cpu;
710                 }
711         }
712
713         fd = io_uring_setup(depth, &p);
714         if (fd < 0) {
715                 perror("io_uring_setup");
716                 return 1;
717         }
718         s->ring_fd = s->enter_ring_fd = fd;
719
720         io_uring_probe(fd);
721
722         if (fixedbufs) {
723                 struct rlimit rlim;
724
725                 rlim.rlim_cur = RLIM_INFINITY;
726                 rlim.rlim_max = RLIM_INFINITY;
727                 /* ignore potential error, not needed on newer kernels */
728                 setrlimit(RLIMIT_MEMLOCK, &rlim);
729
730                 ret = io_uring_register_buffers(s);
731                 if (ret < 0) {
732                         perror("io_uring_register_buffers");
733                         return 1;
734                 }
735
736                 if (dma_map) {
737                         ret = io_uring_map_buffers(s);
738                         if (ret < 0) {
739                                 perror("io_uring_map_buffers");
740                                 return 1;
741                         }
742                 }
743         }
744
745         if (register_files) {
746                 ret = io_uring_register_files(s);
747                 if (ret < 0) {
748                         perror("io_uring_register_files");
749                         return 1;
750                 }
751         }
752
753         ptr = mmap(0, p.sq_off.array + p.sq_entries * sizeof(__u32),
754                         PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
755                         IORING_OFF_SQ_RING);
756         sring->head = ptr + p.sq_off.head;
757         sring->tail = ptr + p.sq_off.tail;
758         sring->ring_mask = ptr + p.sq_off.ring_mask;
759         sring->ring_entries = ptr + p.sq_off.ring_entries;
760         sring->flags = ptr + p.sq_off.flags;
761         sring->array = ptr + p.sq_off.array;
762         sq_ring_mask = *sring->ring_mask;
763
764         s->sqes = mmap(0, p.sq_entries * sizeof(struct io_uring_sqe),
765                         PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
766                         IORING_OFF_SQES);
767
768         ptr = mmap(0, p.cq_off.cqes + p.cq_entries * sizeof(struct io_uring_cqe),
769                         PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
770                         IORING_OFF_CQ_RING);
771         cring->head = ptr + p.cq_off.head;
772         cring->tail = ptr + p.cq_off.tail;
773         cring->ring_mask = ptr + p.cq_off.ring_mask;
774         cring->ring_entries = ptr + p.cq_off.ring_entries;
775         cring->cqes = ptr + p.cq_off.cqes;
776         cq_ring_mask = *cring->ring_mask;
777         return 0;
778 }
779
780 static void *allocate_mem(struct submitter *s, int size)
781 {
782         void *buf;
783
784 #ifdef CONFIG_LIBNUMA
785         if (s->numa_node != -1)
786                 return numa_alloc_onnode(size, s->numa_node);
787 #endif
788
789         if (posix_memalign(&buf, page_size, bs)) {
790                 printf("failed alloc\n");
791                 return NULL;
792         }
793
794         return buf;
795 }
796
797 static int submitter_init(struct submitter *s)
798 {
799         int i, nr_batch, err;
800         static int init_printed;
801         char buf[80];
802
803         s->tid = gettid();
804         printf("submitter=%d, tid=%d, file=%s, node=%d\n", s->index, s->tid,
805                                                         s->filename, s->numa_node);
806
807         set_affinity(s);
808
809         __init_rand64(&s->rand_state, pthread_self());
810         srand48(pthread_self());
811
812         for (i = 0; i < MAX_FDS; i++)
813                 s->files[i].fileno = i;
814
815         for (i = 0; i < roundup_pow2(depth); i++) {
816                 void *buf;
817
818                 buf = allocate_mem(s, bs);
819                 if (!buf)
820                         return 1;
821                 s->iovecs[i].iov_base = buf;
822                 s->iovecs[i].iov_len = bs;
823         }
824
825         if (use_sync) {
826                 sprintf(buf, "Engine=preadv2\n");
827                 err = 0;
828         } else if (!aio) {
829                 err = setup_ring(s);
830                 sprintf(buf, "Engine=io_uring, sq_ring=%d, cq_ring=%d\n", *s->sq_ring.ring_entries, *s->cq_ring.ring_entries);
831         } else {
832                 sprintf(buf, "Engine=aio\n");
833                 err = setup_aio(s);
834         }
835         if (err) {
836                 printf("queue setup failed: %s, %d\n", strerror(errno), err);
837                 return 1;
838         }
839
840         if (!init_printed) {
841                 printf("polled=%d, fixedbufs=%d/%d, register_files=%d, buffered=%d, QD=%d\n", polled, fixedbufs, dma_map, register_files, buffered, depth);
842                 printf("%s", buf);
843                 init_printed = 1;
844         }
845
846         if (stats) {
847                 nr_batch = roundup_pow2(depth / batch_submit);
848                 if (nr_batch < 2)
849                         nr_batch = 2;
850                 s->clock_batch = calloc(nr_batch, sizeof(unsigned long));
851                 s->clock_index = 1;
852
853                 s->plat = calloc(PLAT_NR, sizeof(unsigned long));
854         } else {
855                 s->clock_batch = NULL;
856                 s->plat = NULL;
857                 nr_batch = 0;
858         }
859
860         return nr_batch;
861 }
862
863 #ifdef CONFIG_LIBAIO
864 static int prep_more_ios_aio(struct submitter *s, int max_ios, struct iocb *iocbs)
865 {
866         uint64_t data;
867         long long offset;
868         struct file *f;
869         unsigned index;
870         long r;
871
872         index = 0;
873         while (index < max_ios) {
874                 struct iocb *iocb = &iocbs[index];
875
876                 if (s->nr_files == 1) {
877                         f = &s->files[0];
878                 } else {
879                         f = &s->files[s->cur_file];
880                         if (f->pending_ios >= file_depth(s)) {
881                                 s->cur_file++;
882                                 if (s->cur_file == s->nr_files)
883                                         s->cur_file = 0;
884                                 f = &s->files[s->cur_file];
885                         }
886                 }
887                 f->pending_ios++;
888
889                 r = lrand48();
890                 offset = (r % (f->max_blocks - 1)) * bs;
891                 io_prep_pread(iocb, f->real_fd, s->iovecs[index].iov_base,
892                                 s->iovecs[index].iov_len, offset);
893
894                 data = f->fileno;
895                 if (stats && stats_running)
896                         data |= (((uint64_t) s->clock_index) << 32);
897                 iocb->data = (void *) (uintptr_t) data;
898                 index++;
899         }
900         return index;
901 }
902
903 static int reap_events_aio(struct submitter *s, struct io_event *events, int evs)
904 {
905         int last_idx = -1, stat_nr = 0;
906         int reaped = 0;
907
908         while (evs) {
909                 uint64_t data = (uintptr_t) events[reaped].data;
910                 struct file *f = &s->files[data & 0xffffffff];
911
912                 f->pending_ios--;
913                 if (events[reaped].res != bs) {
914                         printf("io: unexpected ret=%ld\n", events[reaped].res);
915                         return -1;
916                 }
917                 if (stats) {
918                         int clock_index = data >> 32;
919
920                         if (last_idx != clock_index) {
921                                 if (last_idx != -1) {
922                                         add_stat(s, last_idx, stat_nr);
923                                         stat_nr = 0;
924                                 }
925                                 last_idx = clock_index;
926                         }
927                         stat_nr++;
928                 }
929                 reaped++;
930                 evs--;
931         }
932
933         if (stat_nr)
934                 add_stat(s, last_idx, stat_nr);
935
936         s->inflight -= reaped;
937         s->done += reaped;
938         return reaped;
939 }
940
941 static void *submitter_aio_fn(void *data)
942 {
943         struct submitter *s = data;
944         int i, ret, prepped;
945         struct iocb **iocbsptr;
946         struct iocb *iocbs;
947         struct io_event *events;
948 #ifdef ARCH_HAVE_CPU_CLOCK
949         int nr_batch = submitter_init(s);
950 #else
951         submitter_init(s);
952 #endif
953
954         iocbsptr = calloc(depth, sizeof(struct iocb *));
955         iocbs = calloc(depth, sizeof(struct iocb));
956         events = calloc(depth, sizeof(struct io_event));
957
958         for (i = 0; i < depth; i++)
959                 iocbsptr[i] = &iocbs[i];
960
961         prepped = 0;
962         do {
963                 int to_wait, to_submit, to_prep;
964
965                 if (!prepped && s->inflight < depth) {
966                         to_prep = min(depth - s->inflight, batch_submit);
967                         prepped = prep_more_ios_aio(s, to_prep, iocbs);
968 #ifdef ARCH_HAVE_CPU_CLOCK
969                         if (prepped && stats) {
970                                 s->clock_batch[s->clock_index] = get_cpu_clock();
971                                 s->clock_index = (s->clock_index + 1) & (nr_batch - 1);
972                         }
973 #endif
974                 }
975                 s->inflight += prepped;
976                 to_submit = prepped;
977
978                 if (to_submit && (s->inflight + to_submit <= depth))
979                         to_wait = 0;
980                 else
981                         to_wait = min(s->inflight + to_submit, batch_complete);
982
983                 ret = io_submit(s->aio_ctx, to_submit, iocbsptr);
984                 s->calls++;
985                 if (ret < 0) {
986                         perror("io_submit");
987                         break;
988                 } else if (ret != to_submit) {
989                         printf("submitted %d, wanted %d\n", ret, to_submit);
990                         break;
991                 }
992                 prepped = 0;
993
994                 while (to_wait) {
995                         int r;
996
997                         s->calls++;
998                         r = io_getevents(s->aio_ctx, to_wait, to_wait, events, NULL);
999                         if (r < 0) {
1000                                 perror("io_getevents");
1001                                 break;
1002                         } else if (r != to_wait) {
1003                                 printf("r=%d, wait=%d\n", r, to_wait);
1004                                 break;
1005                         }
1006                         r = reap_events_aio(s, events, r);
1007                         s->reaps += r;
1008                         to_wait -= r;
1009                 }
1010         } while (!s->finish);
1011
1012         free(iocbsptr);
1013         free(iocbs);
1014         free(events);
1015         finish = 1;
1016         return NULL;
1017 }
1018 #endif
1019
1020 static void io_uring_unregister_ring(struct submitter *s)
1021 {
1022         struct io_uring_rsrc_update up = {
1023                 .offset = s->enter_ring_fd,
1024         };
1025
1026         syscall(__NR_io_uring_register, s->ring_fd, IORING_UNREGISTER_RING_FDS,
1027                 &up, 1);
1028 }
1029
1030 static int io_uring_register_ring(struct submitter *s)
1031 {
1032         struct io_uring_rsrc_update up = {
1033                 .data   = s->ring_fd,
1034                 .offset = -1U,
1035         };
1036         int ret;
1037
1038         ret = syscall(__NR_io_uring_register, s->ring_fd,
1039                         IORING_REGISTER_RING_FDS, &up, 1);
1040         if (ret == 1) {
1041                 s->enter_ring_fd = up.offset;
1042                 return 0;
1043         }
1044         register_ring = 0;
1045         return -1;
1046 }
1047
1048 static void *submitter_uring_fn(void *data)
1049 {
1050         struct submitter *s = data;
1051         struct io_sq_ring *ring = &s->sq_ring;
1052         int ret, prepped;
1053 #ifdef ARCH_HAVE_CPU_CLOCK
1054         int nr_batch = submitter_init(s);
1055 #else
1056         submitter_init(s);
1057 #endif
1058
1059         if (register_ring)
1060                 io_uring_register_ring(s);
1061
1062         prepped = 0;
1063         do {
1064                 int to_wait, to_submit, this_reap, to_prep;
1065                 unsigned ring_flags = 0;
1066
1067                 if (!prepped && s->inflight < depth) {
1068                         to_prep = min(depth - s->inflight, batch_submit);
1069                         prepped = prep_more_ios_uring(s, to_prep);
1070 #ifdef ARCH_HAVE_CPU_CLOCK
1071                         if (prepped && stats) {
1072                                 s->clock_batch[s->clock_index] = get_cpu_clock();
1073                                 s->clock_index = (s->clock_index + 1) & (nr_batch - 1);
1074                         }
1075 #endif
1076                 }
1077                 s->inflight += prepped;
1078 submit_more:
1079                 to_submit = prepped;
1080 submit:
1081                 if (to_submit && (s->inflight + to_submit <= depth))
1082                         to_wait = 0;
1083                 else
1084                         to_wait = min(s->inflight + to_submit, batch_complete);
1085
1086                 /*
1087                  * Only need to call io_uring_enter if we're not using SQ thread
1088                  * poll, or if IORING_SQ_NEED_WAKEUP is set.
1089                  */
1090                 if (sq_thread_poll)
1091                         ring_flags = atomic_load_acquire(ring->flags);
1092                 if (!sq_thread_poll || ring_flags & IORING_SQ_NEED_WAKEUP) {
1093                         unsigned flags = 0;
1094
1095                         if (to_wait)
1096                                 flags = IORING_ENTER_GETEVENTS;
1097                         if (ring_flags & IORING_SQ_NEED_WAKEUP)
1098                                 flags |= IORING_ENTER_SQ_WAKEUP;
1099                         ret = io_uring_enter(s, to_submit, to_wait, flags);
1100                         s->calls++;
1101                 } else {
1102                         /* for SQPOLL, we submitted it all effectively */
1103                         ret = to_submit;
1104                 }
1105
1106                 /*
1107                  * For non SQ thread poll, we already got the events we needed
1108                  * through the io_uring_enter() above. For SQ thread poll, we
1109                  * need to loop here until we find enough events.
1110                  */
1111                 this_reap = 0;
1112                 do {
1113                         int r;
1114
1115                         r = reap_events_uring(s);
1116                         if (r == -1) {
1117                                 s->finish = 1;
1118                                 break;
1119                         } else if (r > 0)
1120                                 this_reap += r;
1121                 } while (sq_thread_poll && this_reap < to_wait);
1122                 s->reaps += this_reap;
1123
1124                 if (ret >= 0) {
1125                         if (!ret) {
1126                                 to_submit = 0;
1127                                 if (s->inflight)
1128                                         goto submit;
1129                                 continue;
1130                         } else if (ret < to_submit) {
1131                                 int diff = to_submit - ret;
1132
1133                                 s->done += ret;
1134                                 prepped -= diff;
1135                                 goto submit_more;
1136                         }
1137                         s->done += ret;
1138                         prepped = 0;
1139                         continue;
1140                 } else if (ret < 0) {
1141                         if (errno == EAGAIN) {
1142                                 if (s->finish)
1143                                         break;
1144                                 if (this_reap)
1145                                         goto submit;
1146                                 to_submit = 0;
1147                                 goto submit;
1148                         }
1149                         printf("io_submit: %s\n", strerror(errno));
1150                         break;
1151                 }
1152         } while (!s->finish);
1153
1154         if (register_ring)
1155                 io_uring_unregister_ring(s);
1156
1157         finish = 1;
1158         return NULL;
1159 }
1160
1161 #ifdef CONFIG_PWRITEV2
1162 static void *submitter_sync_fn(void *data)
1163 {
1164         struct submitter *s = data;
1165         int ret;
1166
1167         submitter_init(s);
1168
1169         do {
1170                 uint64_t offset;
1171                 struct file *f;
1172                 long r;
1173
1174                 if (s->nr_files == 1) {
1175                         f = &s->files[0];
1176                 } else {
1177                         f = &s->files[s->cur_file];
1178                         if (f->pending_ios >= file_depth(s)) {
1179                                 s->cur_file++;
1180                                 if (s->cur_file == s->nr_files)
1181                                         s->cur_file = 0;
1182                                 f = &s->files[s->cur_file];
1183                         }
1184                 }
1185                 f->pending_ios++;
1186
1187                 if (random_io) {
1188                         r = __rand64(&s->rand_state);
1189                         offset = (r % (f->max_blocks - 1)) * bs;
1190                 } else {
1191                         offset = f->cur_off;
1192                         f->cur_off += bs;
1193                         if (f->cur_off + bs > f->max_size)
1194                                 f->cur_off = 0;
1195                 }
1196
1197 #ifdef ARCH_HAVE_CPU_CLOCK
1198                 if (stats)
1199                         s->clock_batch[s->clock_index] = get_cpu_clock();
1200 #endif
1201
1202                 s->inflight++;
1203                 s->calls++;
1204
1205                 if (polled)
1206                         ret = preadv2(f->real_fd, &s->iovecs[0], 1, offset, RWF_HIPRI);
1207                 else
1208                         ret = preadv2(f->real_fd, &s->iovecs[0], 1, offset, 0);
1209
1210                 if (ret < 0) {
1211                         perror("preadv2");
1212                         break;
1213                 } else if (ret != bs) {
1214                         break;
1215                 }
1216
1217                 s->done++;
1218                 s->inflight--;
1219                 f->pending_ios--;
1220                 if (stats)
1221                         add_stat(s, s->clock_index, 1);
1222         } while (!s->finish);
1223
1224         finish = 1;
1225         return NULL;
1226 }
1227 #else
1228 static void *submitter_sync_fn(void *data)
1229 {
1230         finish = 1;
1231         return NULL;
1232 }
1233 #endif
1234
1235 static struct submitter *get_submitter(int offset)
1236 {
1237         void *ret;
1238
1239         ret = submitter;
1240         if (offset)
1241                 ret += offset * (sizeof(*submitter) + depth * sizeof(struct iovec));
1242         return ret;
1243 }
1244
1245 static void do_finish(const char *reason)
1246 {
1247         int j;
1248
1249         printf("Exiting on %s\n", reason);
1250         for (j = 0; j < nthreads; j++) {
1251                 struct submitter *s = get_submitter(j);
1252                 s->finish = 1;
1253         }
1254         if (max_iops > 1000000) {
1255                 double miops = (double) max_iops / 1000000.0;
1256                 printf("Maximum IOPS=%.2fM\n", miops);
1257         } else if (max_iops > 100000) {
1258                 double kiops = (double) max_iops / 1000.0;
1259                 printf("Maximum IOPS=%.2fK\n", kiops);
1260         } else {
1261                 printf("Maximum IOPS=%lu\n", max_iops);
1262         }
1263         finish = 1;
1264 }
1265
1266 static void sig_int(int sig)
1267 {
1268         do_finish("signal");
1269 }
1270
1271 static void arm_sig_int(void)
1272 {
1273         struct sigaction act;
1274
1275         memset(&act, 0, sizeof(act));
1276         act.sa_handler = sig_int;
1277         act.sa_flags = SA_RESTART;
1278         sigaction(SIGINT, &act, NULL);
1279
1280         /* Windows uses SIGBREAK as a quit signal from other applications */
1281 #ifdef WIN32
1282         sigaction(SIGBREAK, &act, NULL);
1283 #endif
1284 }
1285
1286 static void usage(char *argv, int status)
1287 {
1288         char runtime_str[16];
1289         snprintf(runtime_str, sizeof(runtime_str), "%d", runtime);
1290         printf("%s [options] -- [filenames]\n"
1291                 " -d <int>  : IO Depth, default %d\n"
1292                 " -s <int>  : Batch submit, default %d\n"
1293                 " -c <int>  : Batch complete, default %d\n"
1294                 " -b <int>  : Block size, default %d\n"
1295                 " -p <bool> : Polled IO, default %d\n"
1296                 " -B <bool> : Fixed buffers, default %d\n"
1297                 " -D <bool> : DMA map fixed buffers, default %d\n"
1298                 " -F <bool> : Register files, default %d\n"
1299                 " -n <int>  : Number of threads, default %d\n"
1300                 " -O <bool> : Use O_DIRECT, default %d\n"
1301                 " -N <bool> : Perform just no-op requests, default %d\n"
1302                 " -t <bool> : Track IO latencies, default %d\n"
1303                 " -T <int>  : TSC rate in HZ\n"
1304                 " -r <int>  : Runtime in seconds, default %s\n"
1305                 " -R <bool> : Use random IO, default %d\n"
1306                 " -a <bool> : Use legacy aio, default %d\n"
1307                 " -S <bool> : Use sync IO (preadv2), default %d\n"
1308                 " -X <bool> : Use registered ring %d\n"
1309                 " -P <bool> : Automatically place on device home node %d\n",
1310                 argv, DEPTH, BATCH_SUBMIT, BATCH_COMPLETE, BS, polled,
1311                 fixedbufs, dma_map, register_files, nthreads, !buffered, do_nop,
1312                 stats, runtime == 0 ? "unlimited" : runtime_str, random_io, aio,
1313                 use_sync, register_ring, numa_placement);
1314         exit(status);
1315 }
1316
1317 static void read_tsc_rate(void)
1318 {
1319         char buffer[32];
1320         int fd, ret;
1321
1322         if (tsc_rate)
1323                 return;
1324
1325         fd = open(TSC_RATE_FILE, O_RDONLY);
1326         if (fd < 0)
1327                 return;
1328
1329         ret = read(fd, buffer, sizeof(buffer));
1330         if (ret < 0) {
1331                 close(fd);
1332                 return;
1333         }
1334
1335         tsc_rate = strtoul(buffer, NULL, 10);
1336         printf("Using TSC rate %luHz\n", tsc_rate);
1337         close(fd);
1338 }
1339
1340 static void write_tsc_rate(void)
1341 {
1342         char buffer[32];
1343         struct stat sb;
1344         int fd, ret;
1345
1346         if (!stat(TSC_RATE_FILE, &sb))
1347                 return;
1348
1349         fd = open(TSC_RATE_FILE, O_WRONLY | O_CREAT, 0644);
1350         if (fd < 0)
1351                 return;
1352
1353         memset(buffer, 0, sizeof(buffer));
1354         sprintf(buffer, "%lu", tsc_rate);
1355         ret = write(fd, buffer, strlen(buffer));
1356         if (ret < 0)
1357                 perror("write");
1358         close(fd);
1359 }
1360
1361 int main(int argc, char *argv[])
1362 {
1363         struct submitter *s;
1364         unsigned long done, calls, reap;
1365         int i, j, flags, fd, opt, threads_per_f, threads_rem = 0, nfiles;
1366         struct file f;
1367         void *ret;
1368
1369         if (!do_nop && argc < 2)
1370                 usage(argv[0], 1);
1371
1372         while ((opt = getopt(argc, argv, "d:s:c:b:p:B:F:n:N:O:t:T:a:r:D:R:X:S:P:h?")) != -1) {
1373                 switch (opt) {
1374                 case 'a':
1375                         aio = !!atoi(optarg);
1376                         break;
1377                 case 'd':
1378                         depth = atoi(optarg);
1379                         break;
1380                 case 's':
1381                         batch_submit = atoi(optarg);
1382                         if (!batch_submit)
1383                                 batch_submit = 1;
1384                         break;
1385                 case 'c':
1386                         batch_complete = atoi(optarg);
1387                         if (!batch_complete)
1388                                 batch_complete = 1;
1389                         break;
1390                 case 'b':
1391                         bs = atoi(optarg);
1392                         break;
1393                 case 'p':
1394                         polled = !!atoi(optarg);
1395                         break;
1396                 case 'B':
1397                         fixedbufs = !!atoi(optarg);
1398                         break;
1399                 case 'F':
1400                         register_files = !!atoi(optarg);
1401                         break;
1402                 case 'n':
1403                         nthreads = atoi(optarg);
1404                         if (!nthreads) {
1405                                 printf("Threads must be non-zero\n");
1406                                 usage(argv[0], 1);
1407                         }
1408                         break;
1409                 case 'N':
1410                         do_nop = !!atoi(optarg);
1411                         break;
1412                 case 'O':
1413                         buffered = !atoi(optarg);
1414                         break;
1415                 case 't':
1416 #ifndef ARCH_HAVE_CPU_CLOCK
1417                         fprintf(stderr, "Stats not supported on this CPU\n");
1418                         return 1;
1419 #endif
1420                         stats = !!atoi(optarg);
1421                         break;
1422                 case 'T':
1423 #ifndef ARCH_HAVE_CPU_CLOCK
1424                         fprintf(stderr, "Stats not supported on this CPU\n");
1425                         return 1;
1426 #endif
1427                         tsc_rate = strtoul(optarg, NULL, 10);
1428                         write_tsc_rate();
1429                         break;
1430                 case 'r':
1431                         runtime = atoi(optarg);
1432                         break;
1433                 case 'D':
1434                         dma_map = !!atoi(optarg);
1435                         break;
1436                 case 'R':
1437                         random_io = !!atoi(optarg);
1438                         break;
1439                 case 'X':
1440                         register_ring = !!atoi(optarg);
1441                         break;
1442                 case 'S':
1443 #ifdef CONFIG_PWRITEV2
1444                         use_sync = !!atoi(optarg);
1445 #else
1446                         fprintf(stderr, "preadv2 not supported\n");
1447                         exit(1);
1448 #endif
1449                         break;
1450                 case 'P':
1451                         numa_placement = !!atoi(optarg);
1452                         break;
1453                 case 'h':
1454                 case '?':
1455                 default:
1456                         usage(argv[0], 0);
1457                         break;
1458                 }
1459         }
1460
1461         if (stats)
1462                 read_tsc_rate();
1463
1464         if (batch_complete > depth)
1465                 batch_complete = depth;
1466         if (batch_submit > depth)
1467                 batch_submit = depth;
1468         if (!fixedbufs && dma_map)
1469                 dma_map = 0;
1470
1471         submitter = calloc(nthreads, sizeof(*submitter) +
1472                                 roundup_pow2(depth) * sizeof(struct iovec));
1473         for (j = 0; j < nthreads; j++) {
1474                 s = get_submitter(j);
1475                 s->numa_node = -1;
1476                 s->index = j;
1477                 s->done = s->calls = s->reaps = 0;
1478         }
1479
1480         flags = O_RDONLY | O_NOATIME;
1481         if (!buffered)
1482                 flags |= O_DIRECT;
1483
1484         j = 0;
1485         i = optind;
1486         nfiles = argc - i;
1487         if (!do_nop) {
1488                 if (!nfiles) {
1489                         printf("No files specified\n");
1490                         usage(argv[0], 1);
1491                 }
1492                 threads_per_f = nthreads / nfiles;
1493                 /* make sure each thread gets assigned files */
1494                 if (threads_per_f == 0) {
1495                         threads_per_f = 1;
1496                 } else {
1497                         threads_rem = nthreads - threads_per_f * nfiles;
1498                 }
1499         }
1500         while (!do_nop && i < argc) {
1501                 int k, limit;
1502
1503                 memset(&f, 0, sizeof(f));
1504
1505                 fd = open(argv[i], flags);
1506                 if (fd < 0) {
1507                         perror("open");
1508                         return 1;
1509                 }
1510                 f.real_fd = fd;
1511                 if (get_file_size(&f)) {
1512                         printf("failed getting size of device/file\n");
1513                         return 1;
1514                 }
1515                 if (f.max_blocks <= 1) {
1516                         printf("Zero file/device size?\n");
1517                         return 1;
1518                 }
1519                 f.max_blocks--;
1520
1521                 limit = threads_per_f;
1522                 limit += threads_rem > 0 ? 1 : 0;
1523                 for (k = 0; k < limit; k++) {
1524                         s = get_submitter((j + k) % nthreads);
1525
1526                         if (s->nr_files == MAX_FDS) {
1527                                 printf("Max number of files (%d) reached\n", MAX_FDS);
1528                                 break;
1529                         }
1530
1531                         memcpy(&s->files[s->nr_files], &f, sizeof(f));
1532
1533                         if (numa_placement)
1534                                 detect_node(s, argv[i]);
1535
1536                         s->filename = argv[i];
1537                         s->nr_files++;
1538                 }
1539                 threads_rem--;
1540                 i++;
1541                 j += limit;
1542         }
1543
1544         arm_sig_int();
1545
1546         page_size = sysconf(_SC_PAGESIZE);
1547         if (page_size < 0)
1548                 page_size = 4096;
1549
1550         for (j = 0; j < nthreads; j++) {
1551                 s = get_submitter(j);
1552                 if (use_sync)
1553                         pthread_create(&s->thread, NULL, submitter_sync_fn, s);
1554                 else if (!aio)
1555                         pthread_create(&s->thread, NULL, submitter_uring_fn, s);
1556 #ifdef CONFIG_LIBAIO
1557                 else
1558                         pthread_create(&s->thread, NULL, submitter_aio_fn, s);
1559 #endif
1560         }
1561
1562         reap = calls = done = 0;
1563         do {
1564                 unsigned long this_done = 0;
1565                 unsigned long this_reap = 0;
1566                 unsigned long this_call = 0;
1567                 unsigned long rpc = 0, ipc = 0;
1568                 unsigned long iops, bw;
1569
1570                 sleep(1);
1571                 if (runtime && !--runtime)
1572                         do_finish("timeout");
1573
1574                 /* don't print partial run, if interrupted by signal */
1575                 if (finish)
1576                         break;
1577
1578                 /* one second in to the run, enable stats */
1579                 if (stats)
1580                         stats_running = 1;
1581
1582                 for (j = 0; j < nthreads; j++) {
1583                         s = get_submitter(j);
1584                         this_done += s->done;
1585                         this_call += s->calls;
1586                         this_reap += s->reaps;
1587                 }
1588                 if (this_call - calls) {
1589                         rpc = (this_done - done) / (this_call - calls);
1590                         ipc = (this_reap - reap) / (this_call - calls);
1591                 } else
1592                         rpc = ipc = -1;
1593                 iops = this_done - done;
1594                 if (bs > 1048576)
1595                         bw = iops * (bs / 1048576);
1596                 else
1597                         bw = iops / (1048576 / bs);
1598                 if (iops > 1000000) {
1599                         double miops = (double) iops / 1000000.0;
1600                         printf("IOPS=%.2fM, ", miops);
1601                 } else if (iops > 100000) {
1602                         double kiops = (double) iops / 1000.0;
1603                         printf("IOPS=%.2fK, ", kiops);
1604                 } else {
1605                         printf("IOPS=%lu, ", iops);
1606                 }
1607                 max_iops = max(max_iops, iops);
1608                 if (!do_nop) {
1609                         if (bw > 2000) {
1610                                 double bw_g = (double) bw / 1000.0;
1611
1612                                 printf("BW=%.2fGiB/s, ", bw_g);
1613                         } else {
1614                                 printf("BW=%luMiB/s, ", bw);
1615                         }
1616                 }
1617                 printf("IOS/call=%ld/%ld\n", rpc, ipc);
1618                 done = this_done;
1619                 calls = this_call;
1620                 reap = this_reap;
1621         } while (!finish);
1622
1623         for (j = 0; j < nthreads; j++) {
1624                 s = get_submitter(j);
1625                 pthread_join(s->thread, &ret);
1626                 close(s->ring_fd);
1627
1628                 if (stats) {
1629                         unsigned long nr;
1630
1631                         printf("%d: Latency percentiles:\n", s->tid);
1632                         for (i = 0, nr = 0; i < PLAT_NR; i++)
1633                                 nr += s->plat[i];
1634                         show_clat_percentiles(s->plat, nr, 4);
1635                         free(s->clock_batch);
1636                         free(s->plat);
1637                 }
1638         }
1639
1640         free(submitter);
1641         return 0;
1642 }