58a1c458b866537c0302a7f15bb854474aebac81
[fio.git] / t / io_uring.c
1 #include <stdio.h>
2 #include <errno.h>
3 #include <assert.h>
4 #include <stdlib.h>
5 #include <stddef.h>
6 #include <signal.h>
7 #include <inttypes.h>
8
9 #include <sys/types.h>
10 #include <sys/stat.h>
11 #include <sys/ioctl.h>
12 #include <sys/syscall.h>
13 #include <sys/resource.h>
14 #include <sys/mman.h>
15 #include <sys/uio.h>
16 #include <linux/fs.h>
17 #include <fcntl.h>
18 #include <unistd.h>
19 #include <string.h>
20 #include <pthread.h>
21 #include <sched.h>
22
23 #include "../arch/arch.h"
24 #include "../lib/types.h"
25 #include "../os/io_uring.h"
26
27 #define barrier()       __asm__ __volatile__("": : :"memory")
28
29 #define min(a, b)               ((a < b) ? (a) : (b))
30
31 struct io_sq_ring {
32         unsigned *head;
33         unsigned *tail;
34         unsigned *ring_mask;
35         unsigned *ring_entries;
36         unsigned *flags;
37         unsigned *array;
38 };
39
40 struct io_cq_ring {
41         unsigned *head;
42         unsigned *tail;
43         unsigned *ring_mask;
44         unsigned *ring_entries;
45         struct io_uring_cqe *cqes;
46 };
47
48 #define DEPTH                   128
49
50 #define BATCH_SUBMIT            64
51 #define BATCH_COMPLETE          64
52
53 #define BS                      4096
54
55 #define MAX_FDS                 16
56
57 static unsigned sq_ring_mask, cq_ring_mask;
58
59 struct file {
60         unsigned long max_blocks;
61         unsigned pending_ios;
62         int fd;
63 };
64
65 struct submitter {
66         pthread_t thread;
67         int ring_fd;
68         struct drand48_data rand;
69         struct io_sq_ring sq_ring;
70         struct io_uring_sqe *sqes;
71         struct iovec iovecs[DEPTH];
72         struct io_cq_ring cq_ring;
73         int inflight;
74         unsigned long reaps;
75         unsigned long done;
76         unsigned long calls;
77         unsigned long cachehit, cachemiss;
78         volatile int finish;
79
80         struct file files[MAX_FDS];
81         unsigned nr_files;
82         unsigned cur_file;
83 };
84
85 static struct submitter submitters[1];
86 static volatile int finish;
87
88 static int polled = 1;          /* use IO polling */
89 static int fixedbufs = 1;       /* use fixed user buffers */
90 static int buffered = 0;        /* use buffered IO, not O_DIRECT */
91 static int sq_thread_poll = 0;  /* use kernel submission/poller thread */
92 static int sq_thread_cpu = -1;  /* pin above thread to this CPU */
93
94 static int io_uring_register_buffers(struct submitter *s)
95 {
96         struct io_uring_register_buffers reg = {
97                 .iovecs = s->iovecs,
98                 .nr_iovecs = DEPTH
99         };
100
101         return syscall(__NR_sys_io_uring_register, s->ring_fd,
102                         IORING_REGISTER_BUFFERS, &reg);
103 }
104
105 static int io_uring_setup(unsigned entries, struct io_uring_params *p)
106 {
107         return syscall(__NR_sys_io_uring_setup, entries, p);
108 }
109
110 static int io_uring_enter(struct submitter *s, unsigned int to_submit,
111                           unsigned int min_complete, unsigned int flags)
112 {
113         return syscall(__NR_sys_io_uring_enter, s->ring_fd, to_submit,
114                         min_complete, flags);
115 }
116
117 static int gettid(void)
118 {
119         return syscall(__NR_gettid);
120 }
121
122 static unsigned file_depth(struct submitter *s)
123 {
124         return (DEPTH + s->nr_files - 1) / s->nr_files;
125 }
126
127 static void init_io(struct submitter *s, unsigned index)
128 {
129         struct io_uring_sqe *sqe = &s->sqes[index];
130         unsigned long offset;
131         struct file *f;
132         long r;
133
134         if (s->nr_files == 1) {
135                 f = &s->files[0];
136         } else {
137                 f = &s->files[s->cur_file];
138                 if (f->pending_ios >= file_depth(s)) {
139                         s->cur_file++;
140                         if (s->cur_file == s->nr_files)
141                                 s->cur_file = 0;
142                 }
143         }
144         f->pending_ios++;
145
146         lrand48_r(&s->rand, &r);
147         offset = (r % (f->max_blocks - 1)) * BS;
148
149         sqe->flags = 0;
150         sqe->opcode = IORING_OP_READV;
151         if (fixedbufs) {
152                 sqe->addr = s->iovecs[index].iov_base;
153                 sqe->len = BS;
154                 sqe->buf_index = index;
155                 sqe->flags |= IOSQE_FIXED_BUFFER;
156         } else {
157                 sqe->addr = &s->iovecs[index];
158                 sqe->len = 1;
159                 sqe->buf_index = 0;
160         }
161         sqe->ioprio = 0;
162         sqe->fd = f->fd;
163         sqe->off = offset;
164         sqe->data = (unsigned long) f;
165 }
166
167 static int prep_more_ios(struct submitter *s, int max_ios)
168 {
169         struct io_sq_ring *ring = &s->sq_ring;
170         unsigned index, tail, next_tail, prepped = 0;
171
172         next_tail = tail = *ring->tail;
173         do {
174                 next_tail++;
175                 barrier();
176                 if (next_tail == *ring->head)
177                         break;
178
179                 index = tail & sq_ring_mask;
180                 init_io(s, index);
181                 ring->array[index] = index;
182                 prepped++;
183                 tail = next_tail;
184         } while (prepped < max_ios);
185
186         if (*ring->tail != tail) {
187                 /* order tail store with writes to sqes above */
188                 barrier();
189                 *ring->tail = tail;
190                 barrier();
191         }
192         return prepped;
193 }
194
195 static int get_file_size(struct file *f)
196 {
197         struct stat st;
198
199         if (fstat(f->fd, &st) < 0)
200                 return -1;
201         if (S_ISBLK(st.st_mode)) {
202                 unsigned long long bytes;
203
204                 if (ioctl(f->fd, BLKGETSIZE64, &bytes) != 0)
205                         return -1;
206
207                 f->max_blocks = bytes / BS;
208                 return 0;
209         } else if (S_ISREG(st.st_mode)) {
210                 f->max_blocks = st.st_size / BS;
211                 return 0;
212         }
213
214         return -1;
215 }
216
217 static int reap_events(struct submitter *s)
218 {
219         struct io_cq_ring *ring = &s->cq_ring;
220         struct io_uring_cqe *cqe;
221         unsigned head, reaped = 0;
222
223         head = *ring->head;
224         do {
225                 struct file *f;
226
227                 barrier();
228                 if (head == *ring->tail)
229                         break;
230                 cqe = &ring->cqes[head & cq_ring_mask];
231                 f = (struct file *) cqe->data;
232                 f->pending_ios--;
233                 if (cqe->res != BS) {
234                         printf("io: unexpected ret=%d\n", cqe->res);
235                         return -1;
236                 }
237                 if (cqe->flags & IOCQE_FLAG_CACHEHIT)
238                         s->cachehit++;
239                 else
240                         s->cachemiss++;
241                 reaped++;
242                 head++;
243         } while (1);
244
245         s->inflight -= reaped;
246         *ring->head = head;
247         barrier();
248         return reaped;
249 }
250
251 static void *submitter_fn(void *data)
252 {
253         struct submitter *s = data;
254         struct io_sq_ring *ring = &s->sq_ring;
255         int ret, prepped;
256
257         printf("submitter=%d\n", gettid());
258
259         srand48_r(pthread_self(), &s->rand);
260
261         prepped = 0;
262         do {
263                 int to_wait, to_submit, this_reap, to_prep;
264
265                 if (!prepped && s->inflight < DEPTH) {
266                         to_prep = min(DEPTH - s->inflight, BATCH_SUBMIT);
267                         prepped = prep_more_ios(s, to_prep);
268                 }
269                 s->inflight += prepped;
270 submit_more:
271                 to_submit = prepped;
272 submit:
273                 if (s->inflight + BATCH_SUBMIT < DEPTH)
274                         to_wait = 0;
275                 else
276                         to_wait = min(s->inflight + to_submit, BATCH_COMPLETE);
277
278                 /*
279                  * Only need to call io_uring_enter if we're not using SQ thread
280                  * poll, or if IORING_SQ_NEED_WAKEUP is set.
281                  */
282                 if (!sq_thread_poll || (*ring->flags & IORING_SQ_NEED_WAKEUP)) {
283                         ret = io_uring_enter(s, to_submit, to_wait,
284                                                 IORING_ENTER_GETEVENTS);
285                         s->calls++;
286                 }
287
288                 /*
289                  * For non SQ thread poll, we already got the events we needed
290                  * through the io_uring_enter() above. For SQ thread poll, we
291                  * need to loop here until we find enough events.
292                  */
293                 this_reap = 0;
294                 do {
295                         int r;
296                         r = reap_events(s);
297                         if (r == -1)
298                                 break;
299                         else if (r > 0)
300                                 this_reap += r;
301                 } while (sq_thread_poll && this_reap < to_wait);
302                 s->reaps += this_reap;
303
304                 if (ret >= 0) {
305                         if (!ret) {
306                                 to_submit = 0;
307                                 if (s->inflight)
308                                         goto submit;
309                                 continue;
310                         } else if (ret < to_submit) {
311                                 int diff = to_submit - ret;
312
313                                 s->done += ret;
314                                 prepped -= diff;
315                                 goto submit_more;
316                         }
317                         s->done += ret;
318                         prepped = 0;
319                         continue;
320                 } else if (ret < 0) {
321                         if (errno == EAGAIN) {
322                                 if (s->finish)
323                                         break;
324                                 if (this_reap)
325                                         goto submit;
326                                 to_submit = 0;
327                                 goto submit;
328                         }
329                         printf("io_submit: %s\n", strerror(errno));
330                         break;
331                 }
332         } while (!s->finish);
333
334         finish = 1;
335         return NULL;
336 }
337
338 static void sig_int(int sig)
339 {
340         printf("Exiting on signal %d\n", sig);
341         submitters[0].finish = 1;
342         finish = 1;
343 }
344
345 static void arm_sig_int(void)
346 {
347         struct sigaction act;
348
349         memset(&act, 0, sizeof(act));
350         act.sa_handler = sig_int;
351         act.sa_flags = SA_RESTART;
352         sigaction(SIGINT, &act, NULL);
353 }
354
355 static int setup_ring(struct submitter *s)
356 {
357         struct io_sq_ring *sring = &s->sq_ring;
358         struct io_cq_ring *cring = &s->cq_ring;
359         struct io_uring_params p;
360         int ret, fd;
361         void *ptr;
362
363         memset(&p, 0, sizeof(p));
364
365         if (polled)
366                 p.flags |= IORING_SETUP_IOPOLL;
367         if (sq_thread_poll) {
368                 p.flags |= IORING_SETUP_SQPOLL;
369                 if (sq_thread_cpu != -1)
370                         p.flags |= IORING_SETUP_SQ_AFF;
371         }
372
373         fd = io_uring_setup(DEPTH, &p);
374         if (fd < 0) {
375                 perror("io_uring_setup");
376                 return 1;
377         }
378         s->ring_fd = fd;
379
380         if (fixedbufs) {
381                 ret = io_uring_register_buffers(s);
382                 if (ret < 0) {
383                         perror("io_uring_register");
384                         return 1;
385                 }
386         }
387
388         ptr = mmap(0, p.sq_off.array + p.sq_entries * sizeof(__u32),
389                         PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
390                         IORING_OFF_SQ_RING);
391         printf("sq_ring ptr = 0x%p\n", ptr);
392         sring->head = ptr + p.sq_off.head;
393         sring->tail = ptr + p.sq_off.tail;
394         sring->ring_mask = ptr + p.sq_off.ring_mask;
395         sring->ring_entries = ptr + p.sq_off.ring_entries;
396         sring->flags = ptr + p.sq_off.flags;
397         sring->array = ptr + p.sq_off.array;
398         sq_ring_mask = *sring->ring_mask;
399
400         s->sqes = mmap(0, p.sq_entries * sizeof(struct io_uring_sqe),
401                         PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
402                         IORING_OFF_SQES);
403         printf("sqes ptr    = 0x%p\n", s->sqes);
404
405         ptr = mmap(0, p.cq_off.cqes + p.cq_entries * sizeof(struct io_uring_cqe),
406                         PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
407                         IORING_OFF_CQ_RING);
408         printf("cq_ring ptr = 0x%p\n", ptr);
409         cring->head = ptr + p.cq_off.head;
410         cring->tail = ptr + p.cq_off.tail;
411         cring->ring_mask = ptr + p.cq_off.ring_mask;
412         cring->ring_entries = ptr + p.cq_off.ring_entries;
413         cring->cqes = ptr + p.cq_off.cqes;
414         cq_ring_mask = *cring->ring_mask;
415         return 0;
416 }
417
418 int main(int argc, char *argv[])
419 {
420         struct submitter *s = &submitters[0];
421         unsigned long done, calls, reap, cache_hit, cache_miss;
422         int err, i, flags, fd;
423         struct rlimit rlim;
424         void *ret;
425
426         if (argc < 2) {
427                 printf("%s: filename\n", argv[0]);
428                 return 1;
429         }
430
431         flags = O_RDONLY | O_NOATIME;
432         if (!buffered)
433                 flags |= O_DIRECT;
434
435         i = 1;
436         while (i < argc) {
437                 struct file *f = &s->files[s->nr_files];
438
439                 fd = open(argv[i], flags);
440                 if (fd < 0) {
441                         perror("open");
442                         return 1;
443                 }
444                 f->fd = fd;
445                 if (get_file_size(f)) {
446                         printf("failed getting size of device/file\n");
447                         return 1;
448                 }
449                 if (f->max_blocks <= 1) {
450                         printf("Zero file/device size?\n");
451                         return 1;
452                 }
453                 f->max_blocks--;
454
455                 printf("Added file %s\n", argv[i]);
456                 s->nr_files++;
457                 i++;
458         }
459
460         rlim.rlim_cur = RLIM_INFINITY;
461         rlim.rlim_max = RLIM_INFINITY;
462         if (setrlimit(RLIMIT_MEMLOCK, &rlim) < 0) {
463                 perror("setrlimit");
464                 return 1;
465         }
466
467         arm_sig_int();
468
469         for (i = 0; i < DEPTH; i++) {
470                 void *buf;
471
472                 if (posix_memalign(&buf, BS, BS)) {
473                         printf("failed alloc\n");
474                         return 1;
475                 }
476                 s->iovecs[i].iov_base = buf;
477                 s->iovecs[i].iov_len = BS;
478         }
479
480         err = setup_ring(s);
481         if (err) {
482                 printf("ring setup failed: %s, %d\n", strerror(errno), err);
483                 return 1;
484         }
485         printf("polled=%d, fixedbufs=%d, buffered=%d", polled, fixedbufs, buffered);
486         printf(" QD=%d, sq_ring=%d, cq_ring=%d\n", DEPTH, *s->sq_ring.ring_entries, *s->cq_ring.ring_entries);
487
488         pthread_create(&s->thread, NULL, submitter_fn, s);
489
490         cache_hit = cache_miss = reap = calls = done = 0;
491         do {
492                 unsigned long this_done = 0;
493                 unsigned long this_reap = 0;
494                 unsigned long this_call = 0;
495                 unsigned long this_cache_hit = 0;
496                 unsigned long this_cache_miss = 0;
497                 unsigned long rpc = 0, ipc = 0;
498                 double hit = 0.0;
499
500                 sleep(1);
501                 this_done += s->done;
502                 this_call += s->calls;
503                 this_reap += s->reaps;
504                 this_cache_hit += s->cachehit;
505                 this_cache_miss += s->cachemiss;
506                 if (this_cache_hit && this_cache_miss) {
507                         unsigned long hits, total;
508
509                         hits = this_cache_hit - cache_hit;
510                         total = hits + this_cache_miss - cache_miss;
511                         hit = (double) hits / (double) total;
512                         hit *= 100.0;
513                 }
514                 if (this_call - calls) {
515                         rpc = (this_done - done) / (this_call - calls);
516                         ipc = (this_reap - reap) / (this_call - calls);
517                 }
518                 printf("IOPS=%lu, IOS/call=%lu/%lu, inflight=%u (head=%u tail=%u), Cachehit=%0.2f%%\n",
519                                 this_done - done, rpc, ipc, s->inflight,
520                                 *s->cq_ring.head, *s->cq_ring.tail, hit);
521                 done = this_done;
522                 calls = this_call;
523                 reap = this_reap;
524                 cache_hit = s->cachehit;
525                 cache_miss = s->cachemiss;
526         } while (!finish);
527
528         pthread_join(s->thread, &ret);
529         close(s->ring_fd);
530         return 0;
531 }