static __cold void io_fallback_req_func(struct work_struct *work)
{
struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
- fallback_work.work);
- struct llist_node *node = llist_del_all(&ctx->fallback_llist);
- struct io_kiocb *req, *tmp;
+ fallback_work);
+ struct io_wq_work_node *node;
struct io_tw_state ts = {};
+ struct io_wq_work_list list;
+
+ spin_lock_irq(&ctx->fallback_lock);
+ list = ctx->fallback_list;
+ INIT_WQ_LIST(&ctx->fallback_list);
+ spin_unlock_irq(&ctx->fallback_lock);
percpu_ref_get(&ctx->refs);
mutex_lock(&ctx->uring_lock);
- llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
+ node = list.first;
+ while (node) {
+ struct io_kiocb *req;
+
+ req = container_of(node, struct io_kiocb, io_task_work.node);
+ node = node->next;
req->io_task_work.func(req, ts);
+ }
io_submit_flush_completions(ctx);
mutex_unlock(&ctx->uring_lock);
percpu_ref_put(&ctx->refs);
#ifdef CONFIG_FUTEX
INIT_HLIST_HEAD(&ctx->futex_list);
#endif
- INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func);
+ INIT_WORK(&ctx->fallback_work, io_fallback_req_func);
+ INIT_WQ_LIST(&ctx->fallback_list);
+ spin_lock_init(&ctx->fallback_lock);
INIT_WQ_LIST(&ctx->submit_state.compl_reqs);
INIT_HLIST_HEAD(&ctx->cancelable_uring_cmd);
io_napi_init(ctx);
* If more entries than max_entries are available, stop processing once this
* is reached and return the rest of the list.
*/
-struct llist_node *io_handle_tw_list(struct llist_node *node,
- unsigned int *count,
- unsigned int max_entries)
+struct io_wq_work_node *io_handle_tw_list(struct io_wq_work_node *node,
+ unsigned int *count,
+ unsigned int max_entries)
{
struct io_ring_ctx *ctx = NULL;
struct io_tw_state ts = { };
do {
- struct llist_node *next = node->next;
+ struct io_wq_work_node *next = node->next;
struct io_kiocb *req = container_of(node, struct io_kiocb,
io_task_work.node);
return node;
}
-static __cold void ____io_fallback_tw(struct io_kiocb *req, bool sync,
- struct io_ring_ctx **last_ctx)
+static __cold void __io_fallback_schedule(struct io_ring_ctx *ctx,
+ struct io_wq_work_list *list,
+ bool sync)
{
- if (*last_ctx != req->ctx) {
- if (*last_ctx) {
- if (sync)
- flush_delayed_work(&(*last_ctx)->fallback_work);
- percpu_ref_put(&(*last_ctx)->refs);
- }
- *last_ctx = req->ctx;
- percpu_ref_get(&(*last_ctx)->refs);
- }
- if (llist_add(&req->io_task_work.node, &(*last_ctx)->fallback_llist))
- schedule_delayed_work(&(*last_ctx)->fallback_work, 1);
+ unsigned long flags;
+ bool kick_work;
+
+ spin_lock_irqsave(&ctx->fallback_lock, flags);
+ kick_work = !wq_list_splice_list(list, &ctx->fallback_list);
+ spin_unlock_irqrestore(&ctx->fallback_lock, flags);
+ if (kick_work)
+ schedule_work(&ctx->fallback_work);
+
+ if (sync)
+ flush_work(&ctx->fallback_work);
+ percpu_ref_put(&ctx->refs);
}
-static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
+static __cold void __io_fallback_tw(struct io_wq_work_list *list,
+ spinlock_t *lock, bool sync)
{
+ struct io_wq_work_list local_list, ctx_list;
struct io_ring_ctx *last_ctx = NULL;
+ struct io_wq_work_node *node;
struct io_kiocb *req;
+ unsigned long flags;
+ spin_lock_irqsave(lock, flags);
+ local_list = *list;
+ INIT_WQ_LIST(list);
+ spin_unlock_irqrestore(lock, flags);
+
+ INIT_WQ_LIST(&ctx_list);
+ node = local_list.first;
while (node) {
+ struct io_wq_work_node *next = node->next;
+
req = container_of(node, struct io_kiocb, io_task_work.node);
node = node->next;
- ____io_fallback_tw(req, sync, &last_ctx);
+ if (last_ctx != req->ctx) {
+ if (last_ctx)
+ __io_fallback_schedule(last_ctx, &ctx_list, sync);
+ last_ctx = req->ctx;
+ percpu_ref_get(&last_ctx->refs);
+ }
+ wq_list_add_tail(node, &ctx_list);
+ node = next;
}
- if (last_ctx) {
- if (sync)
- flush_delayed_work(&last_ctx->fallback_work);
- percpu_ref_put(&last_ctx->refs);
- }
+ if (last_ctx)
+ __io_fallback_schedule(last_ctx, &ctx_list, sync);
}
static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
{
- struct llist_node *node = llist_del_all(&tctx->task_list);
-
- __io_fallback_tw(node, sync);
+ __io_fallback_tw(&tctx->task_list, &tctx->task_lock, sync);
}
-struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
- unsigned int max_entries,
- unsigned int *count)
+struct io_wq_work_node *tctx_task_work_run(struct io_uring_task *tctx,
+ unsigned int max_entries,
+ unsigned int *count)
{
- struct llist_node *node;
+ struct io_wq_work_node *node;
if (unlikely(current->flags & PF_EXITING)) {
io_fallback_tw(tctx, true);
return NULL;
}
- node = llist_del_all(&tctx->task_list);
- if (node) {
- node = llist_reverse_order(node);
+ if (!READ_ONCE(tctx->task_list.first))
+ return NULL;
+
+ spin_lock_irq(&tctx->task_lock);
+ node = tctx->task_list.first;
+ INIT_WQ_LIST(&tctx->task_list);
+ spin_unlock_irq(&tctx->task_lock);
+
+ if (node)
node = io_handle_tw_list(node, count, max_entries);
- }
/* relaxed read is enough as only the task itself sets ->in_cancel */
if (unlikely(atomic_read(&tctx->in_cancel)))
void tctx_task_work(struct callback_head *cb)
{
struct io_uring_task *tctx;
- struct llist_node *ret;
unsigned int count = 0;
tctx = container_of(cb, struct io_uring_task, task_work);
- ret = tctx_task_work_run(tctx, UINT_MAX, &count);
- /* can't happen */
- WARN_ON_ONCE(ret);
+ if (tctx_task_work_run(tctx, UINT_MAX, &count))
+ WARN_ON_ONCE(1);
}
static void io_req_local_work_add(struct io_kiocb *req, unsigned tw_flags)
guard(rcu)();
spin_lock_irqsave(&ctx->work_lock, flags);
- wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list);
+ wq_list_add_tail(&req->io_task_work.node, &ctx->work_list);
nr_tw_prev = ctx->work_items++;
spin_unlock_irqrestore(&ctx->work_lock, flags);
{
struct io_uring_task *tctx = req->tctx;
struct io_ring_ctx *ctx = req->ctx;
+ unsigned long flags;
+ bool was_empty;
+
+ spin_lock_irqsave(&tctx->task_lock, flags);
+ was_empty = tctx->task_list.first == NULL;
+ wq_list_add_tail(&req->io_task_work.node, &tctx->task_list);
+ spin_unlock_irqrestore(&tctx->task_lock, flags);
/* task_work already pending, we're done */
- if (!llist_add(&req->io_task_work.node, &tctx->task_list))
+ if (!was_empty)
return;
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
{
- struct io_ring_ctx *last_ctx = NULL;
- struct io_wq_work_node *node;
- unsigned long flags;
-
- spin_lock_irqsave(&ctx->work_lock, flags);
- node = ctx->work_list.first;
- INIT_WQ_LIST(&ctx->work_list);
- ctx->work_items = 0;
- spin_unlock_irqrestore(&ctx->work_lock, flags);
-
- while (node) {
- struct io_kiocb *req;
-
- req = container_of(node, struct io_kiocb, io_task_work.work_node);
- node = node->next;
- ____io_fallback_tw(req, false, &last_ctx);
- }
- if (last_ctx) {
- flush_delayed_work(&last_ctx->fallback_work);
- percpu_ref_put(&last_ctx->refs);
- }
+ /*
+ * __io_fallback_tw() handles lists that can have multiple
+ * rings in it, which isn't the case here. But it'll work just
+ * fine, so use it anyway rather than have a special case for
+ * just a single ctx.
+ */
+ __io_fallback_tw(&ctx->work_list, &ctx->work_lock, false);
}
static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
while (node) {
struct io_kiocb *req = container_of(node, struct io_kiocb,
- io_task_work.work_node);
+ io_task_work.node);
node = node->next;
INDIRECT_CALL_2(req->io_task_work.func,
io_poll_task_func, io_req_rw_complete,
io_unregister_personality(ctx, index);
mutex_unlock(&ctx->uring_lock);
- flush_delayed_work(&ctx->fallback_work);
+ flush_work(&ctx->fallback_work);
INIT_WORK(&ctx->exit_work, io_ring_exit_work);
/*
if (tctx)
ret |= io_run_task_work() > 0;
else
- ret |= flush_delayed_work(&ctx->fallback_work);
+ ret |= flush_work(&ctx->fallback_work);
return ret;
}