#define IO_COMPL_BATCH 32
#define IO_REQ_ALLOC_BATCH 8
-#define IO_LOCAL_TW_DEFAULT_MAX 20
+#define IO_LOCAL_TW_DEFAULT_MAX 2000
struct io_defer_entry {
struct list_head list;
tctx_task_work_run(tctx);
}
+#ifdef CONFIG_64BIT
+#ifdef system_has_cmpxchg128
+#define wq_has_cmpxchg() system_has_cmpxchg128()
+#define wq_cmpxchg_list try_cmpxchg128
+#endif
+#else
+#ifdef system_has_cmpxchg64
+#define wq_has_cmpxchg() system_has_cmpxchg64()
+#define wq_cmpxchg_list try_cmpxchg64
+#endif
+#endif
+
+#ifndef wq_has_cmpxchg
+#define wq_has_cmpxchg() (0)
+#endif
+
+static void wq_ll_add_tail(struct io_wq_work_node *n, struct io_wq_work_list *l)
+{
+ n->next = NULL;
+ do {
+ struct io_wq_work_list new = {
+ .first = READ_ONCE(l->first) ?: n,
+ .last = n
+ };
+ struct io_wq_work_list old = {
+ .first = READ_ONCE(l->first),
+ .last = READ_ONCE(l->last)
+ };
+ struct io_wq_work_node *old_last = READ_ONCE(l->last);
+
+ if (wq_cmpxchg_list(&l->list_ptr, &old.list_ptr, new.list_ptr)) {
+ if (old_last)
+ WRITE_ONCE(old_last->next, n);
+ break;
+ }
+ } while (1);
+}
+
+static int wq_list_add_tail_ll(struct io_ring_ctx *ctx, struct io_kiocb *req)
+{
+ unsigned int nr_tw_prev;
+ unsigned long flags;
+
+ if (wq_has_cmpxchg()) {
+ wq_ll_add_tail(&req->io_task_work.node, &ctx->work_list);
+ return 0;
+ }
+
+ spin_lock_irqsave(&ctx->work_lock, flags);
+ wq_list_add_tail(&req->io_task_work.node, &ctx->work_list);
+ nr_tw_prev = ctx->work_items++;
+ spin_unlock_irqrestore(&ctx->work_lock, flags);
+ return nr_tw_prev;
+}
+
+static void wq_ll_delete_all(struct io_wq_work_list *l,
+ struct io_wq_work_list *dst)
+{
+ static struct io_wq_work_list new;
+
+ guard(rcu)();
+ do {
+ dst->first = READ_ONCE(l->first);
+ dst->last = READ_ONCE(l->last);
+
+ if (!wq_cmpxchg_list(&l->list_ptr, &dst->list_ptr, new.list_ptr))
+ continue;
+ if (!dst->last)
+ break;
+ if (!READ_ONCE(dst->last->next))
+ break;
+ } while (1);
+}
+
+static void wq_list_delete_all_ll(struct io_ring_ctx *ctx,
+ struct io_wq_work_list *dst)
+{
+ if (wq_has_cmpxchg()) {
+ WRITE_ONCE(ctx->work_items, 0);
+ wq_ll_delete_all(&ctx->work_list, dst);
+ /* cmpxchg() provides the necessary memory barrier */
+ return;
+ }
+
+ spin_lock_irq(&ctx->work_lock);
+ *dst = ctx->work_list;
+ INIT_WQ_LIST(&ctx->work_list);
+ ctx->work_items = 0;
+ spin_unlock_irq(&ctx->work_lock);
+
+ /*
+ * We need a barrier after unlock, which pairs with the barrier
+ * in set_current_state() on the io_cqring_wait() side. It's used
+ * to ensure that either we see updated ->cq_wait_nr, or waiters
+ * going to sleep will observe the work added to the list, which
+ * is similar to the wait/wake task state sync.
+ */
+ smp_mb();
+}
+
static inline void io_req_local_work_add(struct io_kiocb *req,
struct io_ring_ctx *ctx,
unsigned tw_flags)
{
unsigned nr_tw, nr_tw_prev, nr_wait;
- unsigned long flags;
/* See comment above IO_CQ_WAKE_INIT */
BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
- spin_lock_irqsave(&ctx->work_lock, flags);
- wq_list_add_tail(&req->io_task_work.node, &ctx->work_list);
- nr_tw_prev = ctx->work_items++;
- spin_unlock_irqrestore(&ctx->work_lock, flags);
+ guard(rcu)();
+
+ nr_tw_prev = wq_list_add_tail_ll(ctx, req);
nr_tw = nr_tw_prev + 1;
if (!(tw_flags & IOU_F_TWQ_LAZY_WAKE))
io_eventfd_signal(ctx);
}
- guard(rcu)();
-
- /*
- * We need a barrier after unlock, which pairs with the barrier
- * in set_current_state() on the io_cqring_wait() side. It's used
- * to ensure that either we see updated ->cq_wait_nr, or waiters
- * going to sleep will observe the work added to the list, which
- * is similar to the wait/wake task state sync.
- */
- smp_mb();
nr_wait = atomic_read(&ctx->cq_wait_nr);
/* not enough or no one is waiting */
if (nr_tw < nr_wait)
static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
int min_events)
{
- struct io_wq_work_node *node, *tail;
+ struct io_wq_work_node *node;
+ struct io_wq_work_list list;
int ret, limit, nitems;
unsigned int loops = 0;
ret = 0;
limit = max(IO_LOCAL_TW_DEFAULT_MAX, min_events);
again:
- spin_lock_irq(&ctx->work_lock);
- node = ctx->work_list.first;
- tail = ctx->work_list.last;
- nitems = ctx->work_items;
- INIT_WQ_LIST(&ctx->work_list);
- ctx->work_items = 0;
- spin_unlock_irq(&ctx->work_lock);
-
+ wq_list_delete_all_ll(ctx, &list);
+ node = list.first;
+ nitems = 0;
while (node) {
struct io_kiocb *req = container_of(node, struct io_kiocb,
io_task_work.node);
INDIRECT_CALL_2(req->io_task_work.func,
io_poll_task_func, io_req_rw_complete,
req, ts);
- nitems--;
- if (++ret >= limit)
+ if (++nitems >= limit)
break;
}
+ ret += nitems;
if (unlikely(node)) {
+ WARN_ON_ONCE(1);
+#if 0
spin_lock_irq(&ctx->work_lock);
tail->next = ctx->work_list.first;
ctx->work_list.first = node;
ctx->work_list.last = tail;
ctx->work_items += nitems;
spin_unlock_irq(&ctx->work_lock);
+#endif
goto retry_done;
}