Revert "eventpoll: Fix priority inversion problem"
authorLinus Torvalds <torvalds@linux-foundation.org>
Sat, 12 Jul 2025 00:10:32 +0000 (17:10 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Sat, 12 Jul 2025 00:10:32 +0000 (17:10 -0700)
This reverts commit 8c44dac8add7503c345c0f6c7962e4863b88ba42.

I haven't figured out what the actual bug in this commit is, but I did
spend a lot of time chasing it down and eventually succeeded in
bisecting it down to this.

For some reason, this eventpoll commit ends up causing delays and stuck
user space processes, but it only happens on one of my machines, and
only during early boot or during the flurry of initial activity when
logging in.

I must be triggering some very subtle timing issue, but once I figured
out the behavior pattern that made it reasonably reliable to trigger, it
did bisect right to this, and reverting the commit fixes the problem.

Of course, that was only after I had failed at bisecting it several
times, and had flailed around blaming both the drm people and the
netlink people for the odd problems.  The most obvious of which happened
at the time of the first graphical login (the most common symptom being
that some gnome app aborted due to a 30s timeout, often leading to the
whole session then failing if it was some critical component like
gnome-shell or similar).

Acked-by: Nam Cao <namcao@linutronix.de>
Cc: Frederic Weisbecker <frederic@kernel.org>
Cc: Valentin Schneider <vschneid@redhat.com>
Cc: Christian Brauner <brauner@kernel.org>
Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
fs/eventpoll.c

index 895256cd2786e27e8b404f35ac0e9c374ec584f5..0fbf5dfedb24e24f1d9e82e480d3b7b8f7dac021 100644 (file)
@@ -137,7 +137,13 @@ struct epitem {
        };
 
        /* List header used to link this structure to the eventpoll ready list */
-       struct llist_node rdllink;
+       struct list_head rdllink;
+
+       /*
+        * Works together "struct eventpoll"->ovflist in keeping the
+        * single linked chain of items.
+        */
+       struct epitem *next;
 
        /* The file descriptor information this item refers to */
        struct epoll_filefd ffd;
@@ -185,15 +191,22 @@ struct eventpoll {
        /* Wait queue used by file->poll() */
        wait_queue_head_t poll_wait;
 
-       /*
-        * List of ready file descriptors. Adding to this list is lockless. Items can be removed
-        * only with eventpoll::mtx
-        */
-       struct llist_head rdllist;
+       /* List of ready file descriptors */
+       struct list_head rdllist;
+
+       /* Lock which protects rdllist and ovflist */
+       rwlock_t lock;
 
        /* RB tree root used to store monitored fd structs */
        struct rb_root_cached rbr;
 
+       /*
+        * This is a single linked list that chains all the "struct epitem" that
+        * happened while transferring ready events to userspace w/out
+        * holding ->lock.
+        */
+       struct epitem *ovflist;
+
        /* wakeup_source used when ep_send_events or __ep_eventpoll_poll is running */
        struct wakeup_source *ws;
 
@@ -348,14 +361,10 @@ static inline int ep_cmp_ffd(struct epoll_filefd *p1,
                (p1->file < p2->file ? -1 : p1->fd - p2->fd));
 }
 
-/*
- * Add the item to its container eventpoll's rdllist; do nothing if the item is already on rdllist.
- */
-static void epitem_ready(struct epitem *epi)
+/* Tells us if the item is currently linked */
+static inline int ep_is_linked(struct epitem *epi)
 {
-       if (&epi->rdllink == cmpxchg(&epi->rdllink.next, &epi->rdllink, NULL))
-               llist_add(&epi->rdllink, &epi->ep->rdllist);
-
+       return !list_empty(&epi->rdllink);
 }
 
 static inline struct eppoll_entry *ep_pwq_from_wait(wait_queue_entry_t *p)
@@ -374,26 +383,13 @@ static inline struct epitem *ep_item_from_wait(wait_queue_entry_t *p)
  *
  * @ep: Pointer to the eventpoll context.
  *
- * Return: true if ready events might be available, false otherwise.
+ * Return: a value different than %zero if ready events are available,
+ *          or %zero otherwise.
  */
-static inline bool ep_events_available(struct eventpoll *ep)
+static inline int ep_events_available(struct eventpoll *ep)
 {
-       bool available;
-       int locked;
-
-       locked = mutex_trylock(&ep->mtx);
-       if (!locked) {
-               /*
-                * The lock held and someone might have removed all items while inspecting it. The
-                * llist_empty() check in this case is futile. Assume that something is enqueued and
-                * let ep_try_send_events() figure it out.
-                */
-               return true;
-       }
-
-       available = !llist_empty(&ep->rdllist);
-       mutex_unlock(&ep->mtx);
-       return available;
+       return !list_empty_careful(&ep->rdllist) ||
+               READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR;
 }
 
 #ifdef CONFIG_NET_RX_BUSY_POLL
@@ -728,6 +724,77 @@ static inline void ep_pm_stay_awake_rcu(struct epitem *epi)
        rcu_read_unlock();
 }
 
