io_uring: batch io_kiocb allocation
[linux-2.6-block.git] / fs / io_uring.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Shared application/kernel submission and completion ring pairs, for
4  * supporting fast/efficient IO.
5  *
6  * A note on the read/write ordering memory barriers that are matched between
7  * the application and kernel side. When the application reads the CQ ring
8  * tail, it must use an appropriate smp_rmb() to order with the smp_wmb()
9  * the kernel uses after writing the tail. Failure to do so could cause a
10  * delay in when the application notices that completion events available.
11  * This isn't a fatal condition. Likewise, the application must use an
12  * appropriate smp_wmb() both before writing the SQ tail, and after writing
13  * the SQ tail. The first one orders the sqe writes with the tail write, and
14  * the latter is paired with the smp_rmb() the kernel will issue before
15  * reading the SQ tail on submission.
16  *
17  * Also see the examples in the liburing library:
18  *
19  *      git://git.kernel.dk/liburing
20  *
21  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
22  * from data shared between the kernel and application. This is done both
23  * for ordering purposes, but also to ensure that once a value is loaded from
24  * data that the application could potentially modify, it remains stable.
25  *
26  * Copyright (C) 2018-2019 Jens Axboe
27  * Copyright (c) 2018-2019 Christoph Hellwig
28  */
29 #include <linux/kernel.h>
30 #include <linux/init.h>
31 #include <linux/errno.h>
32 #include <linux/syscalls.h>
33 #include <linux/compat.h>
34 #include <linux/refcount.h>
35 #include <linux/uio.h>
36
37 #include <linux/sched/signal.h>
38 #include <linux/fs.h>
39 #include <linux/file.h>
40 #include <linux/fdtable.h>
41 #include <linux/mm.h>
42 #include <linux/mman.h>
43 #include <linux/mmu_context.h>
44 #include <linux/percpu.h>
45 #include <linux/slab.h>
46 #include <linux/workqueue.h>
47 #include <linux/blkdev.h>
48 #include <linux/net.h>
49 #include <net/sock.h>
50 #include <net/af_unix.h>
51 #include <linux/anon_inodes.h>
52 #include <linux/sched/mm.h>
53 #include <linux/uaccess.h>
54 #include <linux/nospec.h>
55
56 #include <uapi/linux/io_uring.h>
57
58 #include "internal.h"
59
60 #define IORING_MAX_ENTRIES      4096
61
62 struct io_uring {
63         u32 head ____cacheline_aligned_in_smp;
64         u32 tail ____cacheline_aligned_in_smp;
65 };
66
67 struct io_sq_ring {
68         struct io_uring         r;
69         u32                     ring_mask;
70         u32                     ring_entries;
71         u32                     dropped;
72         u32                     flags;
73         u32                     array[];
74 };
75
76 struct io_cq_ring {
77         struct io_uring         r;
78         u32                     ring_mask;
79         u32                     ring_entries;
80         u32                     overflow;
81         struct io_uring_cqe     cqes[];
82 };
83
84 struct io_ring_ctx {
85         struct {
86                 struct percpu_ref       refs;
87         } ____cacheline_aligned_in_smp;
88
89         struct {
90                 unsigned int            flags;
91                 bool                    compat;
92                 bool                    account_mem;
93
94                 /* SQ ring */
95                 struct io_sq_ring       *sq_ring;
96                 unsigned                cached_sq_head;
97                 unsigned                sq_entries;
98                 unsigned                sq_mask;
99                 struct io_uring_sqe     *sq_sqes;
100         } ____cacheline_aligned_in_smp;
101
102         /* IO offload */
103         struct workqueue_struct *sqo_wq;
104         struct mm_struct        *sqo_mm;
105
106         struct {
107                 /* CQ ring */
108                 struct io_cq_ring       *cq_ring;
109                 unsigned                cached_cq_tail;
110                 unsigned                cq_entries;
111                 unsigned                cq_mask;
112                 struct wait_queue_head  cq_wait;
113                 struct fasync_struct    *cq_fasync;
114         } ____cacheline_aligned_in_smp;
115
116         struct user_struct      *user;
117
118         struct completion       ctx_done;
119
120         struct {
121                 struct mutex            uring_lock;
122                 wait_queue_head_t       wait;
123         } ____cacheline_aligned_in_smp;
124
125         struct {
126                 spinlock_t              completion_lock;
127                 bool                    poll_multi_file;
128                 /*
129                  * ->poll_list is protected by the ctx->uring_lock for
130                  * io_uring instances that don't use IORING_SETUP_SQPOLL.
131                  * For SQPOLL, only the single threaded io_sq_thread() will
132                  * manipulate the list, hence no extra locking is needed there.
133                  */
134                 struct list_head        poll_list;
135         } ____cacheline_aligned_in_smp;
136
137 #if defined(CONFIG_UNIX)
138         struct socket           *ring_sock;
139 #endif
140 };
141
142 struct sqe_submit {
143         const struct io_uring_sqe       *sqe;
144         unsigned short                  index;
145         bool                            has_user;
146         bool                            needs_lock;
147 };
148
149 struct io_kiocb {
150         struct kiocb            rw;
151
152         struct sqe_submit       submit;
153
154         struct io_ring_ctx      *ctx;
155         struct list_head        list;
156         unsigned int            flags;
157 #define REQ_F_FORCE_NONBLOCK    1       /* inline submission attempt */
158 #define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
159         u64                     user_data;
160         u64                     error;
161
162         struct work_struct      work;
163 };
164
165 #define IO_PLUG_THRESHOLD               2
166 #define IO_IOPOLL_BATCH                 8
167
168 struct io_submit_state {
169         struct blk_plug         plug;
170
171         /*
172          * io_kiocb alloc cache
173          */
174         void                    *reqs[IO_IOPOLL_BATCH];
175         unsigned                int free_reqs;
176         unsigned                int cur_req;
177
178         /*
179          * File reference cache
180          */
181         struct file             *file;
182         unsigned int            fd;
183         unsigned int            has_refs;
184         unsigned int            used_refs;
185         unsigned int            ios_left;
186 };
187
188 static struct kmem_cache *req_cachep;
189
190 static const struct file_operations io_uring_fops;
191
192 struct sock *io_uring_get_socket(struct file *file)
193 {
194 #if defined(CONFIG_UNIX)
195         if (file->f_op == &io_uring_fops) {
196                 struct io_ring_ctx *ctx = file->private_data;
197
198                 return ctx->ring_sock->sk;
199         }
200 #endif
201         return NULL;
202 }
203 EXPORT_SYMBOL(io_uring_get_socket);
204
205 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
206 {
207         struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
208
209         complete(&ctx->ctx_done);
210 }
211
212 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
213 {
214         struct io_ring_ctx *ctx;
215
216         ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
217         if (!ctx)
218                 return NULL;
219
220         if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free, 0, GFP_KERNEL)) {
221                 kfree(ctx);
222                 return NULL;
223         }
224
225         ctx->flags = p->flags;
226         init_waitqueue_head(&ctx->cq_wait);
227         init_completion(&ctx->ctx_done);
228         mutex_init(&ctx->uring_lock);
229         init_waitqueue_head(&ctx->wait);
230         spin_lock_init(&ctx->completion_lock);
231         INIT_LIST_HEAD(&ctx->poll_list);
232         return ctx;
233 }
234
235 static void io_commit_cqring(struct io_ring_ctx *ctx)
236 {
237         struct io_cq_ring *ring = ctx->cq_ring;
238
239         if (ctx->cached_cq_tail != READ_ONCE(ring->r.tail)) {
240                 /* order cqe stores with ring update */
241                 smp_store_release(&ring->r.tail, ctx->cached_cq_tail);
242
243                 /*
244                  * Write sider barrier of tail update, app has read side. See
245                  * comment at the top of this file.
246                  */
247                 smp_wmb();
248
249                 if (wq_has_sleeper(&ctx->cq_wait)) {
250                         wake_up_interruptible(&ctx->cq_wait);
251                         kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
252                 }
253         }
254 }
255
256 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
257 {
258         struct io_cq_ring *ring = ctx->cq_ring;
259         unsigned tail;
260
261         tail = ctx->cached_cq_tail;
262         /* See comment at the top of the file */
263         smp_rmb();
264         if (tail + 1 == READ_ONCE(ring->r.head))
265                 return NULL;
266
267         ctx->cached_cq_tail++;
268         return &ring->cqes[tail & ctx->cq_mask];
269 }
270
271 static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
272                                  long res, unsigned ev_flags)
273 {
274         struct io_uring_cqe *cqe;
275
276         /*
277          * If we can't get a cq entry, userspace overflowed the
278          * submission (by quite a lot). Increment the overflow count in
279          * the ring.
280          */
281         cqe = io_get_cqring(ctx);
282         if (cqe) {
283                 WRITE_ONCE(cqe->user_data, ki_user_data);
284                 WRITE_ONCE(cqe->res, res);
285                 WRITE_ONCE(cqe->flags, ev_flags);
286         } else {
287                 unsigned overflow = READ_ONCE(ctx->cq_ring->overflow);
288
289                 WRITE_ONCE(ctx->cq_ring->overflow, overflow + 1);
290         }
291 }
292
293 static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 ki_user_data,
294                                 long res, unsigned ev_flags)
295 {
296         unsigned long flags;
297
298         spin_lock_irqsave(&ctx->completion_lock, flags);
299         io_cqring_fill_event(ctx, ki_user_data, res, ev_flags);
300         io_commit_cqring(ctx);
301         spin_unlock_irqrestore(&ctx->completion_lock, flags);
302
303         if (waitqueue_active(&ctx->wait))
304                 wake_up(&ctx->wait);
305 }
306
307 static void io_ring_drop_ctx_refs(struct io_ring_ctx *ctx, unsigned refs)
308 {
309         percpu_ref_put_many(&ctx->refs, refs);
310
311         if (waitqueue_active(&ctx->wait))
312                 wake_up(&ctx->wait);
313 }
314
315 static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
316                                    struct io_submit_state *state)
317 {
318         struct io_kiocb *req;
319
320         if (!percpu_ref_tryget(&ctx->refs))
321                 return NULL;
322
323         if (!state) {
324                 req = kmem_cache_alloc(req_cachep, __GFP_NOWARN);
325                 if (unlikely(!req))
326                         goto out;
327         } else if (!state->free_reqs) {
328                 size_t sz;
329                 int ret;
330
331                 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
332                 ret = kmem_cache_alloc_bulk(req_cachep, __GFP_NOWARN, sz,
333                                                 state->reqs);
334                 if (unlikely(ret <= 0))
335                         goto out;
336                 state->free_reqs = ret - 1;
337                 state->cur_req = 1;
338                 req = state->reqs[0];
339         } else {
340                 req = state->reqs[state->cur_req];
341                 state->free_reqs--;
342                 state->cur_req++;
343         }
344
345         req->ctx = ctx;
346         req->flags = 0;
347         return req;
348 out:
349         io_ring_drop_ctx_refs(ctx, 1);
350         return NULL;
351 }
352
353 static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
354 {
355         if (*nr) {
356                 kmem_cache_free_bulk(req_cachep, *nr, reqs);
357                 io_ring_drop_ctx_refs(ctx, *nr);
358                 *nr = 0;
359         }
360 }
361
362 static void io_free_req(struct io_kiocb *req)
363 {
364         io_ring_drop_ctx_refs(req->ctx, 1);
365         kmem_cache_free(req_cachep, req);
366 }
367
368 /*
369  * Find and free completed poll iocbs
370  */
371 static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
372                                struct list_head *done)
373 {
374         void *reqs[IO_IOPOLL_BATCH];
375         int file_count, to_free;
376         struct file *file = NULL;
377         struct io_kiocb *req;
378
379         file_count = to_free = 0;
380         while (!list_empty(done)) {
381                 req = list_first_entry(done, struct io_kiocb, list);
382                 list_del(&req->list);
383
384                 io_cqring_fill_event(ctx, req->user_data, req->error, 0);
385
386                 reqs[to_free++] = req;
387                 (*nr_events)++;
388
389                 /*
390                  * Batched puts of the same file, to avoid dirtying the
391                  * file usage count multiple times, if avoidable.
392                  */
393                 if (!file) {
394                         file = req->rw.ki_filp;
395                         file_count = 1;
396                 } else if (file == req->rw.ki_filp) {
397                         file_count++;
398                 } else {
399                         fput_many(file, file_count);
400                         file = req->rw.ki_filp;
401                         file_count = 1;
402                 }
403
404                 if (to_free == ARRAY_SIZE(reqs))
405                         io_free_req_many(ctx, reqs, &to_free);
406         }
407         io_commit_cqring(ctx);
408
409         if (file)
410                 fput_many(file, file_count);
411         io_free_req_many(ctx, reqs, &to_free);
412 }
413
414 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
415                         long min)
416 {
417         struct io_kiocb *req, *tmp;
418         LIST_HEAD(done);
419         bool spin;
420         int ret;
421
422         /*
423          * Only spin for completions if we don't have multiple devices hanging
424          * off our complete list, and we're under the requested amount.
425          */
426         spin = !ctx->poll_multi_file && *nr_events < min;
427
428         ret = 0;
429         list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
430                 struct kiocb *kiocb = &req->rw;
431
432                 /*
433                  * Move completed entries to our local list. If we find a
434                  * request that requires polling, break out and complete
435                  * the done list first, if we have entries there.
436                  */
437                 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
438                         list_move_tail(&req->list, &done);
439                         continue;
440                 }
441                 if (!list_empty(&done))
442                         break;
443
444                 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
445                 if (ret < 0)
446                         break;
447
448                 if (ret && spin)
449                         spin = false;
450                 ret = 0;
451         }
452
453         if (!list_empty(&done))
454                 io_iopoll_complete(ctx, nr_events, &done);
455
456         return ret;
457 }
458
459 /*
460  * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a
461  * non-spinning poll check - we'll still enter the driver poll loop, but only
462  * as a non-spinning completion check.
463  */
464 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
465                                 long min)
466 {
467         while (!list_empty(&ctx->poll_list)) {
468                 int ret;
469
470                 ret = io_do_iopoll(ctx, nr_events, min);
471                 if (ret < 0)
472                         return ret;
473                 if (!min || *nr_events >= min)
474                         return 0;
475         }
476
477         return 1;
478 }
479
480 /*
481  * We can't just wait for polled events to come to us, we have to actively
482  * find and complete them.
483  */
484 static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
485 {
486         if (!(ctx->flags & IORING_SETUP_IOPOLL))
487                 return;
488
489         mutex_lock(&ctx->uring_lock);
490         while (!list_empty(&ctx->poll_list)) {
491                 unsigned int nr_events = 0;
492
493                 io_iopoll_getevents(ctx, &nr_events, 1);
494         }
495         mutex_unlock(&ctx->uring_lock);
496 }
497
498 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
499                            long min)
500 {
501         int ret = 0;
502
503         do {
504                 int tmin = 0;
505
506                 if (*nr_events < min)
507                         tmin = min - *nr_events;
508
509                 ret = io_iopoll_getevents(ctx, nr_events, tmin);
510                 if (ret <= 0)
511                         break;
512                 ret = 0;
513         } while (min && !*nr_events && !need_resched());
514
515         return ret;
516 }
517
518 static void kiocb_end_write(struct kiocb *kiocb)
519 {
520         if (kiocb->ki_flags & IOCB_WRITE) {
521                 struct inode *inode = file_inode(kiocb->ki_filp);
522
523                 /*
524                  * Tell lockdep we inherited freeze protection from submission
525                  * thread.
526                  */
527                 if (S_ISREG(inode->i_mode))
528                         __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
529                 file_end_write(kiocb->ki_filp);
530         }
531 }
532
533 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
534 {
535         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
536
537         kiocb_end_write(kiocb);
538
539         fput(kiocb->ki_filp);
540         io_cqring_add_event(req->ctx, req->user_data, res, 0);
541         io_free_req(req);
542 }
543
544 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
545 {
546         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
547
548         kiocb_end_write(kiocb);
549
550         req->error = res;
551         if (res != -EAGAIN)
552                 req->flags |= REQ_F_IOPOLL_COMPLETED;
553 }
554
555 /*
556  * After the iocb has been issued, it's safe to be found on the poll list.
557  * Adding the kiocb to the list AFTER submission ensures that we don't
558  * find it from a io_iopoll_getevents() thread before the issuer is done
559  * accessing the kiocb cookie.
560  */
561 static void io_iopoll_req_issued(struct io_kiocb *req)
562 {
563         struct io_ring_ctx *ctx = req->ctx;
564
565         /*
566          * Track whether we have multiple files in our lists. This will impact
567          * how we do polling eventually, not spinning if we're on potentially
568          * different devices.
569          */
570         if (list_empty(&ctx->poll_list)) {
571                 ctx->poll_multi_file = false;
572         } else if (!ctx->poll_multi_file) {
573                 struct io_kiocb *list_req;
574
575                 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
576                                                 list);
577                 if (list_req->rw.ki_filp != req->rw.ki_filp)
578                         ctx->poll_multi_file = true;
579         }
580
581         /*
582          * For fast devices, IO may have already completed. If it has, add
583          * it to the front so we find it first.
584          */
585         if (req->flags & REQ_F_IOPOLL_COMPLETED)
586                 list_add(&req->list, &ctx->poll_list);
587         else
588                 list_add_tail(&req->list, &ctx->poll_list);
589 }
590
591 static void io_file_put(struct io_submit_state *state, struct file *file)
592 {
593         if (!state) {
594                 fput(file);
595         } else if (state->file) {
596                 int diff = state->has_refs - state->used_refs;
597
598                 if (diff)
599                         fput_many(state->file, diff);
600                 state->file = NULL;
601         }
602 }
603
604 /*
605  * Get as many references to a file as we have IOs left in this submission,
606  * assuming most submissions are for one file, or at least that each file
607  * has more than one submission.
608  */
609 static struct file *io_file_get(struct io_submit_state *state, int fd)
610 {
611         if (!state)
612                 return fget(fd);
613
614         if (state->file) {
615                 if (state->fd == fd) {
616                         state->used_refs++;
617                         state->ios_left--;
618                         return state->file;
619                 }
620                 io_file_put(state, NULL);
621         }
622         state->file = fget_many(fd, state->ios_left);
623         if (!state->file)
624                 return NULL;
625
626         state->fd = fd;
627         state->has_refs = state->ios_left;
628         state->used_refs = 1;
629         state->ios_left--;
630         return state->file;
631 }
632
633 /*
634  * If we tracked the file through the SCM inflight mechanism, we could support
635  * any file. For now, just ensure that anything potentially problematic is done
636  * inline.
637  */
638 static bool io_file_supports_async(struct file *file)
639 {
640         umode_t mode = file_inode(file)->i_mode;
641
642         if (S_ISBLK(mode) || S_ISCHR(mode))
643                 return true;
644         if (S_ISREG(mode) && file->f_op != &io_uring_fops)
645                 return true;
646
647         return false;
648 }
649
650 static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe,
651                       bool force_nonblock, struct io_submit_state *state)
652 {
653         struct io_ring_ctx *ctx = req->ctx;
654         struct kiocb *kiocb = &req->rw;
655         unsigned ioprio;
656         int fd, ret;
657
658         /* For -EAGAIN retry, everything is already prepped */
659         if (kiocb->ki_filp)
660                 return 0;
661
662         fd = READ_ONCE(sqe->fd);
663         kiocb->ki_filp = io_file_get(state, fd);
664         if (unlikely(!kiocb->ki_filp))
665                 return -EBADF;
666         if (force_nonblock && !io_file_supports_async(kiocb->ki_filp))
667                 force_nonblock = false;
668         kiocb->ki_pos = READ_ONCE(sqe->off);
669         kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
670         kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
671
672         ioprio = READ_ONCE(sqe->ioprio);
673         if (ioprio) {
674                 ret = ioprio_check_cap(ioprio);
675                 if (ret)
676                         goto out_fput;
677
678                 kiocb->ki_ioprio = ioprio;
679         } else
680                 kiocb->ki_ioprio = get_current_ioprio();
681
682         ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
683         if (unlikely(ret))
684                 goto out_fput;
685         if (force_nonblock) {
686                 kiocb->ki_flags |= IOCB_NOWAIT;
687                 req->flags |= REQ_F_FORCE_NONBLOCK;
688         }
689         if (ctx->flags & IORING_SETUP_IOPOLL) {
690                 ret = -EOPNOTSUPP;
691                 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
692                     !kiocb->ki_filp->f_op->iopoll)
693                         goto out_fput;
694
695                 req->error = 0;
696                 kiocb->ki_flags |= IOCB_HIPRI;
697                 kiocb->ki_complete = io_complete_rw_iopoll;
698         } else {
699                 if (kiocb->ki_flags & IOCB_HIPRI) {
700                         ret = -EINVAL;
701                         goto out_fput;
702                 }
703                 kiocb->ki_complete = io_complete_rw;
704         }
705         return 0;
706 out_fput:
707         /* in case of error, we didn't use this file reference. drop it. */
708         if (state)
709                 state->used_refs--;
710         io_file_put(state, kiocb->ki_filp);
711         return ret;
712 }
713
714 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
715 {
716         switch (ret) {
717         case -EIOCBQUEUED:
718                 break;
719         case -ERESTARTSYS:
720         case -ERESTARTNOINTR:
721         case -ERESTARTNOHAND:
722         case -ERESTART_RESTARTBLOCK:
723                 /*
724                  * We can't just restart the syscall, since previously
725                  * submitted sqes may already be in progress. Just fail this
726                  * IO with EINTR.
727                  */
728                 ret = -EINTR;
729                 /* fall through */
730         default:
731                 kiocb->ki_complete(kiocb, ret, 0);
732         }
733 }
734
735 static int io_import_iovec(struct io_ring_ctx *ctx, int rw,
736                            const struct sqe_submit *s, struct iovec **iovec,
737                            struct iov_iter *iter)
738 {
739         const struct io_uring_sqe *sqe = s->sqe;
740         void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr));
741         size_t sqe_len = READ_ONCE(sqe->len);
742
743         if (!s->has_user)
744                 return -EFAULT;
745
746 #ifdef CONFIG_COMPAT
747         if (ctx->compat)
748                 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
749                                                 iovec, iter);
750 #endif
751
752         return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
753 }
754
755 static ssize_t io_read(struct io_kiocb *req, const struct sqe_submit *s,
756                        bool force_nonblock, struct io_submit_state *state)
757 {
758         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
759         struct kiocb *kiocb = &req->rw;
760         struct iov_iter iter;
761         struct file *file;
762         ssize_t ret;
763
764         ret = io_prep_rw(req, s->sqe, force_nonblock, state);
765         if (ret)
766                 return ret;
767         file = kiocb->ki_filp;
768
769         ret = -EBADF;
770         if (unlikely(!(file->f_mode & FMODE_READ)))
771                 goto out_fput;
772         ret = -EINVAL;
773         if (unlikely(!file->f_op->read_iter))
774                 goto out_fput;
775
776         ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter);
777         if (ret)
778                 goto out_fput;
779
780         ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_iter_count(&iter));
781         if (!ret) {
782                 ssize_t ret2;
783
784                 /* Catch -EAGAIN return for forced non-blocking submission */
785                 ret2 = call_read_iter(file, kiocb, &iter);
786                 if (!force_nonblock || ret2 != -EAGAIN)
787                         io_rw_done(kiocb, ret2);
788                 else
789                         ret = -EAGAIN;
790         }
791         kfree(iovec);
792 out_fput:
793         /* Hold on to the file for -EAGAIN */
794         if (unlikely(ret && ret != -EAGAIN))
795                 fput(file);
796         return ret;
797 }
798
799 static ssize_t io_write(struct io_kiocb *req, const struct sqe_submit *s,
800                         bool force_nonblock, struct io_submit_state *state)
801 {
802         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
803         struct kiocb *kiocb = &req->rw;
804         struct iov_iter iter;
805         struct file *file;
806         ssize_t ret;
807
808         ret = io_prep_rw(req, s->sqe, force_nonblock, state);
809         if (ret)
810                 return ret;
811         /* Hold on to the file for -EAGAIN */
812         if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT))
813                 return -EAGAIN;
814
815         ret = -EBADF;
816         file = kiocb->ki_filp;
817         if (unlikely(!(file->f_mode & FMODE_WRITE)))
818                 goto out_fput;
819         ret = -EINVAL;
820         if (unlikely(!file->f_op->write_iter))
821                 goto out_fput;
822
823         ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter);
824         if (ret)
825                 goto out_fput;
826
827         ret = rw_verify_area(WRITE, file, &kiocb->ki_pos,
828                                 iov_iter_count(&iter));
829         if (!ret) {
830                 /*
831                  * Open-code file_start_write here to grab freeze protection,
832                  * which will be released by another thread in
833                  * io_complete_rw().  Fool lockdep by telling it the lock got
834                  * released so that it doesn't complain about the held lock when
835                  * we return to userspace.
836                  */
837                 if (S_ISREG(file_inode(file)->i_mode)) {
838                         __sb_start_write(file_inode(file)->i_sb,
839                                                 SB_FREEZE_WRITE, true);
840                         __sb_writers_release(file_inode(file)->i_sb,
841                                                 SB_FREEZE_WRITE);
842                 }
843                 kiocb->ki_flags |= IOCB_WRITE;
844                 io_rw_done(kiocb, call_write_iter(file, kiocb, &iter));
845         }
846         kfree(iovec);
847 out_fput:
848         if (unlikely(ret))
849                 fput(file);
850         return ret;
851 }
852
853 /*
854  * IORING_OP_NOP just posts a completion event, nothing else.
855  */
856 static int io_nop(struct io_kiocb *req, u64 user_data)
857 {
858         struct io_ring_ctx *ctx = req->ctx;
859         long err = 0;
860
861         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
862                 return -EINVAL;
863
864         /*
865          * Twilight zone - it's possible that someone issued an opcode that
866          * has a file attached, then got -EAGAIN on submission, and changed
867          * the sqe before we retried it from async context. Avoid dropping
868          * a file reference for this malicious case, and flag the error.
869          */
870         if (req->rw.ki_filp) {
871                 err = -EBADF;
872                 fput(req->rw.ki_filp);
873         }
874         io_cqring_add_event(ctx, user_data, err, 0);
875         io_free_req(req);
876         return 0;
877 }
878
879 static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
880 {
881         int fd;
882
883         /* Prep already done */
884         if (req->rw.ki_filp)
885                 return 0;
886
887         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
888                 return -EINVAL;
889         if (unlikely(sqe->addr || sqe->ioprio))
890                 return -EINVAL;
891
892         fd = READ_ONCE(sqe->fd);
893         req->rw.ki_filp = fget(fd);
894         if (unlikely(!req->rw.ki_filp))
895                 return -EBADF;
896
897         return 0;
898 }
899
900 static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
901                     bool force_nonblock)
902 {
903         loff_t sqe_off = READ_ONCE(sqe->off);
904         loff_t sqe_len = READ_ONCE(sqe->len);
905         loff_t end = sqe_off + sqe_len;
906         unsigned fsync_flags;
907         int ret;
908
909         fsync_flags = READ_ONCE(sqe->fsync_flags);
910         if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC))
911                 return -EINVAL;
912
913         ret = io_prep_fsync(req, sqe);
914         if (ret)
915                 return ret;
916
917         /* fsync always requires a blocking context */
918         if (force_nonblock)
919                 return -EAGAIN;
920
921         ret = vfs_fsync_range(req->rw.ki_filp, sqe_off,
922                                 end > 0 ? end : LLONG_MAX,
923                                 fsync_flags & IORING_FSYNC_DATASYNC);
924
925         fput(req->rw.ki_filp);
926         io_cqring_add_event(req->ctx, sqe->user_data, ret, 0);
927         io_free_req(req);
928         return 0;
929 }
930
931 static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
932                            const struct sqe_submit *s, bool force_nonblock,
933                            struct io_submit_state *state)
934 {
935         ssize_t ret;
936         int opcode;
937
938         if (unlikely(s->index >= ctx->sq_entries))
939                 return -EINVAL;
940         req->user_data = READ_ONCE(s->sqe->user_data);
941
942         opcode = READ_ONCE(s->sqe->opcode);
943         switch (opcode) {
944         case IORING_OP_NOP:
945                 ret = io_nop(req, req->user_data);
946                 break;
947         case IORING_OP_READV:
948                 ret = io_read(req, s, force_nonblock, state);
949                 break;
950         case IORING_OP_WRITEV:
951                 ret = io_write(req, s, force_nonblock, state);
952                 break;
953         case IORING_OP_FSYNC:
954                 ret = io_fsync(req, s->sqe, force_nonblock);
955                 break;
956         default:
957                 ret = -EINVAL;
958                 break;
959         }
960
961         if (ret)
962                 return ret;
963
964         if (ctx->flags & IORING_SETUP_IOPOLL) {
965                 if (req->error == -EAGAIN)
966                         return -EAGAIN;
967
968                 /* workqueue context doesn't hold uring_lock, grab it now */
969                 if (s->needs_lock)
970                         mutex_lock(&ctx->uring_lock);
971                 io_iopoll_req_issued(req);
972                 if (s->needs_lock)
973                         mutex_unlock(&ctx->uring_lock);
974         }
975
976         return 0;
977 }
978
979 static void io_sq_wq_submit_work(struct work_struct *work)
980 {
981         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
982         struct sqe_submit *s = &req->submit;
983         const struct io_uring_sqe *sqe = s->sqe;
984         struct io_ring_ctx *ctx = req->ctx;
985         mm_segment_t old_fs = get_fs();
986         int ret;
987
988          /* Ensure we clear previously set forced non-block flag */
989         req->flags &= ~REQ_F_FORCE_NONBLOCK;
990         req->rw.ki_flags &= ~IOCB_NOWAIT;
991
992         if (!mmget_not_zero(ctx->sqo_mm)) {
993                 ret = -EFAULT;
994                 goto err;
995         }
996
997         use_mm(ctx->sqo_mm);
998         set_fs(USER_DS);
999         s->has_user = true;
1000         s->needs_lock = true;
1001
1002         do {
1003                 ret = __io_submit_sqe(ctx, req, s, false, NULL);
1004                 /*
1005                  * We can get EAGAIN for polled IO even though we're forcing
1006                  * a sync submission from here, since we can't wait for
1007                  * request slots on the block side.
1008                  */
1009                 if (ret != -EAGAIN)
1010                         break;
1011                 cond_resched();
1012         } while (1);
1013
1014         set_fs(old_fs);
1015         unuse_mm(ctx->sqo_mm);
1016         mmput(ctx->sqo_mm);
1017 err:
1018         if (ret) {
1019                 io_cqring_add_event(ctx, sqe->user_data, ret, 0);
1020                 io_free_req(req);
1021         }
1022
1023         /* async context always use a copy of the sqe */
1024         kfree(sqe);
1025 }
1026
1027 static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
1028                          struct io_submit_state *state)
1029 {
1030         struct io_kiocb *req;
1031         ssize_t ret;
1032
1033         /* enforce forwards compatibility on users */
1034         if (unlikely(s->sqe->flags))
1035                 return -EINVAL;
1036
1037         req = io_get_req(ctx, state);
1038         if (unlikely(!req))
1039                 return -EAGAIN;
1040
1041         req->rw.ki_filp = NULL;
1042
1043         ret = __io_submit_sqe(ctx, req, s, true, state);
1044         if (ret == -EAGAIN) {
1045                 struct io_uring_sqe *sqe_copy;
1046
1047                 sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
1048                 if (sqe_copy) {
1049                         memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy));
1050                         s->sqe = sqe_copy;
1051
1052                         memcpy(&req->submit, s, sizeof(*s));
1053                         INIT_WORK(&req->work, io_sq_wq_submit_work);
1054                         queue_work(ctx->sqo_wq, &req->work);
1055                         ret = 0;
1056                 }
1057         }
1058         if (ret)
1059                 io_free_req(req);
1060
1061         return ret;
1062 }
1063
1064 /*
1065  * Batched submission is done, ensure local IO is flushed out.
1066  */
1067 static void io_submit_state_end(struct io_submit_state *state)
1068 {
1069         blk_finish_plug(&state->plug);
1070         io_file_put(state, NULL);
1071         if (state->free_reqs)
1072                 kmem_cache_free_bulk(req_cachep, state->free_reqs,
1073                                         &state->reqs[state->cur_req]);
1074 }
1075
1076 /*
1077  * Start submission side cache.
1078  */
1079 static void io_submit_state_start(struct io_submit_state *state,
1080                                   struct io_ring_ctx *ctx, unsigned max_ios)
1081 {
1082         blk_start_plug(&state->plug);
1083         state->free_reqs = 0;
1084         state->file = NULL;
1085         state->ios_left = max_ios;
1086 }
1087
1088 static void io_commit_sqring(struct io_ring_ctx *ctx)
1089 {
1090         struct io_sq_ring *ring = ctx->sq_ring;
1091
1092         if (ctx->cached_sq_head != READ_ONCE(ring->r.head)) {
1093                 /*
1094                  * Ensure any loads from the SQEs are done at this point,
1095                  * since once we write the new head, the application could
1096                  * write new data to them.
1097                  */
1098                 smp_store_release(&ring->r.head, ctx->cached_sq_head);
1099
1100                 /*
1101                  * write side barrier of head update, app has read side. See
1102                  * comment at the top of this file
1103                  */
1104                 smp_wmb();
1105         }
1106 }
1107
1108 /*
1109  * Undo last io_get_sqring()
1110  */
1111 static void io_drop_sqring(struct io_ring_ctx *ctx)
1112 {
1113         ctx->cached_sq_head--;
1114 }
1115
1116 /*
1117  * Fetch an sqe, if one is available. Note that s->sqe will point to memory
1118  * that is mapped by userspace. This means that care needs to be taken to
1119  * ensure that reads are stable, as we cannot rely on userspace always
1120  * being a good citizen. If members of the sqe are validated and then later
1121  * used, it's important that those reads are done through READ_ONCE() to
1122  * prevent a re-load down the line.
1123  */
1124 static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
1125 {
1126         struct io_sq_ring *ring = ctx->sq_ring;
1127         unsigned head;
1128
1129         /*
1130          * The cached sq head (or cq tail) serves two purposes:
1131          *
1132          * 1) allows us to batch the cost of updating the user visible
1133          *    head updates.
1134          * 2) allows the kernel side to track the head on its own, even
1135          *    though the application is the one updating it.
1136          */
1137         head = ctx->cached_sq_head;
1138         /* See comment at the top of this file */
1139         smp_rmb();
1140         if (head == READ_ONCE(ring->r.tail))
1141                 return false;
1142
1143         head = READ_ONCE(ring->array[head & ctx->sq_mask]);
1144         if (head < ctx->sq_entries) {
1145                 s->index = head;
1146                 s->sqe = &ctx->sq_sqes[head];
1147                 ctx->cached_sq_head++;
1148                 return true;
1149         }
1150
1151         /* drop invalid entries */
1152         ctx->cached_sq_head++;
1153         ring->dropped++;
1154         /* See comment at the top of this file */
1155         smp_wmb();
1156         return false;
1157 }
1158
1159 static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
1160 {
1161         struct io_submit_state state, *statep = NULL;
1162         int i, ret = 0, submit = 0;
1163
1164         if (to_submit > IO_PLUG_THRESHOLD) {
1165                 io_submit_state_start(&state, ctx, to_submit);
1166                 statep = &state;
1167         }
1168
1169         for (i = 0; i < to_submit; i++) {
1170                 struct sqe_submit s;
1171
1172                 if (!io_get_sqring(ctx, &s))
1173                         break;
1174
1175                 s.has_user = true;
1176                 s.needs_lock = false;
1177
1178                 ret = io_submit_sqe(ctx, &s, statep);
1179                 if (ret) {
1180                         io_drop_sqring(ctx);
1181                         break;
1182                 }
1183
1184                 submit++;
1185         }
1186         io_commit_sqring(ctx);
1187
1188         if (statep)
1189                 io_submit_state_end(statep);
1190
1191         return submit ? submit : ret;
1192 }
1193
1194 static unsigned io_cqring_events(struct io_cq_ring *ring)
1195 {
1196         return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head);
1197 }
1198
1199 /*
1200  * Wait until events become available, if we don't already have some. The
1201  * application must reap them itself, as they reside on the shared cq ring.
1202  */
1203 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
1204                           const sigset_t __user *sig, size_t sigsz)
1205 {
1206         struct io_cq_ring *ring = ctx->cq_ring;
1207         sigset_t ksigmask, sigsaved;
1208         DEFINE_WAIT(wait);
1209         int ret;
1210
1211         /* See comment at the top of this file */
1212         smp_rmb();
1213         if (io_cqring_events(ring) >= min_events)
1214                 return 0;
1215
1216         if (sig) {
1217                 ret = set_user_sigmask(sig, &ksigmask, &sigsaved, sigsz);
1218                 if (ret)
1219                         return ret;
1220         }
1221
1222         do {
1223                 prepare_to_wait(&ctx->wait, &wait, TASK_INTERRUPTIBLE);
1224
1225                 ret = 0;
1226                 /* See comment at the top of this file */
1227                 smp_rmb();
1228                 if (io_cqring_events(ring) >= min_events)
1229                         break;
1230
1231                 schedule();
1232
1233                 ret = -EINTR;
1234                 if (signal_pending(current))
1235                         break;
1236         } while (1);
1237
1238         finish_wait(&ctx->wait, &wait);
1239
1240         if (sig)
1241                 restore_user_sigmask(sig, &sigsaved);
1242
1243         return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0;
1244 }
1245
1246 static int io_sq_offload_start(struct io_ring_ctx *ctx)
1247 {
1248         int ret;
1249
1250         mmgrab(current->mm);
1251         ctx->sqo_mm = current->mm;
1252
1253         /* Do QD, or 2 * CPUS, whatever is smallest */
1254         ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE,
1255                         min(ctx->sq_entries - 1, 2 * num_online_cpus()));
1256         if (!ctx->sqo_wq) {
1257                 ret = -ENOMEM;
1258                 goto err;
1259         }
1260
1261         return 0;
1262 err:
1263         mmdrop(ctx->sqo_mm);
1264         ctx->sqo_mm = NULL;
1265         return ret;
1266 }
1267
1268 static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
1269 {
1270         atomic_long_sub(nr_pages, &user->locked_vm);
1271 }
1272
1273 static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
1274 {
1275         unsigned long page_limit, cur_pages, new_pages;
1276
1277         /* Don't allow more pages than we can safely lock */
1278         page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
1279
1280         do {
1281                 cur_pages = atomic_long_read(&user->locked_vm);
1282                 new_pages = cur_pages + nr_pages;
1283                 if (new_pages > page_limit)
1284                         return -ENOMEM;
1285         } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
1286                                         new_pages) != cur_pages);
1287
1288         return 0;
1289 }
1290
1291 static void io_mem_free(void *ptr)
1292 {
1293         struct page *page = virt_to_head_page(ptr);
1294
1295         if (put_page_testzero(page))
1296                 free_compound_page(page);
1297 }
1298
1299 static void *io_mem_alloc(size_t size)
1300 {
1301         gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
1302                                 __GFP_NORETRY;
1303
1304         return (void *) __get_free_pages(gfp_flags, get_order(size));
1305 }
1306
1307 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
1308 {
1309         struct io_sq_ring *sq_ring;
1310         struct io_cq_ring *cq_ring;
1311         size_t bytes;
1312
1313         bytes = struct_size(sq_ring, array, sq_entries);
1314         bytes += array_size(sizeof(struct io_uring_sqe), sq_entries);
1315         bytes += struct_size(cq_ring, cqes, cq_entries);
1316
1317         return (bytes + PAGE_SIZE - 1) / PAGE_SIZE;
1318 }
1319
1320 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
1321 {
1322         if (ctx->sqo_wq)
1323                 destroy_workqueue(ctx->sqo_wq);
1324         if (ctx->sqo_mm)
1325                 mmdrop(ctx->sqo_mm);
1326
1327         io_iopoll_reap_events(ctx);
1328
1329 #if defined(CONFIG_UNIX)
1330         if (ctx->ring_sock)
1331                 sock_release(ctx->ring_sock);
1332 #endif
1333
1334         io_mem_free(ctx->sq_ring);
1335         io_mem_free(ctx->sq_sqes);
1336         io_mem_free(ctx->cq_ring);
1337
1338         percpu_ref_exit(&ctx->refs);
1339         if (ctx->account_mem)
1340                 io_unaccount_mem(ctx->user,
1341                                 ring_pages(ctx->sq_entries, ctx->cq_entries));
1342         free_uid(ctx->user);
1343         kfree(ctx);
1344 }
1345
1346 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
1347 {
1348         struct io_ring_ctx *ctx = file->private_data;
1349         __poll_t mask = 0;
1350
1351         poll_wait(file, &ctx->cq_wait, wait);
1352         /* See comment at the top of this file */
1353         smp_rmb();
1354         if (READ_ONCE(ctx->sq_ring->r.tail) + 1 != ctx->cached_sq_head)
1355                 mask |= EPOLLOUT | EPOLLWRNORM;
1356         if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail)
1357                 mask |= EPOLLIN | EPOLLRDNORM;
1358
1359         return mask;
1360 }
1361
1362 static int io_uring_fasync(int fd, struct file *file, int on)
1363 {
1364         struct io_ring_ctx *ctx = file->private_data;
1365
1366         return fasync_helper(fd, file, on, &ctx->cq_fasync);
1367 }
1368
1369 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
1370 {
1371         mutex_lock(&ctx->uring_lock);
1372         percpu_ref_kill(&ctx->refs);
1373         mutex_unlock(&ctx->uring_lock);
1374
1375         io_iopoll_reap_events(ctx);
1376         wait_for_completion(&ctx->ctx_done);
1377         io_ring_ctx_free(ctx);
1378 }
1379
1380 static int io_uring_release(struct inode *inode, struct file *file)
1381 {
1382         struct io_ring_ctx *ctx = file->private_data;
1383
1384         file->private_data = NULL;
1385         io_ring_ctx_wait_and_kill(ctx);
1386         return 0;
1387 }
1388
1389 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
1390 {
1391         loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT;
1392         unsigned long sz = vma->vm_end - vma->vm_start;
1393         struct io_ring_ctx *ctx = file->private_data;
1394         unsigned long pfn;
1395         struct page *page;
1396         void *ptr;
1397
1398         switch (offset) {
1399         case IORING_OFF_SQ_RING:
1400                 ptr = ctx->sq_ring;
1401                 break;
1402         case IORING_OFF_SQES:
1403                 ptr = ctx->sq_sqes;
1404                 break;
1405         case IORING_OFF_CQ_RING:
1406                 ptr = ctx->cq_ring;
1407                 break;
1408         default:
1409                 return -EINVAL;
1410         }
1411
1412         page = virt_to_head_page(ptr);
1413         if (sz > (PAGE_SIZE << compound_order(page)))
1414                 return -EINVAL;
1415
1416         pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
1417         return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
1418 }
1419
1420 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
1421                 u32, min_complete, u32, flags, const sigset_t __user *, sig,
1422                 size_t, sigsz)
1423 {
1424         struct io_ring_ctx *ctx;
1425         long ret = -EBADF;
1426         int submitted = 0;
1427         struct fd f;
1428
1429         if (flags & ~IORING_ENTER_GETEVENTS)
1430                 return -EINVAL;
1431
1432         f = fdget(fd);
1433         if (!f.file)
1434                 return -EBADF;
1435
1436         ret = -EOPNOTSUPP;
1437         if (f.file->f_op != &io_uring_fops)
1438                 goto out_fput;
1439
1440         ret = -ENXIO;
1441         ctx = f.file->private_data;
1442         if (!percpu_ref_tryget(&ctx->refs))
1443                 goto out_fput;
1444
1445         ret = 0;
1446         if (to_submit) {
1447                 to_submit = min(to_submit, ctx->sq_entries);
1448
1449                 mutex_lock(&ctx->uring_lock);
1450                 submitted = io_ring_submit(ctx, to_submit);
1451                 mutex_unlock(&ctx->uring_lock);
1452
1453                 if (submitted < 0)
1454                         goto out_ctx;
1455         }
1456         if (flags & IORING_ENTER_GETEVENTS) {
1457                 unsigned nr_events = 0;
1458
1459                 min_complete = min(min_complete, ctx->cq_entries);
1460
1461                 /*
1462                  * The application could have included the 'to_submit' count
1463                  * in how many events it wanted to wait for. If we failed to
1464                  * submit the desired count, we may need to adjust the number
1465                  * of events to poll/wait for.
1466                  */
1467                 if (submitted < to_submit)
1468                         min_complete = min_t(unsigned, submitted, min_complete);
1469
1470                 if (ctx->flags & IORING_SETUP_IOPOLL) {
1471                         mutex_lock(&ctx->uring_lock);
1472                         ret = io_iopoll_check(ctx, &nr_events, min_complete);
1473                         mutex_unlock(&ctx->uring_lock);
1474                 } else {
1475                         ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
1476                 }
1477         }
1478
1479 out_ctx:
1480         io_ring_drop_ctx_refs(ctx, 1);
1481 out_fput:
1482         fdput(f);
1483         return submitted ? submitted : ret;
1484 }
1485
1486 static const struct file_operations io_uring_fops = {
1487         .release        = io_uring_release,
1488         .mmap           = io_uring_mmap,
1489         .poll           = io_uring_poll,
1490         .fasync         = io_uring_fasync,
1491 };
1492
1493 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
1494                                   struct io_uring_params *p)
1495 {
1496         struct io_sq_ring *sq_ring;
1497         struct io_cq_ring *cq_ring;
1498         size_t size;
1499
1500         sq_ring = io_mem_alloc(struct_size(sq_ring, array, p->sq_entries));
1501         if (!sq_ring)
1502                 return -ENOMEM;
1503
1504         ctx->sq_ring = sq_ring;
1505         sq_ring->ring_mask = p->sq_entries - 1;
1506         sq_ring->ring_entries = p->sq_entries;
1507         ctx->sq_mask = sq_ring->ring_mask;
1508         ctx->sq_entries = sq_ring->ring_entries;
1509
1510         size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
1511         if (size == SIZE_MAX)
1512                 return -EOVERFLOW;
1513
1514         ctx->sq_sqes = io_mem_alloc(size);
1515         if (!ctx->sq_sqes) {
1516                 io_mem_free(ctx->sq_ring);
1517                 return -ENOMEM;
1518         }
1519
1520         cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries));
1521         if (!cq_ring) {
1522                 io_mem_free(ctx->sq_ring);
1523                 io_mem_free(ctx->sq_sqes);
1524                 return -ENOMEM;
1525         }
1526
1527         ctx->cq_ring = cq_ring;
1528         cq_ring->ring_mask = p->cq_entries - 1;
1529         cq_ring->ring_entries = p->cq_entries;
1530         ctx->cq_mask = cq_ring->ring_mask;
1531         ctx->cq_entries = cq_ring->ring_entries;
1532         return 0;
1533 }
1534
1535 /*
1536  * Allocate an anonymous fd, this is what constitutes the application
1537  * visible backing of an io_uring instance. The application mmaps this
1538  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
1539  * we have to tie this fd to a socket for file garbage collection purposes.
1540  */
1541 static int io_uring_get_fd(struct io_ring_ctx *ctx)
1542 {
1543         struct file *file;
1544         int ret;
1545
1546 #if defined(CONFIG_UNIX)
1547         ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
1548                                 &ctx->ring_sock);
1549         if (ret)
1550                 return ret;
1551 #endif
1552
1553         ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
1554         if (ret < 0)
1555                 goto err;
1556
1557         file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
1558                                         O_RDWR | O_CLOEXEC);
1559         if (IS_ERR(file)) {
1560                 put_unused_fd(ret);
1561                 ret = PTR_ERR(file);
1562                 goto err;
1563         }
1564
1565 #if defined(CONFIG_UNIX)
1566         ctx->ring_sock->file = file;
1567 #endif
1568         fd_install(ret, file);
1569         return ret;
1570 err:
1571 #if defined(CONFIG_UNIX)
1572         sock_release(ctx->ring_sock);
1573         ctx->ring_sock = NULL;
1574 #endif
1575         return ret;
1576 }
1577
1578 static int io_uring_create(unsigned entries, struct io_uring_params *p)
1579 {
1580         struct user_struct *user = NULL;
1581         struct io_ring_ctx *ctx;
1582         bool account_mem;
1583         int ret;
1584
1585         if (!entries || entries > IORING_MAX_ENTRIES)
1586                 return -EINVAL;
1587
1588         /*
1589          * Use twice as many entries for the CQ ring. It's possible for the
1590          * application to drive a higher depth than the size of the SQ ring,
1591          * since the sqes are only used at submission time. This allows for
1592          * some flexibility in overcommitting a bit.
1593          */
1594         p->sq_entries = roundup_pow_of_two(entries);
1595         p->cq_entries = 2 * p->sq_entries;
1596
1597         user = get_uid(current_user());
1598         account_mem = !capable(CAP_IPC_LOCK);
1599
1600         if (account_mem) {
1601                 ret = io_account_mem(user,
1602                                 ring_pages(p->sq_entries, p->cq_entries));
1603                 if (ret) {
1604                         free_uid(user);
1605                         return ret;
1606                 }
1607         }
1608
1609         ctx = io_ring_ctx_alloc(p);
1610         if (!ctx) {
1611                 if (account_mem)
1612                         io_unaccount_mem(user, ring_pages(p->sq_entries,
1613                                                                 p->cq_entries));
1614                 free_uid(user);
1615                 return -ENOMEM;
1616         }
1617         ctx->compat = in_compat_syscall();
1618         ctx->account_mem = account_mem;
1619         ctx->user = user;
1620
1621         ret = io_allocate_scq_urings(ctx, p);
1622         if (ret)
1623                 goto err;
1624
1625         ret = io_sq_offload_start(ctx);
1626         if (ret)
1627                 goto err;
1628
1629         ret = io_uring_get_fd(ctx);
1630         if (ret < 0)
1631                 goto err;
1632
1633         memset(&p->sq_off, 0, sizeof(p->sq_off));
1634         p->sq_off.head = offsetof(struct io_sq_ring, r.head);
1635         p->sq_off.tail = offsetof(struct io_sq_ring, r.tail);
1636         p->sq_off.ring_mask = offsetof(struct io_sq_ring, ring_mask);
1637         p->sq_off.ring_entries = offsetof(struct io_sq_ring, ring_entries);
1638         p->sq_off.flags = offsetof(struct io_sq_ring, flags);
1639         p->sq_off.dropped = offsetof(struct io_sq_ring, dropped);
1640         p->sq_off.array = offsetof(struct io_sq_ring, array);
1641
1642         memset(&p->cq_off, 0, sizeof(p->cq_off));
1643         p->cq_off.head = offsetof(struct io_cq_ring, r.head);
1644         p->cq_off.tail = offsetof(struct io_cq_ring, r.tail);
1645         p->cq_off.ring_mask = offsetof(struct io_cq_ring, ring_mask);
1646         p->cq_off.ring_entries = offsetof(struct io_cq_ring, ring_entries);
1647         p->cq_off.overflow = offsetof(struct io_cq_ring, overflow);
1648         p->cq_off.cqes = offsetof(struct io_cq_ring, cqes);
1649         return ret;
1650 err:
1651         io_ring_ctx_wait_and_kill(ctx);
1652         return ret;
1653 }
1654
1655 /*
1656  * Sets up an aio uring context, and returns the fd. Applications asks for a
1657  * ring size, we return the actual sq/cq ring sizes (among other things) in the
1658  * params structure passed in.
1659  */
1660 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
1661 {
1662         struct io_uring_params p;
1663         long ret;
1664         int i;
1665
1666         if (copy_from_user(&p, params, sizeof(p)))
1667                 return -EFAULT;
1668         for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
1669                 if (p.resv[i])
1670                         return -EINVAL;
1671         }
1672
1673         if (p.flags & ~IORING_SETUP_IOPOLL)
1674                 return -EINVAL;
1675
1676         ret = io_uring_create(entries, &p);
1677         if (ret < 0)
1678                 return ret;
1679
1680         if (copy_to_user(params, &p, sizeof(p)))
1681                 return -EFAULT;
1682
1683         return ret;
1684 }
1685
1686 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
1687                 struct io_uring_params __user *, params)
1688 {
1689         return io_uring_setup(entries, params);
1690 }
1691
1692 static int __init io_uring_init(void)
1693 {
1694         req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
1695         return 0;
1696 };
1697 __initcall(io_uring_init);