Merge branch 'for-5.3' of git://git.kernel.org/pub/scm/linux/kernel/git/dennis/percpu
authorLinus Torvalds <torvalds@linux-foundation.org>
Sun, 14 Jul 2019 23:17:18 +0000 (16:17 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Sun, 14 Jul 2019 23:17:18 +0000 (16:17 -0700)
Pull percpu updates from Dennis Zhou:
 "This includes changes to let percpu_ref release the backing percpu
  memory earlier after it has been switched to atomic in cases where the
  percpu ref is not revived.

  This will help recycle percpu memory earlier in cases where the
  refcounts are pinned for prolonged periods of time"

* 'for-5.3' of git://git.kernel.org/pub/scm/linux/kernel/git/dennis/percpu:
  percpu_ref: release percpu memory early without PERCPU_REF_ALLOW_REINIT
  md: initialize percpu refcounters using PERCU_REF_ALLOW_REINIT
  io_uring: initialize percpu refcounters using PERCU_REF_ALLOW_REINIT
  percpu_ref: introduce PERCPU_REF_ALLOW_REINIT flag

1  2 
drivers/md/md.c
fs/io_uring.c
lib/percpu-refcount.c

diff --combined drivers/md/md.c
index a114b05e3db48372fb7ecf512c45d8902f2e1b3b,16e034747a86dd64e94274d7902b4fcd702c93e6..24638ccedce42b8d9fb346fc7a01ff8622f4c26d
@@@ -1,4 -1,3 +1,4 @@@
 +// SPDX-License-Identifier: GPL-2.0-or-later
  /*
     md.c : Multiple Devices driver for Linux
       Copyright (C) 1998, 1999, 2000 Ingo Molnar
     - persistent bitmap code
       Copyright (C) 2003-2004, Paul Clements, SteelEye Technology, Inc.
  
 -   This program is free software; you can redistribute it and/or modify
 -   it under the terms of the GNU General Public License as published by
 -   the Free Software Foundation; either version 2, or (at your option)
 -   any later version.
 -
 -   You should have received a copy of the GNU General Public License
 -   (for example /usr/src/linux/COPYING); if not, write to the Free
 -   Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  
     Errors, Warnings, etc.
     Please use:
@@@ -37,7 -44,6 +37,7 @@@
  
  */
  
 +#include <linux/sched/mm.h>
  #include <linux/sched/signal.h>
  #include <linux/kthread.h>
  #include <linux/blkdev.h>
@@@ -82,7 -88,8 +82,7 @@@ static struct kobj_type md_ktype
  
  struct md_cluster_operations *md_cluster_ops;
  EXPORT_SYMBOL(md_cluster_ops);
 -struct module *md_cluster_mod;
 -EXPORT_SYMBOL(md_cluster_mod);
 +static struct module *md_cluster_mod;
  
  static DECLARE_WAIT_QUEUE_HEAD(resync_wait);
  static struct workqueue_struct *md_wq;
@@@ -125,75 -132,22 +125,75 @@@ static inline int speed_max(struct mdde
                mddev->sync_speed_max : sysctl_speed_limit_max;
  }
  
 -static void * flush_info_alloc(gfp_t gfp_flags, void *data)
 -{
 -        return kzalloc(sizeof(struct flush_info), gfp_flags);
 -}
 -static void flush_info_free(void *flush_info, void *data)
 +static int rdev_init_wb(struct md_rdev *rdev)
  {
 -        kfree(flush_info);
 +      if (rdev->bdev->bd_queue->nr_hw_queues == 1)
 +              return 0;
 +
 +      spin_lock_init(&rdev->wb_list_lock);
 +      INIT_LIST_HEAD(&rdev->wb_list);
 +      init_waitqueue_head(&rdev->wb_io_wait);
 +      set_bit(WBCollisionCheck, &rdev->flags);
 +
 +      return 1;
  }
  
 -static void * flush_bio_alloc(gfp_t gfp_flags, void *data)
 +/*
 + * Create wb_info_pool if rdev is the first multi-queue device flaged
 + * with writemostly, also write-behind mode is enabled.
 + */
 +void mddev_create_wb_pool(struct mddev *mddev, struct md_rdev *rdev,
 +                        bool is_suspend)
  {
 -      return kzalloc(sizeof(struct flush_bio), gfp_flags);
 +      if (mddev->bitmap_info.max_write_behind == 0)
 +              return;
 +
 +      if (!test_bit(WriteMostly, &rdev->flags) || !rdev_init_wb(rdev))
 +              return;
 +
 +      if (mddev->wb_info_pool == NULL) {
 +              unsigned int noio_flag;
 +
 +              if (!is_suspend)
 +                      mddev_suspend(mddev);
 +              noio_flag = memalloc_noio_save();
 +              mddev->wb_info_pool = mempool_create_kmalloc_pool(NR_WB_INFOS,
 +                                                      sizeof(struct wb_info));
 +              memalloc_noio_restore(noio_flag);
 +              if (!mddev->wb_info_pool)
 +                      pr_err("can't alloc memory pool for writemostly\n");
 +              if (!is_suspend)
 +                      mddev_resume(mddev);
 +      }
  }
 -static void flush_bio_free(void *flush_bio, void *data)
 +EXPORT_SYMBOL_GPL(mddev_create_wb_pool);
 +
 +/*
 + * destroy wb_info_pool if rdev is the last device flaged with WBCollisionCheck.
 + */
 +static void mddev_destroy_wb_pool(struct mddev *mddev, struct md_rdev *rdev)
  {
 -      kfree(flush_bio);
 +      if (!test_and_clear_bit(WBCollisionCheck, &rdev->flags))
 +              return;
 +
 +      if (mddev->wb_info_pool) {
 +              struct md_rdev *temp;
 +              int num = 0;
 +
 +              /*
 +               * Check if other rdevs need wb_info_pool.
 +               */
 +              rdev_for_each(temp, mddev)
 +                      if (temp != rdev &&
 +                          test_bit(WBCollisionCheck, &temp->flags))
 +                              num++;
 +              if (!num) {
 +                      mddev_suspend(rdev->mddev);
 +                      mempool_destroy(mddev->wb_info_pool);
 +                      mddev->wb_info_pool = NULL;
 +                      mddev_resume(rdev->mddev);
 +              }
 +      }
  }
  
  static struct ctl_table_header *raid_table_header;
@@@ -469,31 -423,54 +469,31 @@@ static int md_congested(void *data, in
  /*
   * Generic flush handling for md
   */
 -static void submit_flushes(struct work_struct *ws)
 -{
 -      struct flush_info *fi = container_of(ws, struct flush_info, flush_work);
 -      struct mddev *mddev = fi->mddev;
 -      struct bio *bio = fi->bio;
 -
 -      bio->bi_opf &= ~REQ_PREFLUSH;
 -      md_handle_request(mddev, bio);
  
 -      mempool_free(fi, mddev->flush_pool);
 -}
 -
 -static void md_end_flush(struct bio *fbio)
 +static void md_end_flush(struct bio *bio)
  {
 -      struct flush_bio *fb = fbio->bi_private;
 -      struct md_rdev *rdev = fb->rdev;
 -      struct flush_info *fi = fb->fi;
 -      struct bio *bio = fi->bio;
 -      struct mddev *mddev = fi->mddev;
 +      struct md_rdev *rdev = bio->bi_private;
 +      struct mddev *mddev = rdev->mddev;
  
        rdev_dec_pending(rdev, mddev);
  
 -      if (atomic_dec_and_test(&fi->flush_pending)) {
 -              if (bio->bi_iter.bi_size == 0) {
 -                      /* an empty barrier - all done */
 -                      bio_endio(bio);
 -                      mempool_free(fi, mddev->flush_pool);
 -              } else {
 -                      INIT_WORK(&fi->flush_work, submit_flushes);
 -                      queue_work(md_wq, &fi->flush_work);
 -              }
 +      if (atomic_dec_and_test(&mddev->flush_pending)) {
 +              /* The pre-request flush has finished */
 +              queue_work(md_wq, &mddev->flush_work);
        }
 -
 -      mempool_free(fb, mddev->flush_bio_pool);
 -      bio_put(fbio);
 +      bio_put(bio);
  }
  
 -void md_flush_request(struct mddev *mddev, struct bio *bio)
 +static void md_submit_flush_data(struct work_struct *ws);
 +
 +static void submit_flushes(struct work_struct *ws)
  {
 +      struct mddev *mddev = container_of(ws, struct mddev, flush_work);
        struct md_rdev *rdev;
 -      struct flush_info *fi;
 -
 -      fi = mempool_alloc(mddev->flush_pool, GFP_NOIO);
 -
 -      fi->bio = bio;
 -      fi->mddev = mddev;
 -      atomic_set(&fi->flush_pending, 1);
  
 +      mddev->start_flush = ktime_get_boottime();
 +      INIT_WORK(&mddev->flush_work, md_submit_flush_data);
 +      atomic_set(&mddev->flush_pending, 1);
        rcu_read_lock();
        rdev_for_each_rcu(rdev, mddev)
                if (rdev->raid_disk >= 0 &&
                         * we reclaim rcu_read_lock
                         */
                        struct bio *bi;
 -                      struct flush_bio *fb;
                        atomic_inc(&rdev->nr_pending);
                        atomic_inc(&rdev->nr_pending);
                        rcu_read_unlock();
 -
 -                      fb = mempool_alloc(mddev->flush_bio_pool, GFP_NOIO);
 -                      fb->fi = fi;
 -                      fb->rdev = rdev;
 -
                        bi = bio_alloc_mddev(GFP_NOIO, 0, mddev);
 -                      bio_set_dev(bi, rdev->bdev);
                        bi->bi_end_io = md_end_flush;
 -                      bi->bi_private = fb;
 +                      bi->bi_private = rdev;
 +                      bio_set_dev(bi, rdev->bdev);
                        bi->bi_opf = REQ_OP_WRITE | REQ_PREFLUSH;
 -
 -                      atomic_inc(&fi->flush_pending);
 +                      atomic_inc(&mddev->flush_pending);
                        submit_bio(bi);
 -
                        rcu_read_lock();
                        rdev_dec_pending(rdev, mddev);
                }
        rcu_read_unlock();
 +      if (atomic_dec_and_test(&mddev->flush_pending))
 +              queue_work(md_wq, &mddev->flush_work);
 +}
  
 -      if (atomic_dec_and_test(&fi->flush_pending)) {
 -              if (bio->bi_iter.bi_size == 0) {
 +static void md_submit_flush_data(struct work_struct *ws)
 +{
 +      struct mddev *mddev = container_of(ws, struct mddev, flush_work);
 +      struct bio *bio = mddev->flush_bio;
 +
 +      /*
 +       * must reset flush_bio before calling into md_handle_request to avoid a
 +       * deadlock, because other bios passed md_handle_request suspend check
 +       * could wait for this and below md_handle_request could wait for those
 +       * bios because of suspend check
 +       */
 +      mddev->last_flush = mddev->start_flush;
 +      mddev->flush_bio = NULL;
 +      wake_up(&mddev->sb_wait);
 +
 +      if (bio->bi_iter.bi_size == 0) {
 +              /* an empty barrier - all done */
 +              bio_endio(bio);
 +      } else {
 +              bio->bi_opf &= ~REQ_PREFLUSH;
 +              md_handle_request(mddev, bio);
 +      }
 +}
 +
 +void md_flush_request(struct mddev *mddev, struct bio *bio)
 +{
 +      ktime_t start = ktime_get_boottime();
 +      spin_lock_irq(&mddev->lock);
 +      wait_event_lock_irq(mddev->sb_wait,
 +                          !mddev->flush_bio ||
 +                          ktime_after(mddev->last_flush, start),
 +                          mddev->lock);
 +      if (!ktime_after(mddev->last_flush, start)) {
 +              WARN_ON(mddev->flush_bio);
 +              mddev->flush_bio = bio;
 +              bio = NULL;
 +      }
 +      spin_unlock_irq(&mddev->lock);
 +
 +      if (!bio) {
 +              INIT_WORK(&mddev->flush_work, submit_flushes);
 +              queue_work(md_wq, &mddev->flush_work);
 +      } else {
 +              /* flush was performed for some other bio while we waited. */
 +              if (bio->bi_iter.bi_size == 0)
                        /* an empty barrier - all done */
                        bio_endio(bio);
 -                      mempool_free(fi, mddev->flush_pool);
 -              } else {
 -                      INIT_WORK(&fi->flush_work, submit_flushes);
 -                      queue_work(md_wq, &fi->flush_work);
 +              else {
 +                      bio->bi_opf &= ~REQ_PREFLUSH;
 +                      mddev->pers->make_request(mddev, bio);
                }
        }
  }
@@@ -620,7 -560,6 +620,7 @@@ void mddev_init(struct mddev *mddev
        atomic_set(&mddev->openers, 0);
        atomic_set(&mddev->active_io, 0);
        spin_lock_init(&mddev->lock);
 +      atomic_set(&mddev->flush_pending, 0);
        init_waitqueue_head(&mddev->sb_wait);
        init_waitqueue_head(&mddev->recovery_wait);
        mddev->reshape_position = MaxSector;
@@@ -1170,7 -1109,8 +1170,7 @@@ static int super_90_load(struct md_rde
         * (not needed for Linear and RAID0 as metadata doesn't
         * record this size)
         */
 -      if (IS_ENABLED(CONFIG_LBDAF) && (u64)rdev->sectors >= (2ULL << 32) &&
 -          sb->level >= 1)
 +      if ((u64)rdev->sectors >= (2ULL << 32) && sb->level >= 1)
                rdev->sectors = (sector_t)(2ULL << 32) - 2;
  
        if (rdev->sectors < ((sector_t)sb->size) * 2 && sb->level >= 1)
@@@ -1468,7 -1408,8 +1468,7 @@@ super_90_rdev_size_change(struct md_rde
        /* Limit to 4TB as metadata cannot record more than that.
         * 4TB == 2^32 KB, or 2*2^32 sectors.
         */
 -      if (IS_ENABLED(CONFIG_LBDAF) && (u64)num_sectors >= (2ULL << 32) &&
 -          rdev->mddev->level >= 1)
 +      if ((u64)num_sectors >= (2ULL << 32) && rdev->mddev->level >= 1)
                num_sectors = (sector_t)(2ULL << 32) - 2;
        do {
                md_super_write(rdev->mddev, rdev, rdev->sb_start, rdev->sb_size,
@@@ -1612,7 -1553,7 +1612,7 @@@ static int super_1_load(struct md_rdev 
                 */
                s32 offset;
                sector_t bb_sector;
 -              u64 *bbp;
 +              __le64 *bbp;
                int i;
                int sectors = le16_to_cpu(sb->bblog_size);
                if (sectors > (PAGE_SIZE / 512))
                if (!sync_page_io(rdev, bb_sector, sectors << 9,
                                  rdev->bb_page, REQ_OP_READ, 0, true))
                        return -EIO;
 -              bbp = (u64 *)page_address(rdev->bb_page);
 +              bbp = (__le64 *)page_address(rdev->bb_page);
                rdev->badblocks.shift = sb->bblog_shift;
                for (i = 0 ; i < (sectors << (9-3)) ; i++, bbp++) {
                        u64 bb = le64_to_cpu(*bbp);
@@@ -1936,7 -1877,7 +1936,7 @@@ static void super_1_sync(struct mddev *
                md_error(mddev, rdev);
        else {
                struct badblocks *bb = &rdev->badblocks;
 -              u64 *bbp = (u64 *)page_address(rdev->bb_page);
 +              __le64 *bbp = (__le64 *)page_address(rdev->bb_page);
                u64 *p = bb->page;
                sb->feature_map |= cpu_to_le32(MD_FEATURE_BAD_BLOCKS);
                if (bb->changed) {
@@@ -2282,9 -2223,6 +2282,9 @@@ static int bind_rdev_to_array(struct md
        rdev->mddev = mddev;
        pr_debug("md: bind<%s>\n", b);
  
 +      if (mddev->raid_disks)
 +              mddev_create_wb_pool(mddev, rdev, false);
 +
        if ((err = kobject_add(&rdev->kobj, &mddev->kobj, "dev-%s", b)))
                goto fail;
  
@@@ -2321,7 -2259,6 +2321,7 @@@ static void unbind_rdev_from_array(stru
        bd_unlink_disk_holder(rdev->bdev, rdev->mddev->gendisk);
        list_del_rcu(&rdev->same_set);
        pr_debug("md: unbind<%s>\n", bdevname(rdev->bdev,b));
 +      mddev_destroy_wb_pool(rdev->mddev, rdev);
        rdev->mddev = NULL;
        sysfs_remove_link(&rdev->kobj, "block");
        sysfs_put(rdev->sysfs_state);
@@@ -2834,10 -2771,8 +2834,10 @@@ state_store(struct md_rdev *rdev, cons
                }
        } else if (cmd_match(buf, "writemostly")) {
                set_bit(WriteMostly, &rdev->flags);
 +              mddev_create_wb_pool(rdev->mddev, rdev, false);
                err = 0;
        } else if (cmd_match(buf, "-writemostly")) {
 +              mddev_destroy_wb_pool(rdev->mddev, rdev);
                clear_bit(WriteMostly, &rdev->flags);
                err = 0;
        } else if (cmd_match(buf, "blocked")) {
                        err = 0;
                }
        } else if (cmd_match(buf, "re-add")) {
 -              if (test_bit(Faulty, &rdev->flags) && (rdev->raid_disk == -1) &&
 -                      rdev->saved_raid_disk >= 0) {
 +              if (!rdev->mddev->pers)
 +                      err = -EINVAL;
 +              else if (test_bit(Faulty, &rdev->flags) && (rdev->raid_disk == -1) &&
 +                              rdev->saved_raid_disk >= 0) {
                        /* clear_bit is performed _after_ all the devices
                         * have their local Faulty bit cleared. If any writes
                         * happen in the meantime in the local node, they
@@@ -3434,7 -3367,7 +3434,7 @@@ rdev_attr_show(struct kobject *kobj, st
        if (!entry->show)
                return -EIO;
        if (!rdev->mddev)
 -              return -EBUSY;
 +              return -ENODEV;
        return entry->show(rdev, page);
  }
  
@@@ -3451,10 -3384,10 +3451,10 @@@ rdev_attr_store(struct kobject *kobj, s
                return -EIO;
        if (!capable(CAP_SYS_ADMIN))
                return -EACCES;
 -      rv = mddev ? mddev_lock(mddev): -EBUSY;
 +      rv = mddev ? mddev_lock(mddev) : -ENODEV;
        if (!rv) {
                if (rdev->mddev == NULL)
 -                      rv = -EBUSY;
 +                      rv = -ENODEV;
                else
                        rv = entry->store(rdev, page, length);
                mddev_unlock(mddev);
@@@ -5316,7 -5249,8 +5316,8 @@@ int mddev_init_writes_pending(struct md
  {
        if (mddev->writes_pending.percpu_count_ptr)
                return 0;
-       if (percpu_ref_init(&mddev->writes_pending, no_op, 0, GFP_KERNEL) < 0)
+       if (percpu_ref_init(&mddev->writes_pending, no_op,
+                           PERCPU_REF_ALLOW_REINIT, GFP_KERNEL) < 0)
                return -ENOMEM;
        /* We want to start with the refcount at zero */
        percpu_ref_put(&mddev->writes_pending);
@@@ -5578,6 -5512,22 +5579,6 @@@ int md_run(struct mddev *mddev
                if (err)
                        return err;
        }
 -      if (mddev->flush_pool == NULL) {
 -              mddev->flush_pool = mempool_create(NR_FLUSH_INFOS, flush_info_alloc,
 -                                              flush_info_free, mddev);
 -              if (!mddev->flush_pool) {
 -                      err = -ENOMEM;
 -                      goto abort;
 -              }
 -      }
 -      if (mddev->flush_bio_pool == NULL) {
 -              mddev->flush_bio_pool = mempool_create(NR_FLUSH_BIOS, flush_bio_alloc,
 -                                              flush_bio_free, mddev);
 -              if (!mddev->flush_bio_pool) {
 -                      err = -ENOMEM;
 -                      goto abort;
 -              }
 -      }
  
        spin_lock(&pers_lock);
        pers = find_pers(mddev->level, mddev->clevel);
                        mddev->bitmap = bitmap;
  
        }
 -      if (err) {
 -              mddev_detach(mddev);
 -              if (mddev->private)
 -                      pers->free(mddev, mddev->private);
 -              mddev->private = NULL;
 -              module_put(pers->owner);
 -              md_bitmap_destroy(mddev);
 -              goto abort;
 +      if (err)
 +              goto bitmap_abort;
 +
 +      if (mddev->bitmap_info.max_write_behind > 0) {
 +              bool creat_pool = false;
 +
 +              rdev_for_each(rdev, mddev) {
 +                      if (test_bit(WriteMostly, &rdev->flags) &&
 +                          rdev_init_wb(rdev))
 +                              creat_pool = true;
 +              }
 +              if (creat_pool && mddev->wb_info_pool == NULL) {
 +                      mddev->wb_info_pool =
 +                              mempool_create_kmalloc_pool(NR_WB_INFOS,
 +                                                  sizeof(struct wb_info));
 +                      if (!mddev->wb_info_pool) {
 +                              err = -ENOMEM;
 +                              goto bitmap_abort;
 +                      }
 +              }
        }
 +
        if (mddev->queue) {
                bool nonrot = true;
  
        spin_unlock(&mddev->lock);
        rdev_for_each(rdev, mddev)
                if (rdev->raid_disk >= 0)
 -                      if (sysfs_link_rdev(mddev, rdev))
 -                              /* failure here is OK */;
 +                      sysfs_link_rdev(mddev, rdev); /* failure here is OK */
  
        if (mddev->degraded && !mddev->ro)
                /* This ensures that recovering status is reported immediately
        sysfs_notify(&mddev->kobj, NULL, "degraded");
        return 0;
  
 +bitmap_abort:
 +      mddev_detach(mddev);
 +      if (mddev->private)
 +              pers->free(mddev, mddev->private);
 +      mddev->private = NULL;
 +      module_put(pers->owner);
 +      md_bitmap_destroy(mddev);
  abort:
 -      mempool_destroy(mddev->flush_bio_pool);
 -      mddev->flush_bio_pool = NULL;
 -      mempool_destroy(mddev->flush_pool);
 -      mddev->flush_pool = NULL;
 -
 +      bioset_exit(&mddev->bio_set);
 +      bioset_exit(&mddev->sync_set);
        return err;
  }
  EXPORT_SYMBOL_GPL(md_run);
@@@ -5923,8 -5857,6 +5924,8 @@@ static void __md_stop_writes(struct mdd
                        mddev->in_sync = 1;
                md_update_sb(mddev, 1);
        }
 +      mempool_destroy(mddev->wb_info_pool);
 +      mddev->wb_info_pool = NULL;
  }
  
  void md_stop_writes(struct mddev *mddev)
@@@ -5963,6 -5895,14 +5964,6 @@@ static void __md_stop(struct mddev *mdd
                mddev->to_remove = &md_redundancy_group;
        module_put(pers->owner);
        clear_bit(MD_RECOVERY_FROZEN, &mddev->recovery);
 -      if (mddev->flush_bio_pool) {
 -              mempool_destroy(mddev->flush_bio_pool);
 -              mddev->flush_bio_pool = NULL;
 -      }
 -      if (mddev->flush_pool) {
 -              mempool_destroy(mddev->flush_pool);
 -              mddev->flush_pool = NULL;
 -      }
  }
  
  void md_stop(struct mddev *mddev)
@@@ -7706,9 -7646,9 +7707,9 @@@ static void status_unused(struct seq_fi
  static int status_resync(struct seq_file *seq, struct mddev *mddev)
  {
        sector_t max_sectors, resync, res;
 -      unsigned long dt, db;
 -      sector_t rt;
 -      int scale;
 +      unsigned long dt, db = 0;
 +      sector_t rt, curr_mark_cnt, resync_mark_cnt;
 +      int scale, recovery_active;
        unsigned int per_milli;
  
        if (test_bit(MD_RECOVERY_SYNC, &mddev->recovery) ||
         * db: blocks written from mark until now
         * rt: remaining time
         *
 -       * rt is a sector_t, so could be 32bit or 64bit.
 -       * So we divide before multiply in case it is 32bit and close
 -       * to the limit.
 -       * We scale the divisor (db) by 32 to avoid losing precision
 -       * near the end of resync when the number of remaining sectors
 -       * is close to 'db'.
 -       * We then divide rt by 32 after multiplying by db to compensate.
 -       * The '+1' avoids division by zero if db is very small.
 +       * rt is a sector_t, which is always 64bit now. We are keeping
 +       * the original algorithm, but it is not really necessary.
 +       *
 +       * Original algorithm:
 +       *   So we divide before multiply in case it is 32bit and close
 +       *   to the limit.
 +       *   We scale the divisor (db) by 32 to avoid losing precision
 +       *   near the end of resync when the number of remaining sectors
 +       *   is close to 'db'.
 +       *   We then divide rt by 32 after multiplying by db to compensate.
 +       *   The '+1' avoids division by zero if db is very small.
         */
        dt = ((jiffies - mddev->resync_mark) / HZ);
        if (!dt) dt++;
 -      db = (mddev->curr_mark_cnt - atomic_read(&mddev->recovery_active))
 -              - mddev->resync_mark_cnt;
 +
 +      curr_mark_cnt = mddev->curr_mark_cnt;
 +      recovery_active = atomic_read(&mddev->recovery_active);
 +      resync_mark_cnt = mddev->resync_mark_cnt;
 +
 +      if (curr_mark_cnt >= (recovery_active + resync_mark_cnt))
 +              db = curr_mark_cnt - (recovery_active + resync_mark_cnt);
  
        rt = max_sectors - resync;    /* number of remaining sectors */
 -      sector_div(rt, db/32+1);
 +      rt = div64_u64(rt, db/32+1);
        rt *= dt;
        rt >>= 5;
  
@@@ -8297,7 -8229,8 +8298,7 @@@ void md_do_sync(struct md_thread *threa
  {
        struct mddev *mddev = thread->mddev;
        struct mddev *mddev2;
 -      unsigned int currspeed = 0,
 -               window;
 +      unsigned int currspeed = 0, window;
        sector_t max_sectors,j, io_sectors, recovery_done;
        unsigned long mark[SYNC_MARKS];
        unsigned long update_time;
         * 0 == not engaged in resync at all
         * 2 == checking that there is no conflict with another sync
         * 1 == like 2, but have yielded to allow conflicting resync to
 -       *              commense
 +       *              commence
         * other == active in resync - this many blocks
         *
         * Before starting a resync we must have set curr_resync to
        /*
         * Tune reconstruction:
         */
 -      window = 32*(PAGE_SIZE/512);
 +      window = 32 * (PAGE_SIZE / 512);
        pr_debug("md: using %dk window, over a total of %lluk.\n",
                 window/2, (unsigned long long)max_sectors/2);
  
@@@ -9298,6 -9231,7 +9299,6 @@@ static void check_sb_changes(struct mdd
                                 * perform resync with the new activated disk */
                                set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
                                md_wakeup_thread(mddev->thread);
 -
                        }
                        /* device faulty
                         * We just want to do the minimum to mark the disk
                 * reshape is happening in the remote node, we need to
                 * update reshape_position and call start_reshape.
                 */
 -              mddev->reshape_position = sb->reshape_position;
 +              mddev->reshape_position = le64_to_cpu(sb->reshape_position);
                if (mddev->pers->update_reshape_pos)
                        mddev->pers->update_reshape_pos(mddev);
                if (mddev->pers->start_reshape)
diff --combined fs/io_uring.c
index 3fd884b4e0bec41b72406cb36a49ff05a5cde090,083c5dd95452086cb6ada5bcc9ec9f4b83db1e1c..d682049c07b2c0b6daa78bccaa0da81b2df4811b
@@@ -222,8 -222,6 +222,8 @@@ struct io_ring_ctx 
                unsigned                sq_mask;
                unsigned                sq_thread_idle;
                struct io_uring_sqe     *sq_sqes;
 +
 +              struct list_head        defer_list;
        } ____cacheline_aligned_in_smp;
  
        /* IO offload */
        struct task_struct      *sqo_thread;    /* if using sq thread polling */
        struct mm_struct        *sqo_mm;
        wait_queue_head_t       sqo_wait;
 -      unsigned                sqo_stop;
 +      struct completion       sqo_thread_started;
  
        struct {
                /* CQ ring */
                unsigned                cq_mask;
                struct wait_queue_head  cq_wait;
                struct fasync_struct    *cq_fasync;
 +              struct eventfd_ctx      *cq_ev_fd;
        } ____cacheline_aligned_in_smp;
  
        /*
@@@ -323,20 -320,15 +323,20 @@@ struct io_kiocb 
  
        struct io_ring_ctx      *ctx;
        struct list_head        list;
 +      struct list_head        link_list;
        unsigned int            flags;
        refcount_t              refs;
  #define REQ_F_NOWAIT          1       /* must not punt to workers */
  #define REQ_F_IOPOLL_COMPLETED        2       /* polled IO has completed */
  #define REQ_F_FIXED_FILE      4       /* ctx owns file */
  #define REQ_F_SEQ_PREV                8       /* sequential with previous */
 -#define REQ_F_PREPPED         16      /* prep already done */
 +#define REQ_F_IO_DRAIN                16      /* drain existing IO first */
 +#define REQ_F_IO_DRAINED      32      /* drain done */
 +#define REQ_F_LINK            64      /* linked sqes */
 +#define REQ_F_FAIL_LINK               128     /* fail rest of links */
        u64                     user_data;
 -      u64                     error;
 +      u32                     result;
 +      u32                     sequence;
  
        struct work_struct      work;
  };
@@@ -364,8 -356,6 +364,8 @@@ struct io_submit_state 
        unsigned int            ios_left;
  };
  
 +static void io_sq_wq_submit_work(struct work_struct *work);
 +
  static struct kmem_cache *req_cachep;
  
  static const struct file_operations io_uring_fops;
@@@ -399,7 -389,8 +399,8 @@@ static struct io_ring_ctx *io_ring_ctx_
        if (!ctx)
                return NULL;
  
-       if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free, 0, GFP_KERNEL)) {
+       if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
+                           PERCPU_REF_ALLOW_REINIT, GFP_KERNEL)) {
                kfree(ctx);
                return NULL;
        }
        ctx->flags = p->flags;
        init_waitqueue_head(&ctx->cq_wait);
        init_completion(&ctx->ctx_done);
 +      init_completion(&ctx->sqo_thread_started);
        mutex_init(&ctx->uring_lock);
        init_waitqueue_head(&ctx->wait);
        for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) {
        spin_lock_init(&ctx->completion_lock);
        INIT_LIST_HEAD(&ctx->poll_list);
        INIT_LIST_HEAD(&ctx->cancel_list);
 +      INIT_LIST_HEAD(&ctx->defer_list);
        return ctx;
  }
  
 -static void io_commit_cqring(struct io_ring_ctx *ctx)
 +static inline bool io_sequence_defer(struct io_ring_ctx *ctx,
 +                                   struct io_kiocb *req)
 +{
 +      if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN)
 +              return false;
 +
 +      return req->sequence > ctx->cached_cq_tail + ctx->sq_ring->dropped;
 +}
 +
 +static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
 +{
 +      struct io_kiocb *req;
 +
 +      if (list_empty(&ctx->defer_list))
 +              return NULL;
 +
 +      req = list_first_entry(&ctx->defer_list, struct io_kiocb, list);
 +      if (!io_sequence_defer(ctx, req)) {
 +              list_del_init(&req->list);
 +              return req;
 +      }
 +
 +      return NULL;
 +}
 +
 +static void __io_commit_cqring(struct io_ring_ctx *ctx)
  {
        struct io_cq_ring *ring = ctx->cq_ring;
  
        }
  }
  
 +static void io_commit_cqring(struct io_ring_ctx *ctx)
 +{
 +      struct io_kiocb *req;
 +
 +      __io_commit_cqring(ctx);
 +
 +      while ((req = io_get_deferred_req(ctx)) != NULL) {
 +              req->flags |= REQ_F_IO_DRAINED;
 +              queue_work(ctx->sqo_wq, &req->work);
 +      }
 +}
 +
  static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
  {
        struct io_cq_ring *ring = ctx->cq_ring;
  }
  
  static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
 -                               long res, unsigned ev_flags)
 +                               long res)
  {
        struct io_uring_cqe *cqe;
  
        if (cqe) {
                WRITE_ONCE(cqe->user_data, ki_user_data);
                WRITE_ONCE(cqe->res, res);
 -              WRITE_ONCE(cqe->flags, ev_flags);
 +              WRITE_ONCE(cqe->flags, 0);
        } else {
                unsigned overflow = READ_ONCE(ctx->cq_ring->overflow);
  
@@@ -520,17 -472,15 +521,17 @@@ static void io_cqring_ev_posted(struct 
                wake_up(&ctx->wait);
        if (waitqueue_active(&ctx->sqo_wait))
                wake_up(&ctx->sqo_wait);
 +      if (ctx->cq_ev_fd)
 +              eventfd_signal(ctx->cq_ev_fd, 1);
  }
  
  static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data,
 -                              long res, unsigned ev_flags)
 +                              long res)
  {
        unsigned long flags;
  
        spin_lock_irqsave(&ctx->completion_lock, flags);
 -      io_cqring_fill_event(ctx, user_data, res, ev_flags);
 +      io_cqring_fill_event(ctx, user_data, res);
        io_commit_cqring(ctx);
        spin_unlock_irqrestore(&ctx->completion_lock, flags);
  
@@@ -584,12 -534,10 +585,12 @@@ static struct io_kiocb *io_get_req(stru
                state->cur_req++;
        }
  
 +      req->file = NULL;
        req->ctx = ctx;
        req->flags = 0;
        /* one is dropped after submission, the other at completion */
        refcount_set(&req->refs, 2);
 +      req->result = 0;
        return req;
  out:
        io_ring_drop_ctx_refs(ctx, 1);
@@@ -605,7 -553,7 +606,7 @@@ static void io_free_req_many(struct io_
        }
  }
  
 -static void io_free_req(struct io_kiocb *req)
 +static void __io_free_req(struct io_kiocb *req)
  {
        if (req->file && !(req->flags & REQ_F_FIXED_FILE))
                fput(req->file);
        kmem_cache_free(req_cachep, req);
  }
  
 +static void io_req_link_next(struct io_kiocb *req)
 +{
 +      struct io_kiocb *nxt;
 +
 +      /*
 +       * The list should never be empty when we are called here. But could
 +       * potentially happen if the chain is messed up, check to be on the
 +       * safe side.
 +       */
 +      nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb, list);
 +      if (nxt) {
 +              list_del(&nxt->list);
 +              if (!list_empty(&req->link_list)) {
 +                      INIT_LIST_HEAD(&nxt->link_list);
 +                      list_splice(&req->link_list, &nxt->link_list);
 +                      nxt->flags |= REQ_F_LINK;
 +              }
 +
 +              INIT_WORK(&nxt->work, io_sq_wq_submit_work);
 +              queue_work(req->ctx->sqo_wq, &nxt->work);
 +      }
 +}
 +
 +/*
 + * Called if REQ_F_LINK is set, and we fail the head request
 + */
 +static void io_fail_links(struct io_kiocb *req)
 +{
 +      struct io_kiocb *link;
 +
 +      while (!list_empty(&req->link_list)) {
 +              link = list_first_entry(&req->link_list, struct io_kiocb, list);
 +              list_del(&link->list);
 +
 +              io_cqring_add_event(req->ctx, link->user_data, -ECANCELED);
 +              __io_free_req(link);
 +      }
 +}
 +
 +static void io_free_req(struct io_kiocb *req)
 +{
 +      /*
 +       * If LINK is set, we have dependent requests in this chain. If we
 +       * didn't fail this request, queue the first one up, moving any other
 +       * dependencies to the next request. In case of failure, fail the rest
 +       * of the chain.
 +       */
 +      if (req->flags & REQ_F_LINK) {
 +              if (req->flags & REQ_F_FAIL_LINK)
 +                      io_fail_links(req);
 +              else
 +                      io_req_link_next(req);
 +      }
 +
 +      __io_free_req(req);
 +}
 +
  static void io_put_req(struct io_kiocb *req)
  {
        if (refcount_dec_and_test(&req->refs))
@@@ -691,17 -582,16 +692,17 @@@ static void io_iopoll_complete(struct i
                req = list_first_entry(done, struct io_kiocb, list);
                list_del(&req->list);
  
 -              io_cqring_fill_event(ctx, req->user_data, req->error, 0);
 +              io_cqring_fill_event(ctx, req->user_data, req->result);
                (*nr_events)++;
  
                if (refcount_dec_and_test(&req->refs)) {
                        /* If we're not using fixed files, we have to pair the
                         * completion part with the file put. Use regular
                         * completions for those, only batch free for fixed
 -                       * file.
 +                       * file and non-linked commands.
                         */
 -                      if (req->flags & REQ_F_FIXED_FILE) {
 +                      if ((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) ==
 +                          REQ_F_FIXED_FILE) {
                                reqs[to_free++] = req;
                                if (to_free == ARRAY_SIZE(reqs))
                                        io_free_req_many(ctx, reqs, &to_free);
@@@ -840,9 -730,7 +841,9 @@@ static void io_complete_rw(struct kioc
  
        kiocb_end_write(kiocb);
  
 -      io_cqring_add_event(req->ctx, req->user_data, res, 0);
 +      if ((req->flags & REQ_F_LINK) && res != req->result)
 +              req->flags |= REQ_F_FAIL_LINK;
 +      io_cqring_add_event(req->ctx, req->user_data, res);
        io_put_req(req);
  }
  
@@@ -852,9 -740,7 +853,9 @@@ static void io_complete_rw_iopoll(struc
  
        kiocb_end_write(kiocb);
  
 -      req->error = res;
 +      if ((req->flags & REQ_F_LINK) && res != req->result)
 +              req->flags |= REQ_F_FAIL_LINK;
 +      req->result = res;
        if (res != -EAGAIN)
                req->flags |= REQ_F_IOPOLL_COMPLETED;
  }
@@@ -963,6 -849,9 +964,6 @@@ static int io_prep_rw(struct io_kiocb *
  
        if (!req->file)
                return -EBADF;
 -      /* For -EAGAIN retry, everything is already prepped */
 -      if (req->flags & REQ_F_PREPPED)
 -              return 0;
  
        if (force_nonblock && !io_file_supports_async(req->file))
                force_nonblock = false;
                    !kiocb->ki_filp->f_op->iopoll)
                        return -EOPNOTSUPP;
  
 -              req->error = 0;
                kiocb->ki_flags |= IOCB_HIPRI;
                kiocb->ki_complete = io_complete_rw_iopoll;
        } else {
                        return -EINVAL;
                kiocb->ki_complete = io_complete_rw;
        }
 -      req->flags |= REQ_F_PREPPED;
        return 0;
  }
  
@@@ -1065,12 -956,15 +1066,12 @@@ static int io_import_fixed(struct io_ri
        iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
        if (offset)
                iov_iter_advance(iter, offset);
 -
 -      /* don't drop a reference to these pages */
 -      iter->type |= ITER_BVEC_FLAG_NO_REF;
        return 0;
  }
  
 -static int io_import_iovec(struct io_ring_ctx *ctx, int rw,
 -                         const struct sqe_submit *s, struct iovec **iovec,
 -                         struct iov_iter *iter)
 +static ssize_t io_import_iovec(struct io_ring_ctx *ctx, int rw,
 +                             const struct sqe_submit *s, struct iovec **iovec,
 +                             struct iov_iter *iter)
  {
        const struct io_uring_sqe *sqe = s->sqe;
        void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr));
        opcode = READ_ONCE(sqe->opcode);
        if (opcode == IORING_OP_READ_FIXED ||
            opcode == IORING_OP_WRITE_FIXED) {
 -              int ret = io_import_fixed(ctx, rw, sqe, iter);
 +              ssize_t ret = io_import_fixed(ctx, rw, sqe, iter);
                *iovec = NULL;
                return ret;
        }
@@@ -1154,7 -1048,7 +1155,7 @@@ static int io_read(struct io_kiocb *req
        struct iov_iter iter;
        struct file *file;
        size_t iov_count;
 -      int ret;
 +      ssize_t read_size, ret;
  
        ret = io_prep_rw(req, s, force_nonblock);
        if (ret)
                return -EINVAL;
  
        ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter);
 -      if (ret)
 +      if (ret < 0)
                return ret;
  
 +      read_size = ret;
 +      if (req->flags & REQ_F_LINK)
 +              req->result = read_size;
 +
        iov_count = iov_iter_count(&iter);
        ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count);
        if (!ret) {
                ssize_t ret2;
  
 -              /* Catch -EAGAIN return for forced non-blocking submission */
                ret2 = call_read_iter(file, kiocb, &iter);
 +              /*
 +               * In case of a short read, punt to async. This can happen
 +               * if we have data partially cached. Alternatively we can
 +               * return the short read, in which case the application will
 +               * need to issue another SQE and wait for it. That SQE will
 +               * need async punt anyway, so it's more efficient to do it
 +               * here.
 +               */
 +              if (force_nonblock && ret2 > 0 && ret2 < read_size)
 +                      ret2 = -EAGAIN;
 +              /* Catch -EAGAIN return for forced non-blocking submission */
                if (!force_nonblock || ret2 != -EAGAIN) {
                        io_rw_done(kiocb, ret2);
                } else {
@@@ -1215,7 -1095,7 +1216,7 @@@ static int io_write(struct io_kiocb *re
        struct iov_iter iter;
        struct file *file;
        size_t iov_count;
 -      int ret;
 +      ssize_t ret;
  
        ret = io_prep_rw(req, s, force_nonblock);
        if (ret)
                return -EINVAL;
  
        ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter);
 -      if (ret)
 +      if (ret < 0)
                return ret;
  
 +      if (req->flags & REQ_F_LINK)
 +              req->result = ret;
 +
        iov_count = iov_iter_count(&iter);
  
        ret = -EAGAIN;
@@@ -1292,7 -1169,7 +1293,7 @@@ static int io_nop(struct io_kiocb *req
        if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
                return -EINVAL;
  
 -      io_cqring_add_event(ctx, user_data, err, 0);
 +      io_cqring_add_event(ctx, user_data, err);
        io_put_req(req);
        return 0;
  }
@@@ -1303,12 -1180,16 +1304,12 @@@ static int io_prep_fsync(struct io_kioc
  
        if (!req->file)
                return -EBADF;
 -      /* Prep already done (EAGAIN retry) */
 -      if (req->flags & REQ_F_PREPPED)
 -              return 0;
  
        if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
                return -EINVAL;
        if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
                return -EINVAL;
  
 -      req->flags |= REQ_F_PREPPED;
        return 0;
  }
  
@@@ -1337,116 -1218,11 +1338,116 @@@ static int io_fsync(struct io_kiocb *re
                                end > 0 ? end : LLONG_MAX,
                                fsync_flags & IORING_FSYNC_DATASYNC);
  
 -      io_cqring_add_event(req->ctx, sqe->user_data, ret, 0);
 +      if (ret < 0 && (req->flags & REQ_F_LINK))
 +              req->flags |= REQ_F_FAIL_LINK;
 +      io_cqring_add_event(req->ctx, sqe->user_data, ret);
        io_put_req(req);
        return 0;
  }
  
 +static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 +{
 +      struct io_ring_ctx *ctx = req->ctx;
 +      int ret = 0;
 +
 +      if (!req->file)
 +              return -EBADF;
 +
 +      if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
 +              return -EINVAL;
 +      if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
 +              return -EINVAL;
 +
 +      return ret;
 +}
 +
 +static int io_sync_file_range(struct io_kiocb *req,
 +                            const struct io_uring_sqe *sqe,
 +                            bool force_nonblock)
 +{
 +      loff_t sqe_off;
 +      loff_t sqe_len;
 +      unsigned flags;
 +      int ret;
 +
 +      ret = io_prep_sfr(req, sqe);
 +      if (ret)
 +              return ret;
 +
 +      /* sync_file_range always requires a blocking context */
 +      if (force_nonblock)
 +              return -EAGAIN;
 +
 +      sqe_off = READ_ONCE(sqe->off);
 +      sqe_len = READ_ONCE(sqe->len);
 +      flags = READ_ONCE(sqe->sync_range_flags);
 +
 +      ret = sync_file_range(req->rw.ki_filp, sqe_off, sqe_len, flags);
 +
 +      if (ret < 0 && (req->flags & REQ_F_LINK))
 +              req->flags |= REQ_F_FAIL_LINK;
 +      io_cqring_add_event(req->ctx, sqe->user_data, ret);
 +      io_put_req(req);
 +      return 0;
 +}
 +
 +#if defined(CONFIG_NET)
 +static int io_send_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 +                         bool force_nonblock,
 +                 long (*fn)(struct socket *, struct user_msghdr __user *,
 +                              unsigned int))
 +{
 +      struct socket *sock;
 +      int ret;
 +
 +      if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
 +              return -EINVAL;
 +
 +      sock = sock_from_file(req->file, &ret);
 +      if (sock) {
 +              struct user_msghdr __user *msg;
 +              unsigned flags;
 +
 +              flags = READ_ONCE(sqe->msg_flags);
 +              if (flags & MSG_DONTWAIT)
 +                      req->flags |= REQ_F_NOWAIT;
 +              else if (force_nonblock)
 +                      flags |= MSG_DONTWAIT;
 +
 +              msg = (struct user_msghdr __user *) (unsigned long)
 +                      READ_ONCE(sqe->addr);
 +
 +              ret = fn(sock, msg, flags);
 +              if (force_nonblock && ret == -EAGAIN)
 +                      return ret;
 +      }
 +
 +      io_cqring_add_event(req->ctx, sqe->user_data, ret);
 +      io_put_req(req);
 +      return 0;
 +}
 +#endif
 +
 +static int io_sendmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 +                    bool force_nonblock)
 +{
 +#if defined(CONFIG_NET)
 +      return io_send_recvmsg(req, sqe, force_nonblock, __sys_sendmsg_sock);
 +#else
 +      return -EOPNOTSUPP;
 +#endif
 +}
 +
 +static int io_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 +                    bool force_nonblock)
 +{
 +#if defined(CONFIG_NET)
 +      return io_send_recvmsg(req, sqe, force_nonblock, __sys_recvmsg_sock);
 +#else
 +      return -EOPNOTSUPP;
 +#endif
 +}
 +
  static void io_poll_remove_one(struct io_kiocb *req)
  {
        struct io_poll_iocb *poll = &req->poll;
@@@ -1500,7 -1276,7 +1501,7 @@@ static int io_poll_remove(struct io_kio
        }
        spin_unlock_irq(&ctx->completion_lock);
  
 -      io_cqring_add_event(req->ctx, sqe->user_data, ret, 0);
 +      io_cqring_add_event(req->ctx, sqe->user_data, ret);
        io_put_req(req);
        return 0;
  }
@@@ -1509,7 -1285,7 +1510,7 @@@ static void io_poll_complete(struct io_
                             __poll_t mask)
  {
        req->poll.done = true;
 -      io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask), 0);
 +      io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask));
        io_commit_cqring(ctx);
  }
  
@@@ -1649,6 -1425,7 +1650,6 @@@ static int io_poll_add(struct io_kiocb 
                spin_unlock(&poll->head->lock);
        }
        if (mask) { /* no async, we'd stolen it */
 -              req->error = mangle_poll(mask);
                ipt.error = 0;
                io_poll_complete(ctx, req, mask);
        }
        return ipt.error;
  }
  
 +static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req,
 +                      const struct io_uring_sqe *sqe)
 +{
 +      struct io_uring_sqe *sqe_copy;
 +
 +      if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list))
 +              return 0;
 +
 +      sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
 +      if (!sqe_copy)
 +              return -EAGAIN;
 +
 +      spin_lock_irq(&ctx->completion_lock);
 +      if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) {
 +              spin_unlock_irq(&ctx->completion_lock);
 +              kfree(sqe_copy);
 +              return 0;
 +      }
 +
 +      memcpy(sqe_copy, sqe, sizeof(*sqe_copy));
 +      req->submit.sqe = sqe_copy;
 +
 +      INIT_WORK(&req->work, io_sq_wq_submit_work);
 +      list_add_tail(&req->list, &ctx->defer_list);
 +      spin_unlock_irq(&ctx->completion_lock);
 +      return -EIOCBQUEUED;
 +}
 +
  static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
                           const struct sqe_submit *s, bool force_nonblock)
  {
        int ret, opcode;
  
 +      req->user_data = READ_ONCE(s->sqe->user_data);
 +
        if (unlikely(s->index >= ctx->sq_entries))
                return -EINVAL;
 -      req->user_data = READ_ONCE(s->sqe->user_data);
  
        opcode = READ_ONCE(s->sqe->opcode);
        switch (opcode) {
        case IORING_OP_POLL_REMOVE:
                ret = io_poll_remove(req, s->sqe);
                break;
 +      case IORING_OP_SYNC_FILE_RANGE:
 +              ret = io_sync_file_range(req, s->sqe, force_nonblock);
 +              break;
 +      case IORING_OP_SENDMSG:
 +              ret = io_sendmsg(req, s->sqe, force_nonblock);
 +              break;
 +      case IORING_OP_RECVMSG:
 +              ret = io_recvmsg(req, s->sqe, force_nonblock);
 +              break;
        default:
                ret = -EINVAL;
                break;
                return ret;
  
        if (ctx->flags & IORING_SETUP_IOPOLL) {
 -              if (req->error == -EAGAIN)
 +              if (req->result == -EAGAIN)
                        return -EAGAIN;
  
                /* workqueue context doesn't hold uring_lock, grab it now */
@@@ -1836,7 -1575,7 +1837,7 @@@ restart
                io_put_req(req);
  
                if (ret) {
 -                      io_cqring_add_event(ctx, sqe->user_data, ret, 0);
 +                      io_cqring_add_event(ctx, sqe->user_data, ret);
                        io_put_req(req);
                }
  
@@@ -1946,14 -1685,11 +1947,14 @@@ static int io_req_set_file(struct io_ri
        flags = READ_ONCE(s->sqe->flags);
        fd = READ_ONCE(s->sqe->fd);
  
 -      if (!io_op_needs_file(s->sqe)) {
 -              req->file = NULL;
 -              return 0;
 +      if (flags & IOSQE_IO_DRAIN) {
 +              req->flags |= REQ_F_IO_DRAIN;
 +              req->sequence = ctx->cached_sq_head - 1;
        }
  
 +      if (!io_op_needs_file(s->sqe))
 +              return 0;
 +
        if (flags & IOSQE_FIXED_FILE) {
                if (unlikely(!ctx->user_files ||
                    (unsigned) fd >= ctx->nr_user_files))
        return 0;
  }
  
 -static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
 -                       struct io_submit_state *state)
 +static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 +                      struct sqe_submit *s)
  {
 -      struct io_kiocb *req;
        int ret;
  
 -      /* enforce forwards compatibility on users */
 -      if (unlikely(s->sqe->flags & ~IOSQE_FIXED_FILE))
 -              return -EINVAL;
 -
 -      req = io_get_req(ctx, state);
 -      if (unlikely(!req))
 -              return -EAGAIN;
 -
 -      ret = io_req_set_file(ctx, s, state, req);
 -      if (unlikely(ret))
 -              goto out;
 -
        ret = __io_submit_sqe(ctx, req, s, true);
        if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
                struct io_uring_sqe *sqe_copy;
  
                        /*
                         * Queued up for async execution, worker will release
 -                       * submit reference when the iocb is actually
 -                       * submitted.
 +                       * submit reference when the iocb is actually submitted.
                         */
                        return 0;
                }
        }
  
 -out:
        /* drop submission reference */
        io_put_req(req);
  
        /* and drop final reference, if we failed */
 -      if (ret)
 +      if (ret) {
 +              io_cqring_add_event(ctx, req->user_data, ret);
 +              if (req->flags & REQ_F_LINK)
 +                      req->flags |= REQ_F_FAIL_LINK;
                io_put_req(req);
 +      }
  
        return ret;
  }
  
 +#define SQE_VALID_FLAGS       (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK)
 +
 +static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
 +                        struct io_submit_state *state, struct io_kiocb **link)
 +{
 +      struct io_uring_sqe *sqe_copy;
 +      struct io_kiocb *req;
 +      int ret;
 +
 +      /* enforce forwards compatibility on users */
 +      if (unlikely(s->sqe->flags & ~SQE_VALID_FLAGS)) {
 +              ret = -EINVAL;
 +              goto err;
 +      }
 +
 +      req = io_get_req(ctx, state);
 +      if (unlikely(!req)) {
 +              ret = -EAGAIN;
 +              goto err;
 +      }
 +
 +      ret = io_req_set_file(ctx, s, state, req);
 +      if (unlikely(ret)) {
 +err_req:
 +              io_free_req(req);
 +err:
 +              io_cqring_add_event(ctx, s->sqe->user_data, ret);
 +              return;
 +      }
 +
 +      ret = io_req_defer(ctx, req, s->sqe);
 +      if (ret) {
 +              if (ret != -EIOCBQUEUED)
 +                      goto err_req;
 +              return;
 +      }
 +
 +      /*
 +       * If we already have a head request, queue this one for async
 +       * submittal once the head completes. If we don't have a head but
 +       * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
 +       * submitted sync once the chain is complete. If none of those
 +       * conditions are true (normal request), then just queue it.
 +       */
 +      if (*link) {
 +              struct io_kiocb *prev = *link;
 +
 +              sqe_copy = kmemdup(s->sqe, sizeof(*sqe_copy), GFP_KERNEL);
 +              if (!sqe_copy) {
 +                      ret = -EAGAIN;
 +                      goto err_req;
 +              }
 +
 +              s->sqe = sqe_copy;
 +              memcpy(&req->submit, s, sizeof(*s));
 +              list_add_tail(&req->list, &prev->link_list);
 +      } else if (s->sqe->flags & IOSQE_IO_LINK) {
 +              req->flags |= REQ_F_LINK;
 +
 +              memcpy(&req->submit, s, sizeof(*s));
 +              INIT_LIST_HEAD(&req->link_list);
 +              *link = req;
 +      } else {
 +              io_queue_sqe(ctx, req, s);
 +      }
 +}
 +
  /*
   * Batched submission is done, ensure local IO is flushed out.
   */