+
+/*
+ * ep->mutex needs to be held because we could be hit by
+ * eventpoll_release_file() and epoll_ctl().
+ */
+static void ep_start_scan(struct eventpoll *ep, struct list_head *txlist)
+{
+       /*
+        * Steal the ready list, and re-init the original one to the
+        * empty list. Also, set ep->ovflist to NULL so that events
+        * happening while looping w/out locks, are not lost. We cannot
+        * have the poll callback to queue directly on ep->rdllist,
+        * because we want the "sproc" callback to be able to do it
+        * in a lockless way.
+        */
+       lockdep_assert_irqs_enabled();
+       write_lock_irq(&ep->lock);
+       list_splice_init(&ep->rdllist, txlist);
+       WRITE_ONCE(ep->ovflist, NULL);
+       write_unlock_irq(&ep->lock);
+}
+
+static void ep_done_scan(struct eventpoll *ep,
+                        struct list_head *txlist)
+{
+       struct epitem *epi, *nepi;
+
+       write_lock_irq(&ep->lock);
+       /*
+        * During the time we spent inside the "sproc" callback, some
+        * other events might have been queued by the poll callback.
+        * We re-insert them inside the main ready-list here.
+        */
+       for (nepi = READ_ONCE(ep->ovflist); (epi = nepi) != NULL;
+            nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
+               /*
+                * We need to check if the item is already in the list.
+                * During the "sproc" callback execution time, items are
+                * queued into ->ovflist but the "txlist" might already
+                * contain them, and the list_splice() below takes care of them.
+                */
+               if (!ep_is_linked(epi)) {
+                       /*
+                        * ->ovflist is LIFO, so we have to reverse it in order
+                        * to keep in FIFO.
+                        */
+                       list_add(&epi->rdllink, &ep->rdllist);
+                       ep_pm_stay_awake(epi);
+               }
+       }
+       /*
+        * We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
+        * releasing the lock, events will be queued in the normal way inside
+        * ep->rdllist.
+        */
+       WRITE_ONCE(ep->ovflist, EP_UNACTIVE_PTR);
+
+       /*
+        * Quickly re-inject items left on "txlist".
+        */
+       list_splice(txlist, &ep->rdllist);
+       __pm_relax(ep->ws);
+
+       if (!list_empty(&ep->rdllist)) {
+               if (waitqueue_active(&ep->wq))
+                       wake_up(&ep->wq);
+       }
+
+       write_unlock_irq(&ep->lock);
+}
+
 static void ep_get(struct eventpoll *ep)
 {
        refcount_inc(&ep->refcount);
@@ -765,12 +832,10 @@ static void ep_free(struct eventpoll *ep)
 static bool __ep_remove(struct eventpoll *ep, struct epitem *epi, bool force)
 {
        struct file *file = epi->ffd.file;
-       struct llist_node *put_back_last;
        struct epitems_head *to_free;
        struct hlist_head *head;
-       LLIST_HEAD(put_back);
 
-       lockdep_assert_held(&ep->mtx);
+       lockdep_assert_irqs_enabled();
 
        /*
         * Removes poll wait queue hooks.
@@ -802,20 +867,10 @@ static bool __ep_remove(struct eventpoll *ep, struct epitem *epi, bool force)
 
        rb_erase_cached(&epi->rbn, &ep->rbr);
 
-       if (llist_on_list(&epi->rdllink)) {
-               put_back_last = NULL;
-               while (true) {
-                       struct llist_node *n = llist_del_first(&ep->rdllist);
-
-                       if (&epi->rdllink == n || WARN_ON(!n))
-                               break;
-                       if (!put_back_last)
-                               put_back_last = n;
-                       __llist_add(n, &put_back);
-               }
-               if (put_back_last)
-                       llist_add_batch(put_back.first, put_back_last, &ep->rdllist);
-       }
+       write_lock_irq(&ep->lock);
+       if (ep_is_linked(epi))
+               list_del_init(&epi->rdllink);
+       write_unlock_irq(&ep->lock);
 
        wakeup_source_unregister(ep_wakeup_source(epi));
        /*
@@ -917,9 +972,8 @@ static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt, int depth
 static __poll_t __ep_eventpoll_poll(struct file *file, poll_table *wait, int depth)
 {
        struct eventpoll *ep = file->private_data;
-       struct wakeup_source *ws;
-       struct llist_node *n;
-       struct epitem *epi;
+       LIST_HEAD(txlist);
+       struct epitem *epi, *tmp;
        poll_table pt;
        __poll_t res = 0;
 
@@ -933,39 +987,22 @@ static __poll_t __ep_eventpoll_poll(struct file *file, poll_table *wait, int dep
         * the ready list.
         */
        mutex_lock_nested(&ep->mtx, depth);
-       while (true) {
-               n = llist_del_first_init(&ep->rdllist);
-               if (!n)
-                       break;
-
-               epi = llist_entry(n, struct epitem, rdllink);
-
+       ep_start_scan(ep, &txlist);
+       list_for_each_entry_safe(epi, tmp, &txlist, rdllink) {
                if (ep_item_poll(epi, &pt, depth + 1)) {
                        res = EPOLLIN | EPOLLRDNORM;
-                       epitem_ready(epi);
                        break;
                } else {
                        /*
-                        * We need to activate ep before deactivating epi, to prevent autosuspend
-                        * just in case epi becomes active after ep_item_poll() above.
-                        *
-                        * This is similar to ep_send_events().
+                        * Item has been dropped into the ready list by the poll
+                        * callback, but it's not actually ready, as far as
+                        * caller requested events goes. We can remove it here.
                         */
-                       ws = ep_wakeup_source(epi);
-                       if (ws) {
-                               if (ws->active)
-                                       __pm_stay_awake(ep->ws);
-                               __pm_relax(ws);
-                       }
                        __pm_relax(ep_wakeup_source(epi));
-
-                       /* Just in case epi becomes active right before __pm_relax() */
-                       if (unlikely(ep_item_poll(epi, &pt, depth + 1)))
-                               ep_pm_stay_awake(epi);
-
-                       __pm_relax(ep->ws);
+                       list_del_init(&epi->rdllink);
                }
        }
+       ep_done_scan(ep, &txlist);
        mutex_unlock(&ep->mtx);
        return res;
 }
@@ -1114,10 +1151,12 @@ static int ep_alloc(struct eventpoll **pep)
                return -ENOMEM;
 
        mutex_init(&ep->mtx);
+       rwlock_init(&ep->lock);
        init_waitqueue_head(&ep->wq);
        init_waitqueue_head(&ep->poll_wait);
-       init_llist_head(&ep->rdllist);
+       INIT_LIST_HEAD(&ep->rdllist);
        ep->rbr = RB_ROOT_CACHED;
+       ep->ovflist = EP_UNACTIVE_PTR;
        ep->user = get_current_user();
        refcount_set(&ep->refcount, 1);
 
@@ -1199,11 +1238,94 @@ struct file *get_epoll_tfile_raw_ptr(struct file *file, int tfd,
 }
 #endif /* CONFIG_KCMP */
 
+/*
+ * Adds a new entry to the tail of the list in a lockless way, i.e.
+ * multiple CPUs are allowed to call this function concurrently.
+ *
+ * Beware: it is necessary to prevent any other modifications of the
+ *         existing list until all changes are completed, in other words
+ *         concurrent list_add_tail_lockless() calls should be protected
+ *         with a read lock, where write lock acts as a barrier which
+ *         makes sure all list_add_tail_lockless() calls are fully
+ *         completed.
+ *
+ *        Also an element can be locklessly added to the list only in one
+ *        direction i.e. either to the tail or to the head, otherwise
+ *        concurrent access will corrupt the list.
+ *
+ * Return: %false if element has been already added to the list, %true
+ * otherwise.
+ */
+static inline bool list_add_tail_lockless(struct list_head *new,
+                                         struct list_head *head)
+{
+       struct list_head *prev;
+
+       /*
+        * This is simple 'new->next = head' operation, but cmpxchg()
+        * is used in order to detect that same element has been just
+        * added to the list from another CPU: the winner observes
+        * new->next == new.
+        */
+       if (!try_cmpxchg(&new->next, &new, head))
+               return false;
+
+       /*
+        * Initially ->next of a new element must be updated with the head
+        * (we are inserting to the tail) and only then pointers are atomically
+        * exchanged.  XCHG guarantees memory ordering, thus ->next should be
+        * updated before pointers are actually swapped and pointers are
+        * swapped before prev->next is updated.
+        */
+
+       prev = xchg(&head->prev, new);
+
+       /*
+        * It is safe to modify prev->next and new->prev, because a new element
+        * is added only to the tail and new->next is updated before XCHG.
+        */
+
+       prev->next = new;
+       new->prev = prev;
+
+       return true;
+}
+
+/*
+ * Chains a new epi entry to the tail of the ep->ovflist in a lockless way,
+ * i.e. multiple CPUs are allowed to call this function concurrently.
+ *
+ * Return: %false if epi element has been already chained, %true otherwise.
+ */
+static inline bool chain_epi_lockless(struct epitem *epi)
+{
+       struct eventpoll *ep = epi->ep;
+
+       /* Fast preliminary check */
+       if (epi->next != EP_UNACTIVE_PTR)
+               return false;
+
+       /* Check that the same epi has not been just chained from another CPU */
+       if (cmpxchg(&epi->next, EP_UNACTIVE_PTR, NULL) != EP_UNACTIVE_PTR)
+               return false;
+
+       /* Atomically exchange tail */
+       epi->next = xchg(&ep->ovflist, epi);
+
+       return true;
+}
+
 /*
  * This is the callback that is passed to the wait queue wakeup
  * mechanism. It is called by the stored file descriptors when they
  * have events to report.
  *
+ * This callback takes a read lock in order not to contend with concurrent
+ * events from another file descriptor, thus all modifications to ->rdllist
+ * or ->ovflist are lockless.  Read lock is paired with the write lock from
+ * ep_start/done_scan(), which stops all list modifications and guarantees
+ * that lists state is seen correctly.
+ *
  * Another thing worth to mention is that ep_poll_callback() can be called
  * concurrently for the same @epi from different CPUs if poll table was inited
  * with several wait queues entries.  Plural wakeup from different CPUs of a
@@ -1213,11 +1335,15 @@ struct file *get_epoll_tfile_raw_ptr(struct file *file, int tfd,
  */
 static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
 {
+       int pwake = 0;
        struct epitem *epi = ep_item_from_wait(wait);
        struct eventpoll *ep = epi->ep;
        __poll_t pollflags = key_to_poll(key);
+       unsigned long flags;
        int ewake = 0;
 
+       read_lock_irqsave(&ep->lock, flags);
+
        ep_set_busy_poll_napi_id(epi);
 
        /*
@@ -1227,7 +1353,7 @@ static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, v
         * until the next EPOLL_CTL_MOD will be issued.
         */
        if (!(epi->event.events & ~EP_PRIVATE_BITS))
-               goto out;
+               goto out_unlock;
 
        /*
         * Check the events coming with the callback. At this stage, not
@@ -1236,10 +1362,22 @@ static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, v
         * test for "key" != NULL before the event match test.
         */
        if (pollflags && !(pollflags & epi->event.events))
-               goto out;
+               goto out_unlock;
 
-       ep_pm_stay_awake_rcu(epi);
-       epitem_ready(epi);
+       /*
+        * If we are transferring events to userspace, we can hold no locks
+        * (because we're accessing user memory, and because of linux f_op->poll()
+        * semantics). All the events that happen during that period of time are
+        * chained in ep->ovflist and requeued later on.
+        */
+       if (READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR) {
+               if (chain_epi_lockless(epi))
+                       ep_pm_stay_awake_rcu(epi);
+       } else if (!ep_is_linked(epi)) {
+               /* In the usual case, add event to ready list. */
+               if (list_add_tail_lockless(&epi->rdllink, &ep->rdllist))
+                       ep_pm_stay_awake_rcu(epi);
+       }
 
        /*
         * Wake up ( if active ) both the eventpoll wait list and the ->poll()
@@ -1268,9 +1406,15 @@ static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, v
                        wake_up(&ep->wq);
        }
        if (waitqueue_active(&ep->poll_wait))
+               pwake++;
+
+out_unlock:
+       read_unlock_irqrestore(&ep->lock, flags);
+
+       /* We have to call this outside the lock */
+       if (pwake)
                ep_poll_safewake(ep, epi, pollflags & EPOLL_URING_WAKE);
 
-out:
        if (!(epi->event.events & EPOLLEXCLUSIVE))
                ewake = 1;
 
@@ -1515,6 +1659,8 @@ static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
        if (is_file_epoll(tfile))
                tep = tfile->private_data;
 
+       lockdep_assert_irqs_enabled();
+
        if (unlikely(percpu_counter_compare(&ep->user->epoll_watches,
                                            max_user_watches) >= 0))
                return -ENOSPC;
@@ -1526,10 +1672,11 @@ static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
        }
 
        /* Item initialization follow here ... */
-       init_llist_node(&epi->rdllink);
+       INIT_LIST_HEAD(&epi->rdllink);
        epi->ep = ep;
        ep_set_ffd(&epi->ffd, tfile, fd);
        epi->event = *event;
+       epi->next = EP_UNACTIVE_PTR;
 
        if (tep)
                mutex_lock_nested(&tep->mtx, 1);
@@ -1596,13 +1743,16 @@ static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
                return -ENOMEM;
        }
 
+       /* We have to drop the new item inside our item list to keep track of it */
+       write_lock_irq(&ep->lock);
+
        /* record NAPI ID of new item if present */
        ep_set_busy_poll_napi_id(epi);
 
        /* If the file is already "ready" we drop it inside the ready list */
-       if (revents) {
+       if (revents && !ep_is_linked(epi)) {
+               list_add_tail(&epi->rdllink, &ep->rdllist);
                ep_pm_stay_awake(epi);
-               epitem_ready(epi);
 
                /* Notify waiting tasks that events are available */
                if (waitqueue_active(&ep->wq))
@@ -1611,6 +1761,8 @@ static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
                        pwake++;
        }
 
+       write_unlock_irq(&ep->lock);
+
        /* We have to call this outside the lock */
        if (pwake)
                ep_poll_safewake(ep, NULL, 0);
@@ -1625,8 +1777,11 @@ static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
 static int ep_modify(struct eventpoll *ep, struct epitem *epi,
                     const struct epoll_event *event)
 {
+       int pwake = 0;
        poll_table pt;
 
+       lockdep_assert_irqs_enabled();
+
        init_poll_funcptr(&pt, NULL);
 
        /*
@@ -1670,16 +1825,24 @@ static int ep_modify(struct eventpoll *ep, struct epitem *epi,
         * list, push it inside.
         */
        if (ep_item_poll(epi, &pt, 1)) {
-               ep_pm_stay_awake(epi);
-               epitem_ready(epi);
+               write_lock_irq(&ep->lock);
+               if (!ep_is_linked(epi)) {
+                       list_add_tail(&epi->rdllink, &ep->rdllist);
+                       ep_pm_stay_awake(epi);
 
-               /* Notify waiting tasks that events are available */
-               if (waitqueue_active(&ep->wq))
-                       wake_up(&ep->wq);
-               if (waitqueue_active(&ep->poll_wait))
-                       ep_poll_safewake(ep, NULL, 0);
+                       /* Notify waiting tasks that events are available */
+                       if (waitqueue_active(&ep->wq))
+                               wake_up(&ep->wq);
+                       if (waitqueue_active(&ep->poll_wait))
+                               pwake++;
+               }
+               write_unlock_irq(&ep->lock);
        }
 
+       /* We have to call this outside the lock */
+       if (pwake)
+               ep_poll_safewake(ep, NULL, 0);
+
        return 0;
 }
 
@@ -1687,7 +1850,7 @@ static int ep_send_events(struct eventpoll *ep,
                          struct epoll_event __user *events, int maxevents)
 {
        struct epitem *epi, *tmp;
-       LLIST_HEAD(txlist);
+       LIST_HEAD(txlist);
        poll_table pt;
        int res = 0;
 
@@ -1702,18 +1865,19 @@ static int ep_send_events(struct eventpoll *ep,
        init_poll_funcptr(&pt, NULL);
 
        mutex_lock(&ep->mtx);
+       ep_start_scan(ep, &txlist);
 
-       while (res < maxevents) {
+       /*
+        * We can loop without lock because we are passed a task private list.
+        * Items cannot vanish during the loop we are holding ep->mtx.
+        */
+       list_for_each_entry_safe(epi, tmp, &txlist, rdllink) {
                struct wakeup_source *ws;
-               struct llist_node *n;
                __poll_t revents;
 
-               n = llist_del_first(&ep->rdllist);
-               if (!n)
+               if (res >= maxevents)
                        break;
 
-               epi = llist_entry(n, struct epitem, rdllink);
-
                /*
                 * Activate ep->ws before deactivating epi->ws to prevent
                 * triggering auto-suspend here (in case we reactive epi->ws
@@ -1730,30 +1894,21 @@ static int ep_send_events(struct eventpoll *ep,
                        __pm_relax(ws);
                }
 
+               list_del_init(&epi->rdllink);
+
                /*
                 * If the event mask intersect the caller-requested one,
                 * deliver the event to userspace. Again, we are holding ep->mtx,
                 * so no operations coming from userspace can change the item.
                 */
                revents = ep_item_poll(epi, &pt, 1);
-               if (!revents) {
-                       init_llist_node(n);
-
-                       /*
-                        * Just in case epi becomes ready after ep_item_poll() above, but before
-                        * init_llist_node(). Make sure to add it to the ready list, otherwise an
-                        * event may be lost.
-                        */
-                       if (unlikely(ep_item_poll(epi, &pt, 1))) {
-                               ep_pm_stay_awake(epi);
-                               epitem_ready(epi);
-                       }
+               if (!revents)
                        continue;
-               }
 
                events = epoll_put_uevent(revents, epi->event.data, events);
                if (!events) {
-                       llist_add(&epi->rdllink, &ep->rdllist);
+                       list_add(&epi->rdllink, &txlist);
+                       ep_pm_stay_awake(epi);
                        if (!res)
                                res = -EFAULT;
                        break;
@@ -1761,31 +1916,25 @@ static int ep_send_events(struct eventpoll *ep,
                res++;
                if (epi->event.events & EPOLLONESHOT)
                        epi->event.events &= EP_PRIVATE_BITS;
-               __llist_add(n, &txlist);
-       }
-
-       llist_for_each_entry_safe(epi, tmp, txlist.first, rdllink) {
-               init_llist_node(&epi->rdllink);
-
-               if (!(epi->event.events & EPOLLET)) {
+               else if (!(epi->event.events & EPOLLET)) {
                        /*
-                        * If this file has been added with Level Trigger mode, we need to insert
-                        * back inside the ready list, so that the next call to epoll_wait() will
-                        * check again the events availability.
+                        * If this file has been added with Level
+                        * Trigger mode, we need to insert back inside
+                        * the ready list, so that the next call to
+                        * epoll_wait() will check again the events
+                        * availability. At this point, no one can insert
+                        * into ep->rdllist besides us. The epoll_ctl()
+                        * callers are locked out by
+                        * ep_send_events() holding "mtx" and the
+                        * poll callback will queue them in ep->ovflist.
                         */
+                       list_add_tail(&epi->rdllink, &ep->rdllist);
                        ep_pm_stay_awake(epi);
-                       epitem_ready(epi);
                }
        }
-
-       __pm_relax(ep->ws);
+       ep_done_scan(ep, &txlist);
        mutex_unlock(&ep->mtx);
 
-       if (!llist_empty(&ep->rdllist)) {
-               if (waitqueue_active(&ep->wq))
-                       wake_up(&ep->wq);
-       }
-
        return res;
 }
 
@@ -1878,6 +2027,8 @@ static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
        wait_queue_entry_t wait;
        ktime_t expires, *to = NULL;
 
+       lockdep_assert_irqs_enabled();
+
        if (timeout && (timeout->tv_sec | timeout->tv_nsec)) {
                slack = select_estimate_accuracy(timeout);
                to = &expires;
@@ -1937,15 +2088,54 @@ static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
                init_wait(&wait);
                wait.func = ep_autoremove_wake_function;
 
-               prepare_to_wait_exclusive(&ep->wq, &wait, TASK_INTERRUPTIBLE);
+               write_lock_irq(&ep->lock);
+               /*
+                * Barrierless variant, waitqueue_active() is called under
+                * the same lock on wakeup ep_poll_callback() side, so it
+                * is safe to avoid an explicit barrier.
+                */
+               __set_current_state(TASK_INTERRUPTIBLE);
 
-               if (!ep_events_available(ep))
+               /*
+                * Do the final check under the lock. ep_start/done_scan()
+                * plays with two lists (->rdllist and ->ovflist) and there
+                * is always a race when both lists are empty for short
+                * period of time although events are pending, so lock is
+                * important.
+                */
+               eavail = ep_events_available(ep);
+               if (!eavail)
+                       __add_wait_queue_exclusive(&ep->wq, &wait);
+
+               write_unlock_irq(&ep->lock);
+
+               if (!eavail)
                        timed_out = !ep_schedule_timeout(to) ||
                                !schedule_hrtimeout_range(to, slack,
                                                          HRTIMER_MODE_ABS);
+               __set_current_state(TASK_RUNNING);
 
-               finish_wait(&ep->wq, &wait);
-               eavail = ep_events_available(ep);
+               /*
+                * We were woken up, thus go and try to harvest some events.
+                * If timed out and still on the wait queue, recheck eavail
+                * carefully under lock, below.
+                */
+               eavail = 1;
+
+               if (!list_empty_careful(&wait.entry)) {
+                       write_lock_irq(&ep->lock);
+                       /*
+                        * If the thread timed out and is not on the wait queue,
+                        * it means that the thread was woken up after its
+                        * timeout expired before it could reacquire the lock.
+                        * Thus, when wait.entry is empty, it needs to harvest
+                        * events.
+                        */
+                       if (timed_out)
+                               eavail = list_empty(&wait.entry);
+                       __remove_wait_queue(&ep->wq, &wait);
+                       write_unlock_irq(&ep->lock);
+               }
        }
 }