Merge branch 'ras-core-for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git...
[linux-2.6-block.git] / fs / io_uring.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Shared application/kernel submission and completion ring pairs, for
4  * supporting fast/efficient IO.
5  *
6  * A note on the read/write ordering memory barriers that are matched between
7  * the application and kernel side.
8  *
9  * After the application reads the CQ ring tail, it must use an
10  * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11  * before writing the tail (using smp_load_acquire to read the tail will
12  * do). It also needs a smp_mb() before updating CQ head (ordering the
13  * entry load(s) with the head store), pairing with an implicit barrier
14  * through a control-dependency in io_get_cqring (smp_store_release to
15  * store head will do). Failure to do so could lead to reading invalid
16  * CQ entries.
17  *
18  * Likewise, the application must use an appropriate smp_wmb() before
19  * writing the SQ tail (ordering SQ entry stores with the tail store),
20  * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21  * to store the tail will do). And it needs a barrier ordering the SQ
22  * head load before writing new SQ entries (smp_load_acquire to read
23  * head will do).
24  *
25  * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26  * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27  * updating the SQ tail; a full memory barrier smp_mb() is needed
28  * between.
29  *
30  * Also see the examples in the liburing library:
31  *
32  *      git://git.kernel.dk/liburing
33  *
34  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35  * from data shared between the kernel and application. This is done both
36  * for ordering purposes, but also to ensure that once a value is loaded from
37  * data that the application could potentially modify, it remains stable.
38  *
39  * Copyright (C) 2018-2019 Jens Axboe
40  * Copyright (c) 2018-2019 Christoph Hellwig
41  */
42 #include <linux/kernel.h>
43 #include <linux/init.h>
44 #include <linux/errno.h>
45 #include <linux/syscalls.h>
46 #include <linux/compat.h>
47 #include <linux/refcount.h>
48 #include <linux/uio.h>
49
50 #include <linux/sched/signal.h>
51 #include <linux/fs.h>
52 #include <linux/file.h>
53 #include <linux/fdtable.h>
54 #include <linux/mm.h>
55 #include <linux/mman.h>
56 #include <linux/mmu_context.h>
57 #include <linux/percpu.h>
58 #include <linux/slab.h>
59 #include <linux/kthread.h>
60 #include <linux/blkdev.h>
61 #include <linux/bvec.h>
62 #include <linux/net.h>
63 #include <net/sock.h>
64 #include <net/af_unix.h>
65 #include <net/scm.h>
66 #include <linux/anon_inodes.h>
67 #include <linux/sched/mm.h>
68 #include <linux/uaccess.h>
69 #include <linux/nospec.h>
70 #include <linux/sizes.h>
71 #include <linux/hugetlb.h>
72
73 #define CREATE_TRACE_POINTS
74 #include <trace/events/io_uring.h>
75
76 #include <uapi/linux/io_uring.h>
77
78 #include "internal.h"
79 #include "io-wq.h"
80
81 #define IORING_MAX_ENTRIES      32768
82 #define IORING_MAX_CQ_ENTRIES   (2 * IORING_MAX_ENTRIES)
83
84 /*
85  * Shift of 9 is 512 entries, or exactly one page on 64-bit archs
86  */
87 #define IORING_FILE_TABLE_SHIFT 9
88 #define IORING_MAX_FILES_TABLE  (1U << IORING_FILE_TABLE_SHIFT)
89 #define IORING_FILE_TABLE_MASK  (IORING_MAX_FILES_TABLE - 1)
90 #define IORING_MAX_FIXED_FILES  (64 * IORING_MAX_FILES_TABLE)
91
92 struct io_uring {
93         u32 head ____cacheline_aligned_in_smp;
94         u32 tail ____cacheline_aligned_in_smp;
95 };
96
97 /*
98  * This data is shared with the application through the mmap at offsets
99  * IORING_OFF_SQ_RING and IORING_OFF_CQ_RING.
100  *
101  * The offsets to the member fields are published through struct
102  * io_sqring_offsets when calling io_uring_setup.
103  */
104 struct io_rings {
105         /*
106          * Head and tail offsets into the ring; the offsets need to be
107          * masked to get valid indices.
108          *
109          * The kernel controls head of the sq ring and the tail of the cq ring,
110          * and the application controls tail of the sq ring and the head of the
111          * cq ring.
112          */
113         struct io_uring         sq, cq;
114         /*
115          * Bitmasks to apply to head and tail offsets (constant, equals
116          * ring_entries - 1)
117          */
118         u32                     sq_ring_mask, cq_ring_mask;
119         /* Ring sizes (constant, power of 2) */
120         u32                     sq_ring_entries, cq_ring_entries;
121         /*
122          * Number of invalid entries dropped by the kernel due to
123          * invalid index stored in array
124          *
125          * Written by the kernel, shouldn't be modified by the
126          * application (i.e. get number of "new events" by comparing to
127          * cached value).
128          *
129          * After a new SQ head value was read by the application this
130          * counter includes all submissions that were dropped reaching
131          * the new SQ head (and possibly more).
132          */
133         u32                     sq_dropped;
134         /*
135          * Runtime flags
136          *
137          * Written by the kernel, shouldn't be modified by the
138          * application.
139          *
140          * The application needs a full memory barrier before checking
141          * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
142          */
143         u32                     sq_flags;
144         /*
145          * Number of completion events lost because the queue was full;
146          * this should be avoided by the application by making sure
147          * there are not more requests pending thatn there is space in
148          * the completion queue.
149          *
150          * Written by the kernel, shouldn't be modified by the
151          * application (i.e. get number of "new events" by comparing to
152          * cached value).
153          *
154          * As completion events come in out of order this counter is not
155          * ordered with any other data.
156          */
157         u32                     cq_overflow;
158         /*
159          * Ring buffer of completion events.
160          *
161          * The kernel writes completion events fresh every time they are
162          * produced, so the application is allowed to modify pending
163          * entries.
164          */
165         struct io_uring_cqe     cqes[] ____cacheline_aligned_in_smp;
166 };
167
168 struct io_mapped_ubuf {
169         u64             ubuf;
170         size_t          len;
171         struct          bio_vec *bvec;
172         unsigned int    nr_bvecs;
173 };
174
175 struct fixed_file_table {
176         struct file             **files;
177 };
178
179 struct io_ring_ctx {
180         struct {
181                 struct percpu_ref       refs;
182         } ____cacheline_aligned_in_smp;
183
184         struct {
185                 unsigned int            flags;
186                 bool                    compat;
187                 bool                    account_mem;
188                 bool                    cq_overflow_flushed;
189
190                 /*
191                  * Ring buffer of indices into array of io_uring_sqe, which is
192                  * mmapped by the application using the IORING_OFF_SQES offset.
193                  *
194                  * This indirection could e.g. be used to assign fixed
195                  * io_uring_sqe entries to operations and only submit them to
196                  * the queue when needed.
197                  *
198                  * The kernel modifies neither the indices array nor the entries
199                  * array.
200                  */
201                 u32                     *sq_array;
202                 unsigned                cached_sq_head;
203                 unsigned                sq_entries;
204                 unsigned                sq_mask;
205                 unsigned                sq_thread_idle;
206                 unsigned                cached_sq_dropped;
207                 atomic_t                cached_cq_overflow;
208                 struct io_uring_sqe     *sq_sqes;
209
210                 struct list_head        defer_list;
211                 struct list_head        timeout_list;
212                 struct list_head        cq_overflow_list;
213
214                 wait_queue_head_t       inflight_wait;
215         } ____cacheline_aligned_in_smp;
216
217         struct io_rings *rings;
218
219         /* IO offload */
220         struct io_wq            *io_wq;
221         struct task_struct      *sqo_thread;    /* if using sq thread polling */
222         struct mm_struct        *sqo_mm;
223         wait_queue_head_t       sqo_wait;
224
225         /*
226          * If used, fixed file set. Writers must ensure that ->refs is dead,
227          * readers must ensure that ->refs is alive as long as the file* is
228          * used. Only updated through io_uring_register(2).
229          */
230         struct fixed_file_table *file_table;
231         unsigned                nr_user_files;
232
233         /* if used, fixed mapped user buffers */
234         unsigned                nr_user_bufs;
235         struct io_mapped_ubuf   *user_bufs;
236
237         struct user_struct      *user;
238
239         /* 0 is for ctx quiesce/reinit/free, 1 is for sqo_thread started */
240         struct completion       *completions;
241
242         /* if all else fails... */
243         struct io_kiocb         *fallback_req;
244
245 #if defined(CONFIG_UNIX)
246         struct socket           *ring_sock;
247 #endif
248
249         struct {
250                 unsigned                cached_cq_tail;
251                 unsigned                cq_entries;
252                 unsigned                cq_mask;
253                 atomic_t                cq_timeouts;
254                 struct wait_queue_head  cq_wait;
255                 struct fasync_struct    *cq_fasync;
256                 struct eventfd_ctx      *cq_ev_fd;
257         } ____cacheline_aligned_in_smp;
258
259         struct {
260                 struct mutex            uring_lock;
261                 wait_queue_head_t       wait;
262         } ____cacheline_aligned_in_smp;
263
264         struct {
265                 spinlock_t              completion_lock;
266                 bool                    poll_multi_file;
267                 /*
268                  * ->poll_list is protected by the ctx->uring_lock for
269                  * io_uring instances that don't use IORING_SETUP_SQPOLL.
270                  * For SQPOLL, only the single threaded io_sq_thread() will
271                  * manipulate the list, hence no extra locking is needed there.
272                  */
273                 struct list_head        poll_list;
274                 struct rb_root          cancel_tree;
275
276                 spinlock_t              inflight_lock;
277                 struct list_head        inflight_list;
278         } ____cacheline_aligned_in_smp;
279 };
280
281 struct sqe_submit {
282         const struct io_uring_sqe       *sqe;
283         struct file                     *ring_file;
284         int                             ring_fd;
285         u32                             sequence;
286         bool                            has_user;
287         bool                            in_async;
288         bool                            needs_fixed_file;
289 };
290
291 /*
292  * First field must be the file pointer in all the
293  * iocb unions! See also 'struct kiocb' in <linux/fs.h>
294  */
295 struct io_poll_iocb {
296         struct file                     *file;
297         struct wait_queue_head          *head;
298         __poll_t                        events;
299         bool                            done;
300         bool                            canceled;
301         struct wait_queue_entry         wait;
302 };
303
304 struct io_timeout {
305         struct file                     *file;
306         struct hrtimer                  timer;
307 };
308
309 /*
310  * NOTE! Each of the iocb union members has the file pointer
311  * as the first entry in their struct definition. So you can
312  * access the file pointer through any of the sub-structs,
313  * or directly as just 'ki_filp' in this struct.
314  */
315 struct io_kiocb {
316         union {
317                 struct file             *file;
318                 struct kiocb            rw;
319                 struct io_poll_iocb     poll;
320                 struct io_timeout       timeout;
321         };
322
323         struct sqe_submit       submit;
324
325         struct io_ring_ctx      *ctx;
326         union {
327                 struct list_head        list;
328                 struct rb_node          rb_node;
329         };
330         struct list_head        link_list;
331         unsigned int            flags;
332         refcount_t              refs;
333 #define REQ_F_NOWAIT            1       /* must not punt to workers */
334 #define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
335 #define REQ_F_FIXED_FILE        4       /* ctx owns file */
336 #define REQ_F_SEQ_PREV          8       /* sequential with previous */
337 #define REQ_F_IO_DRAIN          16      /* drain existing IO first */
338 #define REQ_F_IO_DRAINED        32      /* drain done */
339 #define REQ_F_LINK              64      /* linked sqes */
340 #define REQ_F_LINK_TIMEOUT      128     /* has linked timeout */
341 #define REQ_F_FAIL_LINK         256     /* fail rest of links */
342 #define REQ_F_SHADOW_DRAIN      512     /* link-drain shadow req */
343 #define REQ_F_TIMEOUT           1024    /* timeout request */
344 #define REQ_F_ISREG             2048    /* regular file */
345 #define REQ_F_MUST_PUNT         4096    /* must be punted even for NONBLOCK */
346 #define REQ_F_TIMEOUT_NOSEQ     8192    /* no timeout sequence */
347 #define REQ_F_INFLIGHT          16384   /* on inflight list */
348 #define REQ_F_COMP_LOCKED       32768   /* completion under lock */
349         u64                     user_data;
350         u32                     result;
351         u32                     sequence;
352
353         struct list_head        inflight_entry;
354
355         struct io_wq_work       work;
356 };
357
358 #define IO_PLUG_THRESHOLD               2
359 #define IO_IOPOLL_BATCH                 8
360
361 struct io_submit_state {
362         struct blk_plug         plug;
363
364         /*
365          * io_kiocb alloc cache
366          */
367         void                    *reqs[IO_IOPOLL_BATCH];
368         unsigned                int free_reqs;
369         unsigned                int cur_req;
370
371         /*
372          * File reference cache
373          */
374         struct file             *file;
375         unsigned int            fd;
376         unsigned int            has_refs;
377         unsigned int            used_refs;
378         unsigned int            ios_left;
379 };
380
381 static void io_wq_submit_work(struct io_wq_work **workptr);
382 static void io_cqring_fill_event(struct io_kiocb *req, long res);
383 static void __io_free_req(struct io_kiocb *req);
384 static void io_put_req(struct io_kiocb *req);
385 static void io_double_put_req(struct io_kiocb *req);
386
387 static struct kmem_cache *req_cachep;
388
389 static const struct file_operations io_uring_fops;
390
391 struct sock *io_uring_get_socket(struct file *file)
392 {
393 #if defined(CONFIG_UNIX)
394         if (file->f_op == &io_uring_fops) {
395                 struct io_ring_ctx *ctx = file->private_data;
396
397                 return ctx->ring_sock->sk;
398         }
399 #endif
400         return NULL;
401 }
402 EXPORT_SYMBOL(io_uring_get_socket);
403
404 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
405 {
406         struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
407
408         complete(&ctx->completions[0]);
409 }
410
411 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
412 {
413         struct io_ring_ctx *ctx;
414
415         ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
416         if (!ctx)
417                 return NULL;
418
419         ctx->fallback_req = kmem_cache_alloc(req_cachep, GFP_KERNEL);
420         if (!ctx->fallback_req)
421                 goto err;
422
423         ctx->completions = kmalloc(2 * sizeof(struct completion), GFP_KERNEL);
424         if (!ctx->completions)
425                 goto err;
426
427         if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
428                             PERCPU_REF_ALLOW_REINIT, GFP_KERNEL))
429                 goto err;
430
431         ctx->flags = p->flags;
432         init_waitqueue_head(&ctx->cq_wait);
433         INIT_LIST_HEAD(&ctx->cq_overflow_list);
434         init_completion(&ctx->completions[0]);
435         init_completion(&ctx->completions[1]);
436         mutex_init(&ctx->uring_lock);
437         init_waitqueue_head(&ctx->wait);
438         spin_lock_init(&ctx->completion_lock);
439         INIT_LIST_HEAD(&ctx->poll_list);
440         ctx->cancel_tree = RB_ROOT;
441         INIT_LIST_HEAD(&ctx->defer_list);
442         INIT_LIST_HEAD(&ctx->timeout_list);
443         init_waitqueue_head(&ctx->inflight_wait);
444         spin_lock_init(&ctx->inflight_lock);
445         INIT_LIST_HEAD(&ctx->inflight_list);
446         return ctx;
447 err:
448         if (ctx->fallback_req)
449                 kmem_cache_free(req_cachep, ctx->fallback_req);
450         kfree(ctx->completions);
451         kfree(ctx);
452         return NULL;
453 }
454
455 static inline bool __req_need_defer(struct io_kiocb *req)
456 {
457         struct io_ring_ctx *ctx = req->ctx;
458
459         return req->sequence != ctx->cached_cq_tail + ctx->cached_sq_dropped
460                                         + atomic_read(&ctx->cached_cq_overflow);
461 }
462
463 static inline bool req_need_defer(struct io_kiocb *req)
464 {
465         if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) == REQ_F_IO_DRAIN)
466                 return __req_need_defer(req);
467
468         return false;
469 }
470
471 static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
472 {
473         struct io_kiocb *req;
474
475         req = list_first_entry_or_null(&ctx->defer_list, struct io_kiocb, list);
476         if (req && !req_need_defer(req)) {
477                 list_del_init(&req->list);
478                 return req;
479         }
480
481         return NULL;
482 }
483
484 static struct io_kiocb *io_get_timeout_req(struct io_ring_ctx *ctx)
485 {
486         struct io_kiocb *req;
487
488         req = list_first_entry_or_null(&ctx->timeout_list, struct io_kiocb, list);
489         if (req) {
490                 if (req->flags & REQ_F_TIMEOUT_NOSEQ)
491                         return NULL;
492                 if (!__req_need_defer(req)) {
493                         list_del_init(&req->list);
494                         return req;
495                 }
496         }
497
498         return NULL;
499 }
500
501 static void __io_commit_cqring(struct io_ring_ctx *ctx)
502 {
503         struct io_rings *rings = ctx->rings;
504
505         if (ctx->cached_cq_tail != READ_ONCE(rings->cq.tail)) {
506                 /* order cqe stores with ring update */
507                 smp_store_release(&rings->cq.tail, ctx->cached_cq_tail);
508
509                 if (wq_has_sleeper(&ctx->cq_wait)) {
510                         wake_up_interruptible(&ctx->cq_wait);
511                         kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
512                 }
513         }
514 }
515
516 static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
517 {
518         u8 opcode = READ_ONCE(sqe->opcode);
519
520         return !(opcode == IORING_OP_READ_FIXED ||
521                  opcode == IORING_OP_WRITE_FIXED);
522 }
523
524 static inline bool io_prep_async_work(struct io_kiocb *req)
525 {
526         bool do_hashed = false;
527
528         if (req->submit.sqe) {
529                 switch (req->submit.sqe->opcode) {
530                 case IORING_OP_WRITEV:
531                 case IORING_OP_WRITE_FIXED:
532                         do_hashed = true;
533                         /* fall-through */
534                 case IORING_OP_READV:
535                 case IORING_OP_READ_FIXED:
536                 case IORING_OP_SENDMSG:
537                 case IORING_OP_RECVMSG:
538                 case IORING_OP_ACCEPT:
539                 case IORING_OP_POLL_ADD:
540                         /*
541                          * We know REQ_F_ISREG is not set on some of these
542                          * opcodes, but this enables us to keep the check in
543                          * just one place.
544                          */
545                         if (!(req->flags & REQ_F_ISREG))
546                                 req->work.flags |= IO_WQ_WORK_UNBOUND;
547                         break;
548                 }
549                 if (io_sqe_needs_user(req->submit.sqe))
550                         req->work.flags |= IO_WQ_WORK_NEEDS_USER;
551         }
552
553         return do_hashed;
554 }
555
556 static inline void io_queue_async_work(struct io_kiocb *req)
557 {
558         bool do_hashed = io_prep_async_work(req);
559         struct io_ring_ctx *ctx = req->ctx;
560
561         trace_io_uring_queue_async_work(ctx, do_hashed, req, &req->work,
562                                         req->flags);
563         if (!do_hashed) {
564                 io_wq_enqueue(ctx->io_wq, &req->work);
565         } else {
566                 io_wq_enqueue_hashed(ctx->io_wq, &req->work,
567                                         file_inode(req->file));
568         }
569 }
570
571 static void io_kill_timeout(struct io_kiocb *req)
572 {
573         int ret;
574
575         ret = hrtimer_try_to_cancel(&req->timeout.timer);
576         if (ret != -1) {
577                 atomic_inc(&req->ctx->cq_timeouts);
578                 list_del_init(&req->list);
579                 io_cqring_fill_event(req, 0);
580                 io_put_req(req);
581         }
582 }
583
584 static void io_kill_timeouts(struct io_ring_ctx *ctx)
585 {
586         struct io_kiocb *req, *tmp;
587
588         spin_lock_irq(&ctx->completion_lock);
589         list_for_each_entry_safe(req, tmp, &ctx->timeout_list, list)
590                 io_kill_timeout(req);
591         spin_unlock_irq(&ctx->completion_lock);
592 }
593
594 static void io_commit_cqring(struct io_ring_ctx *ctx)
595 {
596         struct io_kiocb *req;
597
598         while ((req = io_get_timeout_req(ctx)) != NULL)
599                 io_kill_timeout(req);
600
601         __io_commit_cqring(ctx);
602
603         while ((req = io_get_deferred_req(ctx)) != NULL) {
604                 if (req->flags & REQ_F_SHADOW_DRAIN) {
605                         /* Just for drain, free it. */
606                         __io_free_req(req);
607                         continue;
608                 }
609                 req->flags |= REQ_F_IO_DRAINED;
610                 io_queue_async_work(req);
611         }
612 }
613
614 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
615 {
616         struct io_rings *rings = ctx->rings;
617         unsigned tail;
618
619         tail = ctx->cached_cq_tail;
620         /*
621          * writes to the cq entry need to come after reading head; the
622          * control dependency is enough as we're using WRITE_ONCE to
623          * fill the cq entry
624          */
625         if (tail - READ_ONCE(rings->cq.head) == rings->cq_ring_entries)
626                 return NULL;
627
628         ctx->cached_cq_tail++;
629         return &rings->cqes[tail & ctx->cq_mask];
630 }
631
632 static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
633 {
634         if (waitqueue_active(&ctx->wait))
635                 wake_up(&ctx->wait);
636         if (waitqueue_active(&ctx->sqo_wait))
637                 wake_up(&ctx->sqo_wait);
638         if (ctx->cq_ev_fd)
639                 eventfd_signal(ctx->cq_ev_fd, 1);
640 }
641
642 static void io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
643 {
644         struct io_rings *rings = ctx->rings;
645         struct io_uring_cqe *cqe;
646         struct io_kiocb *req;
647         unsigned long flags;
648         LIST_HEAD(list);
649
650         if (!force) {
651                 if (list_empty_careful(&ctx->cq_overflow_list))
652                         return;
653                 if ((ctx->cached_cq_tail - READ_ONCE(rings->cq.head) ==
654                     rings->cq_ring_entries))
655                         return;
656         }
657
658         spin_lock_irqsave(&ctx->completion_lock, flags);
659
660         /* if force is set, the ring is going away. always drop after that */
661         if (force)
662                 ctx->cq_overflow_flushed = true;
663
664         while (!list_empty(&ctx->cq_overflow_list)) {
665                 cqe = io_get_cqring(ctx);
666                 if (!cqe && !force)
667                         break;
668
669                 req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb,
670                                                 list);
671                 list_move(&req->list, &list);
672                 if (cqe) {
673                         WRITE_ONCE(cqe->user_data, req->user_data);
674                         WRITE_ONCE(cqe->res, req->result);
675                         WRITE_ONCE(cqe->flags, 0);
676                 } else {
677                         WRITE_ONCE(ctx->rings->cq_overflow,
678                                 atomic_inc_return(&ctx->cached_cq_overflow));
679                 }
680         }
681
682         io_commit_cqring(ctx);
683         spin_unlock_irqrestore(&ctx->completion_lock, flags);
684         io_cqring_ev_posted(ctx);
685
686         while (!list_empty(&list)) {
687                 req = list_first_entry(&list, struct io_kiocb, list);
688                 list_del(&req->list);
689                 io_put_req(req);
690         }
691 }
692
693 static void io_cqring_fill_event(struct io_kiocb *req, long res)
694 {
695         struct io_ring_ctx *ctx = req->ctx;
696         struct io_uring_cqe *cqe;
697
698         trace_io_uring_complete(ctx, req->user_data, res);
699
700         /*
701          * If we can't get a cq entry, userspace overflowed the
702          * submission (by quite a lot). Increment the overflow count in
703          * the ring.
704          */
705         cqe = io_get_cqring(ctx);
706         if (likely(cqe)) {
707                 WRITE_ONCE(cqe->user_data, req->user_data);
708                 WRITE_ONCE(cqe->res, res);
709                 WRITE_ONCE(cqe->flags, 0);
710         } else if (ctx->cq_overflow_flushed) {
711                 WRITE_ONCE(ctx->rings->cq_overflow,
712                                 atomic_inc_return(&ctx->cached_cq_overflow));
713         } else {
714                 refcount_inc(&req->refs);
715                 req->result = res;
716                 list_add_tail(&req->list, &ctx->cq_overflow_list);
717         }
718 }
719
720 static void io_cqring_add_event(struct io_kiocb *req, long res)
721 {
722         struct io_ring_ctx *ctx = req->ctx;
723         unsigned long flags;
724
725         spin_lock_irqsave(&ctx->completion_lock, flags);
726         io_cqring_fill_event(req, res);
727         io_commit_cqring(ctx);
728         spin_unlock_irqrestore(&ctx->completion_lock, flags);
729
730         io_cqring_ev_posted(ctx);
731 }
732
733 static inline bool io_is_fallback_req(struct io_kiocb *req)
734 {
735         return req == (struct io_kiocb *)
736                         ((unsigned long) req->ctx->fallback_req & ~1UL);
737 }
738
739 static struct io_kiocb *io_get_fallback_req(struct io_ring_ctx *ctx)
740 {
741         struct io_kiocb *req;
742
743         req = ctx->fallback_req;
744         if (!test_and_set_bit_lock(0, (unsigned long *) ctx->fallback_req))
745                 return req;
746
747         return NULL;
748 }
749
750 static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
751                                    struct io_submit_state *state)
752 {
753         gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
754         struct io_kiocb *req;
755
756         if (!percpu_ref_tryget(&ctx->refs))
757                 return NULL;
758
759         if (!state) {
760                 req = kmem_cache_alloc(req_cachep, gfp);
761                 if (unlikely(!req))
762                         goto fallback;
763         } else if (!state->free_reqs) {
764                 size_t sz;
765                 int ret;
766
767                 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
768                 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
769
770                 /*
771                  * Bulk alloc is all-or-nothing. If we fail to get a batch,
772                  * retry single alloc to be on the safe side.
773                  */
774                 if (unlikely(ret <= 0)) {
775                         state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
776                         if (!state->reqs[0])
777                                 goto fallback;
778                         ret = 1;
779                 }
780                 state->free_reqs = ret - 1;
781                 state->cur_req = 1;
782                 req = state->reqs[0];
783         } else {
784                 req = state->reqs[state->cur_req];
785                 state->free_reqs--;
786                 state->cur_req++;
787         }
788
789 got_it:
790         req->file = NULL;
791         req->ctx = ctx;
792         req->flags = 0;
793         /* one is dropped after submission, the other at completion */
794         refcount_set(&req->refs, 2);
795         req->result = 0;
796         INIT_IO_WORK(&req->work, io_wq_submit_work);
797         return req;
798 fallback:
799         req = io_get_fallback_req(ctx);
800         if (req)
801                 goto got_it;
802         percpu_ref_put(&ctx->refs);
803         return NULL;
804 }
805
806 static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
807 {
808         if (*nr) {
809                 kmem_cache_free_bulk(req_cachep, *nr, reqs);
810                 percpu_ref_put_many(&ctx->refs, *nr);
811                 *nr = 0;
812         }
813 }
814
815 static void __io_free_req(struct io_kiocb *req)
816 {
817         struct io_ring_ctx *ctx = req->ctx;
818
819         if (req->file && !(req->flags & REQ_F_FIXED_FILE))
820                 fput(req->file);
821         if (req->flags & REQ_F_INFLIGHT) {
822                 unsigned long flags;
823
824                 spin_lock_irqsave(&ctx->inflight_lock, flags);
825                 list_del(&req->inflight_entry);
826                 if (waitqueue_active(&ctx->inflight_wait))
827                         wake_up(&ctx->inflight_wait);
828                 spin_unlock_irqrestore(&ctx->inflight_lock, flags);
829         }
830         percpu_ref_put(&ctx->refs);
831         if (likely(!io_is_fallback_req(req)))
832                 kmem_cache_free(req_cachep, req);
833         else
834                 clear_bit_unlock(0, (unsigned long *) ctx->fallback_req);
835 }
836
837 static bool io_link_cancel_timeout(struct io_kiocb *req)
838 {
839         struct io_ring_ctx *ctx = req->ctx;
840         int ret;
841
842         ret = hrtimer_try_to_cancel(&req->timeout.timer);
843         if (ret != -1) {
844                 io_cqring_fill_event(req, -ECANCELED);
845                 io_commit_cqring(ctx);
846                 req->flags &= ~REQ_F_LINK;
847                 io_put_req(req);
848                 return true;
849         }
850
851         return false;
852 }
853
854 static void io_req_link_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
855 {
856         struct io_ring_ctx *ctx = req->ctx;
857         struct io_kiocb *nxt;
858         bool wake_ev = false;
859
860         /*
861          * The list should never be empty when we are called here. But could
862          * potentially happen if the chain is messed up, check to be on the
863          * safe side.
864          */
865         nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb, list);
866         while (nxt) {
867                 list_del_init(&nxt->list);
868                 if (!list_empty(&req->link_list)) {
869                         INIT_LIST_HEAD(&nxt->link_list);
870                         list_splice(&req->link_list, &nxt->link_list);
871                         nxt->flags |= REQ_F_LINK;
872                 }
873
874                 /*
875                  * If we're in async work, we can continue processing the chain
876                  * in this context instead of having to queue up new async work.
877                  */
878                 if (req->flags & REQ_F_LINK_TIMEOUT) {
879                         wake_ev = io_link_cancel_timeout(nxt);
880
881                         /* we dropped this link, get next */
882                         nxt = list_first_entry_or_null(&req->link_list,
883                                                         struct io_kiocb, list);
884                 } else if (nxtptr && io_wq_current_is_worker()) {
885                         *nxtptr = nxt;
886                         break;
887                 } else {
888                         io_queue_async_work(nxt);
889                         break;
890                 }
891         }
892
893         if (wake_ev)
894                 io_cqring_ev_posted(ctx);
895 }
896
897 /*
898  * Called if REQ_F_LINK is set, and we fail the head request
899  */
900 static void io_fail_links(struct io_kiocb *req)
901 {
902         struct io_ring_ctx *ctx = req->ctx;
903         struct io_kiocb *link;
904         unsigned long flags;
905
906         spin_lock_irqsave(&ctx->completion_lock, flags);
907
908         while (!list_empty(&req->link_list)) {
909                 link = list_first_entry(&req->link_list, struct io_kiocb, list);
910                 list_del_init(&link->list);
911
912                 trace_io_uring_fail_link(req, link);
913
914                 if ((req->flags & REQ_F_LINK_TIMEOUT) &&
915                     link->submit.sqe->opcode == IORING_OP_LINK_TIMEOUT) {
916                         io_link_cancel_timeout(link);
917                 } else {
918                         io_cqring_fill_event(link, -ECANCELED);
919                         io_double_put_req(link);
920                 }
921         }
922
923         io_commit_cqring(ctx);
924         spin_unlock_irqrestore(&ctx->completion_lock, flags);
925         io_cqring_ev_posted(ctx);
926 }
927
928 static void io_free_req_find_next(struct io_kiocb *req, struct io_kiocb **nxt)
929 {
930         if (likely(!(req->flags & REQ_F_LINK))) {
931                 __io_free_req(req);
932                 return;
933         }
934
935         /*
936          * If LINK is set, we have dependent requests in this chain. If we
937          * didn't fail this request, queue the first one up, moving any other
938          * dependencies to the next request. In case of failure, fail the rest
939          * of the chain.
940          */
941         if (req->flags & REQ_F_FAIL_LINK) {
942                 io_fail_links(req);
943         } else if ((req->flags & (REQ_F_LINK_TIMEOUT | REQ_F_COMP_LOCKED)) ==
944                         REQ_F_LINK_TIMEOUT) {
945                 struct io_ring_ctx *ctx = req->ctx;
946                 unsigned long flags;
947
948                 /*
949                  * If this is a timeout link, we could be racing with the
950                  * timeout timer. Grab the completion lock for this case to
951                  * protect against that.
952                  */
953                 spin_lock_irqsave(&ctx->completion_lock, flags);
954                 io_req_link_next(req, nxt);
955                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
956         } else {
957                 io_req_link_next(req, nxt);
958         }
959
960         __io_free_req(req);
961 }
962
963 static void io_free_req(struct io_kiocb *req)
964 {
965         io_free_req_find_next(req, NULL);
966 }
967
968 /*
969  * Drop reference to request, return next in chain (if there is one) if this
970  * was the last reference to this request.
971  */
972 static void io_put_req_find_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
973 {
974         struct io_kiocb *nxt = NULL;
975
976         if (refcount_dec_and_test(&req->refs))
977                 io_free_req_find_next(req, &nxt);
978
979         if (nxt) {
980                 if (nxtptr)
981                         *nxtptr = nxt;
982                 else
983                         io_queue_async_work(nxt);
984         }
985 }
986
987 static void io_put_req(struct io_kiocb *req)
988 {
989         if (refcount_dec_and_test(&req->refs))
990                 io_free_req(req);
991 }
992
993 static void io_double_put_req(struct io_kiocb *req)
994 {
995         /* drop both submit and complete references */
996         if (refcount_sub_and_test(2, &req->refs))
997                 __io_free_req(req);
998 }
999
1000 static unsigned io_cqring_events(struct io_ring_ctx *ctx, bool noflush)
1001 {
1002         struct io_rings *rings = ctx->rings;
1003
1004         /*
1005          * noflush == true is from the waitqueue handler, just ensure we wake
1006          * up the task, and the next invocation will flush the entries. We
1007          * cannot safely to it from here.
1008          */
1009         if (noflush && !list_empty(&ctx->cq_overflow_list))
1010                 return -1U;
1011
1012         io_cqring_overflow_flush(ctx, false);
1013
1014         /* See comment at the top of this file */
1015         smp_rmb();
1016         return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head);
1017 }
1018
1019 static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
1020 {
1021         struct io_rings *rings = ctx->rings;
1022
1023         /* make sure SQ entry isn't read before tail */
1024         return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
1025 }
1026
1027 /*
1028  * Find and free completed poll iocbs
1029  */
1030 static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
1031                                struct list_head *done)
1032 {
1033         void *reqs[IO_IOPOLL_BATCH];
1034         struct io_kiocb *req;
1035         int to_free;
1036
1037         to_free = 0;
1038         while (!list_empty(done)) {
1039                 req = list_first_entry(done, struct io_kiocb, list);
1040                 list_del(&req->list);
1041
1042                 io_cqring_fill_event(req, req->result);
1043                 (*nr_events)++;
1044
1045                 if (refcount_dec_and_test(&req->refs)) {
1046                         /* If we're not using fixed files, we have to pair the
1047                          * completion part with the file put. Use regular
1048                          * completions for those, only batch free for fixed
1049                          * file and non-linked commands.
1050                          */
1051                         if (((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) ==
1052                             REQ_F_FIXED_FILE) && !io_is_fallback_req(req)) {
1053                                 reqs[to_free++] = req;
1054                                 if (to_free == ARRAY_SIZE(reqs))
1055                                         io_free_req_many(ctx, reqs, &to_free);
1056                         } else {
1057                                 io_free_req(req);
1058                         }
1059                 }
1060         }
1061
1062         io_commit_cqring(ctx);
1063         io_free_req_many(ctx, reqs, &to_free);
1064 }
1065
1066 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
1067                         long min)
1068 {
1069         struct io_kiocb *req, *tmp;
1070         LIST_HEAD(done);
1071         bool spin;
1072         int ret;
1073
1074         /*
1075          * Only spin for completions if we don't have multiple devices hanging
1076          * off our complete list, and we're under the requested amount.
1077          */
1078         spin = !ctx->poll_multi_file && *nr_events < min;
1079
1080         ret = 0;
1081         list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
1082                 struct kiocb *kiocb = &req->rw;
1083
1084                 /*
1085                  * Move completed entries to our local list. If we find a
1086                  * request that requires polling, break out and complete
1087                  * the done list first, if we have entries there.
1088                  */
1089                 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
1090                         list_move_tail(&req->list, &done);
1091                         continue;
1092                 }
1093                 if (!list_empty(&done))
1094                         break;
1095
1096                 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
1097                 if (ret < 0)
1098                         break;
1099
1100                 if (ret && spin)
1101                         spin = false;
1102                 ret = 0;
1103         }
1104
1105         if (!list_empty(&done))
1106                 io_iopoll_complete(ctx, nr_events, &done);
1107
1108         return ret;
1109 }
1110
1111 /*
1112  * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a
1113  * non-spinning poll check - we'll still enter the driver poll loop, but only
1114  * as a non-spinning completion check.
1115  */
1116 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
1117                                 long min)
1118 {
1119         while (!list_empty(&ctx->poll_list) && !need_resched()) {
1120                 int ret;
1121
1122                 ret = io_do_iopoll(ctx, nr_events, min);
1123                 if (ret < 0)
1124                         return ret;
1125                 if (!min || *nr_events >= min)
1126                         return 0;
1127         }
1128
1129         return 1;
1130 }
1131
1132 /*
1133  * We can't just wait for polled events to come to us, we have to actively
1134  * find and complete them.
1135  */
1136 static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
1137 {
1138         if (!(ctx->flags & IORING_SETUP_IOPOLL))
1139                 return;
1140
1141         mutex_lock(&ctx->uring_lock);
1142         while (!list_empty(&ctx->poll_list)) {
1143                 unsigned int nr_events = 0;
1144
1145                 io_iopoll_getevents(ctx, &nr_events, 1);
1146
1147                 /*
1148                  * Ensure we allow local-to-the-cpu processing to take place,
1149                  * in this case we need to ensure that we reap all events.
1150                  */
1151                 cond_resched();
1152         }
1153         mutex_unlock(&ctx->uring_lock);
1154 }
1155
1156 static int __io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
1157                             long min)
1158 {
1159         int iters = 0, ret = 0;
1160
1161         do {
1162                 int tmin = 0;
1163
1164                 /*
1165                  * Don't enter poll loop if we already have events pending.
1166                  * If we do, we can potentially be spinning for commands that
1167                  * already triggered a CQE (eg in error).
1168                  */
1169                 if (io_cqring_events(ctx, false))
1170                         break;
1171
1172                 /*
1173                  * If a submit got punted to a workqueue, we can have the
1174                  * application entering polling for a command before it gets
1175                  * issued. That app will hold the uring_lock for the duration
1176                  * of the poll right here, so we need to take a breather every
1177                  * now and then to ensure that the issue has a chance to add
1178                  * the poll to the issued list. Otherwise we can spin here
1179                  * forever, while the workqueue is stuck trying to acquire the
1180                  * very same mutex.
1181                  */
1182                 if (!(++iters & 7)) {
1183                         mutex_unlock(&ctx->uring_lock);
1184                         mutex_lock(&ctx->uring_lock);
1185                 }
1186
1187                 if (*nr_events < min)
1188                         tmin = min - *nr_events;
1189
1190                 ret = io_iopoll_getevents(ctx, nr_events, tmin);
1191                 if (ret <= 0)
1192                         break;
1193                 ret = 0;
1194         } while (min && !*nr_events && !need_resched());
1195
1196         return ret;
1197 }
1198
1199 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
1200                            long min)
1201 {
1202         int ret;
1203
1204         /*
1205          * We disallow the app entering submit/complete with polling, but we
1206          * still need to lock the ring to prevent racing with polled issue
1207          * that got punted to a workqueue.
1208          */
1209         mutex_lock(&ctx->uring_lock);
1210         ret = __io_iopoll_check(ctx, nr_events, min);
1211         mutex_unlock(&ctx->uring_lock);
1212         return ret;
1213 }
1214
1215 static void kiocb_end_write(struct io_kiocb *req)
1216 {
1217         /*
1218          * Tell lockdep we inherited freeze protection from submission
1219          * thread.
1220          */
1221         if (req->flags & REQ_F_ISREG) {
1222                 struct inode *inode = file_inode(req->file);
1223
1224                 __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
1225         }
1226         file_end_write(req->file);
1227 }
1228
1229 static void io_complete_rw_common(struct kiocb *kiocb, long res)
1230 {
1231         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
1232
1233         if (kiocb->ki_flags & IOCB_WRITE)
1234                 kiocb_end_write(req);
1235
1236         if ((req->flags & REQ_F_LINK) && res != req->result)
1237                 req->flags |= REQ_F_FAIL_LINK;
1238         io_cqring_add_event(req, res);
1239 }
1240
1241 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
1242 {
1243         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
1244
1245         io_complete_rw_common(kiocb, res);
1246         io_put_req(req);
1247 }
1248
1249 static struct io_kiocb *__io_complete_rw(struct kiocb *kiocb, long res)
1250 {
1251         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
1252         struct io_kiocb *nxt = NULL;
1253
1254         io_complete_rw_common(kiocb, res);
1255         io_put_req_find_next(req, &nxt);
1256
1257         return nxt;
1258 }
1259
1260 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
1261 {
1262         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
1263
1264         if (kiocb->ki_flags & IOCB_WRITE)
1265                 kiocb_end_write(req);
1266
1267         if ((req->flags & REQ_F_LINK) && res != req->result)
1268                 req->flags |= REQ_F_FAIL_LINK;
1269         req->result = res;
1270         if (res != -EAGAIN)
1271                 req->flags |= REQ_F_IOPOLL_COMPLETED;
1272 }
1273
1274 /*
1275  * After the iocb has been issued, it's safe to be found on the poll list.
1276  * Adding the kiocb to the list AFTER submission ensures that we don't
1277  * find it from a io_iopoll_getevents() thread before the issuer is done
1278  * accessing the kiocb cookie.
1279  */
1280 static void io_iopoll_req_issued(struct io_kiocb *req)
1281 {
1282         struct io_ring_ctx *ctx = req->ctx;
1283
1284         /*
1285          * Track whether we have multiple files in our lists. This will impact
1286          * how we do polling eventually, not spinning if we're on potentially
1287          * different devices.
1288          */
1289         if (list_empty(&ctx->poll_list)) {
1290                 ctx->poll_multi_file = false;
1291         } else if (!ctx->poll_multi_file) {
1292                 struct io_kiocb *list_req;
1293
1294                 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
1295                                                 list);
1296                 if (list_req->rw.ki_filp != req->rw.ki_filp)
1297                         ctx->poll_multi_file = true;
1298         }
1299
1300         /*
1301          * For fast devices, IO may have already completed. If it has, add
1302          * it to the front so we find it first.
1303          */
1304         if (req->flags & REQ_F_IOPOLL_COMPLETED)
1305                 list_add(&req->list, &ctx->poll_list);
1306         else
1307                 list_add_tail(&req->list, &ctx->poll_list);
1308 }
1309
1310 static void io_file_put(struct io_submit_state *state)
1311 {
1312         if (state->file) {
1313                 int diff = state->has_refs - state->used_refs;
1314
1315                 if (diff)
1316                         fput_many(state->file, diff);
1317                 state->file = NULL;
1318         }
1319 }
1320
1321 /*
1322  * Get as many references to a file as we have IOs left in this submission,
1323  * assuming most submissions are for one file, or at least that each file
1324  * has more than one submission.
1325  */
1326 static struct file *io_file_get(struct io_submit_state *state, int fd)
1327 {
1328         if (!state)
1329                 return fget(fd);
1330
1331         if (state->file) {
1332                 if (state->fd == fd) {
1333                         state->used_refs++;
1334                         state->ios_left--;
1335                         return state->file;
1336                 }
1337                 io_file_put(state);
1338         }
1339         state->file = fget_many(fd, state->ios_left);
1340         if (!state->file)
1341                 return NULL;
1342
1343         state->fd = fd;
1344         state->has_refs = state->ios_left;
1345         state->used_refs = 1;
1346         state->ios_left--;
1347         return state->file;
1348 }
1349
1350 /*
1351  * If we tracked the file through the SCM inflight mechanism, we could support
1352  * any file. For now, just ensure that anything potentially problematic is done
1353  * inline.
1354  */
1355 static bool io_file_supports_async(struct file *file)
1356 {
1357         umode_t mode = file_inode(file)->i_mode;
1358
1359         if (S_ISBLK(mode) || S_ISCHR(mode))
1360                 return true;
1361         if (S_ISREG(mode) && file->f_op != &io_uring_fops)
1362                 return true;
1363
1364         return false;
1365 }
1366
1367 static int io_prep_rw(struct io_kiocb *req, bool force_nonblock)
1368 {
1369         const struct io_uring_sqe *sqe = req->submit.sqe;
1370         struct io_ring_ctx *ctx = req->ctx;
1371         struct kiocb *kiocb = &req->rw;
1372         unsigned ioprio;
1373         int ret;
1374
1375         if (!req->file)
1376                 return -EBADF;
1377
1378         if (S_ISREG(file_inode(req->file)->i_mode))
1379                 req->flags |= REQ_F_ISREG;
1380
1381         /*
1382          * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
1383          * we know to async punt it even if it was opened O_NONBLOCK
1384          */
1385         if (force_nonblock && !io_file_supports_async(req->file)) {
1386                 req->flags |= REQ_F_MUST_PUNT;
1387                 return -EAGAIN;
1388         }
1389
1390         kiocb->ki_pos = READ_ONCE(sqe->off);
1391         kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
1392         kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
1393
1394         ioprio = READ_ONCE(sqe->ioprio);
1395         if (ioprio) {
1396                 ret = ioprio_check_cap(ioprio);
1397                 if (ret)
1398                         return ret;
1399
1400                 kiocb->ki_ioprio = ioprio;
1401         } else
1402                 kiocb->ki_ioprio = get_current_ioprio();
1403
1404         ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
1405         if (unlikely(ret))
1406                 return ret;
1407
1408         /* don't allow async punt if RWF_NOWAIT was requested */
1409         if ((kiocb->ki_flags & IOCB_NOWAIT) ||
1410             (req->file->f_flags & O_NONBLOCK))
1411                 req->flags |= REQ_F_NOWAIT;
1412
1413         if (force_nonblock)
1414                 kiocb->ki_flags |= IOCB_NOWAIT;
1415
1416         if (ctx->flags & IORING_SETUP_IOPOLL) {
1417                 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
1418                     !kiocb->ki_filp->f_op->iopoll)
1419                         return -EOPNOTSUPP;
1420
1421                 kiocb->ki_flags |= IOCB_HIPRI;
1422                 kiocb->ki_complete = io_complete_rw_iopoll;
1423                 req->result = 0;
1424         } else {
1425                 if (kiocb->ki_flags & IOCB_HIPRI)
1426                         return -EINVAL;
1427                 kiocb->ki_complete = io_complete_rw;
1428         }
1429         return 0;
1430 }
1431
1432 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
1433 {
1434         switch (ret) {
1435         case -EIOCBQUEUED:
1436                 break;
1437         case -ERESTARTSYS:
1438         case -ERESTARTNOINTR:
1439         case -ERESTARTNOHAND:
1440         case -ERESTART_RESTARTBLOCK:
1441                 /*
1442                  * We can't just restart the syscall, since previously
1443                  * submitted sqes may already be in progress. Just fail this
1444                  * IO with EINTR.
1445                  */
1446                 ret = -EINTR;
1447                 /* fall through */
1448         default:
1449                 kiocb->ki_complete(kiocb, ret, 0);
1450         }
1451 }
1452
1453 static void kiocb_done(struct kiocb *kiocb, ssize_t ret, struct io_kiocb **nxt,
1454                        bool in_async)
1455 {
1456         if (in_async && ret >= 0 && nxt && kiocb->ki_complete == io_complete_rw)
1457                 *nxt = __io_complete_rw(kiocb, ret);
1458         else
1459                 io_rw_done(kiocb, ret);
1460 }
1461
1462 static int io_import_fixed(struct io_ring_ctx *ctx, int rw,
1463                            const struct io_uring_sqe *sqe,
1464                            struct iov_iter *iter)
1465 {
1466         size_t len = READ_ONCE(sqe->len);
1467         struct io_mapped_ubuf *imu;
1468         unsigned index, buf_index;
1469         size_t offset;
1470         u64 buf_addr;
1471
1472         /* attempt to use fixed buffers without having provided iovecs */
1473         if (unlikely(!ctx->user_bufs))
1474                 return -EFAULT;
1475
1476         buf_index = READ_ONCE(sqe->buf_index);
1477         if (unlikely(buf_index >= ctx->nr_user_bufs))
1478                 return -EFAULT;
1479
1480         index = array_index_nospec(buf_index, ctx->nr_user_bufs);
1481         imu = &ctx->user_bufs[index];
1482         buf_addr = READ_ONCE(sqe->addr);
1483
1484         /* overflow */
1485         if (buf_addr + len < buf_addr)
1486                 return -EFAULT;
1487         /* not inside the mapped region */
1488         if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
1489                 return -EFAULT;
1490
1491         /*
1492          * May not be a start of buffer, set size appropriately
1493          * and advance us to the beginning.
1494          */
1495         offset = buf_addr - imu->ubuf;
1496         iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
1497
1498         if (offset) {
1499                 /*
1500                  * Don't use iov_iter_advance() here, as it's really slow for
1501                  * using the latter parts of a big fixed buffer - it iterates
1502                  * over each segment manually. We can cheat a bit here, because
1503                  * we know that:
1504                  *
1505                  * 1) it's a BVEC iter, we set it up
1506                  * 2) all bvecs are PAGE_SIZE in size, except potentially the
1507                  *    first and last bvec
1508                  *
1509                  * So just find our index, and adjust the iterator afterwards.
1510                  * If the offset is within the first bvec (or the whole first
1511                  * bvec, just use iov_iter_advance(). This makes it easier
1512                  * since we can just skip the first segment, which may not
1513                  * be PAGE_SIZE aligned.
1514                  */
1515                 const struct bio_vec *bvec = imu->bvec;
1516
1517                 if (offset <= bvec->bv_len) {
1518                         iov_iter_advance(iter, offset);
1519                 } else {
1520                         unsigned long seg_skip;
1521
1522                         /* skip first vec */
1523                         offset -= bvec->bv_len;
1524                         seg_skip = 1 + (offset >> PAGE_SHIFT);
1525
1526                         iter->bvec = bvec + seg_skip;
1527                         iter->nr_segs -= seg_skip;
1528                         iter->count -= bvec->bv_len + offset;
1529                         iter->iov_offset = offset & ~PAGE_MASK;
1530                 }
1531         }
1532
1533         return len;
1534 }
1535
1536 static ssize_t io_import_iovec(struct io_ring_ctx *ctx, int rw,
1537                                const struct sqe_submit *s, struct iovec **iovec,
1538                                struct iov_iter *iter)
1539 {
1540         const struct io_uring_sqe *sqe = s->sqe;
1541         void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr));
1542         size_t sqe_len = READ_ONCE(sqe->len);
1543         u8 opcode;
1544
1545         /*
1546          * We're reading ->opcode for the second time, but the first read
1547          * doesn't care whether it's _FIXED or not, so it doesn't matter
1548          * whether ->opcode changes concurrently. The first read does care
1549          * about whether it is a READ or a WRITE, so we don't trust this read
1550          * for that purpose and instead let the caller pass in the read/write
1551          * flag.
1552          */
1553         opcode = READ_ONCE(sqe->opcode);
1554         if (opcode == IORING_OP_READ_FIXED ||
1555             opcode == IORING_OP_WRITE_FIXED) {
1556                 ssize_t ret = io_import_fixed(ctx, rw, sqe, iter);
1557                 *iovec = NULL;
1558                 return ret;
1559         }
1560
1561         if (!s->has_user)
1562                 return -EFAULT;
1563
1564 #ifdef CONFIG_COMPAT
1565         if (ctx->compat)
1566                 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
1567                                                 iovec, iter);
1568 #endif
1569
1570         return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
1571 }
1572
1573 /*
1574  * For files that don't have ->read_iter() and ->write_iter(), handle them
1575  * by looping over ->read() or ->write() manually.
1576  */
1577 static ssize_t loop_rw_iter(int rw, struct file *file, struct kiocb *kiocb,
1578                            struct iov_iter *iter)
1579 {
1580         ssize_t ret = 0;
1581
1582         /*
1583          * Don't support polled IO through this interface, and we can't
1584          * support non-blocking either. For the latter, this just causes
1585          * the kiocb to be handled from an async context.
1586          */
1587         if (kiocb->ki_flags & IOCB_HIPRI)
1588                 return -EOPNOTSUPP;
1589         if (kiocb->ki_flags & IOCB_NOWAIT)
1590                 return -EAGAIN;
1591
1592         while (iov_iter_count(iter)) {
1593                 struct iovec iovec = iov_iter_iovec(iter);
1594                 ssize_t nr;
1595
1596                 if (rw == READ) {
1597                         nr = file->f_op->read(file, iovec.iov_base,
1598                                               iovec.iov_len, &kiocb->ki_pos);
1599                 } else {
1600                         nr = file->f_op->write(file, iovec.iov_base,
1601                                                iovec.iov_len, &kiocb->ki_pos);
1602                 }
1603
1604                 if (nr < 0) {
1605                         if (!ret)
1606                                 ret = nr;
1607                         break;
1608                 }
1609                 ret += nr;
1610                 if (nr != iovec.iov_len)
1611                         break;
1612                 iov_iter_advance(iter, nr);
1613         }
1614
1615         return ret;
1616 }
1617
1618 static int io_read(struct io_kiocb *req, struct io_kiocb **nxt,
1619                    bool force_nonblock)
1620 {
1621         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1622         struct kiocb *kiocb = &req->rw;
1623         struct iov_iter iter;
1624         struct file *file;
1625         size_t iov_count;
1626         ssize_t read_size, ret;
1627
1628         ret = io_prep_rw(req, force_nonblock);
1629         if (ret)
1630                 return ret;
1631         file = kiocb->ki_filp;
1632
1633         if (unlikely(!(file->f_mode & FMODE_READ)))
1634                 return -EBADF;
1635
1636         ret = io_import_iovec(req->ctx, READ, &req->submit, &iovec, &iter);
1637         if (ret < 0)
1638                 return ret;
1639
1640         read_size = ret;
1641         if (req->flags & REQ_F_LINK)
1642                 req->result = read_size;
1643
1644         iov_count = iov_iter_count(&iter);
1645         ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count);
1646         if (!ret) {
1647                 ssize_t ret2;
1648
1649                 if (file->f_op->read_iter)
1650                         ret2 = call_read_iter(file, kiocb, &iter);
1651                 else
1652                         ret2 = loop_rw_iter(READ, file, kiocb, &iter);
1653
1654                 /*
1655                  * In case of a short read, punt to async. This can happen
1656                  * if we have data partially cached. Alternatively we can
1657                  * return the short read, in which case the application will
1658                  * need to issue another SQE and wait for it. That SQE will
1659                  * need async punt anyway, so it's more efficient to do it
1660                  * here.
1661                  */
1662                 if (force_nonblock && !(req->flags & REQ_F_NOWAIT) &&
1663                     (req->flags & REQ_F_ISREG) &&
1664                     ret2 > 0 && ret2 < read_size)
1665                         ret2 = -EAGAIN;
1666                 /* Catch -EAGAIN return for forced non-blocking submission */
1667                 if (!force_nonblock || ret2 != -EAGAIN)
1668                         kiocb_done(kiocb, ret2, nxt, req->submit.in_async);
1669                 else
1670                         ret = -EAGAIN;
1671         }
1672         kfree(iovec);
1673         return ret;
1674 }
1675
1676 static int io_write(struct io_kiocb *req, struct io_kiocb **nxt,
1677                     bool force_nonblock)
1678 {
1679         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1680         struct kiocb *kiocb = &req->rw;
1681         struct iov_iter iter;
1682         struct file *file;
1683         size_t iov_count;
1684         ssize_t ret;
1685
1686         ret = io_prep_rw(req, force_nonblock);
1687         if (ret)
1688                 return ret;
1689
1690         file = kiocb->ki_filp;
1691         if (unlikely(!(file->f_mode & FMODE_WRITE)))
1692                 return -EBADF;
1693
1694         ret = io_import_iovec(req->ctx, WRITE, &req->submit, &iovec, &iter);
1695         if (ret < 0)
1696                 return ret;
1697
1698         if (req->flags & REQ_F_LINK)
1699                 req->result = ret;
1700
1701         iov_count = iov_iter_count(&iter);
1702
1703         ret = -EAGAIN;
1704         if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT))
1705                 goto out_free;
1706
1707         ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count);
1708         if (!ret) {
1709                 ssize_t ret2;
1710
1711                 /*
1712                  * Open-code file_start_write here to grab freeze protection,
1713                  * which will be released by another thread in
1714                  * io_complete_rw().  Fool lockdep by telling it the lock got
1715                  * released so that it doesn't complain about the held lock when
1716                  * we return to userspace.
1717                  */
1718                 if (req->flags & REQ_F_ISREG) {
1719                         __sb_start_write(file_inode(file)->i_sb,
1720                                                 SB_FREEZE_WRITE, true);
1721                         __sb_writers_release(file_inode(file)->i_sb,
1722                                                 SB_FREEZE_WRITE);
1723                 }
1724                 kiocb->ki_flags |= IOCB_WRITE;
1725
1726                 if (file->f_op->write_iter)
1727                         ret2 = call_write_iter(file, kiocb, &iter);
1728                 else
1729                         ret2 = loop_rw_iter(WRITE, file, kiocb, &iter);
1730                 if (!force_nonblock || ret2 != -EAGAIN)
1731                         kiocb_done(kiocb, ret2, nxt, req->submit.in_async);
1732                 else
1733                         ret = -EAGAIN;
1734         }
1735 out_free:
1736         kfree(iovec);
1737         return ret;
1738 }
1739
1740 /*
1741  * IORING_OP_NOP just posts a completion event, nothing else.
1742  */
1743 static int io_nop(struct io_kiocb *req)
1744 {
1745         struct io_ring_ctx *ctx = req->ctx;
1746
1747         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1748                 return -EINVAL;
1749
1750         io_cqring_add_event(req, 0);
1751         io_put_req(req);
1752         return 0;
1753 }
1754
1755 static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1756 {
1757         struct io_ring_ctx *ctx = req->ctx;
1758
1759         if (!req->file)
1760                 return -EBADF;
1761
1762         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1763                 return -EINVAL;
1764         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1765                 return -EINVAL;
1766
1767         return 0;
1768 }
1769
1770 static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1771                     struct io_kiocb **nxt, bool force_nonblock)
1772 {
1773         loff_t sqe_off = READ_ONCE(sqe->off);
1774         loff_t sqe_len = READ_ONCE(sqe->len);
1775         loff_t end = sqe_off + sqe_len;
1776         unsigned fsync_flags;
1777         int ret;
1778
1779         fsync_flags = READ_ONCE(sqe->fsync_flags);
1780         if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC))
1781                 return -EINVAL;
1782
1783         ret = io_prep_fsync(req, sqe);
1784         if (ret)
1785                 return ret;
1786
1787         /* fsync always requires a blocking context */
1788         if (force_nonblock)
1789                 return -EAGAIN;
1790
1791         ret = vfs_fsync_range(req->rw.ki_filp, sqe_off,
1792                                 end > 0 ? end : LLONG_MAX,
1793                                 fsync_flags & IORING_FSYNC_DATASYNC);
1794
1795         if (ret < 0 && (req->flags & REQ_F_LINK))
1796                 req->flags |= REQ_F_FAIL_LINK;
1797         io_cqring_add_event(req, ret);
1798         io_put_req_find_next(req, nxt);
1799         return 0;
1800 }
1801
1802 static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1803 {
1804         struct io_ring_ctx *ctx = req->ctx;
1805         int ret = 0;
1806
1807         if (!req->file)
1808                 return -EBADF;
1809
1810         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1811                 return -EINVAL;
1812         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1813                 return -EINVAL;
1814
1815         return ret;
1816 }
1817
1818 static int io_sync_file_range(struct io_kiocb *req,
1819                               const struct io_uring_sqe *sqe,
1820                               struct io_kiocb **nxt,
1821                               bool force_nonblock)
1822 {
1823         loff_t sqe_off;
1824         loff_t sqe_len;
1825         unsigned flags;
1826         int ret;
1827
1828         ret = io_prep_sfr(req, sqe);
1829         if (ret)
1830                 return ret;
1831
1832         /* sync_file_range always requires a blocking context */
1833         if (force_nonblock)
1834                 return -EAGAIN;
1835
1836         sqe_off = READ_ONCE(sqe->off);
1837         sqe_len = READ_ONCE(sqe->len);
1838         flags = READ_ONCE(sqe->sync_range_flags);
1839
1840         ret = sync_file_range(req->rw.ki_filp, sqe_off, sqe_len, flags);
1841
1842         if (ret < 0 && (req->flags & REQ_F_LINK))
1843                 req->flags |= REQ_F_FAIL_LINK;
1844         io_cqring_add_event(req, ret);
1845         io_put_req_find_next(req, nxt);
1846         return 0;
1847 }
1848
1849 #if defined(CONFIG_NET)
1850 static int io_send_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1851                            struct io_kiocb **nxt, bool force_nonblock,
1852                    long (*fn)(struct socket *, struct user_msghdr __user *,
1853                                 unsigned int))
1854 {
1855         struct socket *sock;
1856         int ret;
1857
1858         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1859                 return -EINVAL;
1860
1861         sock = sock_from_file(req->file, &ret);
1862         if (sock) {
1863                 struct user_msghdr __user *msg;
1864                 unsigned flags;
1865
1866                 flags = READ_ONCE(sqe->msg_flags);
1867                 if (flags & MSG_DONTWAIT)
1868                         req->flags |= REQ_F_NOWAIT;
1869                 else if (force_nonblock)
1870                         flags |= MSG_DONTWAIT;
1871
1872                 msg = (struct user_msghdr __user *) (unsigned long)
1873                         READ_ONCE(sqe->addr);
1874
1875                 ret = fn(sock, msg, flags);
1876                 if (force_nonblock && ret == -EAGAIN)
1877                         return ret;
1878         }
1879
1880         io_cqring_add_event(req, ret);
1881         if (ret < 0 && (req->flags & REQ_F_LINK))
1882                 req->flags |= REQ_F_FAIL_LINK;
1883         io_put_req_find_next(req, nxt);
1884         return 0;
1885 }
1886 #endif
1887
1888 static int io_sendmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1889                       struct io_kiocb **nxt, bool force_nonblock)
1890 {
1891 #if defined(CONFIG_NET)
1892         return io_send_recvmsg(req, sqe, nxt, force_nonblock,
1893                                 __sys_sendmsg_sock);
1894 #else
1895         return -EOPNOTSUPP;
1896 #endif
1897 }
1898
1899 static int io_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1900                       struct io_kiocb **nxt, bool force_nonblock)
1901 {
1902 #if defined(CONFIG_NET)
1903         return io_send_recvmsg(req, sqe, nxt, force_nonblock,
1904                                 __sys_recvmsg_sock);
1905 #else
1906         return -EOPNOTSUPP;
1907 #endif
1908 }
1909
1910 static int io_accept(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1911                      struct io_kiocb **nxt, bool force_nonblock)
1912 {
1913 #if defined(CONFIG_NET)
1914         struct sockaddr __user *addr;
1915         int __user *addr_len;
1916         unsigned file_flags;
1917         int flags, ret;
1918
1919         if (unlikely(req->ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL)))
1920                 return -EINVAL;
1921         if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
1922                 return -EINVAL;
1923
1924         addr = (struct sockaddr __user *) (unsigned long) READ_ONCE(sqe->addr);
1925         addr_len = (int __user *) (unsigned long) READ_ONCE(sqe->addr2);
1926         flags = READ_ONCE(sqe->accept_flags);
1927         file_flags = force_nonblock ? O_NONBLOCK : 0;
1928
1929         ret = __sys_accept4_file(req->file, file_flags, addr, addr_len, flags);
1930         if (ret == -EAGAIN && force_nonblock) {
1931                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
1932                 return -EAGAIN;
1933         }
1934         if (ret == -ERESTARTSYS)
1935                 ret = -EINTR;
1936         if (ret < 0 && (req->flags & REQ_F_LINK))
1937                 req->flags |= REQ_F_FAIL_LINK;
1938         io_cqring_add_event(req, ret);
1939         io_put_req_find_next(req, nxt);
1940         return 0;
1941 #else
1942         return -EOPNOTSUPP;
1943 #endif
1944 }
1945
1946 static inline void io_poll_remove_req(struct io_kiocb *req)
1947 {
1948         if (!RB_EMPTY_NODE(&req->rb_node)) {
1949                 rb_erase(&req->rb_node, &req->ctx->cancel_tree);
1950                 RB_CLEAR_NODE(&req->rb_node);
1951         }
1952 }
1953
1954 static void io_poll_remove_one(struct io_kiocb *req)
1955 {
1956         struct io_poll_iocb *poll = &req->poll;
1957
1958         spin_lock(&poll->head->lock);
1959         WRITE_ONCE(poll->canceled, true);
1960         if (!list_empty(&poll->wait.entry)) {
1961                 list_del_init(&poll->wait.entry);
1962                 io_queue_async_work(req);
1963         }
1964         spin_unlock(&poll->head->lock);
1965         io_poll_remove_req(req);
1966 }
1967
1968 static void io_poll_remove_all(struct io_ring_ctx *ctx)
1969 {
1970         struct rb_node *node;
1971         struct io_kiocb *req;
1972
1973         spin_lock_irq(&ctx->completion_lock);
1974         while ((node = rb_first(&ctx->cancel_tree)) != NULL) {
1975                 req = rb_entry(node, struct io_kiocb, rb_node);
1976                 io_poll_remove_one(req);
1977         }
1978         spin_unlock_irq(&ctx->completion_lock);
1979 }
1980
1981 static int io_poll_cancel(struct io_ring_ctx *ctx, __u64 sqe_addr)
1982 {
1983         struct rb_node *p, *parent = NULL;
1984         struct io_kiocb *req;
1985
1986         p = ctx->cancel_tree.rb_node;
1987         while (p) {
1988                 parent = p;
1989                 req = rb_entry(parent, struct io_kiocb, rb_node);
1990                 if (sqe_addr < req->user_data) {
1991                         p = p->rb_left;
1992                 } else if (sqe_addr > req->user_data) {
1993                         p = p->rb_right;
1994                 } else {
1995                         io_poll_remove_one(req);
1996                         return 0;
1997                 }
1998         }
1999
2000         return -ENOENT;
2001 }
2002
2003 /*
2004  * Find a running poll command that matches one specified in sqe->addr,
2005  * and remove it if found.
2006  */
2007 static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2008 {
2009         struct io_ring_ctx *ctx = req->ctx;
2010         int ret;
2011
2012         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2013                 return -EINVAL;
2014         if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
2015             sqe->poll_events)
2016                 return -EINVAL;
2017
2018         spin_lock_irq(&ctx->completion_lock);
2019         ret = io_poll_cancel(ctx, READ_ONCE(sqe->addr));
2020         spin_unlock_irq(&ctx->completion_lock);
2021
2022         io_cqring_add_event(req, ret);
2023         if (ret < 0 && (req->flags & REQ_F_LINK))
2024                 req->flags |= REQ_F_FAIL_LINK;
2025         io_put_req(req);
2026         return 0;
2027 }
2028
2029 static void io_poll_complete(struct io_kiocb *req, __poll_t mask)
2030 {
2031         struct io_ring_ctx *ctx = req->ctx;
2032
2033         req->poll.done = true;
2034         io_cqring_fill_event(req, mangle_poll(mask));
2035         io_commit_cqring(ctx);
2036 }
2037
2038 static void io_poll_complete_work(struct io_wq_work **workptr)
2039 {
2040         struct io_wq_work *work = *workptr;
2041         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
2042         struct io_poll_iocb *poll = &req->poll;
2043         struct poll_table_struct pt = { ._key = poll->events };
2044         struct io_ring_ctx *ctx = req->ctx;
2045         struct io_kiocb *nxt = NULL;
2046         __poll_t mask = 0;
2047
2048         if (work->flags & IO_WQ_WORK_CANCEL)
2049                 WRITE_ONCE(poll->canceled, true);
2050
2051         if (!READ_ONCE(poll->canceled))
2052                 mask = vfs_poll(poll->file, &pt) & poll->events;
2053
2054         /*
2055          * Note that ->ki_cancel callers also delete iocb from active_reqs after
2056          * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
2057          * synchronize with them.  In the cancellation case the list_del_init
2058          * itself is not actually needed, but harmless so we keep it in to
2059          * avoid further branches in the fast path.
2060          */
2061         spin_lock_irq(&ctx->completion_lock);
2062         if (!mask && !READ_ONCE(poll->canceled)) {
2063                 add_wait_queue(poll->head, &poll->wait);
2064                 spin_unlock_irq(&ctx->completion_lock);
2065                 return;
2066         }
2067         io_poll_remove_req(req);
2068         io_poll_complete(req, mask);
2069         spin_unlock_irq(&ctx->completion_lock);
2070
2071         io_cqring_ev_posted(ctx);
2072
2073         io_put_req_find_next(req, &nxt);
2074         if (nxt)
2075                 *workptr = &nxt->work;
2076 }
2077
2078 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
2079                         void *key)
2080 {
2081         struct io_poll_iocb *poll = container_of(wait, struct io_poll_iocb,
2082                                                         wait);
2083         struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
2084         struct io_ring_ctx *ctx = req->ctx;
2085         __poll_t mask = key_to_poll(key);
2086         unsigned long flags;
2087
2088         /* for instances that support it check for an event match first: */
2089         if (mask && !(mask & poll->events))
2090                 return 0;
2091
2092         list_del_init(&poll->wait.entry);
2093
2094         /*
2095          * Run completion inline if we can. We're using trylock here because
2096          * we are violating the completion_lock -> poll wq lock ordering.
2097          * If we have a link timeout we're going to need the completion_lock
2098          * for finalizing the request, mark us as having grabbed that already.
2099          */
2100         if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
2101                 io_poll_remove_req(req);
2102                 io_poll_complete(req, mask);
2103                 req->flags |= REQ_F_COMP_LOCKED;
2104                 io_put_req(req);
2105                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
2106
2107                 io_cqring_ev_posted(ctx);
2108         } else {
2109                 io_queue_async_work(req);
2110         }
2111
2112         return 1;
2113 }
2114
2115 struct io_poll_table {
2116         struct poll_table_struct pt;
2117         struct io_kiocb *req;
2118         int error;
2119 };
2120
2121 static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
2122                                struct poll_table_struct *p)
2123 {
2124         struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
2125
2126         if (unlikely(pt->req->poll.head)) {
2127                 pt->error = -EINVAL;
2128                 return;
2129         }
2130
2131         pt->error = 0;
2132         pt->req->poll.head = head;
2133         add_wait_queue(head, &pt->req->poll.wait);
2134 }
2135
2136 static void io_poll_req_insert(struct io_kiocb *req)
2137 {
2138         struct io_ring_ctx *ctx = req->ctx;
2139         struct rb_node **p = &ctx->cancel_tree.rb_node;
2140         struct rb_node *parent = NULL;
2141         struct io_kiocb *tmp;
2142
2143         while (*p) {
2144                 parent = *p;
2145                 tmp = rb_entry(parent, struct io_kiocb, rb_node);
2146                 if (req->user_data < tmp->user_data)
2147                         p = &(*p)->rb_left;
2148                 else
2149                         p = &(*p)->rb_right;
2150         }
2151         rb_link_node(&req->rb_node, parent, p);
2152         rb_insert_color(&req->rb_node, &ctx->cancel_tree);
2153 }
2154
2155 static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2156                        struct io_kiocb **nxt)
2157 {
2158         struct io_poll_iocb *poll = &req->poll;
2159         struct io_ring_ctx *ctx = req->ctx;
2160         struct io_poll_table ipt;
2161         bool cancel = false;
2162         __poll_t mask;
2163         u16 events;
2164
2165         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2166                 return -EINVAL;
2167         if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
2168                 return -EINVAL;
2169         if (!poll->file)
2170                 return -EBADF;
2171
2172         req->submit.sqe = NULL;
2173         INIT_IO_WORK(&req->work, io_poll_complete_work);
2174         events = READ_ONCE(sqe->poll_events);
2175         poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
2176         RB_CLEAR_NODE(&req->rb_node);
2177
2178         poll->head = NULL;
2179         poll->done = false;
2180         poll->canceled = false;
2181
2182         ipt.pt._qproc = io_poll_queue_proc;
2183         ipt.pt._key = poll->events;
2184         ipt.req = req;
2185         ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
2186
2187         /* initialized the list so that we can do list_empty checks */
2188         INIT_LIST_HEAD(&poll->wait.entry);
2189         init_waitqueue_func_entry(&poll->wait, io_poll_wake);
2190
2191         INIT_LIST_HEAD(&req->list);
2192
2193         mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
2194
2195         spin_lock_irq(&ctx->completion_lock);
2196         if (likely(poll->head)) {
2197                 spin_lock(&poll->head->lock);
2198                 if (unlikely(list_empty(&poll->wait.entry))) {
2199                         if (ipt.error)
2200                                 cancel = true;
2201                         ipt.error = 0;
2202                         mask = 0;
2203                 }
2204                 if (mask || ipt.error)
2205                         list_del_init(&poll->wait.entry);
2206                 else if (cancel)
2207                         WRITE_ONCE(poll->canceled, true);
2208                 else if (!poll->done) /* actually waiting for an event */
2209                         io_poll_req_insert(req);
2210                 spin_unlock(&poll->head->lock);
2211         }
2212         if (mask) { /* no async, we'd stolen it */
2213                 ipt.error = 0;
2214                 io_poll_complete(req, mask);
2215         }
2216         spin_unlock_irq(&ctx->completion_lock);
2217
2218         if (mask) {
2219                 io_cqring_ev_posted(ctx);
2220                 io_put_req_find_next(req, nxt);
2221         }
2222         return ipt.error;
2223 }
2224
2225 static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
2226 {
2227         struct io_ring_ctx *ctx;
2228         struct io_kiocb *req;
2229         unsigned long flags;
2230
2231         req = container_of(timer, struct io_kiocb, timeout.timer);
2232         ctx = req->ctx;
2233         atomic_inc(&ctx->cq_timeouts);
2234
2235         spin_lock_irqsave(&ctx->completion_lock, flags);
2236         /*
2237          * We could be racing with timeout deletion. If the list is empty,
2238          * then timeout lookup already found it and will be handling it.
2239          */
2240         if (!list_empty(&req->list)) {
2241                 struct io_kiocb *prev;
2242
2243                 /*
2244                  * Adjust the reqs sequence before the current one because it
2245                  * will consume a slot in the cq_ring and the the cq_tail
2246                  * pointer will be increased, otherwise other timeout reqs may
2247                  * return in advance without waiting for enough wait_nr.
2248                  */
2249                 prev = req;
2250                 list_for_each_entry_continue_reverse(prev, &ctx->timeout_list, list)
2251                         prev->sequence++;
2252                 list_del_init(&req->list);
2253         }
2254
2255         io_cqring_fill_event(req, -ETIME);
2256         io_commit_cqring(ctx);
2257         spin_unlock_irqrestore(&ctx->completion_lock, flags);
2258
2259         io_cqring_ev_posted(ctx);
2260         if (req->flags & REQ_F_LINK)
2261                 req->flags |= REQ_F_FAIL_LINK;
2262         io_put_req(req);
2263         return HRTIMER_NORESTART;
2264 }
2265
2266 static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data)
2267 {
2268         struct io_kiocb *req;
2269         int ret = -ENOENT;
2270
2271         list_for_each_entry(req, &ctx->timeout_list, list) {
2272                 if (user_data == req->user_data) {
2273                         list_del_init(&req->list);
2274                         ret = 0;
2275                         break;
2276                 }
2277         }
2278
2279         if (ret == -ENOENT)
2280                 return ret;
2281
2282         ret = hrtimer_try_to_cancel(&req->timeout.timer);
2283         if (ret == -1)
2284                 return -EALREADY;
2285
2286         io_cqring_fill_event(req, -ECANCELED);
2287         io_put_req(req);
2288         return 0;
2289 }
2290
2291 /*
2292  * Remove or update an existing timeout command
2293  */
2294 static int io_timeout_remove(struct io_kiocb *req,
2295                              const struct io_uring_sqe *sqe)
2296 {
2297         struct io_ring_ctx *ctx = req->ctx;
2298         unsigned flags;
2299         int ret;
2300
2301         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2302                 return -EINVAL;
2303         if (sqe->flags || sqe->ioprio || sqe->buf_index || sqe->len)
2304                 return -EINVAL;
2305         flags = READ_ONCE(sqe->timeout_flags);
2306         if (flags)
2307                 return -EINVAL;
2308
2309         spin_lock_irq(&ctx->completion_lock);
2310         ret = io_timeout_cancel(ctx, READ_ONCE(sqe->addr));
2311
2312         io_cqring_fill_event(req, ret);
2313         io_commit_cqring(ctx);
2314         spin_unlock_irq(&ctx->completion_lock);
2315         io_cqring_ev_posted(ctx);
2316         if (ret < 0 && req->flags & REQ_F_LINK)
2317                 req->flags |= REQ_F_FAIL_LINK;
2318         io_put_req(req);
2319         return 0;
2320 }
2321
2322 static int io_timeout(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2323 {
2324         unsigned count;
2325         struct io_ring_ctx *ctx = req->ctx;
2326         struct list_head *entry;
2327         enum hrtimer_mode mode;
2328         struct timespec64 ts;
2329         unsigned span = 0;
2330         unsigned flags;
2331
2332         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2333                 return -EINVAL;
2334         if (sqe->flags || sqe->ioprio || sqe->buf_index || sqe->len != 1)
2335                 return -EINVAL;
2336         flags = READ_ONCE(sqe->timeout_flags);
2337         if (flags & ~IORING_TIMEOUT_ABS)
2338                 return -EINVAL;
2339
2340         if (get_timespec64(&ts, u64_to_user_ptr(sqe->addr)))
2341                 return -EFAULT;
2342
2343         if (flags & IORING_TIMEOUT_ABS)
2344                 mode = HRTIMER_MODE_ABS;
2345         else
2346                 mode = HRTIMER_MODE_REL;
2347
2348         hrtimer_init(&req->timeout.timer, CLOCK_MONOTONIC, mode);
2349         req->flags |= REQ_F_TIMEOUT;
2350
2351         /*
2352          * sqe->off holds how many events that need to occur for this
2353          * timeout event to be satisfied. If it isn't set, then this is
2354          * a pure timeout request, sequence isn't used.
2355          */
2356         count = READ_ONCE(sqe->off);
2357         if (!count) {
2358                 req->flags |= REQ_F_TIMEOUT_NOSEQ;
2359                 spin_lock_irq(&ctx->completion_lock);
2360                 entry = ctx->timeout_list.prev;
2361                 goto add;
2362         }
2363
2364         req->sequence = ctx->cached_sq_head + count - 1;
2365         /* reuse it to store the count */
2366         req->submit.sequence = count;
2367
2368         /*
2369          * Insertion sort, ensuring the first entry in the list is always
2370          * the one we need first.
2371          */
2372         spin_lock_irq(&ctx->completion_lock);
2373         list_for_each_prev(entry, &ctx->timeout_list) {
2374                 struct io_kiocb *nxt = list_entry(entry, struct io_kiocb, list);
2375                 unsigned nxt_sq_head;
2376                 long long tmp, tmp_nxt;
2377
2378                 if (nxt->flags & REQ_F_TIMEOUT_NOSEQ)
2379                         continue;
2380
2381                 /*
2382                  * Since cached_sq_head + count - 1 can overflow, use type long
2383                  * long to store it.
2384                  */
2385                 tmp = (long long)ctx->cached_sq_head + count - 1;
2386                 nxt_sq_head = nxt->sequence - nxt->submit.sequence + 1;
2387                 tmp_nxt = (long long)nxt_sq_head + nxt->submit.sequence - 1;
2388
2389                 /*
2390                  * cached_sq_head may overflow, and it will never overflow twice
2391                  * once there is some timeout req still be valid.
2392                  */
2393                 if (ctx->cached_sq_head < nxt_sq_head)
2394                         tmp += UINT_MAX;
2395
2396                 if (tmp > tmp_nxt)
2397                         break;
2398
2399                 /*
2400                  * Sequence of reqs after the insert one and itself should
2401                  * be adjusted because each timeout req consumes a slot.
2402                  */
2403                 span++;
2404                 nxt->sequence++;
2405         }
2406         req->sequence -= span;
2407 add:
2408         list_add(&req->list, entry);
2409         req->timeout.timer.function = io_timeout_fn;
2410         hrtimer_start(&req->timeout.timer, timespec64_to_ktime(ts), mode);
2411         spin_unlock_irq(&ctx->completion_lock);
2412         return 0;
2413 }
2414
2415 static bool io_cancel_cb(struct io_wq_work *work, void *data)
2416 {
2417         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
2418
2419         return req->user_data == (unsigned long) data;
2420 }
2421
2422 static int io_async_cancel_one(struct io_ring_ctx *ctx, void *sqe_addr)
2423 {
2424         enum io_wq_cancel cancel_ret;
2425         int ret = 0;
2426
2427         cancel_ret = io_wq_cancel_cb(ctx->io_wq, io_cancel_cb, sqe_addr);
2428         switch (cancel_ret) {
2429         case IO_WQ_CANCEL_OK:
2430                 ret = 0;
2431                 break;
2432         case IO_WQ_CANCEL_RUNNING:
2433                 ret = -EALREADY;
2434                 break;
2435         case IO_WQ_CANCEL_NOTFOUND:
2436                 ret = -ENOENT;
2437                 break;
2438         }
2439
2440         return ret;
2441 }
2442
2443 static void io_async_find_and_cancel(struct io_ring_ctx *ctx,
2444                                      struct io_kiocb *req, __u64 sqe_addr,
2445                                      struct io_kiocb **nxt)
2446 {
2447         unsigned long flags;
2448         int ret;
2449
2450         ret = io_async_cancel_one(ctx, (void *) (unsigned long) sqe_addr);
2451         if (ret != -ENOENT) {
2452                 spin_lock_irqsave(&ctx->completion_lock, flags);
2453                 goto done;
2454         }
2455
2456         spin_lock_irqsave(&ctx->completion_lock, flags);
2457         ret = io_timeout_cancel(ctx, sqe_addr);
2458         if (ret != -ENOENT)
2459                 goto done;
2460         ret = io_poll_cancel(ctx, sqe_addr);
2461 done:
2462         io_cqring_fill_event(req, ret);
2463         io_commit_cqring(ctx);
2464         spin_unlock_irqrestore(&ctx->completion_lock, flags);
2465         io_cqring_ev_posted(ctx);
2466
2467         if (ret < 0 && (req->flags & REQ_F_LINK))
2468                 req->flags |= REQ_F_FAIL_LINK;
2469         io_put_req_find_next(req, nxt);
2470 }
2471
2472 static int io_async_cancel(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2473                            struct io_kiocb **nxt)
2474 {
2475         struct io_ring_ctx *ctx = req->ctx;
2476
2477         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2478                 return -EINVAL;
2479         if (sqe->flags || sqe->ioprio || sqe->off || sqe->len ||
2480             sqe->cancel_flags)
2481                 return -EINVAL;
2482
2483         io_async_find_and_cancel(ctx, req, READ_ONCE(sqe->addr), NULL);
2484         return 0;
2485 }
2486
2487 static int io_req_defer(struct io_kiocb *req)
2488 {
2489         const struct io_uring_sqe *sqe = req->submit.sqe;
2490         struct io_uring_sqe *sqe_copy;
2491         struct io_ring_ctx *ctx = req->ctx;
2492
2493         /* Still need defer if there is pending req in defer list. */
2494         if (!req_need_defer(req) && list_empty(&ctx->defer_list))
2495                 return 0;
2496
2497         sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
2498         if (!sqe_copy)
2499                 return -EAGAIN;
2500
2501         spin_lock_irq(&ctx->completion_lock);
2502         if (!req_need_defer(req) && list_empty(&ctx->defer_list)) {
2503                 spin_unlock_irq(&ctx->completion_lock);
2504                 kfree(sqe_copy);
2505                 return 0;
2506         }
2507
2508         memcpy(sqe_copy, sqe, sizeof(*sqe_copy));
2509         req->submit.sqe = sqe_copy;
2510
2511         trace_io_uring_defer(ctx, req, false);
2512         list_add_tail(&req->list, &ctx->defer_list);
2513         spin_unlock_irq(&ctx->completion_lock);
2514         return -EIOCBQUEUED;
2515 }
2516
2517 static int __io_submit_sqe(struct io_kiocb *req, struct io_kiocb **nxt,
2518                            bool force_nonblock)
2519 {
2520         int ret, opcode;
2521         struct sqe_submit *s = &req->submit;
2522         struct io_ring_ctx *ctx = req->ctx;
2523
2524         opcode = READ_ONCE(s->sqe->opcode);
2525         switch (opcode) {
2526         case IORING_OP_NOP:
2527                 ret = io_nop(req);
2528                 break;
2529         case IORING_OP_READV:
2530                 if (unlikely(s->sqe->buf_index))
2531                         return -EINVAL;
2532                 ret = io_read(req, nxt, force_nonblock);
2533                 break;
2534         case IORING_OP_WRITEV:
2535                 if (unlikely(s->sqe->buf_index))
2536                         return -EINVAL;
2537                 ret = io_write(req, nxt, force_nonblock);
2538                 break;
2539         case IORING_OP_READ_FIXED:
2540                 ret = io_read(req, nxt, force_nonblock);
2541                 break;
2542         case IORING_OP_WRITE_FIXED:
2543                 ret = io_write(req, nxt, force_nonblock);
2544                 break;
2545         case IORING_OP_FSYNC:
2546                 ret = io_fsync(req, s->sqe, nxt, force_nonblock);
2547                 break;
2548         case IORING_OP_POLL_ADD:
2549                 ret = io_poll_add(req, s->sqe, nxt);
2550                 break;
2551         case IORING_OP_POLL_REMOVE:
2552                 ret = io_poll_remove(req, s->sqe);
2553                 break;
2554         case IORING_OP_SYNC_FILE_RANGE:
2555                 ret = io_sync_file_range(req, s->sqe, nxt, force_nonblock);
2556                 break;
2557         case IORING_OP_SENDMSG:
2558                 ret = io_sendmsg(req, s->sqe, nxt, force_nonblock);
2559                 break;
2560         case IORING_OP_RECVMSG:
2561                 ret = io_recvmsg(req, s->sqe, nxt, force_nonblock);
2562                 break;
2563         case IORING_OP_TIMEOUT:
2564                 ret = io_timeout(req, s->sqe);
2565                 break;
2566         case IORING_OP_TIMEOUT_REMOVE:
2567                 ret = io_timeout_remove(req, s->sqe);
2568                 break;
2569         case IORING_OP_ACCEPT:
2570                 ret = io_accept(req, s->sqe, nxt, force_nonblock);
2571                 break;
2572         case IORING_OP_ASYNC_CANCEL:
2573                 ret = io_async_cancel(req, s->sqe, nxt);
2574                 break;
2575         default:
2576                 ret = -EINVAL;
2577                 break;
2578         }
2579
2580         if (ret)
2581                 return ret;
2582
2583         if (ctx->flags & IORING_SETUP_IOPOLL) {
2584                 if (req->result == -EAGAIN)
2585                         return -EAGAIN;
2586
2587                 /* workqueue context doesn't hold uring_lock, grab it now */
2588                 if (s->in_async)
2589                         mutex_lock(&ctx->uring_lock);
2590                 io_iopoll_req_issued(req);
2591                 if (s->in_async)
2592                         mutex_unlock(&ctx->uring_lock);
2593         }
2594
2595         return 0;
2596 }
2597
2598 static void io_wq_submit_work(struct io_wq_work **workptr)
2599 {
2600         struct io_wq_work *work = *workptr;
2601         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
2602         struct sqe_submit *s = &req->submit;
2603         const struct io_uring_sqe *sqe = s->sqe;
2604         struct io_kiocb *nxt = NULL;
2605         int ret = 0;
2606
2607         /* Ensure we clear previously set non-block flag */
2608         req->rw.ki_flags &= ~IOCB_NOWAIT;
2609
2610         if (work->flags & IO_WQ_WORK_CANCEL)
2611                 ret = -ECANCELED;
2612
2613         if (!ret) {
2614                 s->has_user = (work->flags & IO_WQ_WORK_HAS_MM) != 0;
2615                 s->in_async = true;
2616                 do {
2617                         ret = __io_submit_sqe(req, &nxt, false);
2618                         /*
2619                          * We can get EAGAIN for polled IO even though we're
2620                          * forcing a sync submission from here, since we can't
2621                          * wait for request slots on the block side.
2622                          */
2623                         if (ret != -EAGAIN)
2624                                 break;
2625                         cond_resched();
2626                 } while (1);
2627         }
2628
2629         /* drop submission reference */
2630         io_put_req(req);
2631
2632         if (ret) {
2633                 if (req->flags & REQ_F_LINK)
2634                         req->flags |= REQ_F_FAIL_LINK;
2635                 io_cqring_add_event(req, ret);
2636                 io_put_req(req);
2637         }
2638
2639         /* async context always use a copy of the sqe */
2640         kfree(sqe);
2641
2642         /* if a dependent link is ready, pass it back */
2643         if (!ret && nxt) {
2644                 io_prep_async_work(nxt);
2645                 *workptr = &nxt->work;
2646         }
2647 }
2648
2649 static bool io_op_needs_file(const struct io_uring_sqe *sqe)
2650 {
2651         int op = READ_ONCE(sqe->opcode);
2652
2653         switch (op) {
2654         case IORING_OP_NOP:
2655         case IORING_OP_POLL_REMOVE:
2656         case IORING_OP_TIMEOUT:
2657         case IORING_OP_TIMEOUT_REMOVE:
2658         case IORING_OP_ASYNC_CANCEL:
2659         case IORING_OP_LINK_TIMEOUT:
2660                 return false;
2661         default:
2662                 return true;
2663         }
2664 }
2665
2666 static inline struct file *io_file_from_index(struct io_ring_ctx *ctx,
2667                                               int index)
2668 {
2669         struct fixed_file_table *table;
2670
2671         table = &ctx->file_table[index >> IORING_FILE_TABLE_SHIFT];
2672         return table->files[index & IORING_FILE_TABLE_MASK];
2673 }
2674
2675 static int io_req_set_file(struct io_submit_state *state, struct io_kiocb *req)
2676 {
2677         struct sqe_submit *s = &req->submit;
2678         struct io_ring_ctx *ctx = req->ctx;
2679         unsigned flags;
2680         int fd;
2681
2682         flags = READ_ONCE(s->sqe->flags);
2683         fd = READ_ONCE(s->sqe->fd);
2684
2685         if (flags & IOSQE_IO_DRAIN)
2686                 req->flags |= REQ_F_IO_DRAIN;
2687         /*
2688          * All io need record the previous position, if LINK vs DARIN,
2689          * it can be used to mark the position of the first IO in the
2690          * link list.
2691          */
2692         req->sequence = s->sequence;
2693
2694         if (!io_op_needs_file(s->sqe))
2695                 return 0;
2696
2697         if (flags & IOSQE_FIXED_FILE) {
2698                 if (unlikely(!ctx->file_table ||
2699                     (unsigned) fd >= ctx->nr_user_files))
2700                         return -EBADF;
2701                 fd = array_index_nospec(fd, ctx->nr_user_files);
2702                 req->file = io_file_from_index(ctx, fd);
2703                 if (!req->file)
2704                         return -EBADF;
2705                 req->flags |= REQ_F_FIXED_FILE;
2706         } else {
2707                 if (s->needs_fixed_file)
2708                         return -EBADF;
2709                 trace_io_uring_file_get(ctx, fd);
2710                 req->file = io_file_get(state, fd);
2711                 if (unlikely(!req->file))
2712                         return -EBADF;
2713         }
2714
2715         return 0;
2716 }
2717
2718 static int io_grab_files(struct io_kiocb *req)
2719 {
2720         int ret = -EBADF;
2721         struct io_ring_ctx *ctx = req->ctx;
2722
2723         rcu_read_lock();
2724         spin_lock_irq(&ctx->inflight_lock);
2725         /*
2726          * We use the f_ops->flush() handler to ensure that we can flush
2727          * out work accessing these files if the fd is closed. Check if
2728          * the fd has changed since we started down this path, and disallow
2729          * this operation if it has.
2730          */
2731         if (fcheck(req->submit.ring_fd) == req->submit.ring_file) {
2732                 list_add(&req->inflight_entry, &ctx->inflight_list);
2733                 req->flags |= REQ_F_INFLIGHT;
2734                 req->work.files = current->files;
2735                 ret = 0;
2736         }
2737         spin_unlock_irq(&ctx->inflight_lock);
2738         rcu_read_unlock();
2739
2740         return ret;
2741 }
2742
2743 static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
2744 {
2745         struct io_kiocb *req = container_of(timer, struct io_kiocb,
2746                                                 timeout.timer);
2747         struct io_ring_ctx *ctx = req->ctx;
2748         struct io_kiocb *prev = NULL;
2749         unsigned long flags;
2750
2751         spin_lock_irqsave(&ctx->completion_lock, flags);
2752
2753         /*
2754          * We don't expect the list to be empty, that will only happen if we
2755          * race with the completion of the linked work.
2756          */
2757         if (!list_empty(&req->list)) {
2758                 prev = list_entry(req->list.prev, struct io_kiocb, link_list);
2759                 if (refcount_inc_not_zero(&prev->refs))
2760                         list_del_init(&req->list);
2761                 else
2762                         prev = NULL;
2763         }
2764
2765         spin_unlock_irqrestore(&ctx->completion_lock, flags);
2766
2767         if (prev) {
2768                 io_async_find_and_cancel(ctx, req, prev->user_data, NULL);
2769                 io_put_req(prev);
2770         } else {
2771                 io_cqring_add_event(req, -ETIME);
2772                 io_put_req(req);
2773         }
2774         return HRTIMER_NORESTART;
2775 }
2776
2777 static void io_queue_linked_timeout(struct io_kiocb *req, struct timespec64 *ts,
2778                                     enum hrtimer_mode *mode)
2779 {
2780         struct io_ring_ctx *ctx = req->ctx;
2781
2782         /*
2783          * If the list is now empty, then our linked request finished before
2784          * we got a chance to setup the timer
2785          */
2786         spin_lock_irq(&ctx->completion_lock);
2787         if (!list_empty(&req->list)) {
2788                 req->timeout.timer.function = io_link_timeout_fn;
2789                 hrtimer_start(&req->timeout.timer, timespec64_to_ktime(*ts),
2790                                 *mode);
2791         }
2792         spin_unlock_irq(&ctx->completion_lock);
2793
2794         /* drop submission reference */
2795         io_put_req(req);
2796 }
2797
2798 static int io_validate_link_timeout(const struct io_uring_sqe *sqe,
2799                                     struct timespec64 *ts)
2800 {
2801         if (sqe->ioprio || sqe->buf_index || sqe->len != 1 || sqe->off)
2802                 return -EINVAL;
2803         if (sqe->timeout_flags & ~IORING_TIMEOUT_ABS)
2804                 return -EINVAL;
2805         if (get_timespec64(ts, u64_to_user_ptr(sqe->addr)))
2806                 return -EFAULT;
2807
2808         return 0;
2809 }
2810
2811 static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req,
2812                                                struct timespec64 *ts,
2813                                                enum hrtimer_mode *mode)
2814 {
2815         struct io_kiocb *nxt;
2816         int ret;
2817
2818         if (!(req->flags & REQ_F_LINK))
2819                 return NULL;
2820
2821         nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb, list);
2822         if (!nxt || nxt->submit.sqe->opcode != IORING_OP_LINK_TIMEOUT)
2823                 return NULL;
2824
2825         ret = io_validate_link_timeout(nxt->submit.sqe, ts);
2826         if (ret) {
2827                 list_del_init(&nxt->list);
2828                 io_cqring_add_event(nxt, ret);
2829                 io_double_put_req(nxt);
2830                 return ERR_PTR(-ECANCELED);
2831         }
2832
2833         if (nxt->submit.sqe->timeout_flags & IORING_TIMEOUT_ABS)
2834                 *mode = HRTIMER_MODE_ABS;
2835         else
2836                 *mode = HRTIMER_MODE_REL;
2837
2838         req->flags |= REQ_F_LINK_TIMEOUT;
2839         hrtimer_init(&nxt->timeout.timer, CLOCK_MONOTONIC, *mode);
2840         return nxt;
2841 }
2842
2843 static int __io_queue_sqe(struct io_kiocb *req)
2844 {
2845         enum hrtimer_mode mode;
2846         struct io_kiocb *nxt;
2847         struct timespec64 ts;
2848         int ret;
2849
2850         nxt = io_prep_linked_timeout(req, &ts, &mode);
2851         if (IS_ERR(nxt)) {
2852                 ret = PTR_ERR(nxt);
2853                 nxt = NULL;
2854                 goto err;
2855         }
2856
2857         ret = __io_submit_sqe(req, NULL, true);
2858
2859         /*
2860          * We async punt it if the file wasn't marked NOWAIT, or if the file
2861          * doesn't support non-blocking read/write attempts
2862          */
2863         if (ret == -EAGAIN && (!(req->flags & REQ_F_NOWAIT) ||
2864             (req->flags & REQ_F_MUST_PUNT))) {
2865                 struct sqe_submit *s = &req->submit;
2866                 struct io_uring_sqe *sqe_copy;
2867
2868                 sqe_copy = kmemdup(s->sqe, sizeof(*sqe_copy), GFP_KERNEL);
2869                 if (sqe_copy) {
2870                         s->sqe = sqe_copy;
2871                         if (req->work.flags & IO_WQ_WORK_NEEDS_FILES) {
2872                                 ret = io_grab_files(req);
2873                                 if (ret) {
2874                                         kfree(sqe_copy);
2875                                         goto err;
2876                                 }
2877                         }
2878
2879                         /*
2880                          * Queued up for async execution, worker will release
2881                          * submit reference when the iocb is actually submitted.
2882                          */
2883                         io_queue_async_work(req);
2884
2885                         if (nxt)
2886                                 io_queue_linked_timeout(nxt, &ts, &mode);
2887
2888                         return 0;
2889                 }
2890         }
2891
2892 err:
2893         /* drop submission reference */
2894         io_put_req(req);
2895
2896         if (nxt) {
2897                 if (!ret)
2898                         io_queue_linked_timeout(nxt, &ts, &mode);
2899                 else
2900                         io_put_req(nxt);
2901         }
2902
2903         /* and drop final reference, if we failed */
2904         if (ret) {
2905                 io_cqring_add_event(req, ret);
2906                 if (req->flags & REQ_F_LINK)
2907                         req->flags |= REQ_F_FAIL_LINK;
2908                 io_put_req(req);
2909         }
2910
2911         return ret;
2912 }
2913
2914 static int io_queue_sqe(struct io_kiocb *req)
2915 {
2916         int ret;
2917
2918         ret = io_req_defer(req);
2919         if (ret) {
2920                 if (ret != -EIOCBQUEUED) {
2921                         io_cqring_add_event(req, ret);
2922                         io_double_put_req(req);
2923                 }
2924                 return 0;
2925         }
2926
2927         return __io_queue_sqe(req);
2928 }
2929
2930 static int io_queue_link_head(struct io_kiocb *req, struct io_kiocb *shadow)
2931 {
2932         int ret;
2933         int need_submit = false;
2934         struct io_ring_ctx *ctx = req->ctx;
2935
2936         if (!shadow)
2937                 return io_queue_sqe(req);
2938
2939         /*
2940          * Mark the first IO in link list as DRAIN, let all the following
2941          * IOs enter the defer list. all IO needs to be completed before link
2942          * list.
2943          */
2944         req->flags |= REQ_F_IO_DRAIN;
2945         ret = io_req_defer(req);
2946         if (ret) {
2947                 if (ret != -EIOCBQUEUED) {
2948                         io_cqring_add_event(req, ret);
2949                         io_double_put_req(req);
2950                         __io_free_req(shadow);
2951                         return 0;
2952                 }
2953         } else {
2954                 /*
2955                  * If ret == 0 means that all IOs in front of link io are
2956                  * running done. let's queue link head.
2957                  */
2958                 need_submit = true;
2959         }
2960
2961         /* Insert shadow req to defer_list, blocking next IOs */
2962         spin_lock_irq(&ctx->completion_lock);
2963         trace_io_uring_defer(ctx, shadow, true);
2964         list_add_tail(&shadow->list, &ctx->defer_list);
2965         spin_unlock_irq(&ctx->completion_lock);
2966
2967         if (need_submit)
2968                 return __io_queue_sqe(req);
2969
2970         return 0;
2971 }
2972
2973 #define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK)
2974
2975 static void io_submit_sqe(struct io_kiocb *req, struct io_submit_state *state,
2976                           struct io_kiocb **link)
2977 {
2978         struct io_uring_sqe *sqe_copy;
2979         struct sqe_submit *s = &req->submit;
2980         struct io_ring_ctx *ctx = req->ctx;
2981         int ret;
2982
2983         req->user_data = s->sqe->user_data;
2984
2985         /* enforce forwards compatibility on users */
2986         if (unlikely(s->sqe->flags & ~SQE_VALID_FLAGS)) {
2987                 ret = -EINVAL;
2988                 goto err_req;
2989         }
2990
2991         ret = io_req_set_file(state, req);
2992         if (unlikely(ret)) {
2993 err_req:
2994                 io_cqring_add_event(req, ret);
2995                 io_double_put_req(req);
2996                 return;
2997         }
2998
2999         /*
3000          * If we already have a head request, queue this one for async
3001          * submittal once the head completes. If we don't have a head but
3002          * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
3003          * submitted sync once the chain is complete. If none of those
3004          * conditions are true (normal request), then just queue it.
3005          */
3006         if (*link) {
3007                 struct io_kiocb *prev = *link;
3008
3009                 sqe_copy = kmemdup(s->sqe, sizeof(*sqe_copy), GFP_KERNEL);
3010                 if (!sqe_copy) {
3011                         ret = -EAGAIN;
3012                         goto err_req;
3013                 }
3014
3015                 s->sqe = sqe_copy;
3016                 trace_io_uring_link(ctx, req, prev);
3017                 list_add_tail(&req->list, &prev->link_list);
3018         } else if (s->sqe->flags & IOSQE_IO_LINK) {
3019                 req->flags |= REQ_F_LINK;
3020
3021                 INIT_LIST_HEAD(&req->link_list);
3022                 *link = req;
3023         } else if (READ_ONCE(s->sqe->opcode) == IORING_OP_LINK_TIMEOUT) {
3024                 /* Only valid as a linked SQE */
3025                 ret = -EINVAL;
3026                 goto err_req;
3027         } else {
3028                 io_queue_sqe(req);
3029         }
3030 }
3031
3032 /*
3033  * Batched submission is done, ensure local IO is flushed out.
3034  */
3035 static void io_submit_state_end(struct io_submit_state *state)
3036 {
3037         blk_finish_plug(&state->plug);
3038         io_file_put(state);
3039         if (state->free_reqs)
3040                 kmem_cache_free_bulk(req_cachep, state->free_reqs,
3041                                         &state->reqs[state->cur_req]);
3042 }
3043
3044 /*
3045  * Start submission side cache.
3046  */
3047 static void io_submit_state_start(struct io_submit_state *state,
3048                                   struct io_ring_ctx *ctx, unsigned max_ios)
3049 {
3050         blk_start_plug(&state->plug);
3051         state->free_reqs = 0;
3052         state->file = NULL;
3053         state->ios_left = max_ios;
3054 }
3055
3056 static void io_commit_sqring(struct io_ring_ctx *ctx)
3057 {
3058         struct io_rings *rings = ctx->rings;
3059
3060         if (ctx->cached_sq_head != READ_ONCE(rings->sq.head)) {
3061                 /*
3062                  * Ensure any loads from the SQEs are done at this point,
3063                  * since once we write the new head, the application could
3064                  * write new data to them.
3065                  */
3066                 smp_store_release(&rings->sq.head, ctx->cached_sq_head);
3067         }
3068 }
3069
3070 /*
3071  * Fetch an sqe, if one is available. Note that s->sqe will point to memory
3072  * that is mapped by userspace. This means that care needs to be taken to
3073  * ensure that reads are stable, as we cannot rely on userspace always
3074  * being a good citizen. If members of the sqe are validated and then later
3075  * used, it's important that those reads are done through READ_ONCE() to
3076  * prevent a re-load down the line.
3077  */
3078 static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
3079 {
3080         struct io_rings *rings = ctx->rings;
3081         u32 *sq_array = ctx->sq_array;
3082         unsigned head;
3083
3084         /*
3085          * The cached sq head (or cq tail) serves two purposes:
3086          *
3087          * 1) allows us to batch the cost of updating the user visible
3088          *    head updates.
3089          * 2) allows the kernel side to track the head on its own, even
3090          *    though the application is the one updating it.
3091          */
3092         head = ctx->cached_sq_head;
3093         /* make sure SQ entry isn't read before tail */
3094         if (head == smp_load_acquire(&rings->sq.tail))
3095                 return false;
3096
3097         head = READ_ONCE(sq_array[head & ctx->sq_mask]);
3098         if (head < ctx->sq_entries) {
3099                 s->ring_file = NULL;
3100                 s->sqe = &ctx->sq_sqes[head];
3101                 s->sequence = ctx->cached_sq_head;
3102                 ctx->cached_sq_head++;
3103                 return true;
3104         }
3105
3106         /* drop invalid entries */
3107         ctx->cached_sq_head++;
3108         ctx->cached_sq_dropped++;
3109         WRITE_ONCE(rings->sq_dropped, ctx->cached_sq_dropped);
3110         return false;
3111 }
3112
3113 static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
3114                           struct file *ring_file, int ring_fd,
3115                           struct mm_struct **mm, bool async)
3116 {
3117         struct io_submit_state state, *statep = NULL;
3118         struct io_kiocb *link = NULL;
3119         struct io_kiocb *shadow_req = NULL;
3120         int i, submitted = 0;
3121         bool mm_fault = false;
3122
3123         if (!list_empty(&ctx->cq_overflow_list)) {
3124                 io_cqring_overflow_flush(ctx, false);
3125                 return -EBUSY;
3126         }
3127
3128         if (nr > IO_PLUG_THRESHOLD) {
3129                 io_submit_state_start(&state, ctx, nr);
3130                 statep = &state;
3131         }
3132
3133         for (i = 0; i < nr; i++) {
3134                 struct io_kiocb *req;
3135                 unsigned int sqe_flags;
3136
3137                 req = io_get_req(ctx, statep);
3138                 if (unlikely(!req)) {
3139                         if (!submitted)
3140                                 submitted = -EAGAIN;
3141                         break;
3142                 }
3143                 if (!io_get_sqring(ctx, &req->submit)) {
3144                         __io_free_req(req);
3145                         break;
3146                 }
3147
3148                 if (io_sqe_needs_user(req->submit.sqe) && !*mm) {
3149                         mm_fault = mm_fault || !mmget_not_zero(ctx->sqo_mm);
3150                         if (!mm_fault) {
3151                                 use_mm(ctx->sqo_mm);
3152                                 *mm = ctx->sqo_mm;
3153                         }
3154                 }
3155
3156                 sqe_flags = req->submit.sqe->flags;
3157
3158                 if (link && (sqe_flags & IOSQE_IO_DRAIN)) {
3159                         if (!shadow_req) {
3160                                 shadow_req = io_get_req(ctx, NULL);
3161                                 if (unlikely(!shadow_req))
3162                                         goto out;
3163                                 shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN);
3164                                 refcount_dec(&shadow_req->refs);
3165                         }
3166                         shadow_req->sequence = req->submit.sequence;
3167                 }
3168
3169 out:
3170                 req->submit.ring_file = ring_file;
3171                 req->submit.ring_fd = ring_fd;
3172                 req->submit.has_user = *mm != NULL;
3173                 req->submit.in_async = async;
3174                 req->submit.needs_fixed_file = async;
3175                 trace_io_uring_submit_sqe(ctx, req->submit.sqe->user_data,
3176                                           true, async);
3177                 io_submit_sqe(req, statep, &link);
3178                 submitted++;
3179
3180                 /*
3181                  * If previous wasn't linked and we have a linked command,
3182                  * that's the end of the chain. Submit the previous link.
3183                  */
3184                 if (!(sqe_flags & IOSQE_IO_LINK) && link) {
3185                         io_queue_link_head(link, shadow_req);
3186                         link = NULL;
3187                         shadow_req = NULL;
3188                 }
3189         }
3190
3191         if (link)
3192                 io_queue_link_head(link, shadow_req);
3193         if (statep)
3194                 io_submit_state_end(&state);
3195
3196          /* Commit SQ ring head once we've consumed and submitted all SQEs */
3197         io_commit_sqring(ctx);
3198
3199         return submitted;
3200 }
3201
3202 static int io_sq_thread(void *data)
3203 {
3204         struct io_ring_ctx *ctx = data;
3205         struct mm_struct *cur_mm = NULL;
3206         mm_segment_t old_fs;
3207         DEFINE_WAIT(wait);
3208         unsigned inflight;
3209         unsigned long timeout;
3210         int ret;
3211
3212         complete(&ctx->completions[1]);
3213
3214         old_fs = get_fs();
3215         set_fs(USER_DS);
3216
3217         ret = timeout = inflight = 0;
3218         while (!kthread_should_park()) {
3219                 unsigned int to_submit;
3220
3221                 if (inflight) {
3222                         unsigned nr_events = 0;
3223
3224                         if (ctx->flags & IORING_SETUP_IOPOLL) {
3225                                 /*
3226                                  * inflight is the count of the maximum possible
3227                                  * entries we submitted, but it can be smaller
3228                                  * if we dropped some of them. If we don't have
3229                                  * poll entries available, then we know that we
3230                                  * have nothing left to poll for. Reset the
3231                                  * inflight count to zero in that case.
3232                                  */
3233                                 mutex_lock(&ctx->uring_lock);
3234                                 if (!list_empty(&ctx->poll_list))
3235                                         __io_iopoll_check(ctx, &nr_events, 0);
3236                                 else
3237                                         inflight = 0;
3238                                 mutex_unlock(&ctx->uring_lock);
3239                         } else {
3240                                 /*
3241                                  * Normal IO, just pretend everything completed.
3242                                  * We don't have to poll completions for that.
3243                                  */
3244                                 nr_events = inflight;
3245                         }
3246
3247                         inflight -= nr_events;
3248                         if (!inflight)
3249                                 timeout = jiffies + ctx->sq_thread_idle;
3250                 }
3251
3252                 to_submit = io_sqring_entries(ctx);
3253
3254                 /*
3255                  * If submit got -EBUSY, flag us as needing the application
3256                  * to enter the kernel to reap and flush events.
3257                  */
3258                 if (!to_submit || ret == -EBUSY) {
3259                         /*
3260                          * We're polling. If we're within the defined idle
3261                          * period, then let us spin without work before going
3262                          * to sleep. The exception is if we got EBUSY doing
3263                          * more IO, we should wait for the application to
3264                          * reap events and wake us up.
3265                          */
3266                         if (inflight ||
3267                             (!time_after(jiffies, timeout) && ret != -EBUSY)) {
3268                                 cond_resched();
3269                                 continue;
3270                         }
3271
3272                         /*
3273                          * Drop cur_mm before scheduling, we can't hold it for
3274                          * long periods (or over schedule()). Do this before
3275                          * adding ourselves to the waitqueue, as the unuse/drop
3276                          * may sleep.
3277                          */
3278                         if (cur_mm) {
3279                                 unuse_mm(cur_mm);
3280                                 mmput(cur_mm);
3281                                 cur_mm = NULL;
3282                         }
3283
3284                         prepare_to_wait(&ctx->sqo_wait, &wait,
3285                                                 TASK_INTERRUPTIBLE);
3286
3287                         /* Tell userspace we may need a wakeup call */
3288                         ctx->rings->sq_flags |= IORING_SQ_NEED_WAKEUP;
3289                         /* make sure to read SQ tail after writing flags */
3290                         smp_mb();
3291
3292                         to_submit = io_sqring_entries(ctx);
3293                         if (!to_submit || ret == -EBUSY) {
3294                                 if (kthread_should_park()) {
3295                                         finish_wait(&ctx->sqo_wait, &wait);
3296                                         break;
3297                                 }
3298                                 if (signal_pending(current))
3299                                         flush_signals(current);
3300                                 schedule();
3301                                 finish_wait(&ctx->sqo_wait, &wait);
3302
3303                                 ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
3304                                 continue;
3305                         }
3306                         finish_wait(&ctx->sqo_wait, &wait);
3307
3308                         ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
3309                 }
3310
3311                 to_submit = min(to_submit, ctx->sq_entries);
3312                 ret = io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm, true);
3313                 if (ret > 0)
3314                         inflight += ret;
3315         }
3316
3317         set_fs(old_fs);
3318         if (cur_mm) {
3319                 unuse_mm(cur_mm);
3320                 mmput(cur_mm);
3321         }
3322
3323         kthread_parkme();
3324
3325         return 0;
3326 }
3327
3328 struct io_wait_queue {
3329         struct wait_queue_entry wq;
3330         struct io_ring_ctx *ctx;
3331         unsigned to_wait;
3332         unsigned nr_timeouts;
3333 };
3334
3335 static inline bool io_should_wake(struct io_wait_queue *iowq, bool noflush)
3336 {
3337         struct io_ring_ctx *ctx = iowq->ctx;
3338
3339         /*
3340          * Wake up if we have enough events, or if a timeout occured since we
3341          * started waiting. For timeouts, we always want to return to userspace,
3342          * regardless of event count.
3343          */
3344         return io_cqring_events(ctx, noflush) >= iowq->to_wait ||
3345                         atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
3346 }
3347
3348 static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
3349                             int wake_flags, void *key)
3350 {
3351         struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
3352                                                         wq);
3353
3354         /* use noflush == true, as we can't safely rely on locking context */
3355         if (!io_should_wake(iowq, true))
3356                 return -1;
3357
3358         return autoremove_wake_function(curr, mode, wake_flags, key);
3359 }
3360
3361 /*
3362  * Wait until events become available, if we don't already have some. The
3363  * application must reap them itself, as they reside on the shared cq ring.
3364  */
3365 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
3366                           const sigset_t __user *sig, size_t sigsz)
3367 {
3368         struct io_wait_queue iowq = {
3369                 .wq = {
3370                         .private        = current,
3371                         .func           = io_wake_function,
3372                         .entry          = LIST_HEAD_INIT(iowq.wq.entry),
3373                 },
3374                 .ctx            = ctx,
3375                 .to_wait        = min_events,
3376         };
3377         struct io_rings *rings = ctx->rings;
3378         int ret = 0;
3379
3380         if (io_cqring_events(ctx, false) >= min_events)
3381                 return 0;
3382
3383         if (sig) {
3384 #ifdef CONFIG_COMPAT
3385                 if (in_compat_syscall())
3386                         ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
3387                                                       sigsz);
3388                 else
3389 #endif
3390                         ret = set_user_sigmask(sig, sigsz);
3391
3392                 if (ret)
3393                         return ret;
3394         }
3395
3396         iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
3397         trace_io_uring_cqring_wait(ctx, min_events);
3398         do {
3399                 prepare_to_wait_exclusive(&ctx->wait, &iowq.wq,
3400                                                 TASK_INTERRUPTIBLE);
3401                 if (io_should_wake(&iowq, false))
3402                         break;
3403                 schedule();
3404                 if (signal_pending(current)) {
3405                         ret = -EINTR;
3406                         break;
3407                 }
3408         } while (1);
3409         finish_wait(&ctx->wait, &iowq.wq);
3410
3411         restore_saved_sigmask_unless(ret == -EINTR);
3412
3413         return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
3414 }
3415
3416 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
3417 {
3418 #if defined(CONFIG_UNIX)
3419         if (ctx->ring_sock) {
3420                 struct sock *sock = ctx->ring_sock->sk;
3421                 struct sk_buff *skb;
3422
3423                 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
3424                         kfree_skb(skb);
3425         }
3426 #else
3427         int i;
3428
3429         for (i = 0; i < ctx->nr_user_files; i++) {
3430                 struct file *file;
3431
3432                 file = io_file_from_index(ctx, i);
3433                 if (file)
3434                         fput(file);
3435         }
3436 #endif
3437 }
3438
3439 static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
3440 {
3441         unsigned nr_tables, i;
3442
3443         if (!ctx->file_table)
3444                 return -ENXIO;
3445
3446         __io_sqe_files_unregister(ctx);
3447         nr_tables = DIV_ROUND_UP(ctx->nr_user_files, IORING_MAX_FILES_TABLE);
3448         for (i = 0; i < nr_tables; i++)
3449                 kfree(ctx->file_table[i].files);
3450         kfree(ctx->file_table);
3451         ctx->file_table = NULL;
3452         ctx->nr_user_files = 0;
3453         return 0;
3454 }
3455
3456 static void io_sq_thread_stop(struct io_ring_ctx *ctx)
3457 {
3458         if (ctx->sqo_thread) {
3459                 wait_for_completion(&ctx->completions[1]);
3460                 /*
3461                  * The park is a bit of a work-around, without it we get
3462                  * warning spews on shutdown with SQPOLL set and affinity
3463                  * set to a single CPU.
3464                  */
3465                 kthread_park(ctx->sqo_thread);
3466                 kthread_stop(ctx->sqo_thread);
3467                 ctx->sqo_thread = NULL;
3468         }
3469 }
3470
3471 static void io_finish_async(struct io_ring_ctx *ctx)
3472 {
3473         io_sq_thread_stop(ctx);
3474
3475         if (ctx->io_wq) {
3476                 io_wq_destroy(ctx->io_wq);
3477                 ctx->io_wq = NULL;
3478         }
3479 }
3480
3481 #if defined(CONFIG_UNIX)
3482 static void io_destruct_skb(struct sk_buff *skb)
3483 {
3484         struct io_ring_ctx *ctx = skb->sk->sk_user_data;
3485
3486         if (ctx->io_wq)
3487                 io_wq_flush(ctx->io_wq);
3488
3489         unix_destruct_scm(skb);
3490 }
3491
3492 /*
3493  * Ensure the UNIX gc is aware of our file set, so we are certain that
3494  * the io_uring can be safely unregistered on process exit, even if we have
3495  * loops in the file referencing.
3496  */
3497 static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
3498 {
3499         struct sock *sk = ctx->ring_sock->sk;
3500         struct scm_fp_list *fpl;
3501         struct sk_buff *skb;
3502         int i, nr_files;
3503
3504         if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
3505                 unsigned long inflight = ctx->user->unix_inflight + nr;
3506
3507                 if (inflight > task_rlimit(current, RLIMIT_NOFILE))
3508                         return -EMFILE;
3509         }
3510
3511         fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
3512         if (!fpl)
3513                 return -ENOMEM;
3514
3515         skb = alloc_skb(0, GFP_KERNEL);
3516         if (!skb) {
3517                 kfree(fpl);
3518                 return -ENOMEM;
3519         }
3520
3521         skb->sk = sk;
3522
3523         nr_files = 0;
3524         fpl->user = get_uid(ctx->user);
3525         for (i = 0; i < nr; i++) {
3526                 struct file *file = io_file_from_index(ctx, i + offset);
3527
3528                 if (!file)
3529                         continue;
3530                 fpl->fp[nr_files] = get_file(file);
3531                 unix_inflight(fpl->user, fpl->fp[nr_files]);
3532                 nr_files++;
3533         }
3534
3535         if (nr_files) {
3536                 fpl->max = SCM_MAX_FD;
3537                 fpl->count = nr_files;
3538                 UNIXCB(skb).fp = fpl;
3539                 skb->destructor = io_destruct_skb;
3540                 refcount_add(skb->truesize, &sk->sk_wmem_alloc);
3541                 skb_queue_head(&sk->sk_receive_queue, skb);
3542
3543                 for (i = 0; i < nr_files; i++)
3544                         fput(fpl->fp[i]);
3545         } else {
3546                 kfree_skb(skb);
3547                 kfree(fpl);
3548         }
3549
3550         return 0;
3551 }
3552
3553 /*
3554  * If UNIX sockets are enabled, fd passing can cause a reference cycle which
3555  * causes regular reference counting to break down. We rely on the UNIX
3556  * garbage collection to take care of this problem for us.
3557  */
3558 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
3559 {
3560         unsigned left, total;
3561         int ret = 0;
3562
3563         total = 0;
3564         left = ctx->nr_user_files;
3565         while (left) {
3566                 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
3567
3568                 ret = __io_sqe_files_scm(ctx, this_files, total);
3569                 if (ret)
3570                         break;
3571                 left -= this_files;
3572                 total += this_files;
3573         }
3574
3575         if (!ret)
3576                 return 0;
3577
3578         while (total < ctx->nr_user_files) {
3579                 struct file *file = io_file_from_index(ctx, total);
3580
3581                 if (file)
3582                         fput(file);
3583                 total++;
3584         }
3585
3586         return ret;
3587 }
3588 #else
3589 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
3590 {
3591         return 0;
3592 }
3593 #endif
3594
3595 static int io_sqe_alloc_file_tables(struct io_ring_ctx *ctx, unsigned nr_tables,
3596                                     unsigned nr_files)
3597 {
3598         int i;
3599
3600         for (i = 0; i < nr_tables; i++) {
3601                 struct fixed_file_table *table = &ctx->file_table[i];
3602                 unsigned this_files;
3603
3604                 this_files = min(nr_files, IORING_MAX_FILES_TABLE);
3605                 table->files = kcalloc(this_files, sizeof(struct file *),
3606                                         GFP_KERNEL);
3607                 if (!table->files)
3608                         break;
3609                 nr_files -= this_files;
3610         }
3611
3612         if (i == nr_tables)
3613                 return 0;
3614
3615         for (i = 0; i < nr_tables; i++) {
3616                 struct fixed_file_table *table = &ctx->file_table[i];
3617                 kfree(table->files);
3618         }
3619         return 1;
3620 }
3621
3622 static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
3623                                  unsigned nr_args)
3624 {
3625         __s32 __user *fds = (__s32 __user *) arg;
3626         unsigned nr_tables;
3627         int fd, ret = 0;
3628         unsigned i;
3629
3630         if (ctx->file_table)
3631                 return -EBUSY;
3632         if (!nr_args)
3633                 return -EINVAL;
3634         if (nr_args > IORING_MAX_FIXED_FILES)
3635                 return -EMFILE;
3636
3637         nr_tables = DIV_ROUND_UP(nr_args, IORING_MAX_FILES_TABLE);
3638         ctx->file_table = kcalloc(nr_tables, sizeof(struct fixed_file_table),
3639                                         GFP_KERNEL);
3640         if (!ctx->file_table)
3641                 return -ENOMEM;
3642
3643         if (io_sqe_alloc_file_tables(ctx, nr_tables, nr_args)) {
3644                 kfree(ctx->file_table);
3645                 ctx->file_table = NULL;
3646                 return -ENOMEM;
3647         }
3648
3649         for (i = 0; i < nr_args; i++, ctx->nr_user_files++) {
3650                 struct fixed_file_table *table;
3651                 unsigned index;
3652
3653                 ret = -EFAULT;
3654                 if (copy_from_user(&fd, &fds[i], sizeof(fd)))
3655                         break;
3656                 /* allow sparse sets */
3657                 if (fd == -1) {
3658                         ret = 0;
3659                         continue;
3660                 }
3661
3662                 table = &ctx->file_table[i >> IORING_FILE_TABLE_SHIFT];
3663                 index = i & IORING_FILE_TABLE_MASK;
3664                 table->files[index] = fget(fd);
3665
3666                 ret = -EBADF;
3667                 if (!table->files[index])
3668                         break;
3669                 /*
3670                  * Don't allow io_uring instances to be registered. If UNIX
3671                  * isn't enabled, then this causes a reference cycle and this
3672                  * instance can never get freed. If UNIX is enabled we'll
3673                  * handle it just fine, but there's still no point in allowing
3674                  * a ring fd as it doesn't support regular read/write anyway.
3675                  */
3676                 if (table->files[index]->f_op == &io_uring_fops) {
3677                         fput(table->files[index]);
3678                         break;
3679                 }
3680                 ret = 0;
3681         }
3682
3683         if (ret) {
3684                 for (i = 0; i < ctx->nr_user_files; i++) {
3685                         struct file *file;
3686
3687                         file = io_file_from_index(ctx, i);
3688                         if (file)
3689                                 fput(file);
3690                 }
3691                 for (i = 0; i < nr_tables; i++)
3692                         kfree(ctx->file_table[i].files);
3693
3694                 kfree(ctx->file_table);
3695                 ctx->file_table = NULL;
3696                 ctx->nr_user_files = 0;
3697                 return ret;
3698         }
3699
3700         ret = io_sqe_files_scm(ctx);
3701         if (ret)
3702                 io_sqe_files_unregister(ctx);
3703
3704         return ret;
3705 }
3706
3707 static void io_sqe_file_unregister(struct io_ring_ctx *ctx, int index)
3708 {
3709 #if defined(CONFIG_UNIX)
3710         struct file *file = io_file_from_index(ctx, index);
3711         struct sock *sock = ctx->ring_sock->sk;
3712         struct sk_buff_head list, *head = &sock->sk_receive_queue;
3713         struct sk_buff *skb;
3714         int i;
3715
3716         __skb_queue_head_init(&list);
3717
3718         /*
3719          * Find the skb that holds this file in its SCM_RIGHTS. When found,
3720          * remove this entry and rearrange the file array.
3721          */
3722         skb = skb_dequeue(head);
3723         while (skb) {
3724                 struct scm_fp_list *fp;
3725
3726                 fp = UNIXCB(skb).fp;
3727                 for (i = 0; i < fp->count; i++) {
3728                         int left;
3729
3730                         if (fp->fp[i] != file)
3731                                 continue;
3732
3733                         unix_notinflight(fp->user, fp->fp[i]);
3734                         left = fp->count - 1 - i;
3735                         if (left) {
3736                                 memmove(&fp->fp[i], &fp->fp[i + 1],
3737                                                 left * sizeof(struct file *));
3738                         }
3739                         fp->count--;
3740                         if (!fp->count) {
3741                                 kfree_skb(skb);
3742                                 skb = NULL;
3743                         } else {
3744                                 __skb_queue_tail(&list, skb);
3745                         }
3746                         fput(file);
3747                         file = NULL;
3748                         break;
3749                 }
3750
3751                 if (!file)
3752                         break;
3753
3754                 __skb_queue_tail(&list, skb);
3755
3756                 skb = skb_dequeue(head);
3757         }
3758
3759         if (skb_peek(&list)) {
3760                 spin_lock_irq(&head->lock);
3761                 while ((skb = __skb_dequeue(&list)) != NULL)
3762                         __skb_queue_tail(head, skb);
3763                 spin_unlock_irq(&head->lock);
3764         }
3765 #else
3766         fput(io_file_from_index(ctx, index));
3767 #endif
3768 }
3769
3770 static int io_sqe_file_register(struct io_ring_ctx *ctx, struct file *file,
3771                                 int index)
3772 {
3773 #if defined(CONFIG_UNIX)
3774         struct sock *sock = ctx->ring_sock->sk;
3775         struct sk_buff_head *head = &sock->sk_receive_queue;
3776         struct sk_buff *skb;
3777
3778         /*
3779          * See if we can merge this file into an existing skb SCM_RIGHTS
3780          * file set. If there's no room, fall back to allocating a new skb
3781          * and filling it in.
3782          */
3783         spin_lock_irq(&head->lock);
3784         skb = skb_peek(head);
3785         if (skb) {
3786                 struct scm_fp_list *fpl = UNIXCB(skb).fp;
3787
3788                 if (fpl->count < SCM_MAX_FD) {
3789                         __skb_unlink(skb, head);
3790                         spin_unlock_irq(&head->lock);
3791                         fpl->fp[fpl->count] = get_file(file);
3792                         unix_inflight(fpl->user, fpl->fp[fpl->count]);
3793                         fpl->count++;
3794                         spin_lock_irq(&head->lock);
3795                         __skb_queue_head(head, skb);
3796                 } else {
3797                         skb = NULL;
3798                 }
3799         }
3800         spin_unlock_irq(&head->lock);
3801
3802         if (skb) {
3803                 fput(file);
3804                 return 0;
3805         }
3806
3807         return __io_sqe_files_scm(ctx, 1, index);
3808 #else
3809         return 0;
3810 #endif
3811 }
3812
3813 static int io_sqe_files_update(struct io_ring_ctx *ctx, void __user *arg,
3814                                unsigned nr_args)
3815 {
3816         struct io_uring_files_update up;
3817         __s32 __user *fds;
3818         int fd, i, err;
3819         __u32 done;
3820
3821         if (!ctx->file_table)
3822                 return -ENXIO;
3823         if (!nr_args)
3824                 return -EINVAL;
3825         if (copy_from_user(&up, arg, sizeof(up)))
3826                 return -EFAULT;
3827         if (check_add_overflow(up.offset, nr_args, &done))
3828                 return -EOVERFLOW;
3829         if (done > ctx->nr_user_files)
3830                 return -EINVAL;
3831
3832         done = 0;
3833         fds = (__s32 __user *) up.fds;
3834         while (nr_args) {
3835                 struct fixed_file_table *table;
3836                 unsigned index;
3837
3838                 err = 0;
3839                 if (copy_from_user(&fd, &fds[done], sizeof(fd))) {
3840                         err = -EFAULT;
3841                         break;
3842                 }
3843                 i = array_index_nospec(up.offset, ctx->nr_user_files);
3844                 table = &ctx->file_table[i >> IORING_FILE_TABLE_SHIFT];
3845                 index = i & IORING_FILE_TABLE_MASK;
3846                 if (table->files[index]) {
3847                         io_sqe_file_unregister(ctx, i);
3848                         table->files[index] = NULL;
3849                 }
3850                 if (fd != -1) {
3851                         struct file *file;
3852
3853                         file = fget(fd);
3854                         if (!file) {
3855                                 err = -EBADF;
3856                                 break;
3857                         }
3858                         /*
3859                          * Don't allow io_uring instances to be registered. If
3860                          * UNIX isn't enabled, then this causes a reference
3861                          * cycle and this instance can never get freed. If UNIX
3862                          * is enabled we'll handle it just fine, but there's
3863                          * still no point in allowing a ring fd as it doesn't
3864                          * support regular read/write anyway.
3865                          */
3866                         if (file->f_op == &io_uring_fops) {
3867                                 fput(file);
3868                                 err = -EBADF;
3869                                 break;
3870                         }
3871                         table->files[index] = file;
3872                         err = io_sqe_file_register(ctx, file, i);
3873                         if (err)
3874                                 break;
3875                 }
3876                 nr_args--;
3877                 done++;
3878                 up.offset++;
3879         }
3880
3881         return done ? done : err;
3882 }
3883
3884 static void io_put_work(struct io_wq_work *work)
3885 {
3886         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3887
3888         io_put_req(req);
3889 }
3890
3891 static void io_get_work(struct io_wq_work *work)
3892 {
3893         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3894
3895         refcount_inc(&req->refs);
3896 }
3897
3898 static int io_sq_offload_start(struct io_ring_ctx *ctx,
3899                                struct io_uring_params *p)
3900 {
3901         unsigned concurrency;
3902         int ret;
3903
3904         init_waitqueue_head(&ctx->sqo_wait);
3905         mmgrab(current->mm);
3906         ctx->sqo_mm = current->mm;
3907
3908         if (ctx->flags & IORING_SETUP_SQPOLL) {
3909                 ret = -EPERM;
3910                 if (!capable(CAP_SYS_ADMIN))
3911                         goto err;
3912
3913                 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
3914                 if (!ctx->sq_thread_idle)
3915                         ctx->sq_thread_idle = HZ;
3916
3917                 if (p->flags & IORING_SETUP_SQ_AFF) {
3918                         int cpu = p->sq_thread_cpu;
3919
3920                         ret = -EINVAL;
3921                         if (cpu >= nr_cpu_ids)
3922                                 goto err;
3923                         if (!cpu_online(cpu))
3924                                 goto err;
3925
3926                         ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
3927                                                         ctx, cpu,
3928                                                         "io_uring-sq");
3929                 } else {
3930                         ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
3931                                                         "io_uring-sq");
3932                 }
3933                 if (IS_ERR(ctx->sqo_thread)) {
3934                         ret = PTR_ERR(ctx->sqo_thread);
3935                         ctx->sqo_thread = NULL;
3936                         goto err;
3937                 }
3938                 wake_up_process(ctx->sqo_thread);
3939         } else if (p->flags & IORING_SETUP_SQ_AFF) {
3940                 /* Can't have SQ_AFF without SQPOLL */
3941                 ret = -EINVAL;
3942                 goto err;
3943         }
3944
3945         /* Do QD, or 4 * CPUS, whatever is smallest */
3946         concurrency = min(ctx->sq_entries, 4 * num_online_cpus());
3947         ctx->io_wq = io_wq_create(concurrency, ctx->sqo_mm, ctx->user,
3948                                         io_get_work, io_put_work);
3949         if (IS_ERR(ctx->io_wq)) {
3950                 ret = PTR_ERR(ctx->io_wq);
3951                 ctx->io_wq = NULL;
3952                 goto err;
3953         }
3954
3955         return 0;
3956 err:
3957         io_finish_async(ctx);
3958         mmdrop(ctx->sqo_mm);
3959         ctx->sqo_mm = NULL;
3960         return ret;
3961 }
3962
3963 static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
3964 {
3965         atomic_long_sub(nr_pages, &user->locked_vm);
3966 }
3967
3968 static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
3969 {
3970         unsigned long page_limit, cur_pages, new_pages;
3971
3972         /* Don't allow more pages than we can safely lock */
3973         page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
3974
3975         do {
3976                 cur_pages = atomic_long_read(&user->locked_vm);
3977                 new_pages = cur_pages + nr_pages;
3978                 if (new_pages > page_limit)
3979                         return -ENOMEM;
3980         } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
3981                                         new_pages) != cur_pages);
3982
3983         return 0;
3984 }
3985
3986 static void io_mem_free(void *ptr)
3987 {
3988         struct page *page;
3989
3990         if (!ptr)
3991                 return;
3992
3993         page = virt_to_head_page(ptr);
3994         if (put_page_testzero(page))
3995                 free_compound_page(page);
3996 }
3997
3998 static void *io_mem_alloc(size_t size)
3999 {
4000         gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
4001                                 __GFP_NORETRY;
4002
4003         return (void *) __get_free_pages(gfp_flags, get_order(size));
4004 }
4005
4006 static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries,
4007                                 size_t *sq_offset)
4008 {
4009         struct io_rings *rings;
4010         size_t off, sq_array_size;
4011
4012         off = struct_size(rings, cqes, cq_entries);
4013         if (off == SIZE_MAX)
4014                 return SIZE_MAX;
4015
4016 #ifdef CONFIG_SMP
4017         off = ALIGN(off, SMP_CACHE_BYTES);
4018         if (off == 0)
4019                 return SIZE_MAX;
4020 #endif
4021
4022         sq_array_size = array_size(sizeof(u32), sq_entries);
4023         if (sq_array_size == SIZE_MAX)
4024                 return SIZE_MAX;
4025
4026         if (check_add_overflow(off, sq_array_size, &off))
4027                 return SIZE_MAX;
4028
4029         if (sq_offset)
4030                 *sq_offset = off;
4031
4032         return off;
4033 }
4034
4035 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
4036 {
4037         size_t pages;
4038
4039         pages = (size_t)1 << get_order(
4040                 rings_size(sq_entries, cq_entries, NULL));
4041         pages += (size_t)1 << get_order(
4042                 array_size(sizeof(struct io_uring_sqe), sq_entries));
4043
4044         return pages;
4045 }
4046
4047 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
4048 {
4049         int i, j;
4050
4051         if (!ctx->user_bufs)
4052                 return -ENXIO;
4053
4054         for (i = 0; i < ctx->nr_user_bufs; i++) {
4055                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
4056
4057                 for (j = 0; j < imu->nr_bvecs; j++)
4058                         put_user_page(imu->bvec[j].bv_page);
4059
4060                 if (ctx->account_mem)
4061                         io_unaccount_mem(ctx->user, imu->nr_bvecs);
4062                 kvfree(imu->bvec);
4063                 imu->nr_bvecs = 0;
4064         }
4065
4066         kfree(ctx->user_bufs);
4067         ctx->user_bufs = NULL;
4068         ctx->nr_user_bufs = 0;
4069         return 0;
4070 }
4071
4072 static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
4073                        void __user *arg, unsigned index)
4074 {
4075         struct iovec __user *src;
4076
4077 #ifdef CONFIG_COMPAT
4078         if (ctx->compat) {
4079                 struct compat_iovec __user *ciovs;
4080                 struct compat_iovec ciov;
4081
4082                 ciovs = (struct compat_iovec __user *) arg;
4083                 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
4084                         return -EFAULT;
4085
4086                 dst->iov_base = (void __user *) (unsigned long) ciov.iov_base;
4087                 dst->iov_len = ciov.iov_len;
4088                 return 0;
4089         }
4090 #endif
4091         src = (struct iovec __user *) arg;
4092         if (copy_from_user(dst, &src[index], sizeof(*dst)))
4093                 return -EFAULT;
4094         return 0;
4095 }
4096
4097 static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
4098                                   unsigned nr_args)
4099 {
4100         struct vm_area_struct **vmas = NULL;
4101         struct page **pages = NULL;
4102         int i, j, got_pages = 0;
4103         int ret = -EINVAL;
4104
4105         if (ctx->user_bufs)
4106                 return -EBUSY;
4107         if (!nr_args || nr_args > UIO_MAXIOV)
4108                 return -EINVAL;
4109
4110         ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
4111                                         GFP_KERNEL);
4112         if (!ctx->user_bufs)
4113                 return -ENOMEM;
4114
4115         for (i = 0; i < nr_args; i++) {
4116                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
4117                 unsigned long off, start, end, ubuf;
4118                 int pret, nr_pages;
4119                 struct iovec iov;
4120                 size_t size;
4121
4122                 ret = io_copy_iov(ctx, &iov, arg, i);
4123                 if (ret)
4124                         goto err;
4125
4126                 /*
4127                  * Don't impose further limits on the size and buffer
4128                  * constraints here, we'll -EINVAL later when IO is
4129                  * submitted if they are wrong.
4130                  */
4131                 ret = -EFAULT;
4132                 if (!iov.iov_base || !iov.iov_len)
4133                         goto err;
4134
4135                 /* arbitrary limit, but we need something */
4136                 if (iov.iov_len > SZ_1G)
4137                         goto err;
4138
4139                 ubuf = (unsigned long) iov.iov_base;
4140                 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
4141                 start = ubuf >> PAGE_SHIFT;
4142                 nr_pages = end - start;
4143
4144                 if (ctx->account_mem) {
4145                         ret = io_account_mem(ctx->user, nr_pages);
4146                         if (ret)
4147                                 goto err;
4148                 }
4149
4150                 ret = 0;
4151                 if (!pages || nr_pages > got_pages) {
4152                         kfree(vmas);
4153                         kfree(pages);
4154                         pages = kvmalloc_array(nr_pages, sizeof(struct page *),
4155                                                 GFP_KERNEL);
4156                         vmas = kvmalloc_array(nr_pages,
4157                                         sizeof(struct vm_area_struct *),
4158                                         GFP_KERNEL);
4159                         if (!pages || !vmas) {
4160                                 ret = -ENOMEM;
4161                                 if (ctx->account_mem)
4162                                         io_unaccount_mem(ctx->user, nr_pages);
4163                                 goto err;
4164                         }
4165                         got_pages = nr_pages;
4166                 }
4167
4168                 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
4169                                                 GFP_KERNEL);
4170                 ret = -ENOMEM;
4171                 if (!imu->bvec) {
4172                         if (ctx->account_mem)
4173                                 io_unaccount_mem(ctx->user, nr_pages);
4174                         goto err;
4175                 }
4176
4177                 ret = 0;
4178                 down_read(&current->mm->mmap_sem);
4179                 pret = get_user_pages(ubuf, nr_pages,
4180                                       FOLL_WRITE | FOLL_LONGTERM,
4181                                       pages, vmas);
4182                 if (pret == nr_pages) {
4183                         /* don't support file backed memory */
4184                         for (j = 0; j < nr_pages; j++) {
4185                                 struct vm_area_struct *vma = vmas[j];
4186
4187                                 if (vma->vm_file &&
4188                                     !is_file_hugepages(vma->vm_file)) {
4189                                         ret = -EOPNOTSUPP;
4190                                         break;
4191                                 }
4192                         }
4193                 } else {
4194                         ret = pret < 0 ? pret : -EFAULT;
4195                 }
4196                 up_read(&current->mm->mmap_sem);
4197                 if (ret) {
4198                         /*
4199                          * if we did partial map, or found file backed vmas,
4200                          * release any pages we did get
4201                          */
4202                         if (pret > 0)
4203                                 put_user_pages(pages, pret);
4204                         if (ctx->account_mem)
4205                                 io_unaccount_mem(ctx->user, nr_pages);
4206                         kvfree(imu->bvec);
4207                         goto err;
4208                 }
4209
4210                 off = ubuf & ~PAGE_MASK;
4211                 size = iov.iov_len;
4212                 for (j = 0; j < nr_pages; j++) {
4213                         size_t vec_len;
4214
4215                         vec_len = min_t(size_t, size, PAGE_SIZE - off);
4216                         imu->bvec[j].bv_page = pages[j];
4217                         imu->bvec[j].bv_len = vec_len;
4218                         imu->bvec[j].bv_offset = off;
4219                         off = 0;
4220                         size -= vec_len;
4221                 }
4222                 /* store original address for later verification */
4223                 imu->ubuf = ubuf;
4224                 imu->len = iov.iov_len;
4225                 imu->nr_bvecs = nr_pages;
4226
4227                 ctx->nr_user_bufs++;
4228         }
4229         kvfree(pages);
4230         kvfree(vmas);
4231         return 0;
4232 err:
4233         kvfree(pages);
4234         kvfree(vmas);
4235         io_sqe_buffer_unregister(ctx);
4236         return ret;
4237 }
4238
4239 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
4240 {
4241         __s32 __user *fds = arg;
4242         int fd;
4243
4244         if (ctx->cq_ev_fd)
4245                 return -EBUSY;
4246
4247         if (copy_from_user(&fd, fds, sizeof(*fds)))
4248                 return -EFAULT;
4249
4250         ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
4251         if (IS_ERR(ctx->cq_ev_fd)) {
4252                 int ret = PTR_ERR(ctx->cq_ev_fd);
4253                 ctx->cq_ev_fd = NULL;
4254                 return ret;
4255         }
4256
4257         return 0;
4258 }
4259
4260 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
4261 {
4262         if (ctx->cq_ev_fd) {
4263                 eventfd_ctx_put(ctx->cq_ev_fd);
4264                 ctx->cq_ev_fd = NULL;
4265                 return 0;
4266         }
4267
4268         return -ENXIO;
4269 }
4270
4271 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
4272 {
4273         io_finish_async(ctx);
4274         if (ctx->sqo_mm)
4275                 mmdrop(ctx->sqo_mm);
4276
4277         io_iopoll_reap_events(ctx);
4278         io_sqe_buffer_unregister(ctx);
4279         io_sqe_files_unregister(ctx);
4280         io_eventfd_unregister(ctx);
4281
4282 #if defined(CONFIG_UNIX)
4283         if (ctx->ring_sock) {
4284                 ctx->ring_sock->file = NULL; /* so that iput() is called */
4285                 sock_release(ctx->ring_sock);
4286         }
4287 #endif
4288
4289         io_mem_free(ctx->rings);
4290         io_mem_free(ctx->sq_sqes);
4291
4292         percpu_ref_exit(&ctx->refs);
4293         if (ctx->account_mem)
4294                 io_unaccount_mem(ctx->user,
4295                                 ring_pages(ctx->sq_entries, ctx->cq_entries));
4296         free_uid(ctx->user);
4297         kfree(ctx->completions);
4298         kmem_cache_free(req_cachep, ctx->fallback_req);
4299         kfree(ctx);
4300 }
4301
4302 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
4303 {
4304         struct io_ring_ctx *ctx = file->private_data;
4305         __poll_t mask = 0;
4306
4307         poll_wait(file, &ctx->cq_wait, wait);
4308         /*
4309          * synchronizes with barrier from wq_has_sleeper call in
4310          * io_commit_cqring
4311          */
4312         smp_rmb();
4313         if (READ_ONCE(ctx->rings->sq.tail) - ctx->cached_sq_head !=
4314             ctx->rings->sq_ring_entries)
4315                 mask |= EPOLLOUT | EPOLLWRNORM;
4316         if (READ_ONCE(ctx->rings->cq.head) != ctx->cached_cq_tail)
4317                 mask |= EPOLLIN | EPOLLRDNORM;
4318
4319         return mask;
4320 }
4321
4322 static int io_uring_fasync(int fd, struct file *file, int on)
4323 {
4324         struct io_ring_ctx *ctx = file->private_data;
4325
4326         return fasync_helper(fd, file, on, &ctx->cq_fasync);
4327 }
4328
4329 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
4330 {
4331         mutex_lock(&ctx->uring_lock);
4332         percpu_ref_kill(&ctx->refs);
4333         mutex_unlock(&ctx->uring_lock);
4334
4335         io_kill_timeouts(ctx);
4336         io_poll_remove_all(ctx);
4337
4338         if (ctx->io_wq)
4339                 io_wq_cancel_all(ctx->io_wq);
4340
4341         io_iopoll_reap_events(ctx);
4342         /* if we failed setting up the ctx, we might not have any rings */
4343         if (ctx->rings)
4344                 io_cqring_overflow_flush(ctx, true);
4345         wait_for_completion(&ctx->completions[0]);
4346         io_ring_ctx_free(ctx);
4347 }
4348
4349 static int io_uring_release(struct inode *inode, struct file *file)
4350 {
4351         struct io_ring_ctx *ctx = file->private_data;
4352
4353         file->private_data = NULL;
4354         io_ring_ctx_wait_and_kill(ctx);
4355         return 0;
4356 }
4357
4358 static void io_uring_cancel_files(struct io_ring_ctx *ctx,
4359                                   struct files_struct *files)
4360 {
4361         struct io_kiocb *req;
4362         DEFINE_WAIT(wait);
4363
4364         while (!list_empty_careful(&ctx->inflight_list)) {
4365                 struct io_kiocb *cancel_req = NULL;
4366
4367                 spin_lock_irq(&ctx->inflight_lock);
4368                 list_for_each_entry(req, &ctx->inflight_list, inflight_entry) {
4369                         if (req->work.files != files)
4370                                 continue;
4371                         /* req is being completed, ignore */
4372                         if (!refcount_inc_not_zero(&req->refs))
4373                                 continue;
4374                         cancel_req = req;
4375                         break;
4376                 }
4377                 if (cancel_req)
4378                         prepare_to_wait(&ctx->inflight_wait, &wait,
4379                                                 TASK_UNINTERRUPTIBLE);
4380                 spin_unlock_irq(&ctx->inflight_lock);
4381
4382                 /* We need to keep going until we don't find a matching req */
4383                 if (!cancel_req)
4384                         break;
4385
4386                 io_wq_cancel_work(ctx->io_wq, &cancel_req->work);
4387                 io_put_req(cancel_req);
4388                 schedule();
4389         }
4390         finish_wait(&ctx->inflight_wait, &wait);
4391 }
4392
4393 static int io_uring_flush(struct file *file, void *data)
4394 {
4395         struct io_ring_ctx *ctx = file->private_data;
4396
4397         io_uring_cancel_files(ctx, data);
4398         if (fatal_signal_pending(current) || (current->flags & PF_EXITING)) {
4399                 io_cqring_overflow_flush(ctx, true);
4400                 io_wq_cancel_all(ctx->io_wq);
4401         }
4402         return 0;
4403 }
4404
4405 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
4406 {
4407         loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT;
4408         unsigned long sz = vma->vm_end - vma->vm_start;
4409         struct io_ring_ctx *ctx = file->private_data;
4410         unsigned long pfn;
4411         struct page *page;
4412         void *ptr;
4413
4414         switch (offset) {
4415         case IORING_OFF_SQ_RING:
4416         case IORING_OFF_CQ_RING:
4417                 ptr = ctx->rings;
4418                 break;
4419         case IORING_OFF_SQES:
4420                 ptr = ctx->sq_sqes;
4421                 break;
4422         default:
4423                 return -EINVAL;
4424         }
4425
4426         page = virt_to_head_page(ptr);
4427         if (sz > page_size(page))
4428                 return -EINVAL;
4429
4430         pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
4431         return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
4432 }
4433
4434 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
4435                 u32, min_complete, u32, flags, const sigset_t __user *, sig,
4436                 size_t, sigsz)
4437 {
4438         struct io_ring_ctx *ctx;
4439         long ret = -EBADF;
4440         int submitted = 0;
4441         struct fd f;
4442
4443         if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
4444                 return -EINVAL;
4445
4446         f = fdget(fd);
4447         if (!f.file)
4448                 return -EBADF;
4449
4450         ret = -EOPNOTSUPP;
4451         if (f.file->f_op != &io_uring_fops)
4452                 goto out_fput;
4453
4454         ret = -ENXIO;
4455         ctx = f.file->private_data;
4456         if (!percpu_ref_tryget(&ctx->refs))
4457                 goto out_fput;
4458
4459         /*
4460          * For SQ polling, the thread will do all submissions and completions.
4461          * Just return the requested submit count, and wake the thread if
4462          * we were asked to.
4463          */
4464         ret = 0;
4465         if (ctx->flags & IORING_SETUP_SQPOLL) {
4466                 if (!list_empty_careful(&ctx->cq_overflow_list))
4467                         io_cqring_overflow_flush(ctx, false);
4468                 if (flags & IORING_ENTER_SQ_WAKEUP)
4469                         wake_up(&ctx->sqo_wait);
4470                 submitted = to_submit;
4471         } else if (to_submit) {
4472                 struct mm_struct *cur_mm;
4473
4474                 to_submit = min(to_submit, ctx->sq_entries);
4475                 mutex_lock(&ctx->uring_lock);
4476                 /* already have mm, so io_submit_sqes() won't try to grab it */
4477                 cur_mm = ctx->sqo_mm;
4478                 submitted = io_submit_sqes(ctx, to_submit, f.file, fd,
4479                                            &cur_mm, false);
4480                 mutex_unlock(&ctx->uring_lock);
4481         }
4482         if (flags & IORING_ENTER_GETEVENTS) {
4483                 unsigned nr_events = 0;
4484
4485                 min_complete = min(min_complete, ctx->cq_entries);
4486
4487                 if (ctx->flags & IORING_SETUP_IOPOLL) {
4488                         ret = io_iopoll_check(ctx, &nr_events, min_complete);
4489                 } else {
4490                         ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
4491                 }
4492         }
4493
4494         percpu_ref_put(&ctx->refs);
4495 out_fput:
4496         fdput(f);
4497         return submitted ? submitted : ret;
4498 }
4499
4500 static const struct file_operations io_uring_fops = {
4501         .release        = io_uring_release,
4502         .flush          = io_uring_flush,
4503         .mmap           = io_uring_mmap,
4504         .poll           = io_uring_poll,
4505         .fasync         = io_uring_fasync,
4506 };
4507
4508 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
4509                                   struct io_uring_params *p)
4510 {
4511         struct io_rings *rings;
4512         size_t size, sq_array_offset;
4513
4514         size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset);
4515         if (size == SIZE_MAX)
4516                 return -EOVERFLOW;
4517
4518         rings = io_mem_alloc(size);
4519         if (!rings)
4520                 return -ENOMEM;
4521
4522         ctx->rings = rings;
4523         ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
4524         rings->sq_ring_mask = p->sq_entries - 1;
4525         rings->cq_ring_mask = p->cq_entries - 1;
4526         rings->sq_ring_entries = p->sq_entries;
4527         rings->cq_ring_entries = p->cq_entries;
4528         ctx->sq_mask = rings->sq_ring_mask;
4529         ctx->cq_mask = rings->cq_ring_mask;
4530         ctx->sq_entries = rings->sq_ring_entries;
4531         ctx->cq_entries = rings->cq_ring_entries;
4532
4533         size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
4534         if (size == SIZE_MAX)
4535                 return -EOVERFLOW;
4536
4537         ctx->sq_sqes = io_mem_alloc(size);
4538         if (!ctx->sq_sqes)
4539                 return -ENOMEM;
4540
4541         return 0;
4542 }
4543
4544 /*
4545  * Allocate an anonymous fd, this is what constitutes the application
4546  * visible backing of an io_uring instance. The application mmaps this
4547  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
4548  * we have to tie this fd to a socket for file garbage collection purposes.
4549  */
4550 static int io_uring_get_fd(struct io_ring_ctx *ctx)
4551 {
4552         struct file *file;
4553         int ret;
4554
4555 #if defined(CONFIG_UNIX)
4556         ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
4557                                 &ctx->ring_sock);
4558         if (ret)
4559                 return ret;
4560 #endif
4561
4562         ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
4563         if (ret < 0)
4564                 goto err;
4565
4566         file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
4567                                         O_RDWR | O_CLOEXEC);
4568         if (IS_ERR(file)) {
4569                 put_unused_fd(ret);
4570                 ret = PTR_ERR(file);
4571                 goto err;
4572         }
4573
4574 #if defined(CONFIG_UNIX)
4575         ctx->ring_sock->file = file;
4576         ctx->ring_sock->sk->sk_user_data = ctx;
4577 #endif
4578         fd_install(ret, file);
4579         return ret;
4580 err:
4581 #if defined(CONFIG_UNIX)
4582         sock_release(ctx->ring_sock);
4583         ctx->ring_sock = NULL;
4584 #endif
4585         return ret;
4586 }
4587
4588 static int io_uring_create(unsigned entries, struct io_uring_params *p)
4589 {
4590         struct user_struct *user = NULL;
4591         struct io_ring_ctx *ctx;
4592         bool account_mem;
4593         int ret;
4594
4595         if (!entries || entries > IORING_MAX_ENTRIES)
4596                 return -EINVAL;
4597
4598         /*
4599          * Use twice as many entries for the CQ ring. It's possible for the
4600          * application to drive a higher depth than the size of the SQ ring,
4601          * since the sqes are only used at submission time. This allows for
4602          * some flexibility in overcommitting a bit. If the application has
4603          * set IORING_SETUP_CQSIZE, it will have passed in the desired number
4604          * of CQ ring entries manually.
4605          */
4606         p->sq_entries = roundup_pow_of_two(entries);
4607         if (p->flags & IORING_SETUP_CQSIZE) {
4608                 /*
4609                  * If IORING_SETUP_CQSIZE is set, we do the same roundup
4610                  * to a power-of-two, if it isn't already. We do NOT impose
4611                  * any cq vs sq ring sizing.
4612                  */
4613                 if (p->cq_entries < p->sq_entries || p->cq_entries > IORING_MAX_CQ_ENTRIES)
4614                         return -EINVAL;
4615                 p->cq_entries = roundup_pow_of_two(p->cq_entries);
4616         } else {
4617                 p->cq_entries = 2 * p->sq_entries;
4618         }
4619
4620         user = get_uid(current_user());
4621         account_mem = !capable(CAP_IPC_LOCK);
4622
4623         if (account_mem) {
4624                 ret = io_account_mem(user,
4625                                 ring_pages(p->sq_entries, p->cq_entries));
4626                 if (ret) {
4627                         free_uid(user);
4628                         return ret;
4629                 }
4630         }
4631
4632         ctx = io_ring_ctx_alloc(p);
4633         if (!ctx) {
4634                 if (account_mem)
4635                         io_unaccount_mem(user, ring_pages(p->sq_entries,
4636                                                                 p->cq_entries));
4637                 free_uid(user);
4638                 return -ENOMEM;
4639         }
4640         ctx->compat = in_compat_syscall();
4641         ctx->account_mem = account_mem;
4642         ctx->user = user;
4643
4644         ret = io_allocate_scq_urings(ctx, p);
4645         if (ret)
4646                 goto err;
4647
4648         ret = io_sq_offload_start(ctx, p);
4649         if (ret)
4650                 goto err;
4651
4652         memset(&p->sq_off, 0, sizeof(p->sq_off));
4653         p->sq_off.head = offsetof(struct io_rings, sq.head);
4654         p->sq_off.tail = offsetof(struct io_rings, sq.tail);
4655         p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
4656         p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
4657         p->sq_off.flags = offsetof(struct io_rings, sq_flags);
4658         p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
4659         p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
4660
4661         memset(&p->cq_off, 0, sizeof(p->cq_off));
4662         p->cq_off.head = offsetof(struct io_rings, cq.head);
4663         p->cq_off.tail = offsetof(struct io_rings, cq.tail);
4664         p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
4665         p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
4666         p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
4667         p->cq_off.cqes = offsetof(struct io_rings, cqes);
4668
4669         /*
4670          * Install ring fd as the very last thing, so we don't risk someone
4671          * having closed it before we finish setup
4672          */
4673         ret = io_uring_get_fd(ctx);
4674         if (ret < 0)
4675                 goto err;
4676
4677         p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP;
4678         trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
4679         return ret;
4680 err:
4681         io_ring_ctx_wait_and_kill(ctx);
4682         return ret;
4683 }
4684
4685 /*
4686  * Sets up an aio uring context, and returns the fd. Applications asks for a
4687  * ring size, we return the actual sq/cq ring sizes (among other things) in the
4688  * params structure passed in.
4689  */
4690 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
4691 {
4692         struct io_uring_params p;
4693         long ret;
4694         int i;
4695
4696         if (copy_from_user(&p, params, sizeof(p)))
4697                 return -EFAULT;
4698         for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
4699                 if (p.resv[i])
4700                         return -EINVAL;
4701         }
4702
4703         if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
4704                         IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE))
4705                 return -EINVAL;
4706
4707         ret = io_uring_create(entries, &p);
4708         if (ret < 0)
4709                 return ret;
4710
4711         if (copy_to_user(params, &p, sizeof(p)))
4712                 return -EFAULT;
4713
4714         return ret;
4715 }
4716
4717 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
4718                 struct io_uring_params __user *, params)
4719 {
4720         return io_uring_setup(entries, params);
4721 }
4722
4723 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
4724                                void __user *arg, unsigned nr_args)
4725         __releases(ctx->uring_lock)
4726         __acquires(ctx->uring_lock)
4727 {
4728         int ret;
4729
4730         /*
4731          * We're inside the ring mutex, if the ref is already dying, then
4732          * someone else killed the ctx or is already going through
4733          * io_uring_register().
4734          */
4735         if (percpu_ref_is_dying(&ctx->refs))
4736                 return -ENXIO;
4737
4738         percpu_ref_kill(&ctx->refs);
4739
4740         /*
4741          * Drop uring mutex before waiting for references to exit. If another
4742          * thread is currently inside io_uring_enter() it might need to grab
4743          * the uring_lock to make progress. If we hold it here across the drain
4744          * wait, then we can deadlock. It's safe to drop the mutex here, since
4745          * no new references will come in after we've killed the percpu ref.
4746          */
4747         mutex_unlock(&ctx->uring_lock);
4748         wait_for_completion(&ctx->completions[0]);
4749         mutex_lock(&ctx->uring_lock);
4750
4751         switch (opcode) {
4752         case IORING_REGISTER_BUFFERS:
4753                 ret = io_sqe_buffer_register(ctx, arg, nr_args);
4754                 break;
4755         case IORING_UNREGISTER_BUFFERS:
4756                 ret = -EINVAL;
4757                 if (arg || nr_args)
4758                         break;
4759                 ret = io_sqe_buffer_unregister(ctx);
4760                 break;
4761         case IORING_REGISTER_FILES:
4762                 ret = io_sqe_files_register(ctx, arg, nr_args);
4763                 break;
4764         case IORING_UNREGISTER_FILES:
4765                 ret = -EINVAL;
4766                 if (arg || nr_args)
4767                         break;
4768                 ret = io_sqe_files_unregister(ctx);
4769                 break;
4770         case IORING_REGISTER_FILES_UPDATE:
4771                 ret = io_sqe_files_update(ctx, arg, nr_args);
4772                 break;
4773         case IORING_REGISTER_EVENTFD:
4774                 ret = -EINVAL;
4775                 if (nr_args != 1)
4776                         break;
4777                 ret = io_eventfd_register(ctx, arg);
4778                 break;
4779         case IORING_UNREGISTER_EVENTFD:
4780                 ret = -EINVAL;
4781                 if (arg || nr_args)
4782                         break;
4783                 ret = io_eventfd_unregister(ctx);
4784                 break;
4785         default:
4786                 ret = -EINVAL;
4787                 break;
4788         }
4789
4790         /* bring the ctx back to life */
4791         reinit_completion(&ctx->completions[0]);
4792         percpu_ref_reinit(&ctx->refs);
4793         return ret;
4794 }
4795
4796 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
4797                 void __user *, arg, unsigned int, nr_args)
4798 {
4799         struct io_ring_ctx *ctx;
4800         long ret = -EBADF;
4801         struct fd f;
4802
4803         f = fdget(fd);
4804         if (!f.file)
4805                 return -EBADF;
4806
4807         ret = -EOPNOTSUPP;
4808         if (f.file->f_op != &io_uring_fops)
4809                 goto out_fput;
4810
4811         ctx = f.file->private_data;
4812
4813         mutex_lock(&ctx->uring_lock);
4814         ret = __io_uring_register(ctx, opcode, arg, nr_args);
4815         mutex_unlock(&ctx->uring_lock);
4816         trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs,
4817                                                         ctx->cq_ev_fd != NULL, ret);
4818 out_fput:
4819         fdput(f);
4820         return ret;
4821 }
4822
4823 static int __init io_uring_init(void)
4824 {
4825         req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
4826         return 0;
4827 };
4828 __initcall(io_uring_init);