io_uring: wrap multi-req freeing in struct req_batch
[linux-2.6-block.git] / fs / io_uring.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Shared application/kernel submission and completion ring pairs, for
4  * supporting fast/efficient IO.
5  *
6  * A note on the read/write ordering memory barriers that are matched between
7  * the application and kernel side.
8  *
9  * After the application reads the CQ ring tail, it must use an
10  * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11  * before writing the tail (using smp_load_acquire to read the tail will
12  * do). It also needs a smp_mb() before updating CQ head (ordering the
13  * entry load(s) with the head store), pairing with an implicit barrier
14  * through a control-dependency in io_get_cqring (smp_store_release to
15  * store head will do). Failure to do so could lead to reading invalid
16  * CQ entries.
17  *
18  * Likewise, the application must use an appropriate smp_wmb() before
19  * writing the SQ tail (ordering SQ entry stores with the tail store),
20  * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21  * to store the tail will do). And it needs a barrier ordering the SQ
22  * head load before writing new SQ entries (smp_load_acquire to read
23  * head will do).
24  *
25  * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26  * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27  * updating the SQ tail; a full memory barrier smp_mb() is needed
28  * between.
29  *
30  * Also see the examples in the liburing library:
31  *
32  *      git://git.kernel.dk/liburing
33  *
34  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35  * from data shared between the kernel and application. This is done both
36  * for ordering purposes, but also to ensure that once a value is loaded from
37  * data that the application could potentially modify, it remains stable.
38  *
39  * Copyright (C) 2018-2019 Jens Axboe
40  * Copyright (c) 2018-2019 Christoph Hellwig
41  */
42 #include <linux/kernel.h>
43 #include <linux/init.h>
44 #include <linux/errno.h>
45 #include <linux/syscalls.h>
46 #include <linux/compat.h>
47 #include <linux/refcount.h>
48 #include <linux/uio.h>
49
50 #include <linux/sched/signal.h>
51 #include <linux/fs.h>
52 #include <linux/file.h>
53 #include <linux/fdtable.h>
54 #include <linux/mm.h>
55 #include <linux/mman.h>
56 #include <linux/mmu_context.h>
57 #include <linux/percpu.h>
58 #include <linux/slab.h>
59 #include <linux/kthread.h>
60 #include <linux/blkdev.h>
61 #include <linux/bvec.h>
62 #include <linux/net.h>
63 #include <net/sock.h>
64 #include <net/af_unix.h>
65 #include <net/scm.h>
66 #include <linux/anon_inodes.h>
67 #include <linux/sched/mm.h>
68 #include <linux/uaccess.h>
69 #include <linux/nospec.h>
70 #include <linux/sizes.h>
71 #include <linux/hugetlb.h>
72 #include <linux/highmem.h>
73 #include <linux/namei.h>
74 #include <linux/fsnotify.h>
75 #include <linux/fadvise.h>
76
77 #define CREATE_TRACE_POINTS
78 #include <trace/events/io_uring.h>
79
80 #include <uapi/linux/io_uring.h>
81
82 #include "internal.h"
83 #include "io-wq.h"
84
85 #define IORING_MAX_ENTRIES      32768
86 #define IORING_MAX_CQ_ENTRIES   (2 * IORING_MAX_ENTRIES)
87
88 /*
89  * Shift of 9 is 512 entries, or exactly one page on 64-bit archs
90  */
91 #define IORING_FILE_TABLE_SHIFT 9
92 #define IORING_MAX_FILES_TABLE  (1U << IORING_FILE_TABLE_SHIFT)
93 #define IORING_FILE_TABLE_MASK  (IORING_MAX_FILES_TABLE - 1)
94 #define IORING_MAX_FIXED_FILES  (64 * IORING_MAX_FILES_TABLE)
95
96 struct io_uring {
97         u32 head ____cacheline_aligned_in_smp;
98         u32 tail ____cacheline_aligned_in_smp;
99 };
100
101 /*
102  * This data is shared with the application through the mmap at offsets
103  * IORING_OFF_SQ_RING and IORING_OFF_CQ_RING.
104  *
105  * The offsets to the member fields are published through struct
106  * io_sqring_offsets when calling io_uring_setup.
107  */
108 struct io_rings {
109         /*
110          * Head and tail offsets into the ring; the offsets need to be
111          * masked to get valid indices.
112          *
113          * The kernel controls head of the sq ring and the tail of the cq ring,
114          * and the application controls tail of the sq ring and the head of the
115          * cq ring.
116          */
117         struct io_uring         sq, cq;
118         /*
119          * Bitmasks to apply to head and tail offsets (constant, equals
120          * ring_entries - 1)
121          */
122         u32                     sq_ring_mask, cq_ring_mask;
123         /* Ring sizes (constant, power of 2) */
124         u32                     sq_ring_entries, cq_ring_entries;
125         /*
126          * Number of invalid entries dropped by the kernel due to
127          * invalid index stored in array
128          *
129          * Written by the kernel, shouldn't be modified by the
130          * application (i.e. get number of "new events" by comparing to
131          * cached value).
132          *
133          * After a new SQ head value was read by the application this
134          * counter includes all submissions that were dropped reaching
135          * the new SQ head (and possibly more).
136          */
137         u32                     sq_dropped;
138         /*
139          * Runtime flags
140          *
141          * Written by the kernel, shouldn't be modified by the
142          * application.
143          *
144          * The application needs a full memory barrier before checking
145          * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
146          */
147         u32                     sq_flags;
148         /*
149          * Number of completion events lost because the queue was full;
150          * this should be avoided by the application by making sure
151          * there are not more requests pending than there is space in
152          * the completion queue.
153          *
154          * Written by the kernel, shouldn't be modified by the
155          * application (i.e. get number of "new events" by comparing to
156          * cached value).
157          *
158          * As completion events come in out of order this counter is not
159          * ordered with any other data.
160          */
161         u32                     cq_overflow;
162         /*
163          * Ring buffer of completion events.
164          *
165          * The kernel writes completion events fresh every time they are
166          * produced, so the application is allowed to modify pending
167          * entries.
168          */
169         struct io_uring_cqe     cqes[] ____cacheline_aligned_in_smp;
170 };
171
172 struct io_mapped_ubuf {
173         u64             ubuf;
174         size_t          len;
175         struct          bio_vec *bvec;
176         unsigned int    nr_bvecs;
177 };
178
179 struct fixed_file_table {
180         struct file             **files;
181 };
182
183 enum {
184         FFD_F_ATOMIC,
185 };
186
187 struct fixed_file_data {
188         struct fixed_file_table         *table;
189         struct io_ring_ctx              *ctx;
190
191         struct percpu_ref               refs;
192         struct llist_head               put_llist;
193         unsigned long                   state;
194         struct work_struct              ref_work;
195         struct completion               done;
196 };
197
198 struct io_ring_ctx {
199         struct {
200                 struct percpu_ref       refs;
201         } ____cacheline_aligned_in_smp;
202
203         struct {
204                 unsigned int            flags;
205                 bool                    compat;
206                 bool                    account_mem;
207                 bool                    cq_overflow_flushed;
208                 bool                    drain_next;
209
210                 /*
211                  * Ring buffer of indices into array of io_uring_sqe, which is
212                  * mmapped by the application using the IORING_OFF_SQES offset.
213                  *
214                  * This indirection could e.g. be used to assign fixed
215                  * io_uring_sqe entries to operations and only submit them to
216                  * the queue when needed.
217                  *
218                  * The kernel modifies neither the indices array nor the entries
219                  * array.
220                  */
221                 u32                     *sq_array;
222                 unsigned                cached_sq_head;
223                 unsigned                sq_entries;
224                 unsigned                sq_mask;
225                 unsigned                sq_thread_idle;
226                 unsigned                cached_sq_dropped;
227                 atomic_t                cached_cq_overflow;
228                 unsigned long           sq_check_overflow;
229
230                 struct list_head        defer_list;
231                 struct list_head        timeout_list;
232                 struct list_head        cq_overflow_list;
233
234                 wait_queue_head_t       inflight_wait;
235                 struct io_uring_sqe     *sq_sqes;
236         } ____cacheline_aligned_in_smp;
237
238         struct io_rings *rings;
239
240         /* IO offload */
241         struct io_wq            *io_wq;
242         struct task_struct      *sqo_thread;    /* if using sq thread polling */
243         struct mm_struct        *sqo_mm;
244         wait_queue_head_t       sqo_wait;
245
246         /*
247          * If used, fixed file set. Writers must ensure that ->refs is dead,
248          * readers must ensure that ->refs is alive as long as the file* is
249          * used. Only updated through io_uring_register(2).
250          */
251         struct fixed_file_data  *file_data;
252         unsigned                nr_user_files;
253
254         /* if used, fixed mapped user buffers */
255         unsigned                nr_user_bufs;
256         struct io_mapped_ubuf   *user_bufs;
257
258         struct user_struct      *user;
259
260         const struct cred       *creds;
261
262         /* 0 is for ctx quiesce/reinit/free, 1 is for sqo_thread started */
263         struct completion       *completions;
264
265         /* if all else fails... */
266         struct io_kiocb         *fallback_req;
267
268 #if defined(CONFIG_UNIX)
269         struct socket           *ring_sock;
270 #endif
271
272         struct {
273                 unsigned                cached_cq_tail;
274                 unsigned                cq_entries;
275                 unsigned                cq_mask;
276                 atomic_t                cq_timeouts;
277                 unsigned long           cq_check_overflow;
278                 struct wait_queue_head  cq_wait;
279                 struct fasync_struct    *cq_fasync;
280                 struct eventfd_ctx      *cq_ev_fd;
281         } ____cacheline_aligned_in_smp;
282
283         struct {
284                 struct mutex            uring_lock;
285                 wait_queue_head_t       wait;
286         } ____cacheline_aligned_in_smp;
287
288         struct {
289                 spinlock_t              completion_lock;
290                 struct llist_head       poll_llist;
291
292                 /*
293                  * ->poll_list is protected by the ctx->uring_lock for
294                  * io_uring instances that don't use IORING_SETUP_SQPOLL.
295                  * For SQPOLL, only the single threaded io_sq_thread() will
296                  * manipulate the list, hence no extra locking is needed there.
297                  */
298                 struct list_head        poll_list;
299                 struct hlist_head       *cancel_hash;
300                 unsigned                cancel_hash_bits;
301                 bool                    poll_multi_file;
302
303                 spinlock_t              inflight_lock;
304                 struct list_head        inflight_list;
305         } ____cacheline_aligned_in_smp;
306 };
307
308 /*
309  * First field must be the file pointer in all the
310  * iocb unions! See also 'struct kiocb' in <linux/fs.h>
311  */
312 struct io_poll_iocb {
313         struct file                     *file;
314         union {
315                 struct wait_queue_head  *head;
316                 u64                     addr;
317         };
318         __poll_t                        events;
319         bool                            done;
320         bool                            canceled;
321         struct wait_queue_entry         wait;
322 };
323
324 struct io_close {
325         struct file                     *file;
326         struct file                     *put_file;
327         int                             fd;
328 };
329
330 struct io_timeout_data {
331         struct io_kiocb                 *req;
332         struct hrtimer                  timer;
333         struct timespec64               ts;
334         enum hrtimer_mode               mode;
335         u32                             seq_offset;
336 };
337
338 struct io_accept {
339         struct file                     *file;
340         struct sockaddr __user          *addr;
341         int __user                      *addr_len;
342         int                             flags;
343 };
344
345 struct io_sync {
346         struct file                     *file;
347         loff_t                          len;
348         loff_t                          off;
349         int                             flags;
350         int                             mode;
351 };
352
353 struct io_cancel {
354         struct file                     *file;
355         u64                             addr;
356 };
357
358 struct io_timeout {
359         struct file                     *file;
360         u64                             addr;
361         int                             flags;
362         unsigned                        count;
363 };
364
365 struct io_rw {
366         /* NOTE: kiocb has the file as the first member, so don't do it here */
367         struct kiocb                    kiocb;
368         u64                             addr;
369         u64                             len;
370 };
371
372 struct io_connect {
373         struct file                     *file;
374         struct sockaddr __user          *addr;
375         int                             addr_len;
376 };
377
378 struct io_sr_msg {
379         struct file                     *file;
380         struct user_msghdr __user       *msg;
381         int                             msg_flags;
382 };
383
384 struct io_open {
385         struct file                     *file;
386         int                             dfd;
387         union {
388                 umode_t                 mode;
389                 unsigned                mask;
390         };
391         const char __user               *fname;
392         struct filename                 *filename;
393         struct statx __user             *buffer;
394         int                             flags;
395 };
396
397 struct io_files_update {
398         struct file                     *file;
399         u64                             arg;
400         u32                             nr_args;
401         u32                             offset;
402 };
403
404 struct io_fadvise {
405         struct file                     *file;
406         u64                             offset;
407         u32                             len;
408         u32                             advice;
409 };
410
411 struct io_madvise {
412         struct file                     *file;
413         u64                             addr;
414         u32                             len;
415         u32                             advice;
416 };
417
418 struct io_async_connect {
419         struct sockaddr_storage         address;
420 };
421
422 struct io_async_msghdr {
423         struct iovec                    fast_iov[UIO_FASTIOV];
424         struct iovec                    *iov;
425         struct sockaddr __user          *uaddr;
426         struct msghdr                   msg;
427 };
428
429 struct io_async_rw {
430         struct iovec                    fast_iov[UIO_FASTIOV];
431         struct iovec                    *iov;
432         ssize_t                         nr_segs;
433         ssize_t                         size;
434 };
435
436 struct io_async_open {
437         struct filename                 *filename;
438 };
439
440 struct io_async_ctx {
441         union {
442                 struct io_async_rw      rw;
443                 struct io_async_msghdr  msg;
444                 struct io_async_connect connect;
445                 struct io_timeout_data  timeout;
446                 struct io_async_open    open;
447         };
448 };
449
450 /*
451  * NOTE! Each of the iocb union members has the file pointer
452  * as the first entry in their struct definition. So you can
453  * access the file pointer through any of the sub-structs,
454  * or directly as just 'ki_filp' in this struct.
455  */
456 struct io_kiocb {
457         union {
458                 struct file             *file;
459                 struct io_rw            rw;
460                 struct io_poll_iocb     poll;
461                 struct io_accept        accept;
462                 struct io_sync          sync;
463                 struct io_cancel        cancel;
464                 struct io_timeout       timeout;
465                 struct io_connect       connect;
466                 struct io_sr_msg        sr_msg;
467                 struct io_open          open;
468                 struct io_close         close;
469                 struct io_files_update  files_update;
470                 struct io_fadvise       fadvise;
471                 struct io_madvise       madvise;
472         };
473
474         struct io_async_ctx             *io;
475         union {
476                 /*
477                  * ring_file is only used in the submission path, and
478                  * llist_node is only used for poll deferred completions
479                  */
480                 struct file             *ring_file;
481                 struct llist_node       llist_node;
482         };
483         int                             ring_fd;
484         bool                            has_user;
485         bool                            in_async;
486         bool                            needs_fixed_file;
487         u8                              opcode;
488
489         struct io_ring_ctx      *ctx;
490         union {
491                 struct list_head        list;
492                 struct hlist_node       hash_node;
493         };
494         struct list_head        link_list;
495         unsigned int            flags;
496         refcount_t              refs;
497 #define REQ_F_NOWAIT            1       /* must not punt to workers */
498 #define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
499 #define REQ_F_FIXED_FILE        4       /* ctx owns file */
500 #define REQ_F_LINK_NEXT         8       /* already grabbed next link */
501 #define REQ_F_IO_DRAIN          16      /* drain existing IO first */
502 #define REQ_F_IO_DRAINED        32      /* drain done */
503 #define REQ_F_LINK              64      /* linked sqes */
504 #define REQ_F_LINK_TIMEOUT      128     /* has linked timeout */
505 #define REQ_F_FAIL_LINK         256     /* fail rest of links */
506 #define REQ_F_DRAIN_LINK        512     /* link should be fully drained */
507 #define REQ_F_TIMEOUT           1024    /* timeout request */
508 #define REQ_F_ISREG             2048    /* regular file */
509 #define REQ_F_MUST_PUNT         4096    /* must be punted even for NONBLOCK */
510 #define REQ_F_TIMEOUT_NOSEQ     8192    /* no timeout sequence */
511 #define REQ_F_INFLIGHT          16384   /* on inflight list */
512 #define REQ_F_COMP_LOCKED       32768   /* completion under lock */
513 #define REQ_F_HARDLINK          65536   /* doesn't sever on completion < 0 */
514 #define REQ_F_FORCE_ASYNC       131072  /* IOSQE_ASYNC */
515 #define REQ_F_CUR_POS           262144  /* read/write uses file position */
516         u64                     user_data;
517         u32                     result;
518         u32                     sequence;
519
520         struct list_head        inflight_entry;
521
522         struct io_wq_work       work;
523 };
524
525 #define IO_PLUG_THRESHOLD               2
526 #define IO_IOPOLL_BATCH                 8
527
528 struct io_submit_state {
529         struct blk_plug         plug;
530
531         /*
532          * io_kiocb alloc cache
533          */
534         void                    *reqs[IO_IOPOLL_BATCH];
535         unsigned                int free_reqs;
536         unsigned                int cur_req;
537
538         /*
539          * File reference cache
540          */
541         struct file             *file;
542         unsigned int            fd;
543         unsigned int            has_refs;
544         unsigned int            used_refs;
545         unsigned int            ios_left;
546 };
547
548 struct io_op_def {
549         /* needs req->io allocated for deferral/async */
550         unsigned                async_ctx : 1;
551         /* needs current->mm setup, does mm access */
552         unsigned                needs_mm : 1;
553         /* needs req->file assigned */
554         unsigned                needs_file : 1;
555         /* needs req->file assigned IFF fd is >= 0 */
556         unsigned                fd_non_neg : 1;
557         /* hash wq insertion if file is a regular file */
558         unsigned                hash_reg_file : 1;
559         /* unbound wq insertion if file is a non-regular file */
560         unsigned                unbound_nonreg_file : 1;
561 };
562
563 static const struct io_op_def io_op_defs[] = {
564         {
565                 /* IORING_OP_NOP */
566         },
567         {
568                 /* IORING_OP_READV */
569                 .async_ctx              = 1,
570                 .needs_mm               = 1,
571                 .needs_file             = 1,
572                 .unbound_nonreg_file    = 1,
573         },
574         {
575                 /* IORING_OP_WRITEV */
576                 .async_ctx              = 1,
577                 .needs_mm               = 1,
578                 .needs_file             = 1,
579                 .hash_reg_file          = 1,
580                 .unbound_nonreg_file    = 1,
581         },
582         {
583                 /* IORING_OP_FSYNC */
584                 .needs_file             = 1,
585         },
586         {
587                 /* IORING_OP_READ_FIXED */
588                 .needs_file             = 1,
589                 .unbound_nonreg_file    = 1,
590         },
591         {
592                 /* IORING_OP_WRITE_FIXED */
593                 .needs_file             = 1,
594                 .hash_reg_file          = 1,
595                 .unbound_nonreg_file    = 1,
596         },
597         {
598                 /* IORING_OP_POLL_ADD */
599                 .needs_file             = 1,
600                 .unbound_nonreg_file    = 1,
601         },
602         {
603                 /* IORING_OP_POLL_REMOVE */
604         },
605         {
606                 /* IORING_OP_SYNC_FILE_RANGE */
607                 .needs_file             = 1,
608         },
609         {
610                 /* IORING_OP_SENDMSG */
611                 .async_ctx              = 1,
612                 .needs_mm               = 1,
613                 .needs_file             = 1,
614                 .unbound_nonreg_file    = 1,
615         },
616         {
617                 /* IORING_OP_RECVMSG */
618                 .async_ctx              = 1,
619                 .needs_mm               = 1,
620                 .needs_file             = 1,
621                 .unbound_nonreg_file    = 1,
622         },
623         {
624                 /* IORING_OP_TIMEOUT */
625                 .async_ctx              = 1,
626                 .needs_mm               = 1,
627         },
628         {
629                 /* IORING_OP_TIMEOUT_REMOVE */
630         },
631         {
632                 /* IORING_OP_ACCEPT */
633                 .needs_mm               = 1,
634                 .needs_file             = 1,
635                 .unbound_nonreg_file    = 1,
636         },
637         {
638                 /* IORING_OP_ASYNC_CANCEL */
639         },
640         {
641                 /* IORING_OP_LINK_TIMEOUT */
642                 .async_ctx              = 1,
643                 .needs_mm               = 1,
644         },
645         {
646                 /* IORING_OP_CONNECT */
647                 .async_ctx              = 1,
648                 .needs_mm               = 1,
649                 .needs_file             = 1,
650                 .unbound_nonreg_file    = 1,
651         },
652         {
653                 /* IORING_OP_FALLOCATE */
654                 .needs_file             = 1,
655         },
656         {
657                 /* IORING_OP_OPENAT */
658                 .needs_file             = 1,
659                 .fd_non_neg             = 1,
660         },
661         {
662                 /* IORING_OP_CLOSE */
663                 .needs_file             = 1,
664         },
665         {
666                 /* IORING_OP_FILES_UPDATE */
667                 .needs_mm               = 1,
668         },
669         {
670                 /* IORING_OP_STATX */
671                 .needs_mm               = 1,
672                 .needs_file             = 1,
673                 .fd_non_neg             = 1,
674         },
675         {
676                 /* IORING_OP_READ */
677                 .needs_mm               = 1,
678                 .needs_file             = 1,
679                 .unbound_nonreg_file    = 1,
680         },
681         {
682                 /* IORING_OP_WRITE */
683                 .needs_mm               = 1,
684                 .needs_file             = 1,
685                 .unbound_nonreg_file    = 1,
686         },
687         {
688                 /* IORING_OP_FADVISE */
689                 .needs_file             = 1,
690         },
691         {
692                 /* IORING_OP_MADVISE */
693                 .needs_mm               = 1,
694         },
695 };
696
697 static void io_wq_submit_work(struct io_wq_work **workptr);
698 static void io_cqring_fill_event(struct io_kiocb *req, long res);
699 static void io_put_req(struct io_kiocb *req);
700 static void __io_double_put_req(struct io_kiocb *req);
701 static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req);
702 static void io_queue_linked_timeout(struct io_kiocb *req);
703 static int __io_sqe_files_update(struct io_ring_ctx *ctx,
704                                  struct io_uring_files_update *ip,
705                                  unsigned nr_args);
706
707 static struct kmem_cache *req_cachep;
708
709 static const struct file_operations io_uring_fops;
710
711 struct sock *io_uring_get_socket(struct file *file)
712 {
713 #if defined(CONFIG_UNIX)
714         if (file->f_op == &io_uring_fops) {
715                 struct io_ring_ctx *ctx = file->private_data;
716
717                 return ctx->ring_sock->sk;
718         }
719 #endif
720         return NULL;
721 }
722 EXPORT_SYMBOL(io_uring_get_socket);
723
724 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
725 {
726         struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
727
728         complete(&ctx->completions[0]);
729 }
730
731 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
732 {
733         struct io_ring_ctx *ctx;
734         int hash_bits;
735
736         ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
737         if (!ctx)
738                 return NULL;
739
740         ctx->fallback_req = kmem_cache_alloc(req_cachep, GFP_KERNEL);
741         if (!ctx->fallback_req)
742                 goto err;
743
744         ctx->completions = kmalloc(2 * sizeof(struct completion), GFP_KERNEL);
745         if (!ctx->completions)
746                 goto err;
747
748         /*
749          * Use 5 bits less than the max cq entries, that should give us around
750          * 32 entries per hash list if totally full and uniformly spread.
751          */
752         hash_bits = ilog2(p->cq_entries);
753         hash_bits -= 5;
754         if (hash_bits <= 0)
755                 hash_bits = 1;
756         ctx->cancel_hash_bits = hash_bits;
757         ctx->cancel_hash = kmalloc((1U << hash_bits) * sizeof(struct hlist_head),
758                                         GFP_KERNEL);
759         if (!ctx->cancel_hash)
760                 goto err;
761         __hash_init(ctx->cancel_hash, 1U << hash_bits);
762
763         if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
764                             PERCPU_REF_ALLOW_REINIT, GFP_KERNEL))
765                 goto err;
766
767         ctx->flags = p->flags;
768         init_waitqueue_head(&ctx->cq_wait);
769         INIT_LIST_HEAD(&ctx->cq_overflow_list);
770         init_completion(&ctx->completions[0]);
771         init_completion(&ctx->completions[1]);
772         mutex_init(&ctx->uring_lock);
773         init_waitqueue_head(&ctx->wait);
774         spin_lock_init(&ctx->completion_lock);
775         init_llist_head(&ctx->poll_llist);
776         INIT_LIST_HEAD(&ctx->poll_list);
777         INIT_LIST_HEAD(&ctx->defer_list);
778         INIT_LIST_HEAD(&ctx->timeout_list);
779         init_waitqueue_head(&ctx->inflight_wait);
780         spin_lock_init(&ctx->inflight_lock);
781         INIT_LIST_HEAD(&ctx->inflight_list);
782         return ctx;
783 err:
784         if (ctx->fallback_req)
785                 kmem_cache_free(req_cachep, ctx->fallback_req);
786         kfree(ctx->completions);
787         kfree(ctx->cancel_hash);
788         kfree(ctx);
789         return NULL;
790 }
791
792 static inline bool __req_need_defer(struct io_kiocb *req)
793 {
794         struct io_ring_ctx *ctx = req->ctx;
795
796         return req->sequence != ctx->cached_cq_tail + ctx->cached_sq_dropped
797                                         + atomic_read(&ctx->cached_cq_overflow);
798 }
799
800 static inline bool req_need_defer(struct io_kiocb *req)
801 {
802         if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) == REQ_F_IO_DRAIN)
803                 return __req_need_defer(req);
804
805         return false;
806 }
807
808 static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
809 {
810         struct io_kiocb *req;
811
812         req = list_first_entry_or_null(&ctx->defer_list, struct io_kiocb, list);
813         if (req && !req_need_defer(req)) {
814                 list_del_init(&req->list);
815                 return req;
816         }
817
818         return NULL;
819 }
820
821 static struct io_kiocb *io_get_timeout_req(struct io_ring_ctx *ctx)
822 {
823         struct io_kiocb *req;
824
825         req = list_first_entry_or_null(&ctx->timeout_list, struct io_kiocb, list);
826         if (req) {
827                 if (req->flags & REQ_F_TIMEOUT_NOSEQ)
828                         return NULL;
829                 if (!__req_need_defer(req)) {
830                         list_del_init(&req->list);
831                         return req;
832                 }
833         }
834
835         return NULL;
836 }
837
838 static void __io_commit_cqring(struct io_ring_ctx *ctx)
839 {
840         struct io_rings *rings = ctx->rings;
841
842         if (ctx->cached_cq_tail != READ_ONCE(rings->cq.tail)) {
843                 /* order cqe stores with ring update */
844                 smp_store_release(&rings->cq.tail, ctx->cached_cq_tail);
845
846                 if (wq_has_sleeper(&ctx->cq_wait)) {
847                         wake_up_interruptible(&ctx->cq_wait);
848                         kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
849                 }
850         }
851 }
852
853 static inline bool io_prep_async_work(struct io_kiocb *req,
854                                       struct io_kiocb **link)
855 {
856         const struct io_op_def *def = &io_op_defs[req->opcode];
857         bool do_hashed = false;
858
859         if (req->flags & REQ_F_ISREG) {
860                 if (def->hash_reg_file)
861                         do_hashed = true;
862         } else {
863                 if (def->unbound_nonreg_file)
864                         req->work.flags |= IO_WQ_WORK_UNBOUND;
865         }
866         if (def->needs_mm)
867                 req->work.flags |= IO_WQ_WORK_NEEDS_USER;
868
869         *link = io_prep_linked_timeout(req);
870         return do_hashed;
871 }
872
873 static inline void io_queue_async_work(struct io_kiocb *req)
874 {
875         struct io_ring_ctx *ctx = req->ctx;
876         struct io_kiocb *link;
877         bool do_hashed;
878
879         do_hashed = io_prep_async_work(req, &link);
880
881         trace_io_uring_queue_async_work(ctx, do_hashed, req, &req->work,
882                                         req->flags);
883         if (!do_hashed) {
884                 io_wq_enqueue(ctx->io_wq, &req->work);
885         } else {
886                 io_wq_enqueue_hashed(ctx->io_wq, &req->work,
887                                         file_inode(req->file));
888         }
889
890         if (link)
891                 io_queue_linked_timeout(link);
892 }
893
894 static void io_kill_timeout(struct io_kiocb *req)
895 {
896         int ret;
897
898         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
899         if (ret != -1) {
900                 atomic_inc(&req->ctx->cq_timeouts);
901                 list_del_init(&req->list);
902                 io_cqring_fill_event(req, 0);
903                 io_put_req(req);
904         }
905 }
906
907 static void io_kill_timeouts(struct io_ring_ctx *ctx)
908 {
909         struct io_kiocb *req, *tmp;
910
911         spin_lock_irq(&ctx->completion_lock);
912         list_for_each_entry_safe(req, tmp, &ctx->timeout_list, list)
913                 io_kill_timeout(req);
914         spin_unlock_irq(&ctx->completion_lock);
915 }
916
917 static void io_commit_cqring(struct io_ring_ctx *ctx)
918 {
919         struct io_kiocb *req;
920
921         while ((req = io_get_timeout_req(ctx)) != NULL)
922                 io_kill_timeout(req);
923
924         __io_commit_cqring(ctx);
925
926         while ((req = io_get_deferred_req(ctx)) != NULL) {
927                 req->flags |= REQ_F_IO_DRAINED;
928                 io_queue_async_work(req);
929         }
930 }
931
932 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
933 {
934         struct io_rings *rings = ctx->rings;
935         unsigned tail;
936
937         tail = ctx->cached_cq_tail;
938         /*
939          * writes to the cq entry need to come after reading head; the
940          * control dependency is enough as we're using WRITE_ONCE to
941          * fill the cq entry
942          */
943         if (tail - READ_ONCE(rings->cq.head) == rings->cq_ring_entries)
944                 return NULL;
945
946         ctx->cached_cq_tail++;
947         return &rings->cqes[tail & ctx->cq_mask];
948 }
949
950 static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
951 {
952         if (waitqueue_active(&ctx->wait))
953                 wake_up(&ctx->wait);
954         if (waitqueue_active(&ctx->sqo_wait))
955                 wake_up(&ctx->sqo_wait);
956         if (ctx->cq_ev_fd)
957                 eventfd_signal(ctx->cq_ev_fd, 1);
958 }
959
960 /* Returns true if there are no backlogged entries after the flush */
961 static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
962 {
963         struct io_rings *rings = ctx->rings;
964         struct io_uring_cqe *cqe;
965         struct io_kiocb *req;
966         unsigned long flags;
967         LIST_HEAD(list);
968
969         if (!force) {
970                 if (list_empty_careful(&ctx->cq_overflow_list))
971                         return true;
972                 if ((ctx->cached_cq_tail - READ_ONCE(rings->cq.head) ==
973                     rings->cq_ring_entries))
974                         return false;
975         }
976
977         spin_lock_irqsave(&ctx->completion_lock, flags);
978
979         /* if force is set, the ring is going away. always drop after that */
980         if (force)
981                 ctx->cq_overflow_flushed = true;
982
983         cqe = NULL;
984         while (!list_empty(&ctx->cq_overflow_list)) {
985                 cqe = io_get_cqring(ctx);
986                 if (!cqe && !force)
987                         break;
988
989                 req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb,
990                                                 list);
991                 list_move(&req->list, &list);
992                 if (cqe) {
993                         WRITE_ONCE(cqe->user_data, req->user_data);
994                         WRITE_ONCE(cqe->res, req->result);
995                         WRITE_ONCE(cqe->flags, 0);
996                 } else {
997                         WRITE_ONCE(ctx->rings->cq_overflow,
998                                 atomic_inc_return(&ctx->cached_cq_overflow));
999                 }
1000         }
1001
1002         io_commit_cqring(ctx);
1003         if (cqe) {
1004                 clear_bit(0, &ctx->sq_check_overflow);
1005                 clear_bit(0, &ctx->cq_check_overflow);
1006         }
1007         spin_unlock_irqrestore(&ctx->completion_lock, flags);
1008         io_cqring_ev_posted(ctx);
1009
1010         while (!list_empty(&list)) {
1011                 req = list_first_entry(&list, struct io_kiocb, list);
1012                 list_del(&req->list);
1013                 io_put_req(req);
1014         }
1015
1016         return cqe != NULL;
1017 }
1018
1019 static void io_cqring_fill_event(struct io_kiocb *req, long res)
1020 {
1021         struct io_ring_ctx *ctx = req->ctx;
1022         struct io_uring_cqe *cqe;
1023
1024         trace_io_uring_complete(ctx, req->user_data, res);
1025
1026         /*
1027          * If we can't get a cq entry, userspace overflowed the
1028          * submission (by quite a lot). Increment the overflow count in
1029          * the ring.
1030          */
1031         cqe = io_get_cqring(ctx);
1032         if (likely(cqe)) {
1033                 WRITE_ONCE(cqe->user_data, req->user_data);
1034                 WRITE_ONCE(cqe->res, res);
1035                 WRITE_ONCE(cqe->flags, 0);
1036         } else if (ctx->cq_overflow_flushed) {
1037                 WRITE_ONCE(ctx->rings->cq_overflow,
1038                                 atomic_inc_return(&ctx->cached_cq_overflow));
1039         } else {
1040                 if (list_empty(&ctx->cq_overflow_list)) {
1041                         set_bit(0, &ctx->sq_check_overflow);
1042                         set_bit(0, &ctx->cq_check_overflow);
1043                 }
1044                 refcount_inc(&req->refs);
1045                 req->result = res;
1046                 list_add_tail(&req->list, &ctx->cq_overflow_list);
1047         }
1048 }
1049
1050 static void io_cqring_add_event(struct io_kiocb *req, long res)
1051 {
1052         struct io_ring_ctx *ctx = req->ctx;
1053         unsigned long flags;
1054
1055         spin_lock_irqsave(&ctx->completion_lock, flags);
1056         io_cqring_fill_event(req, res);
1057         io_commit_cqring(ctx);
1058         spin_unlock_irqrestore(&ctx->completion_lock, flags);
1059
1060         io_cqring_ev_posted(ctx);
1061 }
1062
1063 static inline bool io_is_fallback_req(struct io_kiocb *req)
1064 {
1065         return req == (struct io_kiocb *)
1066                         ((unsigned long) req->ctx->fallback_req & ~1UL);
1067 }
1068
1069 static struct io_kiocb *io_get_fallback_req(struct io_ring_ctx *ctx)
1070 {
1071         struct io_kiocb *req;
1072
1073         req = ctx->fallback_req;
1074         if (!test_and_set_bit_lock(0, (unsigned long *) ctx->fallback_req))
1075                 return req;
1076
1077         return NULL;
1078 }
1079
1080 static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
1081                                    struct io_submit_state *state)
1082 {
1083         gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
1084         struct io_kiocb *req;
1085
1086         if (!state) {
1087                 req = kmem_cache_alloc(req_cachep, gfp);
1088                 if (unlikely(!req))
1089                         goto fallback;
1090         } else if (!state->free_reqs) {
1091                 size_t sz;
1092                 int ret;
1093
1094                 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
1095                 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
1096
1097                 /*
1098                  * Bulk alloc is all-or-nothing. If we fail to get a batch,
1099                  * retry single alloc to be on the safe side.
1100                  */
1101                 if (unlikely(ret <= 0)) {
1102                         state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
1103                         if (!state->reqs[0])
1104                                 goto fallback;
1105                         ret = 1;
1106                 }
1107                 state->free_reqs = ret - 1;
1108                 state->cur_req = 1;
1109                 req = state->reqs[0];
1110         } else {
1111                 req = state->reqs[state->cur_req];
1112                 state->free_reqs--;
1113                 state->cur_req++;
1114         }
1115
1116 got_it:
1117         req->io = NULL;
1118         req->ring_file = NULL;
1119         req->file = NULL;
1120         req->ctx = ctx;
1121         req->flags = 0;
1122         /* one is dropped after submission, the other at completion */
1123         refcount_set(&req->refs, 2);
1124         req->result = 0;
1125         INIT_IO_WORK(&req->work, io_wq_submit_work);
1126         return req;
1127 fallback:
1128         req = io_get_fallback_req(ctx);
1129         if (req)
1130                 goto got_it;
1131         percpu_ref_put(&ctx->refs);
1132         return NULL;
1133 }
1134
1135 struct req_batch {
1136         void *reqs[IO_IOPOLL_BATCH];
1137         int to_free;
1138 };
1139
1140 static void io_free_req_many(struct io_ring_ctx *ctx, struct req_batch *rb)
1141 {
1142         if (!rb->to_free)
1143                 return;
1144         kmem_cache_free_bulk(req_cachep, rb->to_free, rb->reqs);
1145         percpu_ref_put_many(&ctx->refs, rb->to_free);
1146         percpu_ref_put_many(&ctx->file_data->refs, rb->to_free);
1147         rb->to_free = 0;
1148 }
1149
1150 static void __io_req_do_free(struct io_kiocb *req)
1151 {
1152         if (likely(!io_is_fallback_req(req)))
1153                 kmem_cache_free(req_cachep, req);
1154         else
1155                 clear_bit_unlock(0, (unsigned long *) req->ctx->fallback_req);
1156 }
1157
1158 static void __io_free_req(struct io_kiocb *req)
1159 {
1160         struct io_ring_ctx *ctx = req->ctx;
1161
1162         if (req->io)
1163                 kfree(req->io);
1164         if (req->file) {
1165                 if (req->flags & REQ_F_FIXED_FILE)
1166                         percpu_ref_put(&ctx->file_data->refs);
1167                 else
1168                         fput(req->file);
1169         }
1170         if (req->flags & REQ_F_INFLIGHT) {
1171                 unsigned long flags;
1172
1173                 spin_lock_irqsave(&ctx->inflight_lock, flags);
1174                 list_del(&req->inflight_entry);
1175                 if (waitqueue_active(&ctx->inflight_wait))
1176                         wake_up(&ctx->inflight_wait);
1177                 spin_unlock_irqrestore(&ctx->inflight_lock, flags);
1178         }
1179
1180         percpu_ref_put(&req->ctx->refs);
1181         __io_req_do_free(req);
1182 }
1183
1184 static bool io_link_cancel_timeout(struct io_kiocb *req)
1185 {
1186         struct io_ring_ctx *ctx = req->ctx;
1187         int ret;
1188
1189         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
1190         if (ret != -1) {
1191                 io_cqring_fill_event(req, -ECANCELED);
1192                 io_commit_cqring(ctx);
1193                 req->flags &= ~REQ_F_LINK;
1194                 io_put_req(req);
1195                 return true;
1196         }
1197
1198         return false;
1199 }
1200
1201 static void io_req_link_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
1202 {
1203         struct io_ring_ctx *ctx = req->ctx;
1204         bool wake_ev = false;
1205
1206         /* Already got next link */
1207         if (req->flags & REQ_F_LINK_NEXT)
1208                 return;
1209
1210         /*
1211          * The list should never be empty when we are called here. But could
1212          * potentially happen if the chain is messed up, check to be on the
1213          * safe side.
1214          */
1215         while (!list_empty(&req->link_list)) {
1216                 struct io_kiocb *nxt = list_first_entry(&req->link_list,
1217                                                 struct io_kiocb, link_list);
1218
1219                 if (unlikely((req->flags & REQ_F_LINK_TIMEOUT) &&
1220                              (nxt->flags & REQ_F_TIMEOUT))) {
1221                         list_del_init(&nxt->link_list);
1222                         wake_ev |= io_link_cancel_timeout(nxt);
1223                         req->flags &= ~REQ_F_LINK_TIMEOUT;
1224                         continue;
1225                 }
1226
1227                 list_del_init(&req->link_list);
1228                 if (!list_empty(&nxt->link_list))
1229                         nxt->flags |= REQ_F_LINK;
1230                 *nxtptr = nxt;
1231                 break;
1232         }
1233
1234         req->flags |= REQ_F_LINK_NEXT;
1235         if (wake_ev)
1236                 io_cqring_ev_posted(ctx);
1237 }
1238
1239 /*
1240  * Called if REQ_F_LINK is set, and we fail the head request
1241  */
1242 static void io_fail_links(struct io_kiocb *req)
1243 {
1244         struct io_ring_ctx *ctx = req->ctx;
1245         unsigned long flags;
1246
1247         spin_lock_irqsave(&ctx->completion_lock, flags);
1248
1249         while (!list_empty(&req->link_list)) {
1250                 struct io_kiocb *link = list_first_entry(&req->link_list,
1251                                                 struct io_kiocb, link_list);
1252
1253                 list_del_init(&link->link_list);
1254                 trace_io_uring_fail_link(req, link);
1255
1256                 if ((req->flags & REQ_F_LINK_TIMEOUT) &&
1257                     link->opcode == IORING_OP_LINK_TIMEOUT) {
1258                         io_link_cancel_timeout(link);
1259                 } else {
1260                         io_cqring_fill_event(link, -ECANCELED);
1261                         __io_double_put_req(link);
1262                 }
1263                 req->flags &= ~REQ_F_LINK_TIMEOUT;
1264         }
1265
1266         io_commit_cqring(ctx);
1267         spin_unlock_irqrestore(&ctx->completion_lock, flags);
1268         io_cqring_ev_posted(ctx);
1269 }
1270
1271 static void io_req_find_next(struct io_kiocb *req, struct io_kiocb **nxt)
1272 {
1273         if (likely(!(req->flags & REQ_F_LINK)))
1274                 return;
1275
1276         /*
1277          * If LINK is set, we have dependent requests in this chain. If we
1278          * didn't fail this request, queue the first one up, moving any other
1279          * dependencies to the next request. In case of failure, fail the rest
1280          * of the chain.
1281          */
1282         if (req->flags & REQ_F_FAIL_LINK) {
1283                 io_fail_links(req);
1284         } else if ((req->flags & (REQ_F_LINK_TIMEOUT | REQ_F_COMP_LOCKED)) ==
1285                         REQ_F_LINK_TIMEOUT) {
1286                 struct io_ring_ctx *ctx = req->ctx;
1287                 unsigned long flags;
1288
1289                 /*
1290                  * If this is a timeout link, we could be racing with the
1291                  * timeout timer. Grab the completion lock for this case to
1292                  * protect against that.
1293                  */
1294                 spin_lock_irqsave(&ctx->completion_lock, flags);
1295                 io_req_link_next(req, nxt);
1296                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
1297         } else {
1298                 io_req_link_next(req, nxt);
1299         }
1300 }
1301
1302 static void io_free_req(struct io_kiocb *req)
1303 {
1304         struct io_kiocb *nxt = NULL;
1305
1306         io_req_find_next(req, &nxt);
1307         __io_free_req(req);
1308
1309         if (nxt)
1310                 io_queue_async_work(nxt);
1311 }
1312
1313 /*
1314  * Drop reference to request, return next in chain (if there is one) if this
1315  * was the last reference to this request.
1316  */
1317 __attribute__((nonnull))
1318 static void io_put_req_find_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
1319 {
1320         io_req_find_next(req, nxtptr);
1321
1322         if (refcount_dec_and_test(&req->refs))
1323                 __io_free_req(req);
1324 }
1325
1326 static void io_put_req(struct io_kiocb *req)
1327 {
1328         if (refcount_dec_and_test(&req->refs))
1329                 io_free_req(req);
1330 }
1331
1332 /*
1333  * Must only be used if we don't need to care about links, usually from
1334  * within the completion handling itself.
1335  */
1336 static void __io_double_put_req(struct io_kiocb *req)
1337 {
1338         /* drop both submit and complete references */
1339         if (refcount_sub_and_test(2, &req->refs))
1340                 __io_free_req(req);
1341 }
1342
1343 static void io_double_put_req(struct io_kiocb *req)
1344 {
1345         /* drop both submit and complete references */
1346         if (refcount_sub_and_test(2, &req->refs))
1347                 io_free_req(req);
1348 }
1349
1350 static unsigned io_cqring_events(struct io_ring_ctx *ctx, bool noflush)
1351 {
1352         struct io_rings *rings = ctx->rings;
1353
1354         if (test_bit(0, &ctx->cq_check_overflow)) {
1355                 /*
1356                  * noflush == true is from the waitqueue handler, just ensure
1357                  * we wake up the task, and the next invocation will flush the
1358                  * entries. We cannot safely to it from here.
1359                  */
1360                 if (noflush && !list_empty(&ctx->cq_overflow_list))
1361                         return -1U;
1362
1363                 io_cqring_overflow_flush(ctx, false);
1364         }
1365
1366         /* See comment at the top of this file */
1367         smp_rmb();
1368         return ctx->cached_cq_tail - READ_ONCE(rings->cq.head);
1369 }
1370
1371 static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
1372 {
1373         struct io_rings *rings = ctx->rings;
1374
1375         /* make sure SQ entry isn't read before tail */
1376         return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
1377 }
1378
1379 static inline bool io_req_multi_free(struct req_batch *rb, struct io_kiocb *req)
1380 {
1381         /*
1382          * If we're not using fixed files, we have to pair the completion part
1383          * with the file put. Use regular completions for those, only batch
1384          * free for fixed file and non-linked commands.
1385          */
1386         if (((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) == REQ_F_FIXED_FILE)
1387             && !io_is_fallback_req(req) && !req->io) {
1388                 rb->reqs[rb->to_free++] = req;
1389                 if (unlikely(rb->to_free == ARRAY_SIZE(rb->reqs)))
1390                         io_free_req_many(req->ctx, rb);
1391                 return true;
1392         }
1393
1394         return false;
1395 }
1396
1397 /*
1398  * Find and free completed poll iocbs
1399  */
1400 static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
1401                                struct list_head *done)
1402 {
1403         struct req_batch rb;
1404         struct io_kiocb *req;
1405
1406         rb.to_free = 0;
1407         while (!list_empty(done)) {
1408                 req = list_first_entry(done, struct io_kiocb, list);
1409                 list_del(&req->list);
1410
1411                 io_cqring_fill_event(req, req->result);
1412                 (*nr_events)++;
1413
1414                 if (refcount_dec_and_test(&req->refs) &&
1415                     !io_req_multi_free(&rb, req))
1416                         io_free_req(req);
1417         }
1418
1419         io_commit_cqring(ctx);
1420         io_free_req_many(ctx, &rb);
1421 }
1422
1423 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
1424                         long min)
1425 {
1426         struct io_kiocb *req, *tmp;
1427         LIST_HEAD(done);
1428         bool spin;
1429         int ret;
1430
1431         /*
1432          * Only spin for completions if we don't have multiple devices hanging
1433          * off our complete list, and we're under the requested amount.
1434          */
1435         spin = !ctx->poll_multi_file && *nr_events < min;
1436
1437         ret = 0;
1438         list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
1439                 struct kiocb *kiocb = &req->rw.kiocb;
1440
1441                 /*
1442                  * Move completed entries to our local list. If we find a
1443                  * request that requires polling, break out and complete
1444                  * the done list first, if we have entries there.
1445                  */
1446                 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
1447                         list_move_tail(&req->list, &done);
1448                         continue;
1449                 }
1450                 if (!list_empty(&done))
1451                         break;
1452
1453                 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
1454                 if (ret < 0)
1455                         break;
1456
1457                 if (ret && spin)
1458                         spin = false;
1459                 ret = 0;
1460         }
1461
1462         if (!list_empty(&done))
1463                 io_iopoll_complete(ctx, nr_events, &done);
1464
1465         return ret;
1466 }
1467
1468 /*
1469  * Poll for a minimum of 'min' events. Note that if min == 0 we consider that a
1470  * non-spinning poll check - we'll still enter the driver poll loop, but only
1471  * as a non-spinning completion check.
1472  */
1473 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
1474                                 long min)
1475 {
1476         while (!list_empty(&ctx->poll_list) && !need_resched()) {
1477                 int ret;
1478
1479                 ret = io_do_iopoll(ctx, nr_events, min);
1480                 if (ret < 0)
1481                         return ret;
1482                 if (!min || *nr_events >= min)
1483                         return 0;
1484         }
1485
1486         return 1;
1487 }
1488
1489 /*
1490  * We can't just wait for polled events to come to us, we have to actively
1491  * find and complete them.
1492  */
1493 static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
1494 {
1495         if (!(ctx->flags & IORING_SETUP_IOPOLL))
1496                 return;
1497
1498         mutex_lock(&ctx->uring_lock);
1499         while (!list_empty(&ctx->poll_list)) {
1500                 unsigned int nr_events = 0;
1501
1502                 io_iopoll_getevents(ctx, &nr_events, 1);
1503
1504                 /*
1505                  * Ensure we allow local-to-the-cpu processing to take place,
1506                  * in this case we need to ensure that we reap all events.
1507                  */
1508                 cond_resched();
1509         }
1510         mutex_unlock(&ctx->uring_lock);
1511 }
1512
1513 static int __io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
1514                             long min)
1515 {
1516         int iters = 0, ret = 0;
1517
1518         do {
1519                 int tmin = 0;
1520
1521                 /*
1522                  * Don't enter poll loop if we already have events pending.
1523                  * If we do, we can potentially be spinning for commands that
1524                  * already triggered a CQE (eg in error).
1525                  */
1526                 if (io_cqring_events(ctx, false))
1527                         break;
1528
1529                 /*
1530                  * If a submit got punted to a workqueue, we can have the
1531                  * application entering polling for a command before it gets
1532                  * issued. That app will hold the uring_lock for the duration
1533                  * of the poll right here, so we need to take a breather every
1534                  * now and then to ensure that the issue has a chance to add
1535                  * the poll to the issued list. Otherwise we can spin here
1536                  * forever, while the workqueue is stuck trying to acquire the
1537                  * very same mutex.
1538                  */
1539                 if (!(++iters & 7)) {
1540                         mutex_unlock(&ctx->uring_lock);
1541                         mutex_lock(&ctx->uring_lock);
1542                 }
1543
1544                 if (*nr_events < min)
1545                         tmin = min - *nr_events;
1546
1547                 ret = io_iopoll_getevents(ctx, nr_events, tmin);
1548                 if (ret <= 0)
1549                         break;
1550                 ret = 0;
1551         } while (min && !*nr_events && !need_resched());
1552
1553         return ret;
1554 }
1555
1556 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
1557                            long min)
1558 {
1559         int ret;
1560
1561         /*
1562          * We disallow the app entering submit/complete with polling, but we
1563          * still need to lock the ring to prevent racing with polled issue
1564          * that got punted to a workqueue.
1565          */
1566         mutex_lock(&ctx->uring_lock);
1567         ret = __io_iopoll_check(ctx, nr_events, min);
1568         mutex_unlock(&ctx->uring_lock);
1569         return ret;
1570 }
1571
1572 static void kiocb_end_write(struct io_kiocb *req)
1573 {
1574         /*
1575          * Tell lockdep we inherited freeze protection from submission
1576          * thread.
1577          */
1578         if (req->flags & REQ_F_ISREG) {
1579                 struct inode *inode = file_inode(req->file);
1580
1581                 __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
1582         }
1583         file_end_write(req->file);
1584 }
1585
1586 static inline void req_set_fail_links(struct io_kiocb *req)
1587 {
1588         if ((req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) == REQ_F_LINK)
1589                 req->flags |= REQ_F_FAIL_LINK;
1590 }
1591
1592 static void io_complete_rw_common(struct kiocb *kiocb, long res)
1593 {
1594         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1595
1596         if (kiocb->ki_flags & IOCB_WRITE)
1597                 kiocb_end_write(req);
1598
1599         if (res != req->result)
1600                 req_set_fail_links(req);
1601         io_cqring_add_event(req, res);
1602 }
1603
1604 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
1605 {
1606         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1607
1608         io_complete_rw_common(kiocb, res);
1609         io_put_req(req);
1610 }
1611
1612 static struct io_kiocb *__io_complete_rw(struct kiocb *kiocb, long res)
1613 {
1614         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1615         struct io_kiocb *nxt = NULL;
1616
1617         io_complete_rw_common(kiocb, res);
1618         io_put_req_find_next(req, &nxt);
1619
1620         return nxt;
1621 }
1622
1623 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
1624 {
1625         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1626
1627         if (kiocb->ki_flags & IOCB_WRITE)
1628                 kiocb_end_write(req);
1629
1630         if (res != req->result)
1631                 req_set_fail_links(req);
1632         req->result = res;
1633         if (res != -EAGAIN)
1634                 req->flags |= REQ_F_IOPOLL_COMPLETED;
1635 }
1636
1637 /*
1638  * After the iocb has been issued, it's safe to be found on the poll list.
1639  * Adding the kiocb to the list AFTER submission ensures that we don't
1640  * find it from a io_iopoll_getevents() thread before the issuer is done
1641  * accessing the kiocb cookie.
1642  */
1643 static void io_iopoll_req_issued(struct io_kiocb *req)
1644 {
1645         struct io_ring_ctx *ctx = req->ctx;
1646
1647         /*
1648          * Track whether we have multiple files in our lists. This will impact
1649          * how we do polling eventually, not spinning if we're on potentially
1650          * different devices.
1651          */
1652         if (list_empty(&ctx->poll_list)) {
1653                 ctx->poll_multi_file = false;
1654         } else if (!ctx->poll_multi_file) {
1655                 struct io_kiocb *list_req;
1656
1657                 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
1658                                                 list);
1659                 if (list_req->file != req->file)
1660                         ctx->poll_multi_file = true;
1661         }
1662
1663         /*
1664          * For fast devices, IO may have already completed. If it has, add
1665          * it to the front so we find it first.
1666          */
1667         if (req->flags & REQ_F_IOPOLL_COMPLETED)
1668                 list_add(&req->list, &ctx->poll_list);
1669         else
1670                 list_add_tail(&req->list, &ctx->poll_list);
1671 }
1672
1673 static void io_file_put(struct io_submit_state *state)
1674 {
1675         if (state->file) {
1676                 int diff = state->has_refs - state->used_refs;
1677
1678                 if (diff)
1679                         fput_many(state->file, diff);
1680                 state->file = NULL;
1681         }
1682 }
1683
1684 /*
1685  * Get as many references to a file as we have IOs left in this submission,
1686  * assuming most submissions are for one file, or at least that each file
1687  * has more than one submission.
1688  */
1689 static struct file *io_file_get(struct io_submit_state *state, int fd)
1690 {
1691         if (!state)
1692                 return fget(fd);
1693
1694         if (state->file) {
1695                 if (state->fd == fd) {
1696                         state->used_refs++;
1697                         state->ios_left--;
1698                         return state->file;
1699                 }
1700                 io_file_put(state);
1701         }
1702         state->file = fget_many(fd, state->ios_left);
1703         if (!state->file)
1704                 return NULL;
1705
1706         state->fd = fd;
1707         state->has_refs = state->ios_left;
1708         state->used_refs = 1;
1709         state->ios_left--;
1710         return state->file;
1711 }
1712
1713 /*
1714  * If we tracked the file through the SCM inflight mechanism, we could support
1715  * any file. For now, just ensure that anything potentially problematic is done
1716  * inline.
1717  */
1718 static bool io_file_supports_async(struct file *file)
1719 {
1720         umode_t mode = file_inode(file)->i_mode;
1721
1722         if (S_ISBLK(mode) || S_ISCHR(mode) || S_ISSOCK(mode))
1723                 return true;
1724         if (S_ISREG(mode) && file->f_op != &io_uring_fops)
1725                 return true;
1726
1727         return false;
1728 }
1729
1730 static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1731                       bool force_nonblock)
1732 {
1733         struct io_ring_ctx *ctx = req->ctx;
1734         struct kiocb *kiocb = &req->rw.kiocb;
1735         unsigned ioprio;
1736         int ret;
1737
1738         if (!req->file)
1739                 return -EBADF;
1740
1741         if (S_ISREG(file_inode(req->file)->i_mode))
1742                 req->flags |= REQ_F_ISREG;
1743
1744         kiocb->ki_pos = READ_ONCE(sqe->off);
1745         if (kiocb->ki_pos == -1 && !(req->file->f_mode & FMODE_STREAM)) {
1746                 req->flags |= REQ_F_CUR_POS;
1747                 kiocb->ki_pos = req->file->f_pos;
1748         }
1749         kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
1750         kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
1751
1752         ioprio = READ_ONCE(sqe->ioprio);
1753         if (ioprio) {
1754                 ret = ioprio_check_cap(ioprio);
1755                 if (ret)
1756                         return ret;
1757
1758                 kiocb->ki_ioprio = ioprio;
1759         } else
1760                 kiocb->ki_ioprio = get_current_ioprio();
1761
1762         ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
1763         if (unlikely(ret))
1764                 return ret;
1765
1766         /* don't allow async punt if RWF_NOWAIT was requested */
1767         if ((kiocb->ki_flags & IOCB_NOWAIT) ||
1768             (req->file->f_flags & O_NONBLOCK))
1769                 req->flags |= REQ_F_NOWAIT;
1770
1771         if (force_nonblock)
1772                 kiocb->ki_flags |= IOCB_NOWAIT;
1773
1774         if (ctx->flags & IORING_SETUP_IOPOLL) {
1775                 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
1776                     !kiocb->ki_filp->f_op->iopoll)
1777                         return -EOPNOTSUPP;
1778
1779                 kiocb->ki_flags |= IOCB_HIPRI;
1780                 kiocb->ki_complete = io_complete_rw_iopoll;
1781                 req->result = 0;
1782         } else {
1783                 if (kiocb->ki_flags & IOCB_HIPRI)
1784                         return -EINVAL;
1785                 kiocb->ki_complete = io_complete_rw;
1786         }
1787
1788         req->rw.addr = READ_ONCE(sqe->addr);
1789         req->rw.len = READ_ONCE(sqe->len);
1790         /* we own ->private, reuse it for the buffer index */
1791         req->rw.kiocb.private = (void *) (unsigned long)
1792                                         READ_ONCE(sqe->buf_index);
1793         return 0;
1794 }
1795
1796 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
1797 {
1798         switch (ret) {
1799         case -EIOCBQUEUED:
1800                 break;
1801         case -ERESTARTSYS:
1802         case -ERESTARTNOINTR:
1803         case -ERESTARTNOHAND:
1804         case -ERESTART_RESTARTBLOCK:
1805                 /*
1806                  * We can't just restart the syscall, since previously
1807                  * submitted sqes may already be in progress. Just fail this
1808                  * IO with EINTR.
1809                  */
1810                 ret = -EINTR;
1811                 /* fall through */
1812         default:
1813                 kiocb->ki_complete(kiocb, ret, 0);
1814         }
1815 }
1816
1817 static void kiocb_done(struct kiocb *kiocb, ssize_t ret, struct io_kiocb **nxt,
1818                        bool in_async)
1819 {
1820         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1821
1822         if (req->flags & REQ_F_CUR_POS)
1823                 req->file->f_pos = kiocb->ki_pos;
1824         if (in_async && ret >= 0 && kiocb->ki_complete == io_complete_rw)
1825                 *nxt = __io_complete_rw(kiocb, ret);
1826         else
1827                 io_rw_done(kiocb, ret);
1828 }
1829
1830 static ssize_t io_import_fixed(struct io_kiocb *req, int rw,
1831                                struct iov_iter *iter)
1832 {
1833         struct io_ring_ctx *ctx = req->ctx;
1834         size_t len = req->rw.len;
1835         struct io_mapped_ubuf *imu;
1836         unsigned index, buf_index;
1837         size_t offset;
1838         u64 buf_addr;
1839
1840         /* attempt to use fixed buffers without having provided iovecs */
1841         if (unlikely(!ctx->user_bufs))
1842                 return -EFAULT;
1843
1844         buf_index = (unsigned long) req->rw.kiocb.private;
1845         if (unlikely(buf_index >= ctx->nr_user_bufs))
1846                 return -EFAULT;
1847
1848         index = array_index_nospec(buf_index, ctx->nr_user_bufs);
1849         imu = &ctx->user_bufs[index];
1850         buf_addr = req->rw.addr;
1851
1852         /* overflow */
1853         if (buf_addr + len < buf_addr)
1854                 return -EFAULT;
1855         /* not inside the mapped region */
1856         if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
1857                 return -EFAULT;
1858
1859         /*
1860          * May not be a start of buffer, set size appropriately
1861          * and advance us to the beginning.
1862          */
1863         offset = buf_addr - imu->ubuf;
1864         iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
1865
1866         if (offset) {
1867                 /*
1868                  * Don't use iov_iter_advance() here, as it's really slow for
1869                  * using the latter parts of a big fixed buffer - it iterates
1870                  * over each segment manually. We can cheat a bit here, because
1871                  * we know that:
1872                  *
1873                  * 1) it's a BVEC iter, we set it up
1874                  * 2) all bvecs are PAGE_SIZE in size, except potentially the
1875                  *    first and last bvec
1876                  *
1877                  * So just find our index, and adjust the iterator afterwards.
1878                  * If the offset is within the first bvec (or the whole first
1879                  * bvec, just use iov_iter_advance(). This makes it easier
1880                  * since we can just skip the first segment, which may not
1881                  * be PAGE_SIZE aligned.
1882                  */
1883                 const struct bio_vec *bvec = imu->bvec;
1884
1885                 if (offset <= bvec->bv_len) {
1886                         iov_iter_advance(iter, offset);
1887                 } else {
1888                         unsigned long seg_skip;
1889
1890                         /* skip first vec */
1891                         offset -= bvec->bv_len;
1892                         seg_skip = 1 + (offset >> PAGE_SHIFT);
1893
1894                         iter->bvec = bvec + seg_skip;
1895                         iter->nr_segs -= seg_skip;
1896                         iter->count -= bvec->bv_len + offset;
1897                         iter->iov_offset = offset & ~PAGE_MASK;
1898                 }
1899         }
1900
1901         return len;
1902 }
1903
1904 static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
1905                                struct iovec **iovec, struct iov_iter *iter)
1906 {
1907         void __user *buf = u64_to_user_ptr(req->rw.addr);
1908         size_t sqe_len = req->rw.len;
1909         u8 opcode;
1910
1911         opcode = req->opcode;
1912         if (opcode == IORING_OP_READ_FIXED || opcode == IORING_OP_WRITE_FIXED) {
1913                 *iovec = NULL;
1914                 return io_import_fixed(req, rw, iter);
1915         }
1916
1917         /* buffer index only valid with fixed read/write */
1918         if (req->rw.kiocb.private)
1919                 return -EINVAL;
1920
1921         if (opcode == IORING_OP_READ || opcode == IORING_OP_WRITE) {
1922                 ssize_t ret;
1923                 ret = import_single_range(rw, buf, sqe_len, *iovec, iter);
1924                 *iovec = NULL;
1925                 return ret;
1926         }
1927
1928         if (req->io) {
1929                 struct io_async_rw *iorw = &req->io->rw;
1930
1931                 *iovec = iorw->iov;
1932                 iov_iter_init(iter, rw, *iovec, iorw->nr_segs, iorw->size);
1933                 if (iorw->iov == iorw->fast_iov)
1934                         *iovec = NULL;
1935                 return iorw->size;
1936         }
1937
1938         if (!req->has_user)
1939                 return -EFAULT;
1940
1941 #ifdef CONFIG_COMPAT
1942         if (req->ctx->compat)
1943                 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
1944                                                 iovec, iter);
1945 #endif
1946
1947         return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
1948 }
1949
1950 /*
1951  * For files that don't have ->read_iter() and ->write_iter(), handle them
1952  * by looping over ->read() or ->write() manually.
1953  */
1954 static ssize_t loop_rw_iter(int rw, struct file *file, struct kiocb *kiocb,
1955                            struct iov_iter *iter)
1956 {
1957         ssize_t ret = 0;
1958
1959         /*
1960          * Don't support polled IO through this interface, and we can't
1961          * support non-blocking either. For the latter, this just causes
1962          * the kiocb to be handled from an async context.
1963          */
1964         if (kiocb->ki_flags & IOCB_HIPRI)
1965                 return -EOPNOTSUPP;
1966         if (kiocb->ki_flags & IOCB_NOWAIT)
1967                 return -EAGAIN;
1968
1969         while (iov_iter_count(iter)) {
1970                 struct iovec iovec;
1971                 ssize_t nr;
1972
1973                 if (!iov_iter_is_bvec(iter)) {
1974                         iovec = iov_iter_iovec(iter);
1975                 } else {
1976                         /* fixed buffers import bvec */
1977                         iovec.iov_base = kmap(iter->bvec->bv_page)
1978                                                 + iter->iov_offset;
1979                         iovec.iov_len = min(iter->count,
1980                                         iter->bvec->bv_len - iter->iov_offset);
1981                 }
1982
1983                 if (rw == READ) {
1984                         nr = file->f_op->read(file, iovec.iov_base,
1985                                               iovec.iov_len, &kiocb->ki_pos);
1986                 } else {
1987                         nr = file->f_op->write(file, iovec.iov_base,
1988                                                iovec.iov_len, &kiocb->ki_pos);
1989                 }
1990
1991                 if (iov_iter_is_bvec(iter))
1992                         kunmap(iter->bvec->bv_page);
1993
1994                 if (nr < 0) {
1995                         if (!ret)
1996                                 ret = nr;
1997                         break;
1998                 }
1999                 ret += nr;
2000                 if (nr != iovec.iov_len)
2001                         break;
2002                 iov_iter_advance(iter, nr);
2003         }
2004
2005         return ret;
2006 }
2007
2008 static void io_req_map_rw(struct io_kiocb *req, ssize_t io_size,
2009                           struct iovec *iovec, struct iovec *fast_iov,
2010                           struct iov_iter *iter)
2011 {
2012         req->io->rw.nr_segs = iter->nr_segs;
2013         req->io->rw.size = io_size;
2014         req->io->rw.iov = iovec;
2015         if (!req->io->rw.iov) {
2016                 req->io->rw.iov = req->io->rw.fast_iov;
2017                 memcpy(req->io->rw.iov, fast_iov,
2018                         sizeof(struct iovec) * iter->nr_segs);
2019         }
2020 }
2021
2022 static int io_alloc_async_ctx(struct io_kiocb *req)
2023 {
2024         if (!io_op_defs[req->opcode].async_ctx)
2025                 return 0;
2026         req->io = kmalloc(sizeof(*req->io), GFP_KERNEL);
2027         return req->io == NULL;
2028 }
2029
2030 static void io_rw_async(struct io_wq_work **workptr)
2031 {
2032         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2033         struct iovec *iov = NULL;
2034
2035         if (req->io->rw.iov != req->io->rw.fast_iov)
2036                 iov = req->io->rw.iov;
2037         io_wq_submit_work(workptr);
2038         kfree(iov);
2039 }
2040
2041 static int io_setup_async_rw(struct io_kiocb *req, ssize_t io_size,
2042                              struct iovec *iovec, struct iovec *fast_iov,
2043                              struct iov_iter *iter)
2044 {
2045         if (req->opcode == IORING_OP_READ_FIXED ||
2046             req->opcode == IORING_OP_WRITE_FIXED)
2047                 return 0;
2048         if (!req->io && io_alloc_async_ctx(req))
2049                 return -ENOMEM;
2050
2051         io_req_map_rw(req, io_size, iovec, fast_iov, iter);
2052         req->work.func = io_rw_async;
2053         return 0;
2054 }
2055
2056 static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2057                         bool force_nonblock)
2058 {
2059         struct io_async_ctx *io;
2060         struct iov_iter iter;
2061         ssize_t ret;
2062
2063         ret = io_prep_rw(req, sqe, force_nonblock);
2064         if (ret)
2065                 return ret;
2066
2067         if (unlikely(!(req->file->f_mode & FMODE_READ)))
2068                 return -EBADF;
2069
2070         if (!req->io)
2071                 return 0;
2072
2073         io = req->io;
2074         io->rw.iov = io->rw.fast_iov;
2075         req->io = NULL;
2076         ret = io_import_iovec(READ, req, &io->rw.iov, &iter);
2077         req->io = io;
2078         if (ret < 0)
2079                 return ret;
2080
2081         io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter);
2082         return 0;
2083 }
2084
2085 static int io_read(struct io_kiocb *req, struct io_kiocb **nxt,
2086                    bool force_nonblock)
2087 {
2088         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
2089         struct kiocb *kiocb = &req->rw.kiocb;
2090         struct iov_iter iter;
2091         size_t iov_count;
2092         ssize_t io_size, ret;
2093
2094         ret = io_import_iovec(READ, req, &iovec, &iter);
2095         if (ret < 0)
2096                 return ret;
2097
2098         /* Ensure we clear previously set non-block flag */
2099         if (!force_nonblock)
2100                 req->rw.kiocb.ki_flags &= ~IOCB_NOWAIT;
2101
2102         req->result = 0;
2103         io_size = ret;
2104         if (req->flags & REQ_F_LINK)
2105                 req->result = io_size;
2106
2107         /*
2108          * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
2109          * we know to async punt it even if it was opened O_NONBLOCK
2110          */
2111         if (force_nonblock && !io_file_supports_async(req->file)) {
2112                 req->flags |= REQ_F_MUST_PUNT;
2113                 goto copy_iov;
2114         }
2115
2116         iov_count = iov_iter_count(&iter);
2117         ret = rw_verify_area(READ, req->file, &kiocb->ki_pos, iov_count);
2118         if (!ret) {
2119                 ssize_t ret2;
2120
2121                 if (req->file->f_op->read_iter)
2122                         ret2 = call_read_iter(req->file, kiocb, &iter);
2123                 else
2124                         ret2 = loop_rw_iter(READ, req->file, kiocb, &iter);
2125
2126                 /* Catch -EAGAIN return for forced non-blocking submission */
2127                 if (!force_nonblock || ret2 != -EAGAIN) {
2128                         kiocb_done(kiocb, ret2, nxt, req->in_async);
2129                 } else {
2130 copy_iov:
2131                         ret = io_setup_async_rw(req, io_size, iovec,
2132                                                 inline_vecs, &iter);
2133                         if (ret)
2134                                 goto out_free;
2135                         return -EAGAIN;
2136                 }
2137         }
2138 out_free:
2139         if (!io_wq_current_is_worker())
2140                 kfree(iovec);
2141         return ret;
2142 }
2143
2144 static int io_write_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2145                          bool force_nonblock)
2146 {
2147         struct io_async_ctx *io;
2148         struct iov_iter iter;
2149         ssize_t ret;
2150
2151         ret = io_prep_rw(req, sqe, force_nonblock);
2152         if (ret)
2153                 return ret;
2154
2155         if (unlikely(!(req->file->f_mode & FMODE_WRITE)))
2156                 return -EBADF;
2157
2158         if (!req->io)
2159                 return 0;
2160
2161         io = req->io;
2162         io->rw.iov = io->rw.fast_iov;
2163         req->io = NULL;
2164         ret = io_import_iovec(WRITE, req, &io->rw.iov, &iter);
2165         req->io = io;
2166         if (ret < 0)
2167                 return ret;
2168
2169         io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter);
2170         return 0;
2171 }
2172
2173 static int io_write(struct io_kiocb *req, struct io_kiocb **nxt,
2174                     bool force_nonblock)
2175 {
2176         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
2177         struct kiocb *kiocb = &req->rw.kiocb;
2178         struct iov_iter iter;
2179         size_t iov_count;
2180         ssize_t ret, io_size;
2181
2182         ret = io_import_iovec(WRITE, req, &iovec, &iter);
2183         if (ret < 0)
2184                 return ret;
2185
2186         /* Ensure we clear previously set non-block flag */
2187         if (!force_nonblock)
2188                 req->rw.kiocb.ki_flags &= ~IOCB_NOWAIT;
2189
2190         req->result = 0;
2191         io_size = ret;
2192         if (req->flags & REQ_F_LINK)
2193                 req->result = io_size;
2194
2195         /*
2196          * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
2197          * we know to async punt it even if it was opened O_NONBLOCK
2198          */
2199         if (force_nonblock && !io_file_supports_async(req->file)) {
2200                 req->flags |= REQ_F_MUST_PUNT;
2201                 goto copy_iov;
2202         }
2203
2204         /* file path doesn't support NOWAIT for non-direct_IO */
2205         if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT) &&
2206             (req->flags & REQ_F_ISREG))
2207                 goto copy_iov;
2208
2209         iov_count = iov_iter_count(&iter);
2210         ret = rw_verify_area(WRITE, req->file, &kiocb->ki_pos, iov_count);
2211         if (!ret) {
2212                 ssize_t ret2;
2213
2214                 /*
2215                  * Open-code file_start_write here to grab freeze protection,
2216                  * which will be released by another thread in
2217                  * io_complete_rw().  Fool lockdep by telling it the lock got
2218                  * released so that it doesn't complain about the held lock when
2219                  * we return to userspace.
2220                  */
2221                 if (req->flags & REQ_F_ISREG) {
2222                         __sb_start_write(file_inode(req->file)->i_sb,
2223                                                 SB_FREEZE_WRITE, true);
2224                         __sb_writers_release(file_inode(req->file)->i_sb,
2225                                                 SB_FREEZE_WRITE);
2226                 }
2227                 kiocb->ki_flags |= IOCB_WRITE;
2228
2229                 if (req->file->f_op->write_iter)
2230                         ret2 = call_write_iter(req->file, kiocb, &iter);
2231                 else
2232                         ret2 = loop_rw_iter(WRITE, req->file, kiocb, &iter);
2233                 if (!force_nonblock || ret2 != -EAGAIN) {
2234                         kiocb_done(kiocb, ret2, nxt, req->in_async);
2235                 } else {
2236 copy_iov:
2237                         ret = io_setup_async_rw(req, io_size, iovec,
2238                                                 inline_vecs, &iter);
2239                         if (ret)
2240                                 goto out_free;
2241                         return -EAGAIN;
2242                 }
2243         }
2244 out_free:
2245         if (!io_wq_current_is_worker())
2246                 kfree(iovec);
2247         return ret;
2248 }
2249
2250 /*
2251  * IORING_OP_NOP just posts a completion event, nothing else.
2252  */
2253 static int io_nop(struct io_kiocb *req)
2254 {
2255         struct io_ring_ctx *ctx = req->ctx;
2256
2257         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2258                 return -EINVAL;
2259
2260         io_cqring_add_event(req, 0);
2261         io_put_req(req);
2262         return 0;
2263 }
2264
2265 static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2266 {
2267         struct io_ring_ctx *ctx = req->ctx;
2268
2269         if (!req->file)
2270                 return -EBADF;
2271
2272         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2273                 return -EINVAL;
2274         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
2275                 return -EINVAL;
2276
2277         req->sync.flags = READ_ONCE(sqe->fsync_flags);
2278         if (unlikely(req->sync.flags & ~IORING_FSYNC_DATASYNC))
2279                 return -EINVAL;
2280
2281         req->sync.off = READ_ONCE(sqe->off);
2282         req->sync.len = READ_ONCE(sqe->len);
2283         return 0;
2284 }
2285
2286 static bool io_req_cancelled(struct io_kiocb *req)
2287 {
2288         if (req->work.flags & IO_WQ_WORK_CANCEL) {
2289                 req_set_fail_links(req);
2290                 io_cqring_add_event(req, -ECANCELED);
2291                 io_put_req(req);
2292                 return true;
2293         }
2294
2295         return false;
2296 }
2297
2298 static void io_link_work_cb(struct io_wq_work **workptr)
2299 {
2300         struct io_wq_work *work = *workptr;
2301         struct io_kiocb *link = work->data;
2302
2303         io_queue_linked_timeout(link);
2304         work->func = io_wq_submit_work;
2305 }
2306
2307 static void io_wq_assign_next(struct io_wq_work **workptr, struct io_kiocb *nxt)
2308 {
2309         struct io_kiocb *link;
2310
2311         io_prep_async_work(nxt, &link);
2312         *workptr = &nxt->work;
2313         if (link) {
2314                 nxt->work.flags |= IO_WQ_WORK_CB;
2315                 nxt->work.func = io_link_work_cb;
2316                 nxt->work.data = link;
2317         }
2318 }
2319
2320 static void io_fsync_finish(struct io_wq_work **workptr)
2321 {
2322         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2323         loff_t end = req->sync.off + req->sync.len;
2324         struct io_kiocb *nxt = NULL;
2325         int ret;
2326
2327         if (io_req_cancelled(req))
2328                 return;
2329
2330         ret = vfs_fsync_range(req->file, req->sync.off,
2331                                 end > 0 ? end : LLONG_MAX,
2332                                 req->sync.flags & IORING_FSYNC_DATASYNC);
2333         if (ret < 0)
2334                 req_set_fail_links(req);
2335         io_cqring_add_event(req, ret);
2336         io_put_req_find_next(req, &nxt);
2337         if (nxt)
2338                 io_wq_assign_next(workptr, nxt);
2339 }
2340
2341 static int io_fsync(struct io_kiocb *req, struct io_kiocb **nxt,
2342                     bool force_nonblock)
2343 {
2344         struct io_wq_work *work, *old_work;
2345
2346         /* fsync always requires a blocking context */
2347         if (force_nonblock) {
2348                 io_put_req(req);
2349                 req->work.func = io_fsync_finish;
2350                 return -EAGAIN;
2351         }
2352
2353         work = old_work = &req->work;
2354         io_fsync_finish(&work);
2355         if (work && work != old_work)
2356                 *nxt = container_of(work, struct io_kiocb, work);
2357         return 0;
2358 }
2359
2360 static void io_fallocate_finish(struct io_wq_work **workptr)
2361 {
2362         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2363         struct io_kiocb *nxt = NULL;
2364         int ret;
2365
2366         ret = vfs_fallocate(req->file, req->sync.mode, req->sync.off,
2367                                 req->sync.len);
2368         if (ret < 0)
2369                 req_set_fail_links(req);
2370         io_cqring_add_event(req, ret);
2371         io_put_req_find_next(req, &nxt);
2372         if (nxt)
2373                 io_wq_assign_next(workptr, nxt);
2374 }
2375
2376 static int io_fallocate_prep(struct io_kiocb *req,
2377                              const struct io_uring_sqe *sqe)
2378 {
2379         if (sqe->ioprio || sqe->buf_index || sqe->rw_flags)
2380                 return -EINVAL;
2381
2382         req->sync.off = READ_ONCE(sqe->off);
2383         req->sync.len = READ_ONCE(sqe->addr);
2384         req->sync.mode = READ_ONCE(sqe->len);
2385         return 0;
2386 }
2387
2388 static int io_fallocate(struct io_kiocb *req, struct io_kiocb **nxt,
2389                         bool force_nonblock)
2390 {
2391         struct io_wq_work *work, *old_work;
2392
2393         /* fallocate always requiring blocking context */
2394         if (force_nonblock) {
2395                 io_put_req(req);
2396                 req->work.func = io_fallocate_finish;
2397                 return -EAGAIN;
2398         }
2399
2400         work = old_work = &req->work;
2401         io_fallocate_finish(&work);
2402         if (work && work != old_work)
2403                 *nxt = container_of(work, struct io_kiocb, work);
2404
2405         return 0;
2406 }
2407
2408 static int io_openat_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2409 {
2410         int ret;
2411
2412         if (sqe->ioprio || sqe->buf_index)
2413                 return -EINVAL;
2414
2415         req->open.dfd = READ_ONCE(sqe->fd);
2416         req->open.mode = READ_ONCE(sqe->len);
2417         req->open.fname = u64_to_user_ptr(READ_ONCE(sqe->addr));
2418         req->open.flags = READ_ONCE(sqe->open_flags);
2419
2420         req->open.filename = getname(req->open.fname);
2421         if (IS_ERR(req->open.filename)) {
2422                 ret = PTR_ERR(req->open.filename);
2423                 req->open.filename = NULL;
2424                 return ret;
2425         }
2426
2427         return 0;
2428 }
2429
2430 static int io_openat(struct io_kiocb *req, struct io_kiocb **nxt,
2431                      bool force_nonblock)
2432 {
2433         struct open_flags op;
2434         struct open_how how;
2435         struct file *file;
2436         int ret;
2437
2438         if (force_nonblock) {
2439                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
2440                 return -EAGAIN;
2441         }
2442
2443         how = build_open_how(req->open.flags, req->open.mode);
2444         ret = build_open_flags(&how, &op);
2445         if (ret)
2446                 goto err;
2447
2448         ret = get_unused_fd_flags(how.flags);
2449         if (ret < 0)
2450                 goto err;
2451
2452         file = do_filp_open(req->open.dfd, req->open.filename, &op);
2453         if (IS_ERR(file)) {
2454                 put_unused_fd(ret);
2455                 ret = PTR_ERR(file);
2456         } else {
2457                 fsnotify_open(file);
2458                 fd_install(ret, file);
2459         }
2460 err:
2461         putname(req->open.filename);
2462         if (ret < 0)
2463                 req_set_fail_links(req);
2464         io_cqring_add_event(req, ret);
2465         io_put_req_find_next(req, nxt);
2466         return 0;
2467 }
2468
2469 static int io_madvise_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2470 {
2471 #if defined(CONFIG_ADVISE_SYSCALLS) && defined(CONFIG_MMU)
2472         if (sqe->ioprio || sqe->buf_index || sqe->off)
2473                 return -EINVAL;
2474
2475         req->madvise.addr = READ_ONCE(sqe->addr);
2476         req->madvise.len = READ_ONCE(sqe->len);
2477         req->madvise.advice = READ_ONCE(sqe->fadvise_advice);
2478         return 0;
2479 #else
2480         return -EOPNOTSUPP;
2481 #endif
2482 }
2483
2484 static int io_madvise(struct io_kiocb *req, struct io_kiocb **nxt,
2485                       bool force_nonblock)
2486 {
2487 #if defined(CONFIG_ADVISE_SYSCALLS) && defined(CONFIG_MMU)
2488         struct io_madvise *ma = &req->madvise;
2489         int ret;
2490
2491         if (force_nonblock)
2492                 return -EAGAIN;
2493
2494         ret = do_madvise(ma->addr, ma->len, ma->advice);
2495         if (ret < 0)
2496                 req_set_fail_links(req);
2497         io_cqring_add_event(req, ret);
2498         io_put_req_find_next(req, nxt);
2499         return 0;
2500 #else
2501         return -EOPNOTSUPP;
2502 #endif
2503 }
2504
2505 static int io_fadvise_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2506 {
2507         if (sqe->ioprio || sqe->buf_index || sqe->addr)
2508                 return -EINVAL;
2509
2510         req->fadvise.offset = READ_ONCE(sqe->off);
2511         req->fadvise.len = READ_ONCE(sqe->len);
2512         req->fadvise.advice = READ_ONCE(sqe->fadvise_advice);
2513         return 0;
2514 }
2515
2516 static int io_fadvise(struct io_kiocb *req, struct io_kiocb **nxt,
2517                       bool force_nonblock)
2518 {
2519         struct io_fadvise *fa = &req->fadvise;
2520         int ret;
2521
2522         /* DONTNEED may block, others _should_ not */
2523         if (fa->advice == POSIX_FADV_DONTNEED && force_nonblock)
2524                 return -EAGAIN;
2525
2526         ret = vfs_fadvise(req->file, fa->offset, fa->len, fa->advice);
2527         if (ret < 0)
2528                 req_set_fail_links(req);
2529         io_cqring_add_event(req, ret);
2530         io_put_req_find_next(req, nxt);
2531         return 0;
2532 }
2533
2534 static int io_statx_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2535 {
2536         unsigned lookup_flags;
2537         int ret;
2538
2539         if (sqe->ioprio || sqe->buf_index)
2540                 return -EINVAL;
2541
2542         req->open.dfd = READ_ONCE(sqe->fd);
2543         req->open.mask = READ_ONCE(sqe->len);
2544         req->open.fname = u64_to_user_ptr(READ_ONCE(sqe->addr));
2545         req->open.buffer = u64_to_user_ptr(READ_ONCE(sqe->addr2));
2546         req->open.flags = READ_ONCE(sqe->statx_flags);
2547
2548         if (vfs_stat_set_lookup_flags(&lookup_flags, req->open.flags))
2549                 return -EINVAL;
2550
2551         req->open.filename = getname_flags(req->open.fname, lookup_flags, NULL);
2552         if (IS_ERR(req->open.filename)) {
2553                 ret = PTR_ERR(req->open.filename);
2554                 req->open.filename = NULL;
2555                 return ret;
2556         }
2557
2558         return 0;
2559 }
2560
2561 static int io_statx(struct io_kiocb *req, struct io_kiocb **nxt,
2562                     bool force_nonblock)
2563 {
2564         struct io_open *ctx = &req->open;
2565         unsigned lookup_flags;
2566         struct path path;
2567         struct kstat stat;
2568         int ret;
2569
2570         if (force_nonblock)
2571                 return -EAGAIN;
2572
2573         if (vfs_stat_set_lookup_flags(&lookup_flags, ctx->flags))
2574                 return -EINVAL;
2575
2576 retry:
2577         /* filename_lookup() drops it, keep a reference */
2578         ctx->filename->refcnt++;
2579
2580         ret = filename_lookup(ctx->dfd, ctx->filename, lookup_flags, &path,
2581                                 NULL);
2582         if (ret)
2583                 goto err;
2584
2585         ret = vfs_getattr(&path, &stat, ctx->mask, ctx->flags);
2586         path_put(&path);
2587         if (retry_estale(ret, lookup_flags)) {
2588                 lookup_flags |= LOOKUP_REVAL;
2589                 goto retry;
2590         }
2591         if (!ret)
2592                 ret = cp_statx(&stat, ctx->buffer);
2593 err:
2594         putname(ctx->filename);
2595         if (ret < 0)
2596                 req_set_fail_links(req);
2597         io_cqring_add_event(req, ret);
2598         io_put_req_find_next(req, nxt);
2599         return 0;
2600 }
2601
2602 static int io_close_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2603 {
2604         /*
2605          * If we queue this for async, it must not be cancellable. That would
2606          * leave the 'file' in an undeterminate state.
2607          */
2608         req->work.flags |= IO_WQ_WORK_NO_CANCEL;
2609
2610         if (sqe->ioprio || sqe->off || sqe->addr || sqe->len ||
2611             sqe->rw_flags || sqe->buf_index)
2612                 return -EINVAL;
2613         if (sqe->flags & IOSQE_FIXED_FILE)
2614                 return -EINVAL;
2615
2616         req->close.fd = READ_ONCE(sqe->fd);
2617         if (req->file->f_op == &io_uring_fops ||
2618             req->close.fd == req->ring_fd)
2619                 return -EBADF;
2620
2621         return 0;
2622 }
2623
2624 static void io_close_finish(struct io_wq_work **workptr)
2625 {
2626         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2627         struct io_kiocb *nxt = NULL;
2628
2629         /* Invoked with files, we need to do the close */
2630         if (req->work.files) {
2631                 int ret;
2632
2633                 ret = filp_close(req->close.put_file, req->work.files);
2634                 if (ret < 0) {
2635                         req_set_fail_links(req);
2636                 }
2637                 io_cqring_add_event(req, ret);
2638         }
2639
2640         fput(req->close.put_file);
2641
2642         /* we bypassed the re-issue, drop the submission reference */
2643         io_put_req(req);
2644         io_put_req_find_next(req, &nxt);
2645         if (nxt)
2646                 io_wq_assign_next(workptr, nxt);
2647 }
2648
2649 static int io_close(struct io_kiocb *req, struct io_kiocb **nxt,
2650                     bool force_nonblock)
2651 {
2652         int ret;
2653
2654         req->close.put_file = NULL;
2655         ret = __close_fd_get_file(req->close.fd, &req->close.put_file);
2656         if (ret < 0)
2657                 return ret;
2658
2659         /* if the file has a flush method, be safe and punt to async */
2660         if (req->close.put_file->f_op->flush && !io_wq_current_is_worker()) {
2661                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
2662                 goto eagain;
2663         }
2664
2665         /*
2666          * No ->flush(), safely close from here and just punt the
2667          * fput() to async context.
2668          */
2669         ret = filp_close(req->close.put_file, current->files);
2670
2671         if (ret < 0)
2672                 req_set_fail_links(req);
2673         io_cqring_add_event(req, ret);
2674
2675         if (io_wq_current_is_worker()) {
2676                 struct io_wq_work *old_work, *work;
2677
2678                 old_work = work = &req->work;
2679                 io_close_finish(&work);
2680                 if (work && work != old_work)
2681                         *nxt = container_of(work, struct io_kiocb, work);
2682                 return 0;
2683         }
2684
2685 eagain:
2686         req->work.func = io_close_finish;
2687         return -EAGAIN;
2688 }
2689
2690 static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2691 {
2692         struct io_ring_ctx *ctx = req->ctx;
2693
2694         if (!req->file)
2695                 return -EBADF;
2696
2697         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2698                 return -EINVAL;
2699         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
2700                 return -EINVAL;
2701
2702         req->sync.off = READ_ONCE(sqe->off);
2703         req->sync.len = READ_ONCE(sqe->len);
2704         req->sync.flags = READ_ONCE(sqe->sync_range_flags);
2705         return 0;
2706 }
2707
2708 static void io_sync_file_range_finish(struct io_wq_work **workptr)
2709 {
2710         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2711         struct io_kiocb *nxt = NULL;
2712         int ret;
2713
2714         if (io_req_cancelled(req))
2715                 return;
2716
2717         ret = sync_file_range(req->file, req->sync.off, req->sync.len,
2718                                 req->sync.flags);
2719         if (ret < 0)
2720                 req_set_fail_links(req);
2721         io_cqring_add_event(req, ret);
2722         io_put_req_find_next(req, &nxt);
2723         if (nxt)
2724                 io_wq_assign_next(workptr, nxt);
2725 }
2726
2727 static int io_sync_file_range(struct io_kiocb *req, struct io_kiocb **nxt,
2728                               bool force_nonblock)
2729 {
2730         struct io_wq_work *work, *old_work;
2731
2732         /* sync_file_range always requires a blocking context */
2733         if (force_nonblock) {
2734                 io_put_req(req);
2735                 req->work.func = io_sync_file_range_finish;
2736                 return -EAGAIN;
2737         }
2738
2739         work = old_work = &req->work;
2740         io_sync_file_range_finish(&work);
2741         if (work && work != old_work)
2742                 *nxt = container_of(work, struct io_kiocb, work);
2743         return 0;
2744 }
2745
2746 #if defined(CONFIG_NET)
2747 static void io_sendrecv_async(struct io_wq_work **workptr)
2748 {
2749         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2750         struct iovec *iov = NULL;
2751
2752         if (req->io->rw.iov != req->io->rw.fast_iov)
2753                 iov = req->io->msg.iov;
2754         io_wq_submit_work(workptr);
2755         kfree(iov);
2756 }
2757 #endif
2758
2759 static int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2760 {
2761 #if defined(CONFIG_NET)
2762         struct io_sr_msg *sr = &req->sr_msg;
2763         struct io_async_ctx *io = req->io;
2764
2765         sr->msg_flags = READ_ONCE(sqe->msg_flags);
2766         sr->msg = u64_to_user_ptr(READ_ONCE(sqe->addr));
2767
2768         if (!io)
2769                 return 0;
2770
2771         io->msg.iov = io->msg.fast_iov;
2772         return sendmsg_copy_msghdr(&io->msg.msg, sr->msg, sr->msg_flags,
2773                                         &io->msg.iov);
2774 #else
2775         return -EOPNOTSUPP;
2776 #endif
2777 }
2778
2779 static int io_sendmsg(struct io_kiocb *req, struct io_kiocb **nxt,
2780                       bool force_nonblock)
2781 {
2782 #if defined(CONFIG_NET)
2783         struct io_async_msghdr *kmsg = NULL;
2784         struct socket *sock;
2785         int ret;
2786
2787         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2788                 return -EINVAL;
2789
2790         sock = sock_from_file(req->file, &ret);
2791         if (sock) {
2792                 struct io_async_ctx io;
2793                 struct sockaddr_storage addr;
2794                 unsigned flags;
2795
2796                 if (req->io) {
2797                         kmsg = &req->io->msg;
2798                         kmsg->msg.msg_name = &addr;
2799                         /* if iov is set, it's allocated already */
2800                         if (!kmsg->iov)
2801                                 kmsg->iov = kmsg->fast_iov;
2802                         kmsg->msg.msg_iter.iov = kmsg->iov;
2803                 } else {
2804                         struct io_sr_msg *sr = &req->sr_msg;
2805
2806                         kmsg = &io.msg;
2807                         kmsg->msg.msg_name = &addr;
2808
2809                         io.msg.iov = io.msg.fast_iov;
2810                         ret = sendmsg_copy_msghdr(&io.msg.msg, sr->msg,
2811                                         sr->msg_flags, &io.msg.iov);
2812                         if (ret)
2813                                 return ret;
2814                 }
2815
2816                 flags = req->sr_msg.msg_flags;
2817                 if (flags & MSG_DONTWAIT)
2818                         req->flags |= REQ_F_NOWAIT;
2819                 else if (force_nonblock)
2820                         flags |= MSG_DONTWAIT;
2821
2822                 ret = __sys_sendmsg_sock(sock, &kmsg->msg, flags);
2823                 if (force_nonblock && ret == -EAGAIN) {
2824                         if (req->io)
2825                                 return -EAGAIN;
2826                         if (io_alloc_async_ctx(req))
2827                                 return -ENOMEM;
2828                         memcpy(&req->io->msg, &io.msg, sizeof(io.msg));
2829                         req->work.func = io_sendrecv_async;
2830                         return -EAGAIN;
2831                 }
2832                 if (ret == -ERESTARTSYS)
2833                         ret = -EINTR;
2834         }
2835
2836         if (!io_wq_current_is_worker() && kmsg && kmsg->iov != kmsg->fast_iov)
2837                 kfree(kmsg->iov);
2838         io_cqring_add_event(req, ret);
2839         if (ret < 0)
2840                 req_set_fail_links(req);
2841         io_put_req_find_next(req, nxt);
2842         return 0;
2843 #else
2844         return -EOPNOTSUPP;
2845 #endif
2846 }
2847
2848 static int io_recvmsg_prep(struct io_kiocb *req,
2849                            const struct io_uring_sqe *sqe)
2850 {
2851 #if defined(CONFIG_NET)
2852         struct io_sr_msg *sr = &req->sr_msg;
2853         struct io_async_ctx *io = req->io;
2854
2855         sr->msg_flags = READ_ONCE(sqe->msg_flags);
2856         sr->msg = u64_to_user_ptr(READ_ONCE(sqe->addr));
2857
2858         if (!io)
2859                 return 0;
2860
2861         io->msg.iov = io->msg.fast_iov;
2862         return recvmsg_copy_msghdr(&io->msg.msg, sr->msg, sr->msg_flags,
2863                                         &io->msg.uaddr, &io->msg.iov);
2864 #else
2865         return -EOPNOTSUPP;
2866 #endif
2867 }
2868
2869 static int io_recvmsg(struct io_kiocb *req, struct io_kiocb **nxt,
2870                       bool force_nonblock)
2871 {
2872 #if defined(CONFIG_NET)
2873         struct io_async_msghdr *kmsg = NULL;
2874         struct socket *sock;
2875         int ret;
2876
2877         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2878                 return -EINVAL;
2879
2880         sock = sock_from_file(req->file, &ret);
2881         if (sock) {
2882                 struct io_async_ctx io;
2883                 struct sockaddr_storage addr;
2884                 unsigned flags;
2885
2886                 if (req->io) {
2887                         kmsg = &req->io->msg;
2888                         kmsg->msg.msg_name = &addr;
2889                         /* if iov is set, it's allocated already */
2890                         if (!kmsg->iov)
2891                                 kmsg->iov = kmsg->fast_iov;
2892                         kmsg->msg.msg_iter.iov = kmsg->iov;
2893                 } else {
2894                         struct io_sr_msg *sr = &req->sr_msg;
2895
2896                         kmsg = &io.msg;
2897                         kmsg->msg.msg_name = &addr;
2898
2899                         io.msg.iov = io.msg.fast_iov;
2900                         ret = recvmsg_copy_msghdr(&io.msg.msg, sr->msg,
2901                                         sr->msg_flags, &io.msg.uaddr,
2902                                         &io.msg.iov);
2903                         if (ret)
2904                                 return ret;
2905                 }
2906
2907                 flags = req->sr_msg.msg_flags;
2908                 if (flags & MSG_DONTWAIT)
2909                         req->flags |= REQ_F_NOWAIT;
2910                 else if (force_nonblock)
2911                         flags |= MSG_DONTWAIT;
2912
2913                 ret = __sys_recvmsg_sock(sock, &kmsg->msg, req->sr_msg.msg,
2914                                                 kmsg->uaddr, flags);
2915                 if (force_nonblock && ret == -EAGAIN) {
2916                         if (req->io)
2917                                 return -EAGAIN;
2918                         if (io_alloc_async_ctx(req))
2919                                 return -ENOMEM;
2920                         memcpy(&req->io->msg, &io.msg, sizeof(io.msg));
2921                         req->work.func = io_sendrecv_async;
2922                         return -EAGAIN;
2923                 }
2924                 if (ret == -ERESTARTSYS)
2925                         ret = -EINTR;
2926         }
2927
2928         if (!io_wq_current_is_worker() && kmsg && kmsg->iov != kmsg->fast_iov)
2929                 kfree(kmsg->iov);
2930         io_cqring_add_event(req, ret);
2931         if (ret < 0)
2932                 req_set_fail_links(req);
2933         io_put_req_find_next(req, nxt);
2934         return 0;
2935 #else
2936         return -EOPNOTSUPP;
2937 #endif
2938 }
2939
2940 static int io_accept_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2941 {
2942 #if defined(CONFIG_NET)
2943         struct io_accept *accept = &req->accept;
2944
2945         if (unlikely(req->ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL)))
2946                 return -EINVAL;
2947         if (sqe->ioprio || sqe->len || sqe->buf_index)
2948                 return -EINVAL;
2949
2950         accept->addr = u64_to_user_ptr(READ_ONCE(sqe->addr));
2951         accept->addr_len = u64_to_user_ptr(READ_ONCE(sqe->addr2));
2952         accept->flags = READ_ONCE(sqe->accept_flags);
2953         return 0;
2954 #else
2955         return -EOPNOTSUPP;
2956 #endif
2957 }
2958
2959 #if defined(CONFIG_NET)
2960 static int __io_accept(struct io_kiocb *req, struct io_kiocb **nxt,
2961                        bool force_nonblock)
2962 {
2963         struct io_accept *accept = &req->accept;
2964         unsigned file_flags;
2965         int ret;
2966
2967         file_flags = force_nonblock ? O_NONBLOCK : 0;
2968         ret = __sys_accept4_file(req->file, file_flags, accept->addr,
2969                                         accept->addr_len, accept->flags);
2970         if (ret == -EAGAIN && force_nonblock)
2971                 return -EAGAIN;
2972         if (ret == -ERESTARTSYS)
2973                 ret = -EINTR;
2974         if (ret < 0)
2975                 req_set_fail_links(req);
2976         io_cqring_add_event(req, ret);
2977         io_put_req_find_next(req, nxt);
2978         return 0;
2979 }
2980
2981 static void io_accept_finish(struct io_wq_work **workptr)
2982 {
2983         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2984         struct io_kiocb *nxt = NULL;
2985
2986         if (io_req_cancelled(req))
2987                 return;
2988         __io_accept(req, &nxt, false);
2989         if (nxt)
2990                 io_wq_assign_next(workptr, nxt);
2991 }
2992 #endif
2993
2994 static int io_accept(struct io_kiocb *req, struct io_kiocb **nxt,
2995                      bool force_nonblock)
2996 {
2997 #if defined(CONFIG_NET)
2998         int ret;
2999
3000         ret = __io_accept(req, nxt, force_nonblock);
3001         if (ret == -EAGAIN && force_nonblock) {
3002                 req->work.func = io_accept_finish;
3003                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
3004                 io_put_req(req);
3005                 return -EAGAIN;
3006         }
3007         return 0;
3008 #else
3009         return -EOPNOTSUPP;
3010 #endif
3011 }
3012
3013 static int io_connect_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
3014 {
3015 #if defined(CONFIG_NET)
3016         struct io_connect *conn = &req->connect;
3017         struct io_async_ctx *io = req->io;
3018
3019         if (unlikely(req->ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL)))
3020                 return -EINVAL;
3021         if (sqe->ioprio || sqe->len || sqe->buf_index || sqe->rw_flags)
3022                 return -EINVAL;
3023
3024         conn->addr = u64_to_user_ptr(READ_ONCE(sqe->addr));
3025         conn->addr_len =  READ_ONCE(sqe->addr2);
3026
3027         if (!io)
3028                 return 0;
3029
3030         return move_addr_to_kernel(conn->addr, conn->addr_len,
3031                                         &io->connect.address);
3032 #else
3033         return -EOPNOTSUPP;
3034 #endif
3035 }
3036
3037 static int io_connect(struct io_kiocb *req, struct io_kiocb **nxt,
3038                       bool force_nonblock)
3039 {
3040 #if defined(CONFIG_NET)
3041         struct io_async_ctx __io, *io;
3042         unsigned file_flags;
3043         int ret;
3044
3045         if (req->io) {
3046                 io = req->io;
3047         } else {
3048                 ret = move_addr_to_kernel(req->connect.addr,
3049                                                 req->connect.addr_len,
3050                                                 &__io.connect.address);
3051                 if (ret)
3052                         goto out;
3053                 io = &__io;
3054         }
3055
3056         file_flags = force_nonblock ? O_NONBLOCK : 0;
3057
3058         ret = __sys_connect_file(req->file, &io->connect.address,
3059                                         req->connect.addr_len, file_flags);
3060         if ((ret == -EAGAIN || ret == -EINPROGRESS) && force_nonblock) {
3061                 if (req->io)
3062                         return -EAGAIN;
3063                 if (io_alloc_async_ctx(req)) {
3064                         ret = -ENOMEM;
3065                         goto out;
3066                 }
3067                 memcpy(&req->io->connect, &__io.connect, sizeof(__io.connect));
3068                 return -EAGAIN;
3069         }
3070         if (ret == -ERESTARTSYS)
3071                 ret = -EINTR;
3072 out:
3073         if (ret < 0)
3074                 req_set_fail_links(req);
3075         io_cqring_add_event(req, ret);
3076         io_put_req_find_next(req, nxt);
3077         return 0;
3078 #else
3079         return -EOPNOTSUPP;
3080 #endif
3081 }
3082
3083 static void io_poll_remove_one(struct io_kiocb *req)
3084 {
3085         struct io_poll_iocb *poll = &req->poll;
3086
3087         spin_lock(&poll->head->lock);
3088         WRITE_ONCE(poll->canceled, true);
3089         if (!list_empty(&poll->wait.entry)) {
3090                 list_del_init(&poll->wait.entry);
3091                 io_queue_async_work(req);
3092         }
3093         spin_unlock(&poll->head->lock);
3094         hash_del(&req->hash_node);
3095 }
3096
3097 static void io_poll_remove_all(struct io_ring_ctx *ctx)
3098 {
3099         struct hlist_node *tmp;
3100         struct io_kiocb *req;
3101         int i;
3102
3103         spin_lock_irq(&ctx->completion_lock);
3104         for (i = 0; i < (1U << ctx->cancel_hash_bits); i++) {
3105                 struct hlist_head *list;
3106
3107                 list = &ctx->cancel_hash[i];
3108                 hlist_for_each_entry_safe(req, tmp, list, hash_node)
3109                         io_poll_remove_one(req);
3110         }
3111         spin_unlock_irq(&ctx->completion_lock);
3112 }
3113
3114 static int io_poll_cancel(struct io_ring_ctx *ctx, __u64 sqe_addr)
3115 {
3116         struct hlist_head *list;
3117         struct io_kiocb *req;
3118
3119         list = &ctx->cancel_hash[hash_long(sqe_addr, ctx->cancel_hash_bits)];
3120         hlist_for_each_entry(req, list, hash_node) {
3121                 if (sqe_addr == req->user_data) {
3122                         io_poll_remove_one(req);
3123                         return 0;
3124                 }
3125         }
3126
3127         return -ENOENT;
3128 }
3129
3130 static int io_poll_remove_prep(struct io_kiocb *req,
3131                                const struct io_uring_sqe *sqe)
3132 {
3133         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3134                 return -EINVAL;
3135         if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
3136             sqe->poll_events)
3137                 return -EINVAL;
3138
3139         req->poll.addr = READ_ONCE(sqe->addr);
3140         return 0;
3141 }
3142
3143 /*
3144  * Find a running poll command that matches one specified in sqe->addr,
3145  * and remove it if found.
3146  */
3147 static int io_poll_remove(struct io_kiocb *req)
3148 {
3149         struct io_ring_ctx *ctx = req->ctx;
3150         u64 addr;
3151         int ret;
3152
3153         addr = req->poll.addr;
3154         spin_lock_irq(&ctx->completion_lock);
3155         ret = io_poll_cancel(ctx, addr);
3156         spin_unlock_irq(&ctx->completion_lock);
3157
3158         io_cqring_add_event(req, ret);
3159         if (ret < 0)
3160                 req_set_fail_links(req);
3161         io_put_req(req);
3162         return 0;
3163 }
3164
3165 static void io_poll_complete(struct io_kiocb *req, __poll_t mask, int error)
3166 {
3167         struct io_ring_ctx *ctx = req->ctx;
3168
3169         req->poll.done = true;
3170         if (error)
3171                 io_cqring_fill_event(req, error);
3172         else
3173                 io_cqring_fill_event(req, mangle_poll(mask));
3174         io_commit_cqring(ctx);
3175 }
3176
3177 static void io_poll_complete_work(struct io_wq_work **workptr)
3178 {
3179         struct io_wq_work *work = *workptr;
3180         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3181         struct io_poll_iocb *poll = &req->poll;
3182         struct poll_table_struct pt = { ._key = poll->events };
3183         struct io_ring_ctx *ctx = req->ctx;
3184         struct io_kiocb *nxt = NULL;
3185         __poll_t mask = 0;
3186         int ret = 0;
3187
3188         if (work->flags & IO_WQ_WORK_CANCEL) {
3189                 WRITE_ONCE(poll->canceled, true);
3190                 ret = -ECANCELED;
3191         } else if (READ_ONCE(poll->canceled)) {
3192                 ret = -ECANCELED;
3193         }
3194
3195         if (ret != -ECANCELED)
3196                 mask = vfs_poll(poll->file, &pt) & poll->events;
3197
3198         /*
3199          * Note that ->ki_cancel callers also delete iocb from active_reqs after
3200          * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
3201          * synchronize with them.  In the cancellation case the list_del_init
3202          * itself is not actually needed, but harmless so we keep it in to
3203          * avoid further branches in the fast path.
3204          */
3205         spin_lock_irq(&ctx->completion_lock);
3206         if (!mask && ret != -ECANCELED) {
3207                 add_wait_queue(poll->head, &poll->wait);
3208                 spin_unlock_irq(&ctx->completion_lock);
3209                 return;
3210         }
3211         hash_del(&req->hash_node);
3212         io_poll_complete(req, mask, ret);
3213         spin_unlock_irq(&ctx->completion_lock);
3214
3215         io_cqring_ev_posted(ctx);
3216
3217         if (ret < 0)
3218                 req_set_fail_links(req);
3219         io_put_req_find_next(req, &nxt);
3220         if (nxt)
3221                 io_wq_assign_next(workptr, nxt);
3222 }
3223
3224 static void __io_poll_flush(struct io_ring_ctx *ctx, struct llist_node *nodes)
3225 {
3226         struct io_kiocb *req, *tmp;
3227         struct req_batch rb;
3228
3229         rb.to_free = 0;
3230         spin_lock_irq(&ctx->completion_lock);
3231         llist_for_each_entry_safe(req, tmp, nodes, llist_node) {
3232                 hash_del(&req->hash_node);
3233                 io_poll_complete(req, req->result, 0);
3234
3235                 if (refcount_dec_and_test(&req->refs) &&
3236                     !io_req_multi_free(&rb, req)) {
3237                         req->flags |= REQ_F_COMP_LOCKED;
3238                         io_free_req(req);
3239                 }
3240         }
3241         spin_unlock_irq(&ctx->completion_lock);
3242
3243         io_cqring_ev_posted(ctx);
3244         io_free_req_many(ctx, &rb);
3245 }
3246
3247 static void io_poll_flush(struct io_wq_work **workptr)
3248 {
3249         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
3250         struct llist_node *nodes;
3251
3252         nodes = llist_del_all(&req->ctx->poll_llist);
3253         if (nodes)
3254                 __io_poll_flush(req->ctx, nodes);
3255 }
3256
3257 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
3258                         void *key)
3259 {
3260         struct io_poll_iocb *poll = wait->private;
3261         struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
3262         struct io_ring_ctx *ctx = req->ctx;
3263         __poll_t mask = key_to_poll(key);
3264
3265         /* for instances that support it check for an event match first: */
3266         if (mask && !(mask & poll->events))
3267                 return 0;
3268
3269         list_del_init(&poll->wait.entry);
3270
3271         /*
3272          * Run completion inline if we can. We're using trylock here because
3273          * we are violating the completion_lock -> poll wq lock ordering.
3274          * If we have a link timeout we're going to need the completion_lock
3275          * for finalizing the request, mark us as having grabbed that already.
3276          */
3277         if (mask) {
3278                 unsigned long flags;
3279
3280                 if (llist_empty(&ctx->poll_llist) &&
3281                     spin_trylock_irqsave(&ctx->completion_lock, flags)) {
3282                         hash_del(&req->hash_node);
3283                         io_poll_complete(req, mask, 0);
3284                         req->flags |= REQ_F_COMP_LOCKED;
3285                         io_put_req(req);
3286                         spin_unlock_irqrestore(&ctx->completion_lock, flags);
3287
3288                         io_cqring_ev_posted(ctx);
3289                         req = NULL;
3290                 } else {
3291                         req->result = mask;
3292                         req->llist_node.next = NULL;
3293                         /* if the list wasn't empty, we're done */
3294                         if (!llist_add(&req->llist_node, &ctx->poll_llist))
3295                                 req = NULL;
3296                         else
3297                                 req->work.func = io_poll_flush;
3298                 }
3299         }
3300         if (req)
3301                 io_queue_async_work(req);
3302
3303         return 1;
3304 }
3305
3306 struct io_poll_table {
3307         struct poll_table_struct pt;
3308         struct io_kiocb *req;
3309         int error;
3310 };
3311
3312 static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
3313                                struct poll_table_struct *p)
3314 {
3315         struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
3316
3317         if (unlikely(pt->req->poll.head)) {
3318                 pt->error = -EINVAL;
3319                 return;
3320         }
3321
3322         pt->error = 0;
3323         pt->req->poll.head = head;
3324         add_wait_queue(head, &pt->req->poll.wait);
3325 }
3326
3327 static void io_poll_req_insert(struct io_kiocb *req)
3328 {
3329         struct io_ring_ctx *ctx = req->ctx;
3330         struct hlist_head *list;
3331
3332         list = &ctx->cancel_hash[hash_long(req->user_data, ctx->cancel_hash_bits)];
3333         hlist_add_head(&req->hash_node, list);
3334 }
3335
3336 static int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
3337 {
3338         struct io_poll_iocb *poll = &req->poll;
3339         u16 events;
3340
3341         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3342                 return -EINVAL;
3343         if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
3344                 return -EINVAL;
3345         if (!poll->file)
3346                 return -EBADF;
3347
3348         events = READ_ONCE(sqe->poll_events);
3349         poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
3350         return 0;
3351 }
3352
3353 static int io_poll_add(struct io_kiocb *req, struct io_kiocb **nxt)
3354 {
3355         struct io_poll_iocb *poll = &req->poll;
3356         struct io_ring_ctx *ctx = req->ctx;
3357         struct io_poll_table ipt;
3358         bool cancel = false;
3359         __poll_t mask;
3360
3361         INIT_IO_WORK(&req->work, io_poll_complete_work);
3362         INIT_HLIST_NODE(&req->hash_node);
3363
3364         poll->head = NULL;
3365         poll->done = false;
3366         poll->canceled = false;
3367
3368         ipt.pt._qproc = io_poll_queue_proc;
3369         ipt.pt._key = poll->events;
3370         ipt.req = req;
3371         ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
3372
3373         /* initialized the list so that we can do list_empty checks */
3374         INIT_LIST_HEAD(&poll->wait.entry);
3375         init_waitqueue_func_entry(&poll->wait, io_poll_wake);
3376         poll->wait.private = poll;
3377
3378         INIT_LIST_HEAD(&req->list);
3379
3380         mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
3381
3382         spin_lock_irq(&ctx->completion_lock);
3383         if (likely(poll->head)) {
3384                 spin_lock(&poll->head->lock);
3385                 if (unlikely(list_empty(&poll->wait.entry))) {
3386                         if (ipt.error)
3387                                 cancel = true;
3388                         ipt.error = 0;
3389                         mask = 0;
3390                 }
3391                 if (mask || ipt.error)
3392                         list_del_init(&poll->wait.entry);
3393                 else if (cancel)
3394                         WRITE_ONCE(poll->canceled, true);
3395                 else if (!poll->done) /* actually waiting for an event */
3396                         io_poll_req_insert(req);
3397                 spin_unlock(&poll->head->lock);
3398         }
3399         if (mask) { /* no async, we'd stolen it */
3400                 ipt.error = 0;
3401                 io_poll_complete(req, mask, 0);
3402         }
3403         spin_unlock_irq(&ctx->completion_lock);
3404
3405         if (mask) {
3406                 io_cqring_ev_posted(ctx);
3407                 io_put_req_find_next(req, nxt);
3408         }
3409         return ipt.error;
3410 }
3411
3412 static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
3413 {
3414         struct io_timeout_data *data = container_of(timer,
3415                                                 struct io_timeout_data, timer);
3416         struct io_kiocb *req = data->req;
3417         struct io_ring_ctx *ctx = req->ctx;
3418         unsigned long flags;
3419
3420         atomic_inc(&ctx->cq_timeouts);
3421
3422         spin_lock_irqsave(&ctx->completion_lock, flags);
3423         /*
3424          * We could be racing with timeout deletion. If the list is empty,
3425          * then timeout lookup already found it and will be handling it.
3426          */
3427         if (!list_empty(&req->list)) {
3428                 struct io_kiocb *prev;
3429
3430                 /*
3431                  * Adjust the reqs sequence before the current one because it
3432                  * will consume a slot in the cq_ring and the cq_tail
3433                  * pointer will be increased, otherwise other timeout reqs may
3434                  * return in advance without waiting for enough wait_nr.
3435                  */
3436                 prev = req;
3437                 list_for_each_entry_continue_reverse(prev, &ctx->timeout_list, list)
3438                         prev->sequence++;
3439                 list_del_init(&req->list);
3440         }
3441
3442         io_cqring_fill_event(req, -ETIME);
3443         io_commit_cqring(ctx);
3444         spin_unlock_irqrestore(&ctx->completion_lock, flags);
3445
3446         io_cqring_ev_posted(ctx);
3447         req_set_fail_links(req);
3448         io_put_req(req);
3449         return HRTIMER_NORESTART;
3450 }
3451
3452 static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data)
3453 {
3454         struct io_kiocb *req;
3455         int ret = -ENOENT;
3456
3457         list_for_each_entry(req, &ctx->timeout_list, list) {
3458                 if (user_data == req->user_data) {
3459                         list_del_init(&req->list);
3460                         ret = 0;
3461                         break;
3462                 }
3463         }
3464
3465         if (ret == -ENOENT)
3466                 return ret;
3467
3468         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
3469         if (ret == -1)
3470                 return -EALREADY;
3471
3472         req_set_fail_links(req);
3473         io_cqring_fill_event(req, -ECANCELED);
3474         io_put_req(req);
3475         return 0;
3476 }
3477
3478 static int io_timeout_remove_prep(struct io_kiocb *req,
3479                                   const struct io_uring_sqe *sqe)
3480 {
3481         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3482                 return -EINVAL;
3483         if (sqe->flags || sqe->ioprio || sqe->buf_index || sqe->len)
3484                 return -EINVAL;
3485
3486         req->timeout.addr = READ_ONCE(sqe->addr);
3487         req->timeout.flags = READ_ONCE(sqe->timeout_flags);
3488         if (req->timeout.flags)
3489                 return -EINVAL;
3490
3491         return 0;
3492 }
3493
3494 /*
3495  * Remove or update an existing timeout command
3496  */
3497 static int io_timeout_remove(struct io_kiocb *req)
3498 {
3499         struct io_ring_ctx *ctx = req->ctx;
3500         int ret;
3501
3502         spin_lock_irq(&ctx->completion_lock);
3503         ret = io_timeout_cancel(ctx, req->timeout.addr);
3504
3505         io_cqring_fill_event(req, ret);
3506         io_commit_cqring(ctx);
3507         spin_unlock_irq(&ctx->completion_lock);
3508         io_cqring_ev_posted(ctx);
3509         if (ret < 0)
3510                 req_set_fail_links(req);
3511         io_put_req(req);
3512         return 0;
3513 }
3514
3515 static int io_timeout_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
3516                            bool is_timeout_link)
3517 {
3518         struct io_timeout_data *data;
3519         unsigned flags;
3520
3521         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3522                 return -EINVAL;
3523         if (sqe->ioprio || sqe->buf_index || sqe->len != 1)
3524                 return -EINVAL;
3525         if (sqe->off && is_timeout_link)
3526                 return -EINVAL;
3527         flags = READ_ONCE(sqe->timeout_flags);
3528         if (flags & ~IORING_TIMEOUT_ABS)
3529                 return -EINVAL;
3530
3531         req->timeout.count = READ_ONCE(sqe->off);
3532
3533         if (!req->io && io_alloc_async_ctx(req))
3534                 return -ENOMEM;
3535
3536         data = &req->io->timeout;
3537         data->req = req;
3538         req->flags |= REQ_F_TIMEOUT;
3539
3540         if (get_timespec64(&data->ts, u64_to_user_ptr(sqe->addr)))
3541                 return -EFAULT;
3542
3543         if (flags & IORING_TIMEOUT_ABS)
3544                 data->mode = HRTIMER_MODE_ABS;
3545         else
3546                 data->mode = HRTIMER_MODE_REL;
3547
3548         hrtimer_init(&data->timer, CLOCK_MONOTONIC, data->mode);
3549         return 0;
3550 }
3551
3552 static int io_timeout(struct io_kiocb *req)
3553 {
3554         unsigned count;
3555         struct io_ring_ctx *ctx = req->ctx;
3556         struct io_timeout_data *data;
3557         struct list_head *entry;
3558         unsigned span = 0;
3559
3560         data = &req->io->timeout;
3561
3562         /*
3563          * sqe->off holds how many events that need to occur for this
3564          * timeout event to be satisfied. If it isn't set, then this is
3565          * a pure timeout request, sequence isn't used.
3566          */
3567         count = req->timeout.count;
3568         if (!count) {
3569                 req->flags |= REQ_F_TIMEOUT_NOSEQ;
3570                 spin_lock_irq(&ctx->completion_lock);
3571                 entry = ctx->timeout_list.prev;
3572                 goto add;
3573         }
3574
3575         req->sequence = ctx->cached_sq_head + count - 1;
3576         data->seq_offset = count;
3577
3578         /*
3579          * Insertion sort, ensuring the first entry in the list is always
3580          * the one we need first.
3581          */
3582         spin_lock_irq(&ctx->completion_lock);
3583         list_for_each_prev(entry, &ctx->timeout_list) {
3584                 struct io_kiocb *nxt = list_entry(entry, struct io_kiocb, list);
3585                 unsigned nxt_sq_head;
3586                 long long tmp, tmp_nxt;
3587                 u32 nxt_offset = nxt->io->timeout.seq_offset;
3588
3589                 if (nxt->flags & REQ_F_TIMEOUT_NOSEQ)
3590                         continue;
3591
3592                 /*
3593                  * Since cached_sq_head + count - 1 can overflow, use type long
3594                  * long to store it.
3595                  */
3596                 tmp = (long long)ctx->cached_sq_head + count - 1;
3597                 nxt_sq_head = nxt->sequence - nxt_offset + 1;
3598                 tmp_nxt = (long long)nxt_sq_head + nxt_offset - 1;
3599
3600                 /*
3601                  * cached_sq_head may overflow, and it will never overflow twice
3602                  * once there is some timeout req still be valid.
3603                  */
3604                 if (ctx->cached_sq_head < nxt_sq_head)
3605                         tmp += UINT_MAX;
3606
3607                 if (tmp > tmp_nxt)
3608                         break;
3609
3610                 /*
3611                  * Sequence of reqs after the insert one and itself should
3612                  * be adjusted because each timeout req consumes a slot.
3613                  */
3614                 span++;
3615                 nxt->sequence++;
3616         }
3617         req->sequence -= span;
3618 add:
3619         list_add(&req->list, entry);
3620         data->timer.function = io_timeout_fn;
3621         hrtimer_start(&data->timer, timespec64_to_ktime(data->ts), data->mode);
3622         spin_unlock_irq(&ctx->completion_lock);
3623         return 0;
3624 }
3625
3626 static bool io_cancel_cb(struct io_wq_work *work, void *data)
3627 {
3628         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3629
3630         return req->user_data == (unsigned long) data;
3631 }
3632
3633 static int io_async_cancel_one(struct io_ring_ctx *ctx, void *sqe_addr)
3634 {
3635         enum io_wq_cancel cancel_ret;
3636         int ret = 0;
3637
3638         cancel_ret = io_wq_cancel_cb(ctx->io_wq, io_cancel_cb, sqe_addr);
3639         switch (cancel_ret) {
3640         case IO_WQ_CANCEL_OK:
3641                 ret = 0;
3642                 break;
3643         case IO_WQ_CANCEL_RUNNING:
3644                 ret = -EALREADY;
3645                 break;
3646         case IO_WQ_CANCEL_NOTFOUND:
3647                 ret = -ENOENT;
3648                 break;
3649         }
3650
3651         return ret;
3652 }
3653
3654 static void io_async_find_and_cancel(struct io_ring_ctx *ctx,
3655                                      struct io_kiocb *req, __u64 sqe_addr,
3656                                      struct io_kiocb **nxt, int success_ret)
3657 {
3658         unsigned long flags;
3659         int ret;
3660
3661         ret = io_async_cancel_one(ctx, (void *) (unsigned long) sqe_addr);
3662         if (ret != -ENOENT) {
3663                 spin_lock_irqsave(&ctx->completion_lock, flags);
3664                 goto done;
3665         }
3666
3667         spin_lock_irqsave(&ctx->completion_lock, flags);
3668         ret = io_timeout_cancel(ctx, sqe_addr);
3669         if (ret != -ENOENT)
3670                 goto done;
3671         ret = io_poll_cancel(ctx, sqe_addr);
3672 done:
3673         if (!ret)
3674                 ret = success_ret;
3675         io_cqring_fill_event(req, ret);
3676         io_commit_cqring(ctx);
3677         spin_unlock_irqrestore(&ctx->completion_lock, flags);
3678         io_cqring_ev_posted(ctx);
3679
3680         if (ret < 0)
3681                 req_set_fail_links(req);
3682         io_put_req_find_next(req, nxt);
3683 }
3684
3685 static int io_async_cancel_prep(struct io_kiocb *req,
3686                                 const struct io_uring_sqe *sqe)
3687 {
3688         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3689                 return -EINVAL;
3690         if (sqe->flags || sqe->ioprio || sqe->off || sqe->len ||
3691             sqe->cancel_flags)
3692                 return -EINVAL;
3693
3694         req->cancel.addr = READ_ONCE(sqe->addr);
3695         return 0;
3696 }
3697
3698 static int io_async_cancel(struct io_kiocb *req, struct io_kiocb **nxt)
3699 {
3700         struct io_ring_ctx *ctx = req->ctx;
3701
3702         io_async_find_and_cancel(ctx, req, req->cancel.addr, nxt, 0);
3703         return 0;
3704 }
3705
3706 static int io_files_update_prep(struct io_kiocb *req,
3707                                 const struct io_uring_sqe *sqe)
3708 {
3709         if (sqe->flags || sqe->ioprio || sqe->rw_flags)
3710                 return -EINVAL;
3711
3712         req->files_update.offset = READ_ONCE(sqe->off);
3713         req->files_update.nr_args = READ_ONCE(sqe->len);
3714         if (!req->files_update.nr_args)
3715                 return -EINVAL;
3716         req->files_update.arg = READ_ONCE(sqe->addr);
3717         return 0;
3718 }
3719
3720 static int io_files_update(struct io_kiocb *req, bool force_nonblock)
3721 {
3722         struct io_ring_ctx *ctx = req->ctx;
3723         struct io_uring_files_update up;
3724         int ret;
3725
3726         if (force_nonblock) {
3727                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
3728                 return -EAGAIN;
3729         }
3730
3731         up.offset = req->files_update.offset;
3732         up.fds = req->files_update.arg;
3733
3734         mutex_lock(&ctx->uring_lock);
3735         ret = __io_sqe_files_update(ctx, &up, req->files_update.nr_args);
3736         mutex_unlock(&ctx->uring_lock);
3737
3738         if (ret < 0)
3739                 req_set_fail_links(req);
3740         io_cqring_add_event(req, ret);
3741         io_put_req(req);
3742         return 0;
3743 }
3744
3745 static int io_req_defer_prep(struct io_kiocb *req,
3746                              const struct io_uring_sqe *sqe)
3747 {
3748         ssize_t ret = 0;
3749
3750         switch (req->opcode) {
3751         case IORING_OP_NOP:
3752                 break;
3753         case IORING_OP_READV:
3754         case IORING_OP_READ_FIXED:
3755         case IORING_OP_READ:
3756                 ret = io_read_prep(req, sqe, true);
3757                 break;
3758         case IORING_OP_WRITEV:
3759         case IORING_OP_WRITE_FIXED:
3760         case IORING_OP_WRITE:
3761                 ret = io_write_prep(req, sqe, true);
3762                 break;
3763         case IORING_OP_POLL_ADD:
3764                 ret = io_poll_add_prep(req, sqe);
3765                 break;
3766         case IORING_OP_POLL_REMOVE:
3767                 ret = io_poll_remove_prep(req, sqe);
3768                 break;
3769         case IORING_OP_FSYNC:
3770                 ret = io_prep_fsync(req, sqe);
3771                 break;
3772         case IORING_OP_SYNC_FILE_RANGE:
3773                 ret = io_prep_sfr(req, sqe);
3774                 break;
3775         case IORING_OP_SENDMSG:
3776                 ret = io_sendmsg_prep(req, sqe);
3777                 break;
3778         case IORING_OP_RECVMSG:
3779                 ret = io_recvmsg_prep(req, sqe);
3780                 break;
3781         case IORING_OP_CONNECT:
3782                 ret = io_connect_prep(req, sqe);
3783                 break;
3784         case IORING_OP_TIMEOUT:
3785                 ret = io_timeout_prep(req, sqe, false);
3786                 break;
3787         case IORING_OP_TIMEOUT_REMOVE:
3788                 ret = io_timeout_remove_prep(req, sqe);
3789                 break;
3790         case IORING_OP_ASYNC_CANCEL:
3791                 ret = io_async_cancel_prep(req, sqe);
3792                 break;
3793         case IORING_OP_LINK_TIMEOUT:
3794                 ret = io_timeout_prep(req, sqe, true);
3795                 break;
3796         case IORING_OP_ACCEPT:
3797                 ret = io_accept_prep(req, sqe);
3798                 break;
3799         case IORING_OP_FALLOCATE:
3800                 ret = io_fallocate_prep(req, sqe);
3801                 break;
3802         case IORING_OP_OPENAT:
3803                 ret = io_openat_prep(req, sqe);
3804                 break;
3805         case IORING_OP_CLOSE:
3806                 ret = io_close_prep(req, sqe);
3807                 break;
3808         case IORING_OP_FILES_UPDATE:
3809                 ret = io_files_update_prep(req, sqe);
3810                 break;
3811         case IORING_OP_STATX:
3812                 ret = io_statx_prep(req, sqe);
3813                 break;
3814         case IORING_OP_FADVISE:
3815                 ret = io_fadvise_prep(req, sqe);
3816                 break;
3817         case IORING_OP_MADVISE:
3818                 ret = io_madvise_prep(req, sqe);
3819                 break;
3820         default:
3821                 printk_once(KERN_WARNING "io_uring: unhandled opcode %d\n",
3822                                 req->opcode);
3823                 ret = -EINVAL;
3824                 break;
3825         }
3826
3827         return ret;
3828 }
3829
3830 static int io_req_defer(struct io_kiocb *req, const struct io_uring_sqe *sqe)
3831 {
3832         struct io_ring_ctx *ctx = req->ctx;
3833         int ret;
3834
3835         /* Still need defer if there is pending req in defer list. */
3836         if (!req_need_defer(req) && list_empty(&ctx->defer_list))
3837                 return 0;
3838
3839         if (!req->io && io_alloc_async_ctx(req))
3840                 return -EAGAIN;
3841
3842         ret = io_req_defer_prep(req, sqe);
3843         if (ret < 0)
3844                 return ret;
3845
3846         spin_lock_irq(&ctx->completion_lock);
3847         if (!req_need_defer(req) && list_empty(&ctx->defer_list)) {
3848                 spin_unlock_irq(&ctx->completion_lock);
3849                 return 0;
3850         }
3851
3852         trace_io_uring_defer(ctx, req, req->user_data);
3853         list_add_tail(&req->list, &ctx->defer_list);
3854         spin_unlock_irq(&ctx->completion_lock);
3855         return -EIOCBQUEUED;
3856 }
3857
3858 static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
3859                         struct io_kiocb **nxt, bool force_nonblock)
3860 {
3861         struct io_ring_ctx *ctx = req->ctx;
3862         int ret;
3863
3864         switch (req->opcode) {
3865         case IORING_OP_NOP:
3866                 ret = io_nop(req);
3867                 break;
3868         case IORING_OP_READV:
3869         case IORING_OP_READ_FIXED:
3870         case IORING_OP_READ:
3871                 if (sqe) {
3872                         ret = io_read_prep(req, sqe, force_nonblock);
3873                         if (ret < 0)
3874                                 break;
3875                 }
3876                 ret = io_read(req, nxt, force_nonblock);
3877                 break;
3878         case IORING_OP_WRITEV:
3879         case IORING_OP_WRITE_FIXED:
3880         case IORING_OP_WRITE:
3881                 if (sqe) {
3882                         ret = io_write_prep(req, sqe, force_nonblock);
3883                         if (ret < 0)
3884                                 break;
3885                 }
3886                 ret = io_write(req, nxt, force_nonblock);
3887                 break;
3888         case IORING_OP_FSYNC:
3889                 if (sqe) {
3890                         ret = io_prep_fsync(req, sqe);
3891                         if (ret < 0)
3892                                 break;
3893                 }
3894                 ret = io_fsync(req, nxt, force_nonblock);
3895                 break;
3896         case IORING_OP_POLL_ADD:
3897                 if (sqe) {
3898                         ret = io_poll_add_prep(req, sqe);
3899                         if (ret)
3900                                 break;
3901                 }
3902                 ret = io_poll_add(req, nxt);
3903                 break;
3904         case IORING_OP_POLL_REMOVE:
3905                 if (sqe) {
3906                         ret = io_poll_remove_prep(req, sqe);
3907                         if (ret < 0)
3908                                 break;
3909                 }
3910                 ret = io_poll_remove(req);
3911                 break;
3912         case IORING_OP_SYNC_FILE_RANGE:
3913                 if (sqe) {
3914                         ret = io_prep_sfr(req, sqe);
3915                         if (ret < 0)
3916                                 break;
3917                 }
3918                 ret = io_sync_file_range(req, nxt, force_nonblock);
3919                 break;
3920         case IORING_OP_SENDMSG:
3921                 if (sqe) {
3922                         ret = io_sendmsg_prep(req, sqe);
3923                         if (ret < 0)
3924                                 break;
3925                 }
3926                 ret = io_sendmsg(req, nxt, force_nonblock);
3927                 break;
3928         case IORING_OP_RECVMSG:
3929                 if (sqe) {
3930                         ret = io_recvmsg_prep(req, sqe);
3931                         if (ret)
3932                                 break;
3933                 }
3934                 ret = io_recvmsg(req, nxt, force_nonblock);
3935                 break;
3936         case IORING_OP_TIMEOUT:
3937                 if (sqe) {
3938                         ret = io_timeout_prep(req, sqe, false);
3939                         if (ret)
3940                                 break;
3941                 }
3942                 ret = io_timeout(req);
3943                 break;
3944         case IORING_OP_TIMEOUT_REMOVE:
3945                 if (sqe) {
3946                         ret = io_timeout_remove_prep(req, sqe);
3947                         if (ret)
3948                                 break;
3949                 }
3950                 ret = io_timeout_remove(req);
3951                 break;
3952         case IORING_OP_ACCEPT:
3953                 if (sqe) {
3954                         ret = io_accept_prep(req, sqe);
3955                         if (ret)
3956                                 break;
3957                 }
3958                 ret = io_accept(req, nxt, force_nonblock);
3959                 break;
3960         case IORING_OP_CONNECT:
3961                 if (sqe) {
3962                         ret = io_connect_prep(req, sqe);
3963                         if (ret)
3964                                 break;
3965                 }
3966                 ret = io_connect(req, nxt, force_nonblock);
3967                 break;
3968         case IORING_OP_ASYNC_CANCEL:
3969                 if (sqe) {
3970                         ret = io_async_cancel_prep(req, sqe);
3971                         if (ret)
3972                                 break;
3973                 }
3974                 ret = io_async_cancel(req, nxt);
3975                 break;
3976         case IORING_OP_FALLOCATE:
3977                 if (sqe) {
3978                         ret = io_fallocate_prep(req, sqe);
3979                         if (ret)
3980                                 break;
3981                 }
3982                 ret = io_fallocate(req, nxt, force_nonblock);
3983                 break;
3984         case IORING_OP_OPENAT:
3985                 if (sqe) {
3986                         ret = io_openat_prep(req, sqe);
3987                         if (ret)
3988                                 break;
3989                 }
3990                 ret = io_openat(req, nxt, force_nonblock);
3991                 break;
3992         case IORING_OP_CLOSE:
3993                 if (sqe) {
3994                         ret = io_close_prep(req, sqe);
3995                         if (ret)
3996                                 break;
3997                 }
3998                 ret = io_close(req, nxt, force_nonblock);
3999                 break;
4000         case IORING_OP_FILES_UPDATE:
4001                 if (sqe) {
4002                         ret = io_files_update_prep(req, sqe);
4003                         if (ret)
4004                                 break;
4005                 }
4006                 ret = io_files_update(req, force_nonblock);
4007                 break;
4008         case IORING_OP_STATX:
4009                 if (sqe) {
4010                         ret = io_statx_prep(req, sqe);
4011                         if (ret)
4012                                 break;
4013                 }
4014                 ret = io_statx(req, nxt, force_nonblock);
4015                 break;
4016         case IORING_OP_FADVISE:
4017                 if (sqe) {
4018                         ret = io_fadvise_prep(req, sqe);
4019                         if (ret)
4020                                 break;
4021                 }
4022                 ret = io_fadvise(req, nxt, force_nonblock);
4023                 break;
4024         case IORING_OP_MADVISE:
4025                 if (sqe) {
4026                         ret = io_madvise_prep(req, sqe);
4027                         if (ret)
4028                                 break;
4029                 }
4030                 ret = io_madvise(req, nxt, force_nonblock);
4031                 break;
4032         default:
4033                 ret = -EINVAL;
4034                 break;
4035         }
4036
4037         if (ret)
4038                 return ret;
4039
4040         if (ctx->flags & IORING_SETUP_IOPOLL) {
4041                 const bool in_async = io_wq_current_is_worker();
4042
4043                 if (req->result == -EAGAIN)
4044                         return -EAGAIN;
4045
4046                 /* workqueue context doesn't hold uring_lock, grab it now */
4047                 if (in_async)
4048                         mutex_lock(&ctx->uring_lock);
4049
4050                 io_iopoll_req_issued(req);
4051
4052                 if (in_async)
4053                         mutex_unlock(&ctx->uring_lock);
4054         }
4055
4056         return 0;
4057 }
4058
4059 static void io_wq_submit_work(struct io_wq_work **workptr)
4060 {
4061         struct io_wq_work *work = *workptr;
4062         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
4063         struct io_kiocb *nxt = NULL;
4064         int ret = 0;
4065
4066         /* if NO_CANCEL is set, we must still run the work */
4067         if ((work->flags & (IO_WQ_WORK_CANCEL|IO_WQ_WORK_NO_CANCEL)) ==
4068                                 IO_WQ_WORK_CANCEL) {
4069                 ret = -ECANCELED;
4070         }
4071
4072         if (!ret) {
4073                 req->has_user = (work->flags & IO_WQ_WORK_HAS_MM) != 0;
4074                 req->in_async = true;
4075                 do {
4076                         ret = io_issue_sqe(req, NULL, &nxt, false);
4077                         /*
4078                          * We can get EAGAIN for polled IO even though we're
4079                          * forcing a sync submission from here, since we can't
4080                          * wait for request slots on the block side.
4081                          */
4082                         if (ret != -EAGAIN)
4083                                 break;
4084                         cond_resched();
4085                 } while (1);
4086         }
4087
4088         /* drop submission reference */
4089         io_put_req(req);
4090
4091         if (ret) {
4092                 req_set_fail_links(req);
4093                 io_cqring_add_event(req, ret);
4094                 io_put_req(req);
4095         }
4096
4097         /* if a dependent link is ready, pass it back */
4098         if (!ret && nxt)
4099                 io_wq_assign_next(workptr, nxt);
4100 }
4101
4102 static int io_req_needs_file(struct io_kiocb *req, int fd)
4103 {
4104         if (!io_op_defs[req->opcode].needs_file)
4105                 return 0;
4106         if (fd == -1 && io_op_defs[req->opcode].fd_non_neg)
4107                 return 0;
4108         return 1;
4109 }
4110
4111 static inline struct file *io_file_from_index(struct io_ring_ctx *ctx,
4112                                               int index)
4113 {
4114         struct fixed_file_table *table;
4115
4116         table = &ctx->file_data->table[index >> IORING_FILE_TABLE_SHIFT];
4117         return table->files[index & IORING_FILE_TABLE_MASK];;
4118 }
4119
4120 static int io_req_set_file(struct io_submit_state *state, struct io_kiocb *req,
4121                            const struct io_uring_sqe *sqe)
4122 {
4123         struct io_ring_ctx *ctx = req->ctx;
4124         unsigned flags;
4125         int fd;
4126
4127         flags = READ_ONCE(sqe->flags);
4128         fd = READ_ONCE(sqe->fd);
4129
4130         if (flags & IOSQE_IO_DRAIN)
4131                 req->flags |= REQ_F_IO_DRAIN;
4132
4133         if (!io_req_needs_file(req, fd))
4134                 return 0;
4135
4136         if (flags & IOSQE_FIXED_FILE) {
4137                 if (unlikely(!ctx->file_data ||
4138                     (unsigned) fd >= ctx->nr_user_files))
4139                         return -EBADF;
4140                 fd = array_index_nospec(fd, ctx->nr_user_files);
4141                 req->file = io_file_from_index(ctx, fd);
4142                 if (!req->file)
4143                         return -EBADF;
4144                 req->flags |= REQ_F_FIXED_FILE;
4145                 percpu_ref_get(&ctx->file_data->refs);
4146         } else {
4147                 if (req->needs_fixed_file)
4148                         return -EBADF;
4149                 trace_io_uring_file_get(ctx, fd);
4150                 req->file = io_file_get(state, fd);
4151                 if (unlikely(!req->file))
4152                         return -EBADF;
4153         }
4154
4155         return 0;
4156 }
4157
4158 static int io_grab_files(struct io_kiocb *req)
4159 {
4160         int ret = -EBADF;
4161         struct io_ring_ctx *ctx = req->ctx;
4162
4163         if (!req->ring_file)
4164                 return -EBADF;
4165
4166         rcu_read_lock();
4167         spin_lock_irq(&ctx->inflight_lock);
4168         /*
4169          * We use the f_ops->flush() handler to ensure that we can flush
4170          * out work accessing these files if the fd is closed. Check if
4171          * the fd has changed since we started down this path, and disallow
4172          * this operation if it has.
4173          */
4174         if (fcheck(req->ring_fd) == req->ring_file) {
4175                 list_add(&req->inflight_entry, &ctx->inflight_list);
4176                 req->flags |= REQ_F_INFLIGHT;
4177                 req->work.files = current->files;
4178                 ret = 0;
4179         }
4180         spin_unlock_irq(&ctx->inflight_lock);
4181         rcu_read_unlock();
4182
4183         return ret;
4184 }
4185
4186 static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
4187 {
4188         struct io_timeout_data *data = container_of(timer,
4189                                                 struct io_timeout_data, timer);
4190         struct io_kiocb *req = data->req;
4191         struct io_ring_ctx *ctx = req->ctx;
4192         struct io_kiocb *prev = NULL;
4193         unsigned long flags;
4194
4195         spin_lock_irqsave(&ctx->completion_lock, flags);
4196
4197         /*
4198          * We don't expect the list to be empty, that will only happen if we
4199          * race with the completion of the linked work.
4200          */
4201         if (!list_empty(&req->link_list)) {
4202                 prev = list_entry(req->link_list.prev, struct io_kiocb,
4203                                   link_list);
4204                 if (refcount_inc_not_zero(&prev->refs)) {
4205                         list_del_init(&req->link_list);
4206                         prev->flags &= ~REQ_F_LINK_TIMEOUT;
4207                 } else
4208                         prev = NULL;
4209         }
4210
4211         spin_unlock_irqrestore(&ctx->completion_lock, flags);
4212
4213         if (prev) {
4214                 req_set_fail_links(prev);
4215                 io_async_find_and_cancel(ctx, req, prev->user_data, NULL,
4216                                                 -ETIME);
4217                 io_put_req(prev);
4218         } else {
4219                 io_cqring_add_event(req, -ETIME);
4220                 io_put_req(req);
4221         }
4222         return HRTIMER_NORESTART;
4223 }
4224
4225 static void io_queue_linked_timeout(struct io_kiocb *req)
4226 {
4227         struct io_ring_ctx *ctx = req->ctx;
4228
4229         /*
4230          * If the list is now empty, then our linked request finished before
4231          * we got a chance to setup the timer
4232          */
4233         spin_lock_irq(&ctx->completion_lock);
4234         if (!list_empty(&req->link_list)) {
4235                 struct io_timeout_data *data = &req->io->timeout;
4236
4237                 data->timer.function = io_link_timeout_fn;
4238                 hrtimer_start(&data->timer, timespec64_to_ktime(data->ts),
4239                                 data->mode);
4240         }
4241         spin_unlock_irq(&ctx->completion_lock);
4242
4243         /* drop submission reference */
4244         io_put_req(req);
4245 }
4246
4247 static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
4248 {
4249         struct io_kiocb *nxt;
4250
4251         if (!(req->flags & REQ_F_LINK))
4252                 return NULL;
4253
4254         nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb,
4255                                         link_list);
4256         if (!nxt || nxt->opcode != IORING_OP_LINK_TIMEOUT)
4257                 return NULL;
4258
4259         req->flags |= REQ_F_LINK_TIMEOUT;
4260         return nxt;
4261 }
4262
4263 static void __io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
4264 {
4265         struct io_kiocb *linked_timeout;
4266         struct io_kiocb *nxt = NULL;
4267         int ret;
4268
4269 again:
4270         linked_timeout = io_prep_linked_timeout(req);
4271
4272         ret = io_issue_sqe(req, sqe, &nxt, true);
4273
4274         /*
4275          * We async punt it if the file wasn't marked NOWAIT, or if the file
4276          * doesn't support non-blocking read/write attempts
4277          */
4278         if (ret == -EAGAIN && (!(req->flags & REQ_F_NOWAIT) ||
4279             (req->flags & REQ_F_MUST_PUNT))) {
4280                 if (req->work.flags & IO_WQ_WORK_NEEDS_FILES) {
4281                         ret = io_grab_files(req);
4282                         if (ret)
4283                                 goto err;
4284                 }
4285
4286                 /*
4287                  * Queued up for async execution, worker will release
4288                  * submit reference when the iocb is actually submitted.
4289                  */
4290                 io_queue_async_work(req);
4291                 goto done_req;
4292         }
4293
4294 err:
4295         /* drop submission reference */
4296         io_put_req(req);
4297
4298         if (linked_timeout) {
4299                 if (!ret)
4300                         io_queue_linked_timeout(linked_timeout);
4301                 else
4302                         io_put_req(linked_timeout);
4303         }
4304
4305         /* and drop final reference, if we failed */
4306         if (ret) {
4307                 io_cqring_add_event(req, ret);
4308                 req_set_fail_links(req);
4309                 io_put_req(req);
4310         }
4311 done_req:
4312         if (nxt) {
4313                 req = nxt;
4314                 nxt = NULL;
4315                 goto again;
4316         }
4317 }
4318
4319 static void io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
4320 {
4321         int ret;
4322
4323         if (unlikely(req->ctx->drain_next)) {
4324                 req->flags |= REQ_F_IO_DRAIN;
4325                 req->ctx->drain_next = false;
4326         }
4327         req->ctx->drain_next = (req->flags & REQ_F_DRAIN_LINK);
4328
4329         ret = io_req_defer(req, sqe);
4330         if (ret) {
4331                 if (ret != -EIOCBQUEUED) {
4332                         io_cqring_add_event(req, ret);
4333                         req_set_fail_links(req);
4334                         io_double_put_req(req);
4335                 }
4336         } else if ((req->flags & REQ_F_FORCE_ASYNC) &&
4337                    !io_wq_current_is_worker()) {
4338                 /*
4339                  * Never try inline submit of IOSQE_ASYNC is set, go straight
4340                  * to async execution.
4341                  */
4342                 req->work.flags |= IO_WQ_WORK_CONCURRENT;
4343                 io_queue_async_work(req);
4344         } else {
4345                 __io_queue_sqe(req, sqe);
4346         }
4347 }
4348
4349 static inline void io_queue_link_head(struct io_kiocb *req)
4350 {
4351         if (unlikely(req->flags & REQ_F_FAIL_LINK)) {
4352                 io_cqring_add_event(req, -ECANCELED);
4353                 io_double_put_req(req);
4354         } else
4355                 io_queue_sqe(req, NULL);
4356 }
4357
4358 #define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK| \
4359                                 IOSQE_IO_HARDLINK | IOSQE_ASYNC)
4360
4361 static bool io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
4362                           struct io_submit_state *state, struct io_kiocb **link)
4363 {
4364         struct io_ring_ctx *ctx = req->ctx;
4365         unsigned int sqe_flags;
4366         int ret;
4367
4368         sqe_flags = READ_ONCE(sqe->flags);
4369
4370         /* enforce forwards compatibility on users */
4371         if (unlikely(sqe_flags & ~SQE_VALID_FLAGS)) {
4372                 ret = -EINVAL;
4373                 goto err_req;
4374         }
4375         if (sqe_flags & IOSQE_ASYNC)
4376                 req->flags |= REQ_F_FORCE_ASYNC;
4377
4378         ret = io_req_set_file(state, req, sqe);
4379         if (unlikely(ret)) {
4380 err_req:
4381                 io_cqring_add_event(req, ret);
4382                 io_double_put_req(req);
4383                 return false;
4384         }
4385
4386         /*
4387          * If we already have a head request, queue this one for async
4388          * submittal once the head completes. If we don't have a head but
4389          * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
4390          * submitted sync once the chain is complete. If none of those
4391          * conditions are true (normal request), then just queue it.
4392          */
4393         if (*link) {
4394                 struct io_kiocb *head = *link;
4395
4396                 if (sqe_flags & IOSQE_IO_DRAIN)
4397                         head->flags |= REQ_F_DRAIN_LINK | REQ_F_IO_DRAIN;
4398
4399                 if (sqe_flags & IOSQE_IO_HARDLINK)
4400                         req->flags |= REQ_F_HARDLINK;
4401
4402                 if (io_alloc_async_ctx(req)) {
4403                         ret = -EAGAIN;
4404                         goto err_req;
4405                 }
4406
4407                 ret = io_req_defer_prep(req, sqe);
4408                 if (ret) {
4409                         /* fail even hard links since we don't submit */
4410                         head->flags |= REQ_F_FAIL_LINK;
4411                         goto err_req;
4412                 }
4413                 trace_io_uring_link(ctx, req, head);
4414                 list_add_tail(&req->link_list, &head->link_list);
4415
4416                 /* last request of a link, enqueue the link */
4417                 if (!(sqe_flags & (IOSQE_IO_LINK|IOSQE_IO_HARDLINK))) {
4418                         io_queue_link_head(head);
4419                         *link = NULL;
4420                 }
4421         } else if (sqe_flags & (IOSQE_IO_LINK|IOSQE_IO_HARDLINK)) {
4422                 req->flags |= REQ_F_LINK;
4423                 if (sqe_flags & IOSQE_IO_HARDLINK)
4424                         req->flags |= REQ_F_HARDLINK;
4425
4426                 INIT_LIST_HEAD(&req->link_list);
4427                 ret = io_req_defer_prep(req, sqe);
4428                 if (ret)
4429                         req->flags |= REQ_F_FAIL_LINK;
4430                 *link = req;
4431         } else {
4432                 io_queue_sqe(req, sqe);
4433         }
4434
4435         return true;
4436 }
4437
4438 /*
4439  * Batched submission is done, ensure local IO is flushed out.
4440  */
4441 static void io_submit_state_end(struct io_submit_state *state)
4442 {
4443         blk_finish_plug(&state->plug);
4444         io_file_put(state);
4445         if (state->free_reqs)
4446                 kmem_cache_free_bulk(req_cachep, state->free_reqs,
4447                                         &state->reqs[state->cur_req]);
4448 }
4449
4450 /*
4451  * Start submission side cache.
4452  */
4453 static void io_submit_state_start(struct io_submit_state *state,
4454                                   unsigned int max_ios)
4455 {
4456         blk_start_plug(&state->plug);
4457         state->free_reqs = 0;
4458         state->file = NULL;
4459         state->ios_left = max_ios;
4460 }
4461
4462 static void io_commit_sqring(struct io_ring_ctx *ctx)
4463 {
4464         struct io_rings *rings = ctx->rings;
4465
4466         if (ctx->cached_sq_head != READ_ONCE(rings->sq.head)) {
4467                 /*
4468                  * Ensure any loads from the SQEs are done at this point,
4469                  * since once we write the new head, the application could
4470                  * write new data to them.
4471                  */
4472                 smp_store_release(&rings->sq.head, ctx->cached_sq_head);
4473         }
4474 }
4475
4476 /*
4477  * Fetch an sqe, if one is available. Note that sqe_ptr will point to memory
4478  * that is mapped by userspace. This means that care needs to be taken to
4479  * ensure that reads are stable, as we cannot rely on userspace always
4480  * being a good citizen. If members of the sqe are validated and then later
4481  * used, it's important that those reads are done through READ_ONCE() to
4482  * prevent a re-load down the line.
4483  */
4484 static bool io_get_sqring(struct io_ring_ctx *ctx, struct io_kiocb *req,
4485                           const struct io_uring_sqe **sqe_ptr)
4486 {
4487         struct io_rings *rings = ctx->rings;
4488         u32 *sq_array = ctx->sq_array;
4489         unsigned head;
4490
4491         /*
4492          * The cached sq head (or cq tail) serves two purposes:
4493          *
4494          * 1) allows us to batch the cost of updating the user visible
4495          *    head updates.
4496          * 2) allows the kernel side to track the head on its own, even
4497          *    though the application is the one updating it.
4498          */
4499         head = ctx->cached_sq_head;
4500         /* make sure SQ entry isn't read before tail */
4501         if (unlikely(head == smp_load_acquire(&rings->sq.tail)))
4502                 return false;
4503
4504         head = READ_ONCE(sq_array[head & ctx->sq_mask]);
4505         if (likely(head < ctx->sq_entries)) {
4506                 /*
4507                  * All io need record the previous position, if LINK vs DARIN,
4508                  * it can be used to mark the position of the first IO in the
4509                  * link list.
4510                  */
4511                 req->sequence = ctx->cached_sq_head;
4512                 *sqe_ptr = &ctx->sq_sqes[head];
4513                 req->opcode = READ_ONCE((*sqe_ptr)->opcode);
4514                 req->user_data = READ_ONCE((*sqe_ptr)->user_data);
4515                 ctx->cached_sq_head++;
4516                 return true;
4517         }
4518
4519         /* drop invalid entries */
4520         ctx->cached_sq_head++;
4521         ctx->cached_sq_dropped++;
4522         WRITE_ONCE(rings->sq_dropped, ctx->cached_sq_dropped);
4523         return false;
4524 }
4525
4526 static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
4527                           struct file *ring_file, int ring_fd,
4528                           struct mm_struct **mm, bool async)
4529 {
4530         struct io_submit_state state, *statep = NULL;
4531         struct io_kiocb *link = NULL;
4532         int i, submitted = 0;
4533         bool mm_fault = false;
4534
4535         /* if we have a backlog and couldn't flush it all, return BUSY */
4536         if (test_bit(0, &ctx->sq_check_overflow)) {
4537                 if (!list_empty(&ctx->cq_overflow_list) &&
4538                     !io_cqring_overflow_flush(ctx, false))
4539                         return -EBUSY;
4540         }
4541
4542         if (!percpu_ref_tryget_many(&ctx->refs, nr))
4543                 return -EAGAIN;
4544
4545         if (nr > IO_PLUG_THRESHOLD) {
4546                 io_submit_state_start(&state, nr);
4547                 statep = &state;
4548         }
4549
4550         for (i = 0; i < nr; i++) {
4551                 const struct io_uring_sqe *sqe;
4552                 struct io_kiocb *req;
4553
4554                 req = io_get_req(ctx, statep);
4555                 if (unlikely(!req)) {
4556                         if (!submitted)
4557                                 submitted = -EAGAIN;
4558                         break;
4559                 }
4560                 if (!io_get_sqring(ctx, req, &sqe)) {
4561                         __io_req_do_free(req);
4562                         break;
4563                 }
4564
4565                 /* will complete beyond this point, count as submitted */
4566                 submitted++;
4567
4568                 if (unlikely(req->opcode >= IORING_OP_LAST)) {
4569                         io_cqring_add_event(req, -EINVAL);
4570                         io_double_put_req(req);
4571                         break;
4572                 }
4573
4574                 if (io_op_defs[req->opcode].needs_mm && !*mm) {
4575                         mm_fault = mm_fault || !mmget_not_zero(ctx->sqo_mm);
4576                         if (!mm_fault) {
4577                                 use_mm(ctx->sqo_mm);
4578                                 *mm = ctx->sqo_mm;
4579                         }
4580                 }
4581
4582                 req->ring_file = ring_file;
4583                 req->ring_fd = ring_fd;
4584                 req->has_user = *mm != NULL;
4585                 req->in_async = async;
4586                 req->needs_fixed_file = async;
4587                 trace_io_uring_submit_sqe(ctx, req->user_data, true, async);
4588                 if (!io_submit_sqe(req, sqe, statep, &link))
4589                         break;
4590         }
4591
4592         if (submitted != nr)
4593                 percpu_ref_put_many(&ctx->refs, nr - submitted);
4594         if (link)
4595                 io_queue_link_head(link);
4596         if (statep)
4597                 io_submit_state_end(&state);
4598
4599          /* Commit SQ ring head once we've consumed and submitted all SQEs */
4600         io_commit_sqring(ctx);
4601
4602         return submitted;
4603 }
4604
4605 static int io_sq_thread(void *data)
4606 {
4607         struct io_ring_ctx *ctx = data;
4608         struct mm_struct *cur_mm = NULL;
4609         const struct cred *old_cred;
4610         mm_segment_t old_fs;
4611         DEFINE_WAIT(wait);
4612         unsigned inflight;
4613         unsigned long timeout;
4614         int ret;
4615
4616         complete(&ctx->completions[1]);
4617
4618         old_fs = get_fs();
4619         set_fs(USER_DS);
4620         old_cred = override_creds(ctx->creds);
4621
4622         ret = timeout = inflight = 0;
4623         while (!kthread_should_park()) {
4624                 unsigned int to_submit;
4625
4626                 if (inflight) {
4627                         unsigned nr_events = 0;
4628
4629                         if (ctx->flags & IORING_SETUP_IOPOLL) {
4630                                 /*
4631                                  * inflight is the count of the maximum possible
4632                                  * entries we submitted, but it can be smaller
4633                                  * if we dropped some of them. If we don't have
4634                                  * poll entries available, then we know that we
4635                                  * have nothing left to poll for. Reset the
4636                                  * inflight count to zero in that case.
4637                                  */
4638                                 mutex_lock(&ctx->uring_lock);
4639                                 if (!list_empty(&ctx->poll_list))
4640                                         __io_iopoll_check(ctx, &nr_events, 0);
4641                                 else
4642                                         inflight = 0;
4643                                 mutex_unlock(&ctx->uring_lock);
4644                         } else {
4645                                 /*
4646                                  * Normal IO, just pretend everything completed.
4647                                  * We don't have to poll completions for that.
4648                                  */
4649                                 nr_events = inflight;
4650                         }
4651
4652                         inflight -= nr_events;
4653                         if (!inflight)
4654                                 timeout = jiffies + ctx->sq_thread_idle;
4655                 }
4656
4657                 to_submit = io_sqring_entries(ctx);
4658
4659                 /*
4660                  * If submit got -EBUSY, flag us as needing the application
4661                  * to enter the kernel to reap and flush events.
4662                  */
4663                 if (!to_submit || ret == -EBUSY) {
4664                         /*
4665                          * We're polling. If we're within the defined idle
4666                          * period, then let us spin without work before going
4667                          * to sleep. The exception is if we got EBUSY doing
4668                          * more IO, we should wait for the application to
4669                          * reap events and wake us up.
4670                          */
4671                         if (inflight ||
4672                             (!time_after(jiffies, timeout) && ret != -EBUSY)) {
4673                                 cond_resched();
4674                                 continue;
4675                         }
4676
4677                         /*
4678                          * Drop cur_mm before scheduling, we can't hold it for
4679                          * long periods (or over schedule()). Do this before
4680                          * adding ourselves to the waitqueue, as the unuse/drop
4681                          * may sleep.
4682                          */
4683                         if (cur_mm) {
4684                                 unuse_mm(cur_mm);
4685                                 mmput(cur_mm);
4686                                 cur_mm = NULL;
4687                         }
4688
4689                         prepare_to_wait(&ctx->sqo_wait, &wait,
4690                                                 TASK_INTERRUPTIBLE);
4691
4692                         /* Tell userspace we may need a wakeup call */
4693                         ctx->rings->sq_flags |= IORING_SQ_NEED_WAKEUP;
4694                         /* make sure to read SQ tail after writing flags */
4695                         smp_mb();
4696
4697                         to_submit = io_sqring_entries(ctx);
4698                         if (!to_submit || ret == -EBUSY) {
4699                                 if (kthread_should_park()) {
4700                                         finish_wait(&ctx->sqo_wait, &wait);
4701                                         break;
4702                                 }
4703                                 if (signal_pending(current))
4704                                         flush_signals(current);
4705                                 schedule();
4706                                 finish_wait(&ctx->sqo_wait, &wait);
4707
4708                                 ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
4709                                 continue;
4710                         }
4711                         finish_wait(&ctx->sqo_wait, &wait);
4712
4713                         ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
4714                 }
4715
4716                 to_submit = min(to_submit, ctx->sq_entries);
4717                 mutex_lock(&ctx->uring_lock);
4718                 ret = io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm, true);
4719                 mutex_unlock(&ctx->uring_lock);
4720                 if (ret > 0)
4721                         inflight += ret;
4722         }
4723
4724         set_fs(old_fs);
4725         if (cur_mm) {
4726                 unuse_mm(cur_mm);
4727                 mmput(cur_mm);
4728         }
4729         revert_creds(old_cred);
4730
4731         kthread_parkme();
4732
4733         return 0;
4734 }
4735
4736 struct io_wait_queue {
4737         struct wait_queue_entry wq;
4738         struct io_ring_ctx *ctx;
4739         unsigned to_wait;
4740         unsigned nr_timeouts;
4741 };
4742
4743 static inline bool io_should_wake(struct io_wait_queue *iowq, bool noflush)
4744 {
4745         struct io_ring_ctx *ctx = iowq->ctx;
4746
4747         /*
4748          * Wake up if we have enough events, or if a timeout occurred since we
4749          * started waiting. For timeouts, we always want to return to userspace,
4750          * regardless of event count.
4751          */
4752         return io_cqring_events(ctx, noflush) >= iowq->to_wait ||
4753                         atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
4754 }
4755
4756 static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
4757                             int wake_flags, void *key)
4758 {
4759         struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
4760                                                         wq);
4761
4762         /* use noflush == true, as we can't safely rely on locking context */
4763         if (!io_should_wake(iowq, true))
4764                 return -1;
4765
4766         return autoremove_wake_function(curr, mode, wake_flags, key);
4767 }
4768
4769 /*
4770  * Wait until events become available, if we don't already have some. The
4771  * application must reap them itself, as they reside on the shared cq ring.
4772  */
4773 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
4774                           const sigset_t __user *sig, size_t sigsz)
4775 {
4776         struct io_wait_queue iowq = {
4777                 .wq = {
4778                         .private        = current,
4779                         .func           = io_wake_function,
4780                         .entry          = LIST_HEAD_INIT(iowq.wq.entry),
4781                 },
4782                 .ctx            = ctx,
4783                 .to_wait        = min_events,
4784         };
4785         struct io_rings *rings = ctx->rings;
4786         int ret = 0;
4787
4788         if (io_cqring_events(ctx, false) >= min_events)
4789                 return 0;
4790
4791         if (sig) {
4792 #ifdef CONFIG_COMPAT
4793                 if (in_compat_syscall())
4794                         ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
4795                                                       sigsz);
4796                 else
4797 #endif
4798                         ret = set_user_sigmask(sig, sigsz);
4799
4800                 if (ret)
4801                         return ret;
4802         }
4803
4804         iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
4805         trace_io_uring_cqring_wait(ctx, min_events);
4806         do {
4807                 prepare_to_wait_exclusive(&ctx->wait, &iowq.wq,
4808                                                 TASK_INTERRUPTIBLE);
4809                 if (io_should_wake(&iowq, false))
4810                         break;
4811                 schedule();
4812                 if (signal_pending(current)) {
4813                         ret = -EINTR;
4814                         break;
4815                 }
4816         } while (1);
4817         finish_wait(&ctx->wait, &iowq.wq);
4818
4819         restore_saved_sigmask_unless(ret == -EINTR);
4820
4821         return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
4822 }
4823
4824 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
4825 {
4826 #if defined(CONFIG_UNIX)
4827         if (ctx->ring_sock) {
4828                 struct sock *sock = ctx->ring_sock->sk;
4829                 struct sk_buff *skb;
4830
4831                 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
4832                         kfree_skb(skb);
4833         }
4834 #else
4835         int i;
4836
4837         for (i = 0; i < ctx->nr_user_files; i++) {
4838                 struct file *file;
4839
4840                 file = io_file_from_index(ctx, i);
4841                 if (file)
4842                         fput(file);
4843         }
4844 #endif
4845 }
4846
4847 static void io_file_ref_kill(struct percpu_ref *ref)
4848 {
4849         struct fixed_file_data *data;
4850
4851         data = container_of(ref, struct fixed_file_data, refs);
4852         complete(&data->done);
4853 }
4854
4855 static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
4856 {
4857         struct fixed_file_data *data = ctx->file_data;
4858         unsigned nr_tables, i;
4859
4860         if (!data)
4861                 return -ENXIO;
4862
4863         /* protect against inflight atomic switch, which drops the ref */
4864         flush_work(&data->ref_work);
4865         percpu_ref_get(&data->refs);
4866         percpu_ref_kill_and_confirm(&data->refs, io_file_ref_kill);
4867         wait_for_completion(&data->done);
4868         percpu_ref_put(&data->refs);
4869         percpu_ref_exit(&data->refs);
4870
4871         __io_sqe_files_unregister(ctx);
4872         nr_tables = DIV_ROUND_UP(ctx->nr_user_files, IORING_MAX_FILES_TABLE);
4873         for (i = 0; i < nr_tables; i++)
4874                 kfree(data->table[i].files);
4875         kfree(data->table);
4876         kfree(data);
4877         ctx->file_data = NULL;
4878         ctx->nr_user_files = 0;
4879         return 0;
4880 }
4881
4882 static void io_sq_thread_stop(struct io_ring_ctx *ctx)
4883 {
4884         if (ctx->sqo_thread) {
4885                 wait_for_completion(&ctx->completions[1]);
4886                 /*
4887                  * The park is a bit of a work-around, without it we get
4888                  * warning spews on shutdown with SQPOLL set and affinity
4889                  * set to a single CPU.
4890                  */
4891                 kthread_park(ctx->sqo_thread);
4892                 kthread_stop(ctx->sqo_thread);
4893                 ctx->sqo_thread = NULL;
4894         }
4895 }
4896
4897 static void io_finish_async(struct io_ring_ctx *ctx)
4898 {
4899         io_sq_thread_stop(ctx);
4900
4901         if (ctx->io_wq) {
4902                 io_wq_destroy(ctx->io_wq);
4903                 ctx->io_wq = NULL;
4904         }
4905 }
4906
4907 #if defined(CONFIG_UNIX)
4908 /*
4909  * Ensure the UNIX gc is aware of our file set, so we are certain that
4910  * the io_uring can be safely unregistered on process exit, even if we have
4911  * loops in the file referencing.
4912  */
4913 static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
4914 {
4915         struct sock *sk = ctx->ring_sock->sk;
4916         struct scm_fp_list *fpl;
4917         struct sk_buff *skb;
4918         int i, nr_files;
4919
4920         if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
4921                 unsigned long inflight = ctx->user->unix_inflight + nr;
4922
4923                 if (inflight > task_rlimit(current, RLIMIT_NOFILE))
4924                         return -EMFILE;
4925         }
4926
4927         fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
4928         if (!fpl)
4929                 return -ENOMEM;
4930
4931         skb = alloc_skb(0, GFP_KERNEL);
4932         if (!skb) {
4933                 kfree(fpl);
4934                 return -ENOMEM;
4935         }
4936
4937         skb->sk = sk;
4938
4939         nr_files = 0;
4940         fpl->user = get_uid(ctx->user);
4941         for (i = 0; i < nr; i++) {
4942                 struct file *file = io_file_from_index(ctx, i + offset);
4943
4944                 if (!file)
4945                         continue;
4946                 fpl->fp[nr_files] = get_file(file);
4947                 unix_inflight(fpl->user, fpl->fp[nr_files]);
4948                 nr_files++;
4949         }
4950
4951         if (nr_files) {
4952                 fpl->max = SCM_MAX_FD;
4953                 fpl->count = nr_files;
4954                 UNIXCB(skb).fp = fpl;
4955                 skb->destructor = unix_destruct_scm;
4956                 refcount_add(skb->truesize, &sk->sk_wmem_alloc);
4957                 skb_queue_head(&sk->sk_receive_queue, skb);
4958
4959                 for (i = 0; i < nr_files; i++)
4960                         fput(fpl->fp[i]);
4961         } else {
4962                 kfree_skb(skb);
4963                 kfree(fpl);
4964         }
4965
4966         return 0;
4967 }
4968
4969 /*
4970  * If UNIX sockets are enabled, fd passing can cause a reference cycle which
4971  * causes regular reference counting to break down. We rely on the UNIX
4972  * garbage collection to take care of this problem for us.
4973  */
4974 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
4975 {
4976         unsigned left, total;
4977         int ret = 0;
4978
4979         total = 0;
4980         left = ctx->nr_user_files;
4981         while (left) {
4982                 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
4983
4984                 ret = __io_sqe_files_scm(ctx, this_files, total);
4985                 if (ret)
4986                         break;
4987                 left -= this_files;
4988                 total += this_files;
4989         }
4990
4991         if (!ret)
4992                 return 0;
4993
4994         while (total < ctx->nr_user_files) {
4995                 struct file *file = io_file_from_index(ctx, total);
4996
4997                 if (file)
4998                         fput(file);
4999                 total++;
5000         }
5001
5002         return ret;
5003 }
5004 #else
5005 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
5006 {
5007         return 0;
5008 }
5009 #endif
5010
5011 static int io_sqe_alloc_file_tables(struct io_ring_ctx *ctx, unsigned nr_tables,
5012                                     unsigned nr_files)
5013 {
5014         int i;
5015
5016         for (i = 0; i < nr_tables; i++) {
5017                 struct fixed_file_table *table = &ctx->file_data->table[i];
5018                 unsigned this_files;
5019
5020                 this_files = min(nr_files, IORING_MAX_FILES_TABLE);
5021                 table->files = kcalloc(this_files, sizeof(struct file *),
5022                                         GFP_KERNEL);
5023                 if (!table->files)
5024                         break;
5025                 nr_files -= this_files;
5026         }
5027
5028         if (i == nr_tables)
5029                 return 0;
5030
5031         for (i = 0; i < nr_tables; i++) {
5032                 struct fixed_file_table *table = &ctx->file_data->table[i];
5033                 kfree(table->files);
5034         }
5035         return 1;
5036 }
5037
5038 static void io_ring_file_put(struct io_ring_ctx *ctx, struct file *file)
5039 {
5040 #if defined(CONFIG_UNIX)
5041         struct sock *sock = ctx->ring_sock->sk;
5042         struct sk_buff_head list, *head = &sock->sk_receive_queue;
5043         struct sk_buff *skb;
5044         int i;
5045
5046         __skb_queue_head_init(&list);
5047
5048         /*
5049          * Find the skb that holds this file in its SCM_RIGHTS. When found,
5050          * remove this entry and rearrange the file array.
5051          */
5052         skb = skb_dequeue(head);
5053         while (skb) {
5054                 struct scm_fp_list *fp;
5055
5056                 fp = UNIXCB(skb).fp;
5057                 for (i = 0; i < fp->count; i++) {
5058                         int left;
5059
5060                         if (fp->fp[i] != file)
5061                                 continue;
5062
5063                         unix_notinflight(fp->user, fp->fp[i]);
5064                         left = fp->count - 1 - i;
5065                         if (left) {
5066                                 memmove(&fp->fp[i], &fp->fp[i + 1],
5067                                                 left * sizeof(struct file *));
5068                         }
5069                         fp->count--;
5070                         if (!fp->count) {
5071                                 kfree_skb(skb);
5072                                 skb = NULL;
5073                         } else {
5074                                 __skb_queue_tail(&list, skb);
5075                         }
5076                         fput(file);
5077                         file = NULL;
5078                         break;
5079                 }
5080
5081                 if (!file)
5082                         break;
5083
5084                 __skb_queue_tail(&list, skb);
5085
5086                 skb = skb_dequeue(head);
5087         }
5088
5089         if (skb_peek(&list)) {
5090                 spin_lock_irq(&head->lock);
5091                 while ((skb = __skb_dequeue(&list)) != NULL)
5092                         __skb_queue_tail(head, skb);
5093                 spin_unlock_irq(&head->lock);
5094         }
5095 #else
5096         fput(file);
5097 #endif
5098 }
5099
5100 struct io_file_put {
5101         struct llist_node llist;
5102         struct file *file;
5103         struct completion *done;
5104 };
5105
5106 static void io_ring_file_ref_switch(struct work_struct *work)
5107 {
5108         struct io_file_put *pfile, *tmp;
5109         struct fixed_file_data *data;
5110         struct llist_node *node;
5111
5112         data = container_of(work, struct fixed_file_data, ref_work);
5113
5114         while ((node = llist_del_all(&data->put_llist)) != NULL) {
5115                 llist_for_each_entry_safe(pfile, tmp, node, llist) {
5116                         io_ring_file_put(data->ctx, pfile->file);
5117                         if (pfile->done)
5118                                 complete(pfile->done);
5119                         else
5120                                 kfree(pfile);
5121                 }
5122         }
5123
5124         percpu_ref_get(&data->refs);
5125         percpu_ref_switch_to_percpu(&data->refs);
5126 }
5127
5128 static void io_file_data_ref_zero(struct percpu_ref *ref)
5129 {
5130         struct fixed_file_data *data;
5131
5132         data = container_of(ref, struct fixed_file_data, refs);
5133
5134         /* we can't safely switch from inside this context, punt to wq */
5135         queue_work(system_wq, &data->ref_work);
5136 }
5137
5138 static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
5139                                  unsigned nr_args)
5140 {
5141         __s32 __user *fds = (__s32 __user *) arg;
5142         unsigned nr_tables;
5143         struct file *file;
5144         int fd, ret = 0;
5145         unsigned i;
5146
5147         if (ctx->file_data)
5148                 return -EBUSY;
5149         if (!nr_args)
5150                 return -EINVAL;
5151         if (nr_args > IORING_MAX_FIXED_FILES)
5152                 return -EMFILE;
5153
5154         ctx->file_data = kzalloc(sizeof(*ctx->file_data), GFP_KERNEL);
5155         if (!ctx->file_data)
5156                 return -ENOMEM;
5157         ctx->file_data->ctx = ctx;
5158         init_completion(&ctx->file_data->done);
5159
5160         nr_tables = DIV_ROUND_UP(nr_args, IORING_MAX_FILES_TABLE);
5161         ctx->file_data->table = kcalloc(nr_tables,
5162                                         sizeof(struct fixed_file_table),
5163                                         GFP_KERNEL);
5164         if (!ctx->file_data->table) {
5165                 kfree(ctx->file_data);
5166                 ctx->file_data = NULL;
5167                 return -ENOMEM;
5168         }
5169
5170         if (percpu_ref_init(&ctx->file_data->refs, io_file_data_ref_zero,
5171                                 PERCPU_REF_ALLOW_REINIT, GFP_KERNEL)) {
5172                 kfree(ctx->file_data->table);
5173                 kfree(ctx->file_data);
5174                 ctx->file_data = NULL;
5175                 return -ENOMEM;
5176         }
5177         ctx->file_data->put_llist.first = NULL;
5178         INIT_WORK(&ctx->file_data->ref_work, io_ring_file_ref_switch);
5179
5180         if (io_sqe_alloc_file_tables(ctx, nr_tables, nr_args)) {
5181                 percpu_ref_exit(&ctx->file_data->refs);
5182                 kfree(ctx->file_data->table);
5183                 kfree(ctx->file_data);
5184                 ctx->file_data = NULL;
5185                 return -ENOMEM;
5186         }
5187
5188         for (i = 0; i < nr_args; i++, ctx->nr_user_files++) {
5189                 struct fixed_file_table *table;
5190                 unsigned index;
5191
5192                 ret = -EFAULT;
5193                 if (copy_from_user(&fd, &fds[i], sizeof(fd)))
5194                         break;
5195                 /* allow sparse sets */
5196                 if (fd == -1) {
5197                         ret = 0;
5198                         continue;
5199                 }
5200
5201                 table = &ctx->file_data->table[i >> IORING_FILE_TABLE_SHIFT];
5202                 index = i & IORING_FILE_TABLE_MASK;
5203                 file = fget(fd);
5204
5205                 ret = -EBADF;
5206                 if (!file)
5207                         break;
5208
5209                 /*
5210                  * Don't allow io_uring instances to be registered. If UNIX
5211                  * isn't enabled, then this causes a reference cycle and this
5212                  * instance can never get freed. If UNIX is enabled we'll
5213                  * handle it just fine, but there's still no point in allowing
5214                  * a ring fd as it doesn't support regular read/write anyway.
5215                  */
5216                 if (file->f_op == &io_uring_fops) {
5217                         fput(file);
5218                         break;
5219                 }
5220                 ret = 0;
5221                 table->files[index] = file;
5222         }
5223
5224         if (ret) {
5225                 for (i = 0; i < ctx->nr_user_files; i++) {
5226                         file = io_file_from_index(ctx, i);
5227                         if (file)
5228                                 fput(file);
5229                 }
5230                 for (i = 0; i < nr_tables; i++)
5231                         kfree(ctx->file_data->table[i].files);
5232
5233                 kfree(ctx->file_data->table);
5234                 kfree(ctx->file_data);
5235                 ctx->file_data = NULL;
5236                 ctx->nr_user_files = 0;
5237                 return ret;
5238         }
5239
5240         ret = io_sqe_files_scm(ctx);
5241         if (ret)
5242                 io_sqe_files_unregister(ctx);
5243
5244         return ret;
5245 }
5246
5247 static int io_sqe_file_register(struct io_ring_ctx *ctx, struct file *file,
5248                                 int index)
5249 {
5250 #if defined(CONFIG_UNIX)
5251         struct sock *sock = ctx->ring_sock->sk;
5252         struct sk_buff_head *head = &sock->sk_receive_queue;
5253         struct sk_buff *skb;
5254
5255         /*
5256          * See if we can merge this file into an existing skb SCM_RIGHTS
5257          * file set. If there's no room, fall back to allocating a new skb
5258          * and filling it in.
5259          */
5260         spin_lock_irq(&head->lock);
5261         skb = skb_peek(head);
5262         if (skb) {
5263                 struct scm_fp_list *fpl = UNIXCB(skb).fp;
5264
5265                 if (fpl->count < SCM_MAX_FD) {
5266                         __skb_unlink(skb, head);
5267                         spin_unlock_irq(&head->lock);
5268                         fpl->fp[fpl->count] = get_file(file);
5269                         unix_inflight(fpl->user, fpl->fp[fpl->count]);
5270                         fpl->count++;
5271                         spin_lock_irq(&head->lock);
5272                         __skb_queue_head(head, skb);
5273                 } else {
5274                         skb = NULL;
5275                 }
5276         }
5277         spin_unlock_irq(&head->lock);
5278
5279         if (skb) {
5280                 fput(file);
5281                 return 0;
5282         }
5283
5284         return __io_sqe_files_scm(ctx, 1, index);
5285 #else
5286         return 0;
5287 #endif
5288 }
5289
5290 static void io_atomic_switch(struct percpu_ref *ref)
5291 {
5292         struct fixed_file_data *data;
5293
5294         data = container_of(ref, struct fixed_file_data, refs);
5295         clear_bit(FFD_F_ATOMIC, &data->state);
5296 }
5297
5298 static bool io_queue_file_removal(struct fixed_file_data *data,
5299                                   struct file *file)
5300 {
5301         struct io_file_put *pfile, pfile_stack;
5302         DECLARE_COMPLETION_ONSTACK(done);
5303
5304         /*
5305          * If we fail allocating the struct we need for doing async reomval
5306          * of this file, just punt to sync and wait for it.
5307          */
5308         pfile = kzalloc(sizeof(*pfile), GFP_KERNEL);
5309         if (!pfile) {
5310                 pfile = &pfile_stack;
5311                 pfile->done = &done;
5312         }
5313
5314         pfile->file = file;
5315         llist_add(&pfile->llist, &data->put_llist);
5316
5317         if (pfile == &pfile_stack) {
5318                 if (!test_and_set_bit(FFD_F_ATOMIC, &data->state)) {
5319                         percpu_ref_put(&data->refs);
5320                         percpu_ref_switch_to_atomic(&data->refs,
5321                                                         io_atomic_switch);
5322                 }
5323                 wait_for_completion(&done);
5324                 flush_work(&data->ref_work);
5325                 return false;
5326         }
5327
5328         return true;
5329 }
5330
5331 static int __io_sqe_files_update(struct io_ring_ctx *ctx,
5332                                  struct io_uring_files_update *up,
5333                                  unsigned nr_args)
5334 {
5335         struct fixed_file_data *data = ctx->file_data;
5336         bool ref_switch = false;
5337         struct file *file;
5338         __s32 __user *fds;
5339         int fd, i, err;
5340         __u32 done;
5341
5342         if (check_add_overflow(up->offset, nr_args, &done))
5343                 return -EOVERFLOW;
5344         if (done > ctx->nr_user_files)
5345                 return -EINVAL;
5346
5347         done = 0;
5348         fds = u64_to_user_ptr(up->fds);
5349         while (nr_args) {
5350                 struct fixed_file_table *table;
5351                 unsigned index;
5352
5353                 err = 0;
5354                 if (copy_from_user(&fd, &fds[done], sizeof(fd))) {
5355                         err = -EFAULT;
5356                         break;
5357                 }
5358                 i = array_index_nospec(up->offset, ctx->nr_user_files);
5359                 table = &ctx->file_data->table[i >> IORING_FILE_TABLE_SHIFT];
5360                 index = i & IORING_FILE_TABLE_MASK;
5361                 if (table->files[index]) {
5362                         file = io_file_from_index(ctx, index);
5363                         table->files[index] = NULL;
5364                         if (io_queue_file_removal(data, file))
5365                                 ref_switch = true;
5366                 }
5367                 if (fd != -1) {
5368                         file = fget(fd);
5369                         if (!file) {
5370                                 err = -EBADF;
5371                                 break;
5372                         }
5373                         /*
5374                          * Don't allow io_uring instances to be registered. If
5375                          * UNIX isn't enabled, then this causes a reference
5376                          * cycle and this instance can never get freed. If UNIX
5377                          * is enabled we'll handle it just fine, but there's
5378                          * still no point in allowing a ring fd as it doesn't
5379                          * support regular read/write anyway.
5380                          */
5381                         if (file->f_op == &io_uring_fops) {
5382                                 fput(file);
5383                                 err = -EBADF;
5384                                 break;
5385                         }
5386                         table->files[index] = file;
5387                         err = io_sqe_file_register(ctx, file, i);
5388                         if (err)
5389                                 break;
5390                 }
5391                 nr_args--;
5392                 done++;
5393                 up->offset++;
5394         }
5395
5396         if (ref_switch && !test_and_set_bit(FFD_F_ATOMIC, &data->state)) {
5397                 percpu_ref_put(&data->refs);
5398                 percpu_ref_switch_to_atomic(&data->refs, io_atomic_switch);
5399         }
5400
5401         return done ? done : err;
5402 }
5403 static int io_sqe_files_update(struct io_ring_ctx *ctx, void __user *arg,
5404                                unsigned nr_args)
5405 {
5406         struct io_uring_files_update up;
5407
5408         if (!ctx->file_data)
5409                 return -ENXIO;
5410         if (!nr_args)
5411                 return -EINVAL;
5412         if (copy_from_user(&up, arg, sizeof(up)))
5413                 return -EFAULT;
5414         if (up.resv)
5415                 return -EINVAL;
5416
5417         return __io_sqe_files_update(ctx, &up, nr_args);
5418 }
5419
5420 static void io_put_work(struct io_wq_work *work)
5421 {
5422         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
5423
5424         io_put_req(req);
5425 }
5426
5427 static void io_get_work(struct io_wq_work *work)
5428 {
5429         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
5430
5431         refcount_inc(&req->refs);
5432 }
5433
5434 static int io_sq_offload_start(struct io_ring_ctx *ctx,
5435                                struct io_uring_params *p)
5436 {
5437         struct io_wq_data data;
5438         unsigned concurrency;
5439         int ret;
5440
5441         init_waitqueue_head(&ctx->sqo_wait);
5442         mmgrab(current->mm);
5443         ctx->sqo_mm = current->mm;
5444
5445         if (ctx->flags & IORING_SETUP_SQPOLL) {
5446                 ret = -EPERM;
5447                 if (!capable(CAP_SYS_ADMIN))
5448                         goto err;
5449
5450                 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
5451                 if (!ctx->sq_thread_idle)
5452                         ctx->sq_thread_idle = HZ;
5453
5454                 if (p->flags & IORING_SETUP_SQ_AFF) {
5455                         int cpu = p->sq_thread_cpu;
5456
5457                         ret = -EINVAL;
5458                         if (cpu >= nr_cpu_ids)
5459                                 goto err;
5460                         if (!cpu_online(cpu))
5461                                 goto err;
5462
5463                         ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
5464                                                         ctx, cpu,
5465                                                         "io_uring-sq");
5466                 } else {
5467                         ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
5468                                                         "io_uring-sq");
5469                 }
5470                 if (IS_ERR(ctx->sqo_thread)) {
5471                         ret = PTR_ERR(ctx->sqo_thread);
5472                         ctx->sqo_thread = NULL;
5473                         goto err;
5474                 }
5475                 wake_up_process(ctx->sqo_thread);
5476         } else if (p->flags & IORING_SETUP_SQ_AFF) {
5477                 /* Can't have SQ_AFF without SQPOLL */
5478                 ret = -EINVAL;
5479                 goto err;
5480         }
5481
5482         data.mm = ctx->sqo_mm;
5483         data.user = ctx->user;
5484         data.creds = ctx->creds;
5485         data.get_work = io_get_work;
5486         data.put_work = io_put_work;
5487
5488         /* Do QD, or 4 * CPUS, whatever is smallest */
5489         concurrency = min(ctx->sq_entries, 4 * num_online_cpus());
5490         ctx->io_wq = io_wq_create(concurrency, &data);
5491         if (IS_ERR(ctx->io_wq)) {
5492                 ret = PTR_ERR(ctx->io_wq);
5493                 ctx->io_wq = NULL;
5494                 goto err;
5495         }
5496
5497         return 0;
5498 err:
5499         io_finish_async(ctx);
5500         mmdrop(ctx->sqo_mm);
5501         ctx->sqo_mm = NULL;
5502         return ret;
5503 }
5504
5505 static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
5506 {
5507         atomic_long_sub(nr_pages, &user->locked_vm);
5508 }
5509
5510 static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
5511 {
5512         unsigned long page_limit, cur_pages, new_pages;
5513
5514         /* Don't allow more pages than we can safely lock */
5515         page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
5516
5517         do {
5518                 cur_pages = atomic_long_read(&user->locked_vm);
5519                 new_pages = cur_pages + nr_pages;
5520                 if (new_pages > page_limit)
5521                         return -ENOMEM;
5522         } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
5523                                         new_pages) != cur_pages);
5524
5525         return 0;
5526 }
5527
5528 static void io_mem_free(void *ptr)
5529 {
5530         struct page *page;
5531
5532         if (!ptr)
5533                 return;
5534
5535         page = virt_to_head_page(ptr);
5536         if (put_page_testzero(page))
5537                 free_compound_page(page);
5538 }
5539
5540 static void *io_mem_alloc(size_t size)
5541 {
5542         gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
5543                                 __GFP_NORETRY;
5544
5545         return (void *) __get_free_pages(gfp_flags, get_order(size));
5546 }
5547
5548 static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries,
5549                                 size_t *sq_offset)
5550 {
5551         struct io_rings *rings;
5552         size_t off, sq_array_size;
5553
5554         off = struct_size(rings, cqes, cq_entries);
5555         if (off == SIZE_MAX)
5556                 return SIZE_MAX;
5557
5558 #ifdef CONFIG_SMP
5559         off = ALIGN(off, SMP_CACHE_BYTES);
5560         if (off == 0)
5561                 return SIZE_MAX;
5562 #endif
5563
5564         sq_array_size = array_size(sizeof(u32), sq_entries);
5565         if (sq_array_size == SIZE_MAX)
5566                 return SIZE_MAX;
5567
5568         if (check_add_overflow(off, sq_array_size, &off))
5569                 return SIZE_MAX;
5570
5571         if (sq_offset)
5572                 *sq_offset = off;
5573
5574         return off;
5575 }
5576
5577 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
5578 {
5579         size_t pages;
5580
5581         pages = (size_t)1 << get_order(
5582                 rings_size(sq_entries, cq_entries, NULL));
5583         pages += (size_t)1 << get_order(
5584                 array_size(sizeof(struct io_uring_sqe), sq_entries));
5585
5586         return pages;
5587 }
5588
5589 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
5590 {
5591         int i, j;
5592
5593         if (!ctx->user_bufs)
5594                 return -ENXIO;
5595
5596         for (i = 0; i < ctx->nr_user_bufs; i++) {
5597                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
5598
5599                 for (j = 0; j < imu->nr_bvecs; j++)
5600                         put_user_page(imu->bvec[j].bv_page);
5601
5602                 if (ctx->account_mem)
5603                         io_unaccount_mem(ctx->user, imu->nr_bvecs);
5604                 kvfree(imu->bvec);
5605                 imu->nr_bvecs = 0;
5606         }
5607
5608         kfree(ctx->user_bufs);
5609         ctx->user_bufs = NULL;
5610         ctx->nr_user_bufs = 0;
5611         return 0;
5612 }
5613
5614 static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
5615                        void __user *arg, unsigned index)
5616 {
5617         struct iovec __user *src;
5618
5619 #ifdef CONFIG_COMPAT
5620         if (ctx->compat) {
5621                 struct compat_iovec __user *ciovs;
5622                 struct compat_iovec ciov;
5623
5624                 ciovs = (struct compat_iovec __user *) arg;
5625                 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
5626                         return -EFAULT;
5627
5628                 dst->iov_base = u64_to_user_ptr((u64)ciov.iov_base);
5629                 dst->iov_len = ciov.iov_len;
5630                 return 0;
5631         }
5632 #endif
5633         src = (struct iovec __user *) arg;
5634         if (copy_from_user(dst, &src[index], sizeof(*dst)))
5635                 return -EFAULT;
5636         return 0;
5637 }
5638
5639 static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
5640                                   unsigned nr_args)
5641 {
5642         struct vm_area_struct **vmas = NULL;
5643         struct page **pages = NULL;
5644         int i, j, got_pages = 0;
5645         int ret = -EINVAL;
5646
5647         if (ctx->user_bufs)
5648                 return -EBUSY;
5649         if (!nr_args || nr_args > UIO_MAXIOV)
5650                 return -EINVAL;
5651
5652         ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
5653                                         GFP_KERNEL);
5654         if (!ctx->user_bufs)
5655                 return -ENOMEM;
5656
5657         for (i = 0; i < nr_args; i++) {
5658                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
5659                 unsigned long off, start, end, ubuf;
5660                 int pret, nr_pages;
5661                 struct iovec iov;
5662                 size_t size;
5663
5664                 ret = io_copy_iov(ctx, &iov, arg, i);
5665                 if (ret)
5666                         goto err;
5667
5668                 /*
5669                  * Don't impose further limits on the size and buffer
5670                  * constraints here, we'll -EINVAL later when IO is
5671                  * submitted if they are wrong.
5672                  */
5673                 ret = -EFAULT;
5674                 if (!iov.iov_base || !iov.iov_len)
5675                         goto err;
5676
5677                 /* arbitrary limit, but we need something */
5678                 if (iov.iov_len > SZ_1G)
5679                         goto err;
5680
5681                 ubuf = (unsigned long) iov.iov_base;
5682                 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
5683                 start = ubuf >> PAGE_SHIFT;
5684                 nr_pages = end - start;
5685
5686                 if (ctx->account_mem) {
5687                         ret = io_account_mem(ctx->user, nr_pages);
5688                         if (ret)
5689                                 goto err;
5690                 }
5691
5692                 ret = 0;
5693                 if (!pages || nr_pages > got_pages) {
5694                         kfree(vmas);
5695                         kfree(pages);
5696                         pages = kvmalloc_array(nr_pages, sizeof(struct page *),
5697                                                 GFP_KERNEL);
5698                         vmas = kvmalloc_array(nr_pages,
5699                                         sizeof(struct vm_area_struct *),
5700                                         GFP_KERNEL);
5701                         if (!pages || !vmas) {
5702                                 ret = -ENOMEM;
5703                                 if (ctx->account_mem)
5704                                         io_unaccount_mem(ctx->user, nr_pages);
5705                                 goto err;
5706                         }
5707                         got_pages = nr_pages;
5708                 }
5709
5710                 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
5711                                                 GFP_KERNEL);
5712                 ret = -ENOMEM;
5713                 if (!imu->bvec) {
5714                         if (ctx->account_mem)
5715                                 io_unaccount_mem(ctx->user, nr_pages);
5716                         goto err;
5717                 }
5718
5719                 ret = 0;
5720                 down_read(&current->mm->mmap_sem);
5721                 pret = get_user_pages(ubuf, nr_pages,
5722                                       FOLL_WRITE | FOLL_LONGTERM,
5723                                       pages, vmas);
5724                 if (pret == nr_pages) {
5725                         /* don't support file backed memory */
5726                         for (j = 0; j < nr_pages; j++) {
5727                                 struct vm_area_struct *vma = vmas[j];
5728
5729                                 if (vma->vm_file &&
5730                                     !is_file_hugepages(vma->vm_file)) {
5731                                         ret = -EOPNOTSUPP;
5732                                         break;
5733                                 }
5734                         }
5735                 } else {
5736                         ret = pret < 0 ? pret : -EFAULT;
5737                 }
5738                 up_read(&current->mm->mmap_sem);
5739                 if (ret) {
5740                         /*
5741                          * if we did partial map, or found file backed vmas,
5742                          * release any pages we did get
5743                          */
5744                         if (pret > 0)
5745                                 put_user_pages(pages, pret);
5746                         if (ctx->account_mem)
5747                                 io_unaccount_mem(ctx->user, nr_pages);
5748                         kvfree(imu->bvec);
5749                         goto err;
5750                 }
5751
5752                 off = ubuf & ~PAGE_MASK;
5753                 size = iov.iov_len;
5754                 for (j = 0; j < nr_pages; j++) {
5755                         size_t vec_len;
5756
5757                         vec_len = min_t(size_t, size, PAGE_SIZE - off);
5758                         imu->bvec[j].bv_page = pages[j];
5759                         imu->bvec[j].bv_len = vec_len;
5760                         imu->bvec[j].bv_offset = off;
5761                         off = 0;
5762                         size -= vec_len;
5763                 }
5764                 /* store original address for later verification */
5765                 imu->ubuf = ubuf;
5766                 imu->len = iov.iov_len;
5767                 imu->nr_bvecs = nr_pages;
5768
5769                 ctx->nr_user_bufs++;
5770         }
5771         kvfree(pages);
5772         kvfree(vmas);
5773         return 0;
5774 err:
5775         kvfree(pages);
5776         kvfree(vmas);
5777         io_sqe_buffer_unregister(ctx);
5778         return ret;
5779 }
5780
5781 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
5782 {
5783         __s32 __user *fds = arg;
5784         int fd;
5785
5786         if (ctx->cq_ev_fd)
5787                 return -EBUSY;
5788
5789         if (copy_from_user(&fd, fds, sizeof(*fds)))
5790                 return -EFAULT;
5791
5792         ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
5793         if (IS_ERR(ctx->cq_ev_fd)) {
5794                 int ret = PTR_ERR(ctx->cq_ev_fd);
5795                 ctx->cq_ev_fd = NULL;
5796                 return ret;
5797         }
5798
5799         return 0;
5800 }
5801
5802 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
5803 {
5804         if (ctx->cq_ev_fd) {
5805                 eventfd_ctx_put(ctx->cq_ev_fd);
5806                 ctx->cq_ev_fd = NULL;
5807                 return 0;
5808         }
5809
5810         return -ENXIO;
5811 }
5812
5813 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
5814 {
5815         io_finish_async(ctx);
5816         if (ctx->sqo_mm)
5817                 mmdrop(ctx->sqo_mm);
5818
5819         io_iopoll_reap_events(ctx);
5820         io_sqe_buffer_unregister(ctx);
5821         io_sqe_files_unregister(ctx);
5822         io_eventfd_unregister(ctx);
5823
5824 #if defined(CONFIG_UNIX)
5825         if (ctx->ring_sock) {
5826                 ctx->ring_sock->file = NULL; /* so that iput() is called */
5827                 sock_release(ctx->ring_sock);
5828         }
5829 #endif
5830
5831         io_mem_free(ctx->rings);
5832         io_mem_free(ctx->sq_sqes);
5833
5834         percpu_ref_exit(&ctx->refs);
5835         if (ctx->account_mem)
5836                 io_unaccount_mem(ctx->user,
5837                                 ring_pages(ctx->sq_entries, ctx->cq_entries));
5838         free_uid(ctx->user);
5839         put_cred(ctx->creds);
5840         kfree(ctx->completions);
5841         kfree(ctx->cancel_hash);
5842         kmem_cache_free(req_cachep, ctx->fallback_req);
5843         kfree(ctx);
5844 }
5845
5846 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
5847 {
5848         struct io_ring_ctx *ctx = file->private_data;
5849         __poll_t mask = 0;
5850
5851         poll_wait(file, &ctx->cq_wait, wait);
5852         /*
5853          * synchronizes with barrier from wq_has_sleeper call in
5854          * io_commit_cqring
5855          */
5856         smp_rmb();
5857         if (READ_ONCE(ctx->rings->sq.tail) - ctx->cached_sq_head !=
5858             ctx->rings->sq_ring_entries)
5859                 mask |= EPOLLOUT | EPOLLWRNORM;
5860         if (READ_ONCE(ctx->rings->cq.head) != ctx->cached_cq_tail)
5861                 mask |= EPOLLIN | EPOLLRDNORM;
5862
5863         return mask;
5864 }
5865
5866 static int io_uring_fasync(int fd, struct file *file, int on)
5867 {
5868         struct io_ring_ctx *ctx = file->private_data;
5869
5870         return fasync_helper(fd, file, on, &ctx->cq_fasync);
5871 }
5872
5873 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
5874 {
5875         mutex_lock(&ctx->uring_lock);
5876         percpu_ref_kill(&ctx->refs);
5877         mutex_unlock(&ctx->uring_lock);
5878
5879         io_kill_timeouts(ctx);
5880         io_poll_remove_all(ctx);
5881
5882         if (ctx->io_wq)
5883                 io_wq_cancel_all(ctx->io_wq);
5884
5885         io_iopoll_reap_events(ctx);
5886         /* if we failed setting up the ctx, we might not have any rings */
5887         if (ctx->rings)
5888                 io_cqring_overflow_flush(ctx, true);
5889         wait_for_completion(&ctx->completions[0]);
5890         io_ring_ctx_free(ctx);
5891 }
5892
5893 static int io_uring_release(struct inode *inode, struct file *file)
5894 {
5895         struct io_ring_ctx *ctx = file->private_data;
5896
5897         file->private_data = NULL;
5898         io_ring_ctx_wait_and_kill(ctx);
5899         return 0;
5900 }
5901
5902 static void io_uring_cancel_files(struct io_ring_ctx *ctx,
5903                                   struct files_struct *files)
5904 {
5905         struct io_kiocb *req;
5906         DEFINE_WAIT(wait);
5907
5908         while (!list_empty_careful(&ctx->inflight_list)) {
5909                 struct io_kiocb *cancel_req = NULL;
5910
5911                 spin_lock_irq(&ctx->inflight_lock);
5912                 list_for_each_entry(req, &ctx->inflight_list, inflight_entry) {
5913                         if (req->work.files != files)
5914                                 continue;
5915                         /* req is being completed, ignore */
5916                         if (!refcount_inc_not_zero(&req->refs))
5917                                 continue;
5918                         cancel_req = req;
5919                         break;
5920                 }
5921                 if (cancel_req)
5922                         prepare_to_wait(&ctx->inflight_wait, &wait,
5923                                                 TASK_UNINTERRUPTIBLE);
5924                 spin_unlock_irq(&ctx->inflight_lock);
5925
5926                 /* We need to keep going until we don't find a matching req */
5927                 if (!cancel_req)
5928                         break;
5929
5930                 io_wq_cancel_work(ctx->io_wq, &cancel_req->work);
5931                 io_put_req(cancel_req);
5932                 schedule();
5933         }
5934         finish_wait(&ctx->inflight_wait, &wait);
5935 }
5936
5937 static int io_uring_flush(struct file *file, void *data)
5938 {
5939         struct io_ring_ctx *ctx = file->private_data;
5940
5941         io_uring_cancel_files(ctx, data);
5942         if (fatal_signal_pending(current) || (current->flags & PF_EXITING)) {
5943                 io_cqring_overflow_flush(ctx, true);
5944                 io_wq_cancel_all(ctx->io_wq);
5945         }
5946         return 0;
5947 }
5948
5949 static void *io_uring_validate_mmap_request(struct file *file,
5950                                             loff_t pgoff, size_t sz)
5951 {
5952         struct io_ring_ctx *ctx = file->private_data;
5953         loff_t offset = pgoff << PAGE_SHIFT;
5954         struct page *page;
5955         void *ptr;
5956
5957         switch (offset) {
5958         case IORING_OFF_SQ_RING:
5959         case IORING_OFF_CQ_RING:
5960                 ptr = ctx->rings;
5961                 break;
5962         case IORING_OFF_SQES:
5963                 ptr = ctx->sq_sqes;
5964                 break;
5965         default:
5966                 return ERR_PTR(-EINVAL);
5967         }
5968
5969         page = virt_to_head_page(ptr);
5970         if (sz > page_size(page))
5971                 return ERR_PTR(-EINVAL);
5972
5973         return ptr;
5974 }
5975
5976 #ifdef CONFIG_MMU
5977
5978 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
5979 {
5980         size_t sz = vma->vm_end - vma->vm_start;
5981         unsigned long pfn;
5982         void *ptr;
5983
5984         ptr = io_uring_validate_mmap_request(file, vma->vm_pgoff, sz);
5985         if (IS_ERR(ptr))
5986                 return PTR_ERR(ptr);
5987
5988         pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
5989         return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
5990 }
5991
5992 #else /* !CONFIG_MMU */
5993
5994 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
5995 {
5996         return vma->vm_flags & (VM_SHARED | VM_MAYSHARE) ? 0 : -EINVAL;
5997 }
5998
5999 static unsigned int io_uring_nommu_mmap_capabilities(struct file *file)
6000 {
6001         return NOMMU_MAP_DIRECT | NOMMU_MAP_READ | NOMMU_MAP_WRITE;
6002 }
6003
6004 static unsigned long io_uring_nommu_get_unmapped_area(struct file *file,
6005         unsigned long addr, unsigned long len,
6006         unsigned long pgoff, unsigned long flags)
6007 {
6008         void *ptr;
6009
6010         ptr = io_uring_validate_mmap_request(file, pgoff, len);
6011         if (IS_ERR(ptr))
6012                 return PTR_ERR(ptr);
6013
6014         return (unsigned long) ptr;
6015 }
6016
6017 #endif /* !CONFIG_MMU */
6018
6019 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
6020                 u32, min_complete, u32, flags, const sigset_t __user *, sig,
6021                 size_t, sigsz)
6022 {
6023         struct io_ring_ctx *ctx;
6024         long ret = -EBADF;
6025         int submitted = 0;
6026         struct fd f;
6027
6028         if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
6029                 return -EINVAL;
6030
6031         f = fdget(fd);
6032         if (!f.file)
6033                 return -EBADF;
6034
6035         ret = -EOPNOTSUPP;
6036         if (f.file->f_op != &io_uring_fops)
6037                 goto out_fput;
6038
6039         ret = -ENXIO;
6040         ctx = f.file->private_data;
6041         if (!percpu_ref_tryget(&ctx->refs))
6042                 goto out_fput;
6043
6044         /*
6045          * For SQ polling, the thread will do all submissions and completions.
6046          * Just return the requested submit count, and wake the thread if
6047          * we were asked to.
6048          */
6049         ret = 0;
6050         if (ctx->flags & IORING_SETUP_SQPOLL) {
6051                 if (!list_empty_careful(&ctx->cq_overflow_list))
6052                         io_cqring_overflow_flush(ctx, false);
6053                 if (flags & IORING_ENTER_SQ_WAKEUP)
6054                         wake_up(&ctx->sqo_wait);
6055                 submitted = to_submit;
6056         } else if (to_submit) {
6057                 struct mm_struct *cur_mm;
6058
6059                 if (current->mm != ctx->sqo_mm ||
6060                     current_cred() != ctx->creds) {
6061                         ret = -EPERM;
6062                         goto out;
6063                 }
6064
6065                 to_submit = min(to_submit, ctx->sq_entries);
6066                 mutex_lock(&ctx->uring_lock);
6067                 /* already have mm, so io_submit_sqes() won't try to grab it */
6068                 cur_mm = ctx->sqo_mm;
6069                 submitted = io_submit_sqes(ctx, to_submit, f.file, fd,
6070                                            &cur_mm, false);
6071                 mutex_unlock(&ctx->uring_lock);
6072
6073                 if (submitted != to_submit)
6074                         goto out;
6075         }
6076         if (flags & IORING_ENTER_GETEVENTS) {
6077                 unsigned nr_events = 0;
6078
6079                 min_complete = min(min_complete, ctx->cq_entries);
6080
6081                 if (ctx->flags & IORING_SETUP_IOPOLL) {
6082                         ret = io_iopoll_check(ctx, &nr_events, min_complete);
6083                 } else {
6084                         ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
6085                 }
6086         }
6087
6088 out:
6089         percpu_ref_put(&ctx->refs);
6090 out_fput:
6091         fdput(f);
6092         return submitted ? submitted : ret;
6093 }
6094
6095 static const struct file_operations io_uring_fops = {
6096         .release        = io_uring_release,
6097         .flush          = io_uring_flush,
6098         .mmap           = io_uring_mmap,
6099 #ifndef CONFIG_MMU
6100         .get_unmapped_area = io_uring_nommu_get_unmapped_area,
6101         .mmap_capabilities = io_uring_nommu_mmap_capabilities,
6102 #endif
6103         .poll           = io_uring_poll,
6104         .fasync         = io_uring_fasync,
6105 };
6106
6107 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
6108                                   struct io_uring_params *p)
6109 {
6110         struct io_rings *rings;
6111         size_t size, sq_array_offset;
6112
6113         size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset);
6114         if (size == SIZE_MAX)
6115                 return -EOVERFLOW;
6116
6117         rings = io_mem_alloc(size);
6118         if (!rings)
6119                 return -ENOMEM;
6120
6121         ctx->rings = rings;
6122         ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
6123         rings->sq_ring_mask = p->sq_entries - 1;
6124         rings->cq_ring_mask = p->cq_entries - 1;
6125         rings->sq_ring_entries = p->sq_entries;
6126         rings->cq_ring_entries = p->cq_entries;
6127         ctx->sq_mask = rings->sq_ring_mask;
6128         ctx->cq_mask = rings->cq_ring_mask;
6129         ctx->sq_entries = rings->sq_ring_entries;
6130         ctx->cq_entries = rings->cq_ring_entries;
6131
6132         size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
6133         if (size == SIZE_MAX) {
6134                 io_mem_free(ctx->rings);
6135                 ctx->rings = NULL;
6136                 return -EOVERFLOW;
6137         }
6138
6139         ctx->sq_sqes = io_mem_alloc(size);
6140         if (!ctx->sq_sqes) {
6141                 io_mem_free(ctx->rings);
6142                 ctx->rings = NULL;
6143                 return -ENOMEM;
6144         }
6145
6146         return 0;
6147 }
6148
6149 /*
6150  * Allocate an anonymous fd, this is what constitutes the application
6151  * visible backing of an io_uring instance. The application mmaps this
6152  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
6153  * we have to tie this fd to a socket for file garbage collection purposes.
6154  */
6155 static int io_uring_get_fd(struct io_ring_ctx *ctx)
6156 {
6157         struct file *file;
6158         int ret;
6159
6160 #if defined(CONFIG_UNIX)
6161         ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
6162                                 &ctx->ring_sock);
6163         if (ret)
6164                 return ret;
6165 #endif
6166
6167         ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
6168         if (ret < 0)
6169                 goto err;
6170
6171         file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
6172                                         O_RDWR | O_CLOEXEC);
6173         if (IS_ERR(file)) {
6174                 put_unused_fd(ret);
6175                 ret = PTR_ERR(file);
6176                 goto err;
6177         }
6178
6179 #if defined(CONFIG_UNIX)
6180         ctx->ring_sock->file = file;
6181 #endif
6182         fd_install(ret, file);
6183         return ret;
6184 err:
6185 #if defined(CONFIG_UNIX)
6186         sock_release(ctx->ring_sock);
6187         ctx->ring_sock = NULL;
6188 #endif
6189         return ret;
6190 }
6191
6192 static int io_uring_create(unsigned entries, struct io_uring_params *p)
6193 {
6194         struct user_struct *user = NULL;
6195         struct io_ring_ctx *ctx;
6196         bool account_mem;
6197         int ret;
6198
6199         if (!entries || entries > IORING_MAX_ENTRIES)
6200                 return -EINVAL;
6201
6202         /*
6203          * Use twice as many entries for the CQ ring. It's possible for the
6204          * application to drive a higher depth than the size of the SQ ring,
6205          * since the sqes are only used at submission time. This allows for
6206          * some flexibility in overcommitting a bit. If the application has
6207          * set IORING_SETUP_CQSIZE, it will have passed in the desired number
6208          * of CQ ring entries manually.
6209          */
6210         p->sq_entries = roundup_pow_of_two(entries);
6211         if (p->flags & IORING_SETUP_CQSIZE) {
6212                 /*
6213                  * If IORING_SETUP_CQSIZE is set, we do the same roundup
6214                  * to a power-of-two, if it isn't already. We do NOT impose
6215                  * any cq vs sq ring sizing.
6216                  */
6217                 if (p->cq_entries < p->sq_entries || p->cq_entries > IORING_MAX_CQ_ENTRIES)
6218                         return -EINVAL;
6219                 p->cq_entries = roundup_pow_of_two(p->cq_entries);
6220         } else {
6221                 p->cq_entries = 2 * p->sq_entries;
6222         }
6223
6224         user = get_uid(current_user());
6225         account_mem = !capable(CAP_IPC_LOCK);
6226
6227         if (account_mem) {
6228                 ret = io_account_mem(user,
6229                                 ring_pages(p->sq_entries, p->cq_entries));
6230                 if (ret) {
6231                         free_uid(user);
6232                         return ret;
6233                 }
6234         }
6235
6236         ctx = io_ring_ctx_alloc(p);
6237         if (!ctx) {
6238                 if (account_mem)
6239                         io_unaccount_mem(user, ring_pages(p->sq_entries,
6240                                                                 p->cq_entries));
6241                 free_uid(user);
6242                 return -ENOMEM;
6243         }
6244         ctx->compat = in_compat_syscall();
6245         ctx->account_mem = account_mem;
6246         ctx->user = user;
6247         ctx->creds = get_current_cred();
6248
6249         ret = io_allocate_scq_urings(ctx, p);
6250         if (ret)
6251                 goto err;
6252
6253         ret = io_sq_offload_start(ctx, p);
6254         if (ret)
6255                 goto err;
6256
6257         memset(&p->sq_off, 0, sizeof(p->sq_off));
6258         p->sq_off.head = offsetof(struct io_rings, sq.head);
6259         p->sq_off.tail = offsetof(struct io_rings, sq.tail);
6260         p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
6261         p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
6262         p->sq_off.flags = offsetof(struct io_rings, sq_flags);
6263         p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
6264         p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
6265
6266         memset(&p->cq_off, 0, sizeof(p->cq_off));
6267         p->cq_off.head = offsetof(struct io_rings, cq.head);
6268         p->cq_off.tail = offsetof(struct io_rings, cq.tail);
6269         p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
6270         p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
6271         p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
6272         p->cq_off.cqes = offsetof(struct io_rings, cqes);
6273
6274         /*
6275          * Install ring fd as the very last thing, so we don't risk someone
6276          * having closed it before we finish setup
6277          */
6278         ret = io_uring_get_fd(ctx);
6279         if (ret < 0)
6280                 goto err;
6281
6282         p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP |
6283                         IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS;
6284         trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
6285         return ret;
6286 err:
6287         io_ring_ctx_wait_and_kill(ctx);
6288         return ret;
6289 }
6290
6291 /*
6292  * Sets up an aio uring context, and returns the fd. Applications asks for a
6293  * ring size, we return the actual sq/cq ring sizes (among other things) in the
6294  * params structure passed in.
6295  */
6296 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
6297 {
6298         struct io_uring_params p;
6299         long ret;
6300         int i;
6301
6302         if (copy_from_user(&p, params, sizeof(p)))
6303                 return -EFAULT;
6304         for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
6305                 if (p.resv[i])
6306                         return -EINVAL;
6307         }
6308
6309         if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
6310                         IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE))
6311                 return -EINVAL;
6312
6313         ret = io_uring_create(entries, &p);
6314         if (ret < 0)
6315                 return ret;
6316
6317         if (copy_to_user(params, &p, sizeof(p)))
6318                 return -EFAULT;
6319
6320         return ret;
6321 }
6322
6323 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
6324                 struct io_uring_params __user *, params)
6325 {
6326         return io_uring_setup(entries, params);
6327 }
6328
6329 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
6330                                void __user *arg, unsigned nr_args)
6331         __releases(ctx->uring_lock)
6332         __acquires(ctx->uring_lock)
6333 {
6334         int ret;
6335
6336         /*
6337          * We're inside the ring mutex, if the ref is already dying, then
6338          * someone else killed the ctx or is already going through
6339          * io_uring_register().
6340          */
6341         if (percpu_ref_is_dying(&ctx->refs))
6342                 return -ENXIO;
6343
6344         if (opcode != IORING_UNREGISTER_FILES &&
6345             opcode != IORING_REGISTER_FILES_UPDATE) {
6346                 percpu_ref_kill(&ctx->refs);
6347
6348                 /*
6349                  * Drop uring mutex before waiting for references to exit. If
6350                  * another thread is currently inside io_uring_enter() it might
6351                  * need to grab the uring_lock to make progress. If we hold it
6352                  * here across the drain wait, then we can deadlock. It's safe
6353                  * to drop the mutex here, since no new references will come in
6354                  * after we've killed the percpu ref.
6355                  */
6356                 mutex_unlock(&ctx->uring_lock);
6357                 wait_for_completion(&ctx->completions[0]);
6358                 mutex_lock(&ctx->uring_lock);
6359         }
6360
6361         switch (opcode) {
6362         case IORING_REGISTER_BUFFERS:
6363                 ret = io_sqe_buffer_register(ctx, arg, nr_args);
6364                 break;
6365         case IORING_UNREGISTER_BUFFERS:
6366                 ret = -EINVAL;
6367                 if (arg || nr_args)
6368                         break;
6369                 ret = io_sqe_buffer_unregister(ctx);
6370                 break;
6371         case IORING_REGISTER_FILES:
6372                 ret = io_sqe_files_register(ctx, arg, nr_args);
6373                 break;
6374         case IORING_UNREGISTER_FILES:
6375                 ret = -EINVAL;
6376                 if (arg || nr_args)
6377                         break;
6378                 ret = io_sqe_files_unregister(ctx);
6379                 break;
6380         case IORING_REGISTER_FILES_UPDATE:
6381                 ret = io_sqe_files_update(ctx, arg, nr_args);
6382                 break;
6383         case IORING_REGISTER_EVENTFD:
6384                 ret = -EINVAL;
6385                 if (nr_args != 1)
6386                         break;
6387                 ret = io_eventfd_register(ctx, arg);
6388                 break;
6389         case IORING_UNREGISTER_EVENTFD:
6390                 ret = -EINVAL;
6391                 if (arg || nr_args)
6392                         break;
6393                 ret = io_eventfd_unregister(ctx);
6394                 break;
6395         default:
6396                 ret = -EINVAL;
6397                 break;
6398         }
6399
6400
6401         if (opcode != IORING_UNREGISTER_FILES &&
6402             opcode != IORING_REGISTER_FILES_UPDATE) {
6403                 /* bring the ctx back to life */
6404                 reinit_completion(&ctx->completions[0]);
6405                 percpu_ref_reinit(&ctx->refs);
6406         }
6407         return ret;
6408 }
6409
6410 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
6411                 void __user *, arg, unsigned int, nr_args)
6412 {
6413         struct io_ring_ctx *ctx;
6414         long ret = -EBADF;
6415         struct fd f;
6416
6417         f = fdget(fd);
6418         if (!f.file)
6419                 return -EBADF;
6420
6421         ret = -EOPNOTSUPP;
6422         if (f.file->f_op != &io_uring_fops)
6423                 goto out_fput;
6424
6425         ctx = f.file->private_data;
6426
6427         mutex_lock(&ctx->uring_lock);
6428         ret = __io_uring_register(ctx, opcode, arg, nr_args);
6429         mutex_unlock(&ctx->uring_lock);
6430         trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs,
6431                                                         ctx->cq_ev_fd != NULL, ret);
6432 out_fput:
6433         fdput(f);
6434         return ret;
6435 }
6436
6437 static int __init io_uring_init(void)
6438 {
6439         BUILD_BUG_ON(ARRAY_SIZE(io_op_defs) != IORING_OP_LAST);
6440         req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
6441         return 0;
6442 };
6443 __initcall(io_uring_init);