io_uring/rsrc: move node refs == 0 checking outside of hot path io_uring-rsrc.4
authorJens Axboe <axboe@kernel.dk>
Sun, 3 Nov 2024 23:59:33 +0000 (16:59 -0700)
committerJens Axboe <axboe@kernel.dk>
Mon, 4 Nov 2024 14:12:13 +0000 (07:12 -0700)
One thing that is slightly suboptimal on the current resource node
handling, is that putting a node neccesitates decrementing the
ref count AND checking if it hit zero. While this isn't an atomic
operation (as we're under the ctx lock), it does mean the decrement
cannot be pipelined.

Move the dead node checking to unregistration time. The holder of a
node will still decrement the reference count, but it need not check
if it hit zero. It will, by definition, never hit zero until it gets
unregistered, as the table itself holds a reference.

At unregistration time, if the node ref count isn't zero when the
table reference is dropped, then its known that someone else holds a
reference to it and will put it at some point. For this situation,
store an entry for later reaping in ctx->rsrc_free. Whenever a batch
of nodes is flushed, whether ctx->rsrc_free is empty or not is
checked, and previously released nodes will be freed.

With that, the overhead of putting a node from the hot path is reduced
quite a bit, as everything pipelines nicely and doesn't stall on
bringing in the node cacheline.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
include/linux/io_uring_types.h
io_uring/io_uring.c
io_uring/rsrc.c
io_uring/rsrc.h

index ad5001102c868404f4009a0010f1fe5f96b80c05..e458ebed3ba03235e72afa30554293ef85d20f26 100644 (file)
@@ -320,6 +320,13 @@ struct io_ring_ctx {
                unsigned                cq_entries;
                struct io_ev_fd __rcu   *io_ev_fd;
                unsigned                cq_extra;
+
+               /*
+                * Deferred free of io_rsrc_node entries that were busy
+                * at the time of unregistration.
+                */
+               unsigned int            rsrc_free_nr;
+               struct xarray           rsrc_free;
        } ____cacheline_aligned_in_smp;
 
        /*
index f34fa1ead2cf3bdaaf8919e80747cbedbc4a724f..efbc3b5956b94b1548c810ffc02e3f1b1bf0044a 100644 (file)
@@ -293,6 +293,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
                return NULL;
 
        xa_init(&ctx->io_bl_xa);
+       xa_init(&ctx->rsrc_free);
 
        /*
         * Use 5 bits less than the max cq entries, that should give us around
@@ -366,6 +367,7 @@ err:
        io_futex_cache_free(ctx);
        kvfree(ctx->cancel_table.hbs);
        xa_destroy(&ctx->io_bl_xa);
+       xa_destroy(&ctx->rsrc_free);
        kfree(ctx);
        return NULL;
 }
@@ -1562,6 +1564,8 @@ static void io_free_batch_list(struct io_ring_ctx *ctx,
                node = req->comp_list.next;
                io_req_add_to_cache(req, ctx);
        } while (node);
+
+       io_reap_rsrc_nodes(ctx);
 }
 
 void __io_submit_flush_completions(struct io_ring_ctx *ctx)
@@ -3009,6 +3013,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
        io_futex_cache_free(ctx);
        io_destroy_buffers(ctx);
        io_unregister_cqwait_reg(ctx);
+       io_reap_rsrc_nodes(ctx);
        mutex_unlock(&ctx->uring_lock);
        if (ctx->sq_creds)
                put_cred(ctx->sq_creds);
@@ -3034,6 +3039,9 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
        io_napi_free(ctx);
        kvfree(ctx->cancel_table.hbs);
        xa_destroy(&ctx->io_bl_xa);
+       WARN_ON_ONCE(!xa_empty(&ctx->rsrc_free));
+       WARN_ON_ONCE(ctx->rsrc_free_nr);
+       xa_destroy(&ctx->rsrc_free);
        kfree(ctx);
 }
 
index 2fb1791d7255b4d337b9e471f8241b1987d5aad4..2effdd3a7d35f197f7640a51d73d82d144aa739f 100644 (file)
@@ -118,14 +118,67 @@ static void io_buffer_unmap(struct io_ring_ctx *ctx, struct io_rsrc_node *node)
        }
 }
 
+void __io_reap_rsrc_nodes(struct io_ring_ctx *ctx)
+{
+       struct io_rsrc_node *node;
+       unsigned long index;
+
+       /* Find and reap nodes that hit zero refs */
+       xa_for_each(&ctx->rsrc_free, index, node) {
+               if (!node->refs)
+                       io_free_rsrc_node(node);
+       }
+}
+
+void io_release_rsrc_node(struct io_rsrc_node *node)
+{
+       struct io_ring_ctx *ctx = io_rsrc_node_ctx(node);
+       unsigned long index = (unsigned long) node;
+
+       /*
+        * If ref dropped to zero, nobody else has a reference to it and we
+        * can safely free it. If it's non-zero, then someone else has a
+        * reference and will put it at some point. As the put side is the
+        * fast path, no checking is done there for the ref dropping to zero.
+        * It just means that nobody else has it, and also that nobody else
+        * can find it as it's been removed from the lookup table prior to
+        * that. A ref dropping to zero as part of fast path node put cannot
+        * happen without the unregister side having already stored the node
+        * in ctx->rsrc_free.
+        */
+       if (!--node->refs) {
+               io_free_rsrc_node(node);
+               return;
+       }
+
+       /*
+        * If the free list already has an entry for this node, then it was
+        * already stashed for cleanup. Just let the normal cleanup reap it.
+        */
+       if (xa_load(&ctx->rsrc_free, index))
+               return;
+
+       /* Slot was reserved at registration time */
+       ctx->rsrc_free_nr++;
+       if (xa_store(&ctx->rsrc_free, index, node, GFP_NOWAIT))
+               WARN_ON_ONCE(1);
+}
+
 struct io_rsrc_node *io_rsrc_node_alloc(struct io_ring_ctx *ctx, int type)
 {
        struct io_rsrc_node *node;
 
        node = kzalloc(sizeof(*node), GFP_KERNEL);
        if (node) {
-               node->ctx_ptr = (unsigned long) ctx | type;
-               node->refs = 1;
+               unsigned long index = (unsigned long) node;
+
+               if (!xa_reserve(&ctx->rsrc_free, index, GFP_KERNEL)) {
+                       node->ctx_ptr = (unsigned long) ctx | type;
+                       node->refs = 1;
+                       return node;
+               }
+               kfree(node);
+               node = NULL;
        }
        return node;
 }