@@@ -2167,9 -1847,7 +2168,9 @@@ static int io_submit_sqes(struct io_rin
                          unsigned int nr, bool has_user, bool mm_fault)
  {
        struct io_submit_state state, *statep = NULL;
 -      int ret, i, submitted = 0;
 +      struct io_kiocb *link = NULL;
 +      bool prev_was_link = false;
 +      int i, submitted = 0;
  
        if (nr > IO_PLUG_THRESHOLD) {
                io_submit_state_start(&state, ctx, nr);
        }
  
        for (i = 0; i < nr; i++) {
 +              /*
 +               * If previous wasn't linked and we have a linked command,
 +               * that's the end of the chain. Submit the previous link.
 +               */
 +              if (!prev_was_link && link) {
 +                      io_queue_sqe(ctx, link, &link->submit);
 +                      link = NULL;
 +              }
 +              prev_was_link = (sqes[i].sqe->flags & IOSQE_IO_LINK) != 0;
 +
                if (unlikely(mm_fault)) {
 -                      ret = -EFAULT;
 +                      io_cqring_add_event(ctx, sqes[i].sqe->user_data,
 +                                              -EFAULT);
                } else {
                        sqes[i].has_user = has_user;
                        sqes[i].needs_lock = true;
                        sqes[i].needs_fixed_file = true;
 -                      ret = io_submit_sqe(ctx, &sqes[i], statep);
 -              }
 -              if (!ret) {
 +                      io_submit_sqe(ctx, &sqes[i], statep, &link);
                        submitted++;
 -                      continue;
                }
 -
 -              io_cqring_add_event(ctx, sqes[i].sqe->user_data, ret, 0);
        }
  
 +      if (link)
 +              io_queue_sqe(ctx, link, &link->submit);
        if (statep)
                io_submit_state_end(&state);
  
@@@ -2217,13 -1887,11 +2218,13 @@@ static int io_sq_thread(void *data
        unsigned inflight;
        unsigned long timeout;
  
 +      complete(&ctx->sqo_thread_started);
 +
        old_fs = get_fs();
        set_fs(USER_DS);
  
        timeout = inflight = 0;
 -      while (!kthread_should_stop() && !ctx->sqo_stop) {
 +      while (!kthread_should_park()) {
                bool all_fixed, mm_fault = false;
                int i;
  
                        smp_mb();
  
                        if (!io_get_sqring(ctx, &sqes[0])) {
 -                              if (kthread_should_stop()) {
 +                              if (kthread_should_park()) {
                                        finish_wait(&ctx->sqo_wait, &wait);
                                        break;
                                }
                mmput(cur_mm);
        }
  
 -      if (kthread_should_park())
 -              kthread_parkme();
 +      kthread_parkme();
  
        return 0;
  }
  static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
  {
        struct io_submit_state state, *statep = NULL;
 +      struct io_kiocb *link = NULL;
 +      bool prev_was_link = false;
        int i, submit = 0;
  
        if (to_submit > IO_PLUG_THRESHOLD) {
  
        for (i = 0; i < to_submit; i++) {
                struct sqe_submit s;
 -              int ret;
  
                if (!io_get_sqring(ctx, &s))
                        break;
  
 +              /*
 +               * If previous wasn't linked and we have a linked command,
 +               * that's the end of the chain. Submit the previous link.
 +               */
 +              if (!prev_was_link && link) {
 +                      io_queue_sqe(ctx, link, &link->submit);
 +                      link = NULL;
 +              }
 +              prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0;
 +
                s.has_user = true;
                s.needs_lock = false;
                s.needs_fixed_file = false;
                submit++;
 -
 -              ret = io_submit_sqe(ctx, &s, statep);
 -              if (ret)
 -                      io_cqring_add_event(ctx, s.sqe->user_data, ret, 0);
 +              io_submit_sqe(ctx, &s, statep, &link);
        }
        io_commit_sqring(ctx);
  
 +      if (link)
 +              io_queue_sqe(ctx, link, &link->submit);
        if (statep)
                io_submit_state_end(statep);
  
  
  static unsigned io_cqring_events(struct io_cq_ring *ring)
  {
 +      /* See comment at the top of this file */
 +      smp_rmb();
        return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head);
  }
  
@@@ -2400,8 -2057,11 +2401,8 @@@ static int io_cqring_wait(struct io_rin
  {
        struct io_cq_ring *ring = ctx->cq_ring;
        sigset_t ksigmask, sigsaved;
 -      DEFINE_WAIT(wait);
        int ret;
  
 -      /* See comment at the top of this file */
 -      smp_rmb();
        if (io_cqring_events(ring) >= min_events)
                return 0;
  
                        return ret;
        }
  
 -      do {
 -              prepare_to_wait(&ctx->wait, &wait, TASK_INTERRUPTIBLE);
 +      ret = wait_event_interruptible(ctx->wait, io_cqring_events(ring) >= min_events);
  
 -              ret = 0;
 -              /* See comment at the top of this file */
 -              smp_rmb();
 -              if (io_cqring_events(ring) >= min_events)
 -                      break;
 -
 -              schedule();
 +      if (sig)
 +              restore_user_sigmask(sig, &sigsaved, ret == -ERESTARTSYS);
  
 +      if (ret == -ERESTARTSYS)
                ret = -EINTR;
 -              if (signal_pending(current))
 -                      break;
 -      } while (1);
 -
 -      finish_wait(&ctx->wait, &wait);
 -
 -      if (sig)
 -              restore_user_sigmask(sig, &sigsaved);
  
        return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0;
  }
@@@ -2463,12 -2136,8 +2464,12 @@@ static int io_sqe_files_unregister(stru
  static void io_sq_thread_stop(struct io_ring_ctx *ctx)
  {
        if (ctx->sqo_thread) {
 -              ctx->sqo_stop = 1;
 -              mb();
 +              wait_for_completion(&ctx->sqo_thread_started);
 +              /*
 +               * The park is a bit of a work-around, without it we get
 +               * warning spews on shutdown with SQPOLL set and affinity
 +               * set to a single CPU.
 +               */
                kthread_park(ctx->sqo_thread);
                kthread_stop(ctx->sqo_thread);
                ctx->sqo_thread = NULL;
@@@ -2557,6 -2226,7 +2558,6 @@@ static int io_sqe_files_scm(struct io_r
        left = ctx->nr_user_files;
        while (left) {
                unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
 -              int ret;
  
                ret = __io_sqe_files_scm(ctx, this_files, total);
                if (ret)
@@@ -2661,12 -2331,11 +2662,12 @@@ static int io_sq_offload_start(struct i
                        ctx->sq_thread_idle = HZ;
  
                if (p->flags & IORING_SETUP_SQ_AFF) {
 -                      int cpu = array_index_nospec(p->sq_thread_cpu,
 -                                                      nr_cpu_ids);
 +                      int cpu = p->sq_thread_cpu;
  
                        ret = -EINVAL;
 -                      if (!cpu_possible(cpu))
 +                      if (cpu >= nr_cpu_ids)
 +                              goto err;
 +                      if (!cpu_online(cpu))
                                goto err;
  
                        ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
@@@ -2837,7 -2506,7 +2838,7 @@@ static int io_sqe_buffer_register(struc
  
                ret = io_copy_iov(ctx, &iov, arg, i);
                if (ret)
 -                      break;
 +                      goto err;
  
                /*
                 * Don't impose further limits on the size and buffer
  
                ret = 0;
                down_read(&current->mm->mmap_sem);
 -              pret = get_user_pages_longterm(ubuf, nr_pages, FOLL_WRITE,
 -                                              pages, vmas);
 +              pret = get_user_pages(ubuf, nr_pages,
 +                                    FOLL_WRITE | FOLL_LONGTERM,
 +                                    pages, vmas);
                if (pret == nr_pages) {
                        /* don't support file backed memory */
                        for (j = 0; j < nr_pages; j++) {
        return ret;
  }
  
 +static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
 +{
 +      __s32 __user *fds = arg;
 +      int fd;
 +
 +      if (ctx->cq_ev_fd)
 +              return -EBUSY;
 +
 +      if (copy_from_user(&fd, fds, sizeof(*fds)))
 +              return -EFAULT;
 +
 +      ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
 +      if (IS_ERR(ctx->cq_ev_fd)) {
 +              int ret = PTR_ERR(ctx->cq_ev_fd);
 +              ctx->cq_ev_fd = NULL;
 +              return ret;
 +      }
 +
 +      return 0;
 +}
 +
 +static int io_eventfd_unregister(struct io_ring_ctx *ctx)
 +{
 +      if (ctx->cq_ev_fd) {
 +              eventfd_ctx_put(ctx->cq_ev_fd);
 +              ctx->cq_ev_fd = NULL;
 +              return 0;
 +      }
 +
 +      return -ENXIO;
 +}
 +
  static void io_ring_ctx_free(struct io_ring_ctx *ctx)
  {
        io_finish_async(ctx);
        io_iopoll_reap_events(ctx);
        io_sqe_buffer_unregister(ctx);
        io_sqe_files_unregister(ctx);
 +      io_eventfd_unregister(ctx);
  
  #if defined(CONFIG_UNIX)
 -      if (ctx->ring_sock)
 +      if (ctx->ring_sock) {
 +              ctx->ring_sock->file = NULL; /* so that iput() is called */
                sock_release(ctx->ring_sock);
 +      }
  #endif
  
        io_mem_free(ctx->sq_ring);
@@@ -3411,18 -3044,6 +3412,18 @@@ static int __io_uring_register(struct i
                        break;
                ret = io_sqe_files_unregister(ctx);
                break;
 +      case IORING_REGISTER_EVENTFD:
 +              ret = -EINVAL;
 +              if (nr_args != 1)
 +                      break;
 +              ret = io_eventfd_register(ctx, arg);
 +              break;
 +      case IORING_UNREGISTER_EVENTFD:
 +              ret = -EINVAL;
 +              if (arg || nr_args)
 +                      break;
 +              ret = io_eventfd_unregister(ctx);
 +              break;
        default:
                ret = -EINVAL;
                break;
diff --combined lib/percpu-refcount.c
index 071a76c7bac079d421840ad4c708c5e66d02ea8c,501b517bd3dbea3cd6a387f92de6f42afa502b3b..4f6c6ebbbbdea8c311d30eda388b29c2f02893b6
@@@ -1,4 -1,3 +1,4 @@@
 +// SPDX-License-Identifier: GPL-2.0-only
  #define pr_fmt(fmt) "%s: " fmt "\n", __func__
  
  #include <linux/kernel.h>
@@@ -70,11 -69,14 +70,14 @@@ int percpu_ref_init(struct percpu_ref *
                return -ENOMEM;
  
        ref->force_atomic = flags & PERCPU_REF_INIT_ATOMIC;
+       ref->allow_reinit = flags & PERCPU_REF_ALLOW_REINIT;
  
-       if (flags & (PERCPU_REF_INIT_ATOMIC | PERCPU_REF_INIT_DEAD))
+       if (flags & (PERCPU_REF_INIT_ATOMIC | PERCPU_REF_INIT_DEAD)) {
                ref->percpu_count_ptr |= __PERCPU_REF_ATOMIC;
-       else
+               ref->allow_reinit = true;
+       } else {
                start_count += PERCPU_COUNT_BIAS;
+       }
  
        if (flags & PERCPU_REF_INIT_DEAD)
                ref->percpu_count_ptr |= __PERCPU_REF_DEAD;
@@@ -120,6 -122,9 +123,9 @@@ static void percpu_ref_call_confirm_rcu
        ref->confirm_switch = NULL;
        wake_up_all(&percpu_ref_switch_waitq);
  
+       if (!ref->allow_reinit)
+               percpu_ref_exit(ref);
        /* drop ref from percpu_ref_switch_to_atomic() */
        percpu_ref_put(ref);
  }
@@@ -152,7 -157,7 +158,7 @@@ static void percpu_ref_switch_to_atomic
        atomic_long_add((long)count - PERCPU_COUNT_BIAS, &ref->count);
  
        WARN_ONCE(atomic_long_read(&ref->count) <= 0,
 -                "percpu ref (%pf) <= 0 (%ld) after switching to atomic",
 +                "percpu ref (%ps) <= 0 (%ld) after switching to atomic",
                  ref->release, atomic_long_read(&ref->count));
  
        /* @ref is viewed as dead on all CPUs, send out switch confirmation */
@@@ -195,6 -200,9 +201,9 @@@ static void __percpu_ref_switch_to_perc
        if (!(ref->percpu_count_ptr & __PERCPU_REF_ATOMIC))
                return;
  
+       if (WARN_ON_ONCE(!ref->allow_reinit))
+               return;
        atomic_long_add(PERCPU_COUNT_BIAS, &ref->count);
  
        /*
@@@ -334,7 -342,7 +343,7 @@@ void percpu_ref_kill_and_confirm(struc
        spin_lock_irqsave(&percpu_ref_switch_lock, flags);
  
        WARN_ONCE(ref->percpu_count_ptr & __PERCPU_REF_DEAD,
 -                "%s called more than once on %pf!", __func__, ref->release);
 +                "%s called more than once on %ps!", __func__, ref->release);
  
        ref->percpu_count_ptr |= __PERCPU_REF_DEAD;
        __percpu_ref_switch_mode(ref, confirm_kill);