From 8aa80d684650fb54177f9a9b91a4ed80d567fb1e Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Sun, 3 Nov 2024 16:59:33 -0700 Subject: [PATCH] io_uring/rsrc: move node refs == 0 checking outside of hot path One thing that is slightly suboptimal on the current resource node handling, is that putting a node neccesitates decrementing the ref count AND checking if it hit zero. While this isn't an atomic operation (as we're under the ctx lock), it does mean the decrement cannot be pipelined. Move the dead node checking to unregistration time. The holder of a node will still decrement the reference count, but it need not check if it hit zero. It will, by definition, never hit zero until it gets unregistered, as the table itself holds a reference. At unregistration time, if the node ref count isn't zero when the table reference is dropped, then its known that someone else holds a reference to it and will put it at some point. For this situation, store an entry for later reaping in ctx->rsrc_free. Whenever a batch of nodes is flushed, whether ctx->rsrc_free is empty or not is checked, and previously released nodes will be freed. With that, the overhead of putting a node from the hot path is reduced quite a bit, as everything pipelines nicely and doesn't stall on bringing in the node cacheline. Signed-off-by: Jens Axboe --- include/linux/io_uring_types.h | 7 ++++ io_uring/io_uring.c | 8 ++++ io_uring/rsrc.c | 73 +++++++++++++++++++++++++++++----- io_uring/rsrc.h | 18 +++++++-- 4 files changed, 94 insertions(+), 12 deletions(-) diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h index ad5001102c86..e458ebed3ba0 100644 --- a/include/linux/io_uring_types.h +++ b/include/linux/io_uring_types.h @@ -320,6 +320,13 @@ struct io_ring_ctx { unsigned cq_entries; struct io_ev_fd __rcu *io_ev_fd; unsigned cq_extra; + + /* + * Deferred free of io_rsrc_node entries that were busy + * at the time of unregistration. + */ + unsigned int rsrc_free_nr; + struct xarray rsrc_free; } ____cacheline_aligned_in_smp; /* diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index f34fa1ead2cf..efbc3b5956b9 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -293,6 +293,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) return NULL; xa_init(&ctx->io_bl_xa); + xa_init(&ctx->rsrc_free); /* * Use 5 bits less than the max cq entries, that should give us around @@ -366,6 +367,7 @@ err: io_futex_cache_free(ctx); kvfree(ctx->cancel_table.hbs); xa_destroy(&ctx->io_bl_xa); + xa_destroy(&ctx->rsrc_free); kfree(ctx); return NULL; } @@ -1562,6 +1564,8 @@ static void io_free_batch_list(struct io_ring_ctx *ctx, node = req->comp_list.next; io_req_add_to_cache(req, ctx); } while (node); + + io_reap_rsrc_nodes(ctx); } void __io_submit_flush_completions(struct io_ring_ctx *ctx) @@ -3009,6 +3013,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx) io_futex_cache_free(ctx); io_destroy_buffers(ctx); io_unregister_cqwait_reg(ctx); + io_reap_rsrc_nodes(ctx); mutex_unlock(&ctx->uring_lock); if (ctx->sq_creds) put_cred(ctx->sq_creds); @@ -3034,6 +3039,9 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx) io_napi_free(ctx); kvfree(ctx->cancel_table.hbs); xa_destroy(&ctx->io_bl_xa); + WARN_ON_ONCE(!xa_empty(&ctx->rsrc_free)); + WARN_ON_ONCE(ctx->rsrc_free_nr); + xa_destroy(&ctx->rsrc_free); kfree(ctx); } diff --git a/io_uring/rsrc.c b/io_uring/rsrc.c index 2fb1791d7255..2effdd3a7d35 100644 --- a/io_uring/rsrc.c +++ b/io_uring/rsrc.c @@ -118,14 +118,67 @@ static void io_buffer_unmap(struct io_ring_ctx *ctx, struct io_rsrc_node *node) } } +void __io_reap_rsrc_nodes(struct io_ring_ctx *ctx) +{ + struct io_rsrc_node *node; + unsigned long index; + + /* Find and reap nodes that hit zero refs */ + xa_for_each(&ctx->rsrc_free, index, node) { + if (!node->refs) + io_free_rsrc_node(node); + } +} + +void io_release_rsrc_node(struct io_rsrc_node *node) +{ + struct io_ring_ctx *ctx = io_rsrc_node_ctx(node); + unsigned long index = (unsigned long) node; + + /* + * If ref dropped to zero, nobody else has a reference to it and we + * can safely free it. If it's non-zero, then someone else has a + * reference and will put it at some point. As the put side is the + * fast path, no checking is done there for the ref dropping to zero. + * It just means that nobody else has it, and also that nobody else + * can find it as it's been removed from the lookup table prior to + * that. A ref dropping to zero as part of fast path node put cannot + * happen without the unregister side having already stored the node + * in ctx->rsrc_free. + */ + if (!--node->refs) { + io_free_rsrc_node(node); + return; + } + + /* + * If the free list already has an entry for this node, then it was + * already stashed for cleanup. Just let the normal cleanup reap it. + */ + if (xa_load(&ctx->rsrc_free, index)) + return; + + /* Slot was reserved at registration time */ + ctx->rsrc_free_nr++; + if (xa_store(&ctx->rsrc_free, index, node, GFP_NOWAIT)) + WARN_ON_ONCE(1); +} + struct io_rsrc_node *io_rsrc_node_alloc(struct io_ring_ctx *ctx, int type) { struct io_rsrc_node *node; node = kzalloc(sizeof(*node), GFP_KERNEL); if (node) { - node->ctx_ptr = (unsigned long) ctx | type; - node->refs = 1; + unsigned long index = (unsigned long) node; + + if (!xa_reserve(&ctx->rsrc_free, index, GFP_KERNEL)) { + node->ctx_ptr = (unsigned long) ctx | type; + node->refs = 1; + return node; + } + kfree(node); + node = NULL; } return node; } @@ -134,10 +187,8 @@ __cold void io_rsrc_data_free(struct io_rsrc_data *data) { if (!data->nr) return; - while (data->nr--) { - if (data->nodes[data->nr]) - io_put_rsrc_node(data->nodes[data->nr]); - } + while (data->nr--) + io_reset_rsrc_node(data, data->nr); kvfree(data->nodes); data->nodes = NULL; data->nr = 0; @@ -465,6 +516,8 @@ void io_free_rsrc_node(struct io_rsrc_node *node) break; } + if (xa_erase(&ctx->rsrc_free, (unsigned long) node)) + ctx->rsrc_free_nr--; kfree(node); } @@ -475,6 +528,7 @@ int io_sqe_files_unregister(struct io_ring_ctx *ctx) io_free_file_tables(&ctx->file_table); io_file_table_set_alloc_range(ctx, 0, 0); + io_reap_rsrc_nodes(ctx); return 0; } @@ -552,6 +606,7 @@ int io_sqe_buffers_unregister(struct io_ring_ctx *ctx) if (!ctx->buf_table.nr) return -ENXIO; io_rsrc_data_free(&ctx->buf_table); + io_reap_rsrc_nodes(ctx); return 0; } @@ -788,7 +843,7 @@ done: if (ret) { kvfree(imu); if (node) - io_put_rsrc_node(node); + io_release_rsrc_node(node); node = ERR_PTR(ret); } kvfree(pages); @@ -1038,8 +1093,8 @@ static int io_clone_buffers(struct io_ring_ctx *ctx, struct io_ring_ctx *src_ctx out_put_free: i = data.nr; while (i--) { - io_buffer_unmap(src_ctx, data.nodes[i]); - kfree(data.nodes[i]); + if (data.nodes[i]) + io_free_rsrc_node(data.nodes[i]); } out_unlock: io_rsrc_data_free(&data); diff --git a/io_uring/rsrc.h b/io_uring/rsrc.h index bc3a863b14bb..10e06e227270 100644 --- a/io_uring/rsrc.h +++ b/io_uring/rsrc.h @@ -45,6 +45,8 @@ struct io_imu_folio_data { }; struct io_rsrc_node *io_rsrc_node_alloc(struct io_ring_ctx *ctx, int type); +void io_release_rsrc_node(struct io_rsrc_node *node); +void __io_reap_rsrc_nodes(struct io_ring_ctx *ctx); void io_free_rsrc_node(struct io_rsrc_node *node); void io_rsrc_data_free(struct io_rsrc_data *data); int io_rsrc_data_alloc(struct io_rsrc_data *data, unsigned nr); @@ -76,10 +78,20 @@ static inline struct io_rsrc_node *io_rsrc_node_lookup(struct io_rsrc_data *data return NULL; } +static inline void io_reap_rsrc_nodes(struct io_ring_ctx *ctx) +{ + if (ctx->rsrc_free_nr) + __io_reap_rsrc_nodes(ctx); +} + +/* + * Reaping done at unregistration/removal time, hence no checking needed + * here - just drop the held reference. + */ static inline void io_put_rsrc_node(struct io_rsrc_node *node) { - if (node && !--node->refs) - io_free_rsrc_node(node); + if (node) + node->refs--; } static inline bool io_reset_rsrc_node(struct io_rsrc_data *data, int index) @@ -88,7 +100,7 @@ static inline bool io_reset_rsrc_node(struct io_rsrc_data *data, int index) if (!node) return false; - io_put_rsrc_node(node); + io_release_rsrc_node(node); data->nodes[index] = NULL; return true; } -- 2.25.1