@@ -134,10 +187,8 @@ __cold void io_rsrc_data_free(struct io_rsrc_data *data)
 {
        if (!data->nr)
                return;
-       while (data->nr--) {
-               if (data->nodes[data->nr])
-                       io_put_rsrc_node(data->nodes[data->nr]);
-       }
+       while (data->nr--)
+               io_reset_rsrc_node(data, data->nr);
        kvfree(data->nodes);
        data->nodes = NULL;
        data->nr = 0;
@@ -465,6 +516,8 @@ void io_free_rsrc_node(struct io_rsrc_node *node)
                break;
        }
 
+       if (xa_erase(&ctx->rsrc_free, (unsigned long) node))
+               ctx->rsrc_free_nr--;
        kfree(node);
 }
 
@@ -475,6 +528,7 @@ int io_sqe_files_unregister(struct io_ring_ctx *ctx)
 
        io_free_file_tables(&ctx->file_table);
        io_file_table_set_alloc_range(ctx, 0, 0);
+       io_reap_rsrc_nodes(ctx);
        return 0;
 }
 
@@ -552,6 +606,7 @@ int io_sqe_buffers_unregister(struct io_ring_ctx *ctx)
        if (!ctx->buf_table.nr)
                return -ENXIO;
        io_rsrc_data_free(&ctx->buf_table);
+       io_reap_rsrc_nodes(ctx);
        return 0;
 }
 
@@ -788,7 +843,7 @@ done:
        if (ret) {
                kvfree(imu);
                if (node)
-                       io_put_rsrc_node(node);
+                       io_release_rsrc_node(node);
                node = ERR_PTR(ret);
        }
        kvfree(pages);
@@ -1038,8 +1093,8 @@ static int io_clone_buffers(struct io_ring_ctx *ctx, struct io_ring_ctx *src_ctx
 out_put_free:
        i = data.nr;
        while (i--) {
-               io_buffer_unmap(src_ctx, data.nodes[i]);
-               kfree(data.nodes[i]);
+               if (data.nodes[i])
+                       io_free_rsrc_node(data.nodes[i]);
        }
 out_unlock:
        io_rsrc_data_free(&data);
index bc3a863b14bb97f7d6fdba5ae3b283840765b197..10e06e2272706999f944190c99c15ce3dc2abdc4 100644 (file)
@@ -45,6 +45,8 @@ struct io_imu_folio_data {
 };
 
 struct io_rsrc_node *io_rsrc_node_alloc(struct io_ring_ctx *ctx, int type);
+void io_release_rsrc_node(struct io_rsrc_node *node);
+void __io_reap_rsrc_nodes(struct io_ring_ctx *ctx);
 void io_free_rsrc_node(struct io_rsrc_node *node);
 void io_rsrc_data_free(struct io_rsrc_data *data);
 int io_rsrc_data_alloc(struct io_rsrc_data *data, unsigned nr);
@@ -76,10 +78,20 @@ static inline struct io_rsrc_node *io_rsrc_node_lookup(struct io_rsrc_data *data
        return NULL;
 }
 
+static inline void io_reap_rsrc_nodes(struct io_ring_ctx *ctx)
+{
+       if (ctx->rsrc_free_nr)
+               __io_reap_rsrc_nodes(ctx);
+}
+
+/*
+ * Reaping done at unregistration/removal time, hence no checking needed
+ * here - just drop the held reference.
+ */
 static inline void io_put_rsrc_node(struct io_rsrc_node *node)
 {
-       if (node && !--node->refs)
-               io_free_rsrc_node(node);
+       if (node)
+               node->refs--;
 }
 
 static inline bool io_reset_rsrc_node(struct io_rsrc_data *data, int index)
@@ -88,7 +100,7 @@ static inline bool io_reset_rsrc_node(struct io_rsrc_data *data, int index)
 
        if (!node)
                return false;
-       io_put_rsrc_node(node);
+       io_release_rsrc_node(node);
        data->nodes[index] = NULL;
        return true;
 }