io_uring/net: support bundles for recv
authorJens Axboe <axboe@kernel.dk>
Tue, 5 Mar 2024 23:22:04 +0000 (16:22 -0700)
committerJens Axboe <axboe@kernel.dk>
Mon, 22 Apr 2024 17:26:11 +0000 (11:26 -0600)
If IORING_OP_RECV is used with provided buffers, the caller may also set
IORING_RECVSEND_BUNDLE to turn it into a multi-buffer recv. This grabs
buffers available and receives into them, posting a single completion for
all of it.

This can be used with multishot receive as well, or without it.

Now that both send and receive support bundles, add a feature flag for
it as well. If IORING_FEAT_RECVSEND_BUNDLE is set after registering the
ring, then the kernel supports bundles for recv and send.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
include/uapi/linux/io_uring.h
io_uring/io_uring.c
io_uring/net.c

index 7f583927c908d364d2f7423b433a7fe693d8b704..f093cb2300d96ebde02073431d09c118aaafd415 100644 (file)
@@ -352,13 +352,13 @@ enum io_uring_op {
  *                             IORING_NOTIF_USAGE_ZC_COPIED if data was copied
  *                             (at least partially).
  *
- * IORING_RECVSEND_BUNDLE      Used with IOSQE_BUFFER_SELECT. If set, send will
- *                             grab as many buffers from the buffer group ID
- *                             given and send them all. The completion result
- *                             will be the number of buffers send, with the
- *                             starting buffer ID in cqe->flags as per usual
- *                             for provided buffer usage. The buffers will be
- *                             contigious from the starting buffer ID.
+ * IORING_RECVSEND_BUNDLE      Used with IOSQE_BUFFER_SELECT. If set, send or
+ *                             recv will grab as many buffers from the buffer
+ *                             group ID given and send them all. The completion
+ *                             result  will be the number of buffers send, with
+ *                             the starting buffer ID in cqe->flags as per
+ *                             usual for provided buffer usage. The buffers
+ *                             will be contigious from the starting buffer ID.
  */
 #define IORING_RECVSEND_POLL_FIRST     (1U << 0)
 #define IORING_RECV_MULTISHOT          (1U << 1)
@@ -529,6 +529,7 @@ struct io_uring_params {
 #define IORING_FEAT_CQE_SKIP           (1U << 11)
 #define IORING_FEAT_LINKED_FILE                (1U << 12)
 #define IORING_FEAT_REG_REG_RING       (1U << 13)
+#define IORING_FEAT_RECVSEND_BUNDLE    (1U << 14)
 
 /*
  * io_uring_register(2) opcodes and arguments
index c67ae6e36c4f3e25f792d9fb3e933f92c4d06072..64845634d89f307ca9515ea434c5d3dc26b55c75 100644 (file)
@@ -3583,7 +3583,8 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
                        IORING_FEAT_POLL_32BITS | IORING_FEAT_SQPOLL_NONFIXED |
                        IORING_FEAT_EXT_ARG | IORING_FEAT_NATIVE_WORKERS |
                        IORING_FEAT_RSRC_TAGS | IORING_FEAT_CQE_SKIP |
-                       IORING_FEAT_LINKED_FILE | IORING_FEAT_REG_REG_RING;
+                       IORING_FEAT_LINKED_FILE | IORING_FEAT_REG_REG_RING |
+                       IORING_FEAT_RECVSEND_BUNDLE;
 
        if (copy_to_user(params, p, sizeof(*p))) {
                ret = -EFAULT;
index 3e326576254be710f753767fbf320d5e8ad86399..51c41d771c50ee8676c1c3ad4525aaa0cb2a5d3c 100644 (file)
@@ -747,7 +747,8 @@ static int io_recvmsg_prep_setup(struct io_kiocb *req)
        return ret;
 }
 
-#define RECVMSG_FLAGS (IORING_RECVSEND_POLL_FIRST | IORING_RECV_MULTISHOT)
+#define RECVMSG_FLAGS (IORING_RECVSEND_POLL_FIRST | IORING_RECV_MULTISHOT | \
+                       IORING_RECVSEND_BUNDLE)
 
 int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 {
@@ -761,21 +762,14 @@ int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
        sr->umsg = u64_to_user_ptr(READ_ONCE(sqe->addr));
        sr->len = READ_ONCE(sqe->len);
        sr->flags = READ_ONCE(sqe->ioprio);
-       if (sr->flags & ~(RECVMSG_FLAGS))
+       if (sr->flags & ~RECVMSG_FLAGS)
                return -EINVAL;
        sr->msg_flags = READ_ONCE(sqe->msg_flags);
        if (sr->msg_flags & MSG_DONTWAIT)
                req->flags |= REQ_F_NOWAIT;
        if (sr->msg_flags & MSG_ERRQUEUE)
                req->flags |= REQ_F_CLEAR_POLLIN;
-       if (sr->flags & IORING_RECV_MULTISHOT) {
-               if (!(req->flags & REQ_F_BUFFER_SELECT))
-                       return -EINVAL;
-               if (sr->msg_flags & MSG_WAITALL)
-                       return -EINVAL;
-               if (req->opcode == IORING_OP_RECV && sr->len)
-                       return -EINVAL;
-               req->flags |= REQ_F_APOLL_MULTISHOT;
+       if (req->flags & REQ_F_BUFFER_SELECT) {
                /*
                 * Store the buffer group for this multishot receive separately,
                 * as if we end up doing an io-wq based issue that selects a
@@ -785,6 +779,20 @@ int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
                 * restore it.
                 */
                sr->buf_group = req->buf_index;
+               req->buf_list = NULL;
+       }
+       if (sr->flags & IORING_RECV_MULTISHOT) {
+               if (!(req->flags & REQ_F_BUFFER_SELECT))
+                       return -EINVAL;
+               if (sr->msg_flags & MSG_WAITALL)
+                       return -EINVAL;
+               if (req->opcode == IORING_OP_RECV && sr->len)
+                       return -EINVAL;
+               req->flags |= REQ_F_APOLL_MULTISHOT;
+       }
+       if (sr->flags & IORING_RECVSEND_BUNDLE) {
+               if (req->opcode == IORING_OP_RECVMSG)
+                       return -EINVAL;
        }
 
 #ifdef CONFIG_COMPAT
@@ -805,19 +813,28 @@ static inline bool io_recv_finish(struct io_kiocb *req, int *ret,
                                  struct io_async_msghdr *kmsg,
                                  bool mshot_finished, unsigned issue_flags)
 {
+       struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
        unsigned int cflags;
 
-       cflags = io_put_kbuf(req, issue_flags);
+       if (sr->flags & IORING_RECVSEND_BUNDLE)
+               cflags = io_put_kbufs(req, io_bundle_nbufs(kmsg, *ret),
+                                     issue_flags);
+       else
+               cflags = io_put_kbuf(req, issue_flags);
+
        if (kmsg->msg.msg_inq > 0)
                cflags |= IORING_CQE_F_SOCK_NONEMPTY;
 
+       /* bundle with no more immediate buffers, we're done */
+       if (sr->flags & IORING_RECVSEND_BUNDLE && req->flags & REQ_F_BL_EMPTY)
+               goto finish;
+
        /*
         * Fill CQE for this receive and see if we should keep trying to
         * receive from this socket.
         */
        if ((req->flags & REQ_F_APOLL_MULTISHOT) && !mshot_finished &&
            io_req_post_cqe(req, *ret, cflags | IORING_CQE_F_MORE)) {
-               struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
                int mshot_retry_ret = IOU_ISSUE_SKIP_COMPLETE;
 
                io_mshot_prep_retry(req, kmsg);
@@ -837,6 +854,7 @@ static inline bool io_recv_finish(struct io_kiocb *req, int *ret,
        }
 
        /* Finish the request / stop multishot. */
+finish:
        io_req_set_res(req, *ret, cflags);
 
        if (issue_flags & IO_URING_F_MULTISHOT)
@@ -1020,6 +1038,69 @@ retry_multishot:
        return ret;
 }
 
+static int io_recv_buf_select(struct io_kiocb *req, struct io_async_msghdr *kmsg,
+                             size_t *len, unsigned int issue_flags)
+{
+       struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
+       int ret;
+
+       /*
+        * If the ring isn't locked, then don't use the peek interface
+        * to grab multiple buffers as we will lock/unlock between
+        * this selection and posting the buffers.
+        */
+       if (!(issue_flags & IO_URING_F_UNLOCKED) &&
+           sr->flags & IORING_RECVSEND_BUNDLE) {
+               struct buf_sel_arg arg = {
+                       .iovs = &kmsg->fast_iov,
+                       .nr_iovs = 1,
+                       .mode = KBUF_MODE_EXPAND,
+               };
+
+               if (kmsg->free_iov) {
+                       arg.nr_iovs = kmsg->free_iov_nr;
+                       arg.iovs = kmsg->free_iov;
+                       arg.mode |= KBUF_MODE_FREE;
+               }
+
+               if (kmsg->msg.msg_inq > 0)
+                       arg.max_len = min_not_zero(sr->len, kmsg->msg.msg_inq);
+
+               ret = io_buffers_peek(req, &arg);
+               if (unlikely(ret < 0))
+                       return ret;
+
+               /* special case 1 vec, can be a fast path */
+               if (ret == 1) {
+                       sr->buf = arg.iovs[0].iov_base;
+                       sr->len = arg.iovs[0].iov_len;
+                       goto map_ubuf;
+               }
+               iov_iter_init(&kmsg->msg.msg_iter, ITER_DEST, arg.iovs, ret,
+                               arg.out_len);
+               if (arg.iovs != &kmsg->fast_iov && arg.iovs != kmsg->free_iov) {
+                       kmsg->free_iov_nr = ret;
+                       kmsg->free_iov = arg.iovs;
+               }
+       } else {
+               void __user *buf;
+
+               *len = sr->len;
+               buf = io_buffer_select(req, len, issue_flags);
+               if (!buf)
+                       return -ENOBUFS;
+               sr->buf = buf;
+               sr->len = *len;
+map_ubuf:
+               ret = import_ubuf(ITER_DEST, sr->buf, sr->len,
+                                 &kmsg->msg.msg_iter);
+               if (unlikely(ret))
+                       return ret;
+       }
+
+       return 0;
+}
+
 int io_recv(struct io_kiocb *req, unsigned int issue_flags)
 {
        struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
@@ -1044,17 +1125,10 @@ int io_recv(struct io_kiocb *req, unsigned int issue_flags)
 
 retry_multishot:
        if (io_do_buffer_select(req)) {
-               void __user *buf;
-
-               buf = io_buffer_select(req, &len, issue_flags);
-               if (!buf)
-                       return -ENOBUFS;
-               sr->buf = buf;
-               sr->len = len;
-               ret = import_ubuf(ITER_DEST, sr->buf, sr->len,
-                                 &kmsg->msg.msg_iter);
+               ret = io_recv_buf_select(req, kmsg, &len, issue_flags);
                if (unlikely(ret))
                        goto out_free;
+               sr->buf = NULL;
        }
 
        kmsg->msg.msg_inq = -1;