Merge tag 'ceph-for-5.3-rc1' of git://github.com/ceph/ceph-client
[linux-2.6-block.git] / drivers / block / rbd.c
index e5009a34f9c2622570272cd9ea7789e0e5492b2c..3327192bb71f73c9e4f4500203f39d8c1de01a0a 100644 (file)
@@ -115,6 +115,8 @@ static int atomic_dec_return_safe(atomic_t *v)
 #define RBD_FEATURE_LAYERING           (1ULL<<0)
 #define RBD_FEATURE_STRIPINGV2         (1ULL<<1)
 #define RBD_FEATURE_EXCLUSIVE_LOCK     (1ULL<<2)
+#define RBD_FEATURE_OBJECT_MAP         (1ULL<<3)
+#define RBD_FEATURE_FAST_DIFF          (1ULL<<4)
 #define RBD_FEATURE_DEEP_FLATTEN       (1ULL<<5)
 #define RBD_FEATURE_DATA_POOL          (1ULL<<7)
 #define RBD_FEATURE_OPERATIONS         (1ULL<<8)
@@ -122,6 +124,8 @@ static int atomic_dec_return_safe(atomic_t *v)
 #define RBD_FEATURES_ALL       (RBD_FEATURE_LAYERING |         \
                                 RBD_FEATURE_STRIPINGV2 |       \
                                 RBD_FEATURE_EXCLUSIVE_LOCK |   \
+                                RBD_FEATURE_OBJECT_MAP |       \
+                                RBD_FEATURE_FAST_DIFF |        \
                                 RBD_FEATURE_DEEP_FLATTEN |     \
                                 RBD_FEATURE_DATA_POOL |        \
                                 RBD_FEATURE_OPERATIONS)
@@ -203,6 +207,11 @@ struct rbd_client {
        struct list_head        node;
 };
 
+struct pending_result {
+       int                     result;         /* first nonzero result */
+       int                     num_pending;
+};
+
 struct rbd_img_request;
 
 enum obj_request_type {
@@ -219,6 +228,18 @@ enum obj_operation_type {
        OBJ_OP_ZEROOUT,
 };
 
+#define RBD_OBJ_FLAG_DELETION                  (1U << 0)
+#define RBD_OBJ_FLAG_COPYUP_ENABLED            (1U << 1)
+#define RBD_OBJ_FLAG_COPYUP_ZEROS              (1U << 2)
+#define RBD_OBJ_FLAG_MAY_EXIST                 (1U << 3)
+#define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT      (1U << 4)
+
+enum rbd_obj_read_state {
+       RBD_OBJ_READ_START = 1,
+       RBD_OBJ_READ_OBJECT,
+       RBD_OBJ_READ_PARENT,
+};
+
 /*
  * Writes go through the following state machine to deal with
  * layering:
@@ -245,17 +266,28 @@ enum obj_operation_type {
  * even if there is a parent).
  */
 enum rbd_obj_write_state {
-       RBD_OBJ_WRITE_FLAT = 1,
-       RBD_OBJ_WRITE_GUARD,
-       RBD_OBJ_WRITE_READ_FROM_PARENT,
-       RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC,
-       RBD_OBJ_WRITE_COPYUP_OPS,
+       RBD_OBJ_WRITE_START = 1,
+       RBD_OBJ_WRITE_PRE_OBJECT_MAP,
+       RBD_OBJ_WRITE_OBJECT,
+       __RBD_OBJ_WRITE_COPYUP,
+       RBD_OBJ_WRITE_COPYUP,
+       RBD_OBJ_WRITE_POST_OBJECT_MAP,
+};
+
+enum rbd_obj_copyup_state {
+       RBD_OBJ_COPYUP_START = 1,
+       RBD_OBJ_COPYUP_READ_PARENT,
+       __RBD_OBJ_COPYUP_OBJECT_MAPS,
+       RBD_OBJ_COPYUP_OBJECT_MAPS,
+       __RBD_OBJ_COPYUP_WRITE_OBJECT,
+       RBD_OBJ_COPYUP_WRITE_OBJECT,
 };
 
 struct rbd_obj_request {
        struct ceph_object_extent ex;
+       unsigned int            flags;  /* RBD_OBJ_FLAG_* */
        union {
-               bool                    tried_parent;   /* for reads */
+               enum rbd_obj_read_state  read_state;    /* for reads */
                enum rbd_obj_write_state write_state;   /* for writes */
        };
 
@@ -271,14 +303,15 @@ struct rbd_obj_request {
                        u32                     bvec_idx;
                };
        };
+
+       enum rbd_obj_copyup_state copyup_state;
        struct bio_vec          *copyup_bvecs;
        u32                     copyup_bvec_count;
 
-       struct ceph_osd_request *osd_req;
-
-       u64                     xferred;        /* bytes transferred */
-       int                     result;
+       struct list_head        osd_reqs;       /* w/ r_private_item */
 
+       struct mutex            state_mutex;
+       struct pending_result   pending;
        struct kref             kref;
 };
 
@@ -287,11 +320,19 @@ enum img_req_flags {
        IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
 };
 
+enum rbd_img_state {
+       RBD_IMG_START = 1,
+       RBD_IMG_EXCLUSIVE_LOCK,
+       __RBD_IMG_OBJECT_REQUESTS,
+       RBD_IMG_OBJECT_REQUESTS,
+};
+
 struct rbd_img_request {
        struct rbd_device       *rbd_dev;
        enum obj_operation_type op_type;
        enum obj_request_type   data_type;
        unsigned long           flags;
+       enum rbd_img_state      state;
        union {
                u64                     snap_id;        /* for reads */
                struct ceph_snap_context *snapc;        /* for writes */
@@ -300,13 +341,14 @@ struct rbd_img_request {
                struct request          *rq;            /* block request */
                struct rbd_obj_request  *obj_request;   /* obj req initiator */
        };
-       spinlock_t              completion_lock;
-       u64                     xferred;/* aggregate bytes transferred */
-       int                     result; /* first nonzero obj_request result */
 
+       struct list_head        lock_item;
        struct list_head        object_extents; /* obj_req.ex structs */
-       u32                     pending_count;
 
+       struct mutex            state_mutex;
+       struct pending_result   pending;
+       struct work_struct      work;
+       int                     work_result;
        struct kref             kref;
 };
 
@@ -380,7 +422,17 @@ struct rbd_device {
        struct work_struct      released_lock_work;
        struct delayed_work     lock_dwork;
        struct work_struct      unlock_work;
-       wait_queue_head_t       lock_waitq;
+       spinlock_t              lock_lists_lock;
+       struct list_head        acquiring_list;
+       struct list_head        running_list;
+       struct completion       acquire_wait;
+       int                     acquire_err;
+       struct completion       releasing_wait;
+
+       spinlock_t              object_map_lock;
+       u8                      *object_map;
+       u64                     object_map_size;        /* in objects */
+       u64                     object_map_flags;
 
        struct workqueue_struct *task_wq;
 
@@ -408,12 +460,10 @@ struct rbd_device {
  * Flag bits for rbd_dev->flags:
  * - REMOVING (which is coupled with rbd_dev->open_count) is protected
  *   by rbd_dev->lock
- * - BLACKLISTED is protected by rbd_dev->lock_rwsem
  */
 enum rbd_dev_flags {
        RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
        RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
-       RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
 };
 
 static DEFINE_MUTEX(client_mutex);     /* Serialize client creation */
@@ -466,6 +516,8 @@ static int minor_to_rbd_dev_id(int minor)
 
 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
 {
+       lockdep_assert_held(&rbd_dev->lock_rwsem);
+
        return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
               rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
 }
@@ -583,6 +635,26 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
                                u8 *order, u64 *snap_size);
 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
                u64 *snap_features);
+static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
+
+static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
+static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
+
+/*
+ * Return true if nothing else is pending.
+ */
+static bool pending_result_dec(struct pending_result *pending, int *result)
+{
+       rbd_assert(pending->num_pending > 0);
+
+       if (*result && !pending->result)
+               pending->result = *result;
+       if (--pending->num_pending)
+               return false;
+
+       *result = pending->result;
+       return true;
+}
 
 static int rbd_open(struct block_device *bdev, fmode_t mode)
 {
@@ -1317,6 +1389,8 @@ static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
 static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
                               u32 bytes)
 {
+       dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
+
        switch (obj_req->img_request->data_type) {
        case OBJ_REQUEST_BIO:
                zero_bios(&obj_req->bio_pos, off, bytes);
@@ -1339,13 +1413,6 @@ static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
        kref_put(&obj_request->kref, rbd_obj_request_destroy);
 }
 
-static void rbd_img_request_get(struct rbd_img_request *img_request)
-{
-       dout("%s: img %p (was %d)\n", __func__, img_request,
-            kref_read(&img_request->kref));
-       kref_get(&img_request->kref);
-}
-
 static void rbd_img_request_destroy(struct kref *kref);
 static void rbd_img_request_put(struct rbd_img_request *img_request)
 {
@@ -1362,7 +1429,6 @@ static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
 
        /* Image request now owns object's original reference */
        obj_request->img_request = img_request;
-       img_request->pending_count++;
        dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
 }
 
@@ -1375,13 +1441,13 @@ static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
        rbd_obj_request_put(obj_request);
 }
 
-static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
+static void rbd_osd_submit(struct ceph_osd_request *osd_req)
 {
-       struct ceph_osd_request *osd_req = obj_request->osd_req;
+       struct rbd_obj_request *obj_req = osd_req->r_priv;
 
-       dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
-            obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off,
-            obj_request->ex.oe_len, osd_req);
+       dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n",
+            __func__, osd_req, obj_req, obj_req->ex.oe_objno,
+            obj_req->ex.oe_off, obj_req->ex.oe_len);
        ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
 }
 
@@ -1457,41 +1523,38 @@ static bool rbd_img_is_write(struct rbd_img_request *img_req)
        }
 }
 
-static void rbd_obj_handle_request(struct rbd_obj_request *obj_req);
-
 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
 {
        struct rbd_obj_request *obj_req = osd_req->r_priv;
+       int result;
 
        dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
             osd_req->r_result, obj_req);
-       rbd_assert(osd_req == obj_req->osd_req);
 
-       obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0;
-       if (!obj_req->result && !rbd_img_is_write(obj_req->img_request))
-               obj_req->xferred = osd_req->r_result;
+       /*
+        * Writes aren't allowed to return a data payload.  In some
+        * guarded write cases (e.g. stat + zero on an empty object)
+        * a stat response makes it through, but we don't care.
+        */
+       if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
+               result = 0;
        else
-               /*
-                * Writes aren't allowed to return a data payload.  In some
-                * guarded write cases (e.g. stat + zero on an empty object)
-                * a stat response makes it through, but we don't care.
-                */
-               obj_req->xferred = 0;
+               result = osd_req->r_result;
 
-       rbd_obj_handle_request(obj_req);
+       rbd_obj_handle_request(obj_req, result);
 }
 
-static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
+static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
 {
-       struct ceph_osd_request *osd_req = obj_request->osd_req;
+       struct rbd_obj_request *obj_request = osd_req->r_priv;
 
        osd_req->r_flags = CEPH_OSD_FLAG_READ;
        osd_req->r_snapid = obj_request->img_request->snap_id;
 }
 
-static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
+static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
 {
-       struct ceph_osd_request *osd_req = obj_request->osd_req;
+       struct rbd_obj_request *obj_request = osd_req->r_priv;
 
        osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
        ktime_get_real_ts64(&osd_req->r_mtime);
@@ -1499,19 +1562,21 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
 }
 
 static struct ceph_osd_request *
-__rbd_osd_req_create(struct rbd_obj_request *obj_req,
-                    struct ceph_snap_context *snapc, unsigned int num_ops)
+__rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
+                         struct ceph_snap_context *snapc, int num_ops)
 {
        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        struct ceph_osd_request *req;
        const char *name_format = rbd_dev->image_format == 1 ?
                                      RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
+       int ret;
 
        req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
        if (!req)
-               return NULL;
+               return ERR_PTR(-ENOMEM);
 
+       list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
        req->r_callback = rbd_osd_req_callback;
        req->r_priv = obj_req;
 
@@ -1522,27 +1587,20 @@ __rbd_osd_req_create(struct rbd_obj_request *obj_req,
        ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
        req->r_base_oloc.pool = rbd_dev->layout.pool_id;
 
-       if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
-                       rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
-               goto err_req;
+       ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
+                              rbd_dev->header.object_prefix,
+                              obj_req->ex.oe_objno);
+       if (ret)
+               return ERR_PTR(ret);
 
        return req;
-
-err_req:
-       ceph_osdc_put_request(req);
-       return NULL;
 }
 
 static struct ceph_osd_request *
-rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
-{
-       return __rbd_osd_req_create(obj_req, obj_req->img_request->snapc,
-                                   num_ops);
-}
-
-static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
+rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
 {
-       ceph_osdc_put_request(osd_req);
+       return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
+                                        num_ops);
 }
 
 static struct rbd_obj_request *rbd_obj_request_create(void)
@@ -1554,6 +1612,8 @@ static struct rbd_obj_request *rbd_obj_request_create(void)
                return NULL;
 
        ceph_object_extent_init(&obj_request->ex);
+       INIT_LIST_HEAD(&obj_request->osd_reqs);
+       mutex_init(&obj_request->state_mutex);
        kref_init(&obj_request->kref);
 
        dout("%s %p\n", __func__, obj_request);
@@ -1563,14 +1623,19 @@ static struct rbd_obj_request *rbd_obj_request_create(void)
 static void rbd_obj_request_destroy(struct kref *kref)
 {
        struct rbd_obj_request *obj_request;
+       struct ceph_osd_request *osd_req;
        u32 i;
 
        obj_request = container_of(kref, struct rbd_obj_request, kref);
 
        dout("%s: obj %p\n", __func__, obj_request);
 
-       if (obj_request->osd_req)
-               rbd_osd_req_destroy(obj_request->osd_req);
+       while (!list_empty(&obj_request->osd_reqs)) {
+               osd_req = list_first_entry(&obj_request->osd_reqs,
+                                   struct ceph_osd_request, r_private_item);
+               list_del_init(&osd_req->r_private_item);
+               ceph_osdc_put_request(osd_req);
+       }
 
        switch (obj_request->img_request->data_type) {
        case OBJ_REQUEST_NODATA:
@@ -1684,8 +1749,9 @@ static struct rbd_img_request *rbd_img_request_create(
        if (rbd_dev_parent_get(rbd_dev))
                img_request_layered_set(img_request);
 
-       spin_lock_init(&img_request->completion_lock);
+       INIT_LIST_HEAD(&img_request->lock_item);
        INIT_LIST_HEAD(&img_request->object_extents);
+       mutex_init(&img_request->state_mutex);
        kref_init(&img_request->kref);
 
        dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev,
@@ -1703,6 +1769,7 @@ static void rbd_img_request_destroy(struct kref *kref)
 
        dout("%s: img %p\n", __func__, img_request);
 
+       WARN_ON(!list_empty(&img_request->lock_item));
        for_each_obj_request_safe(img_request, obj_request, next_obj_request)
                rbd_img_obj_request_del(img_request, obj_request);
 
@@ -1717,304 +1784,781 @@ static void rbd_img_request_destroy(struct kref *kref)
        kmem_cache_free(rbd_img_request_cache, img_request);
 }
 
-static void prune_extents(struct ceph_file_extent *img_extents,
-                         u32 *num_img_extents, u64 overlap)
-{
-       u32 cnt = *num_img_extents;
+#define BITS_PER_OBJ   2
+#define OBJS_PER_BYTE  (BITS_PER_BYTE / BITS_PER_OBJ)
+#define OBJ_MASK       ((1 << BITS_PER_OBJ) - 1)
 
-       /* drop extents completely beyond the overlap */
-       while (cnt && img_extents[cnt - 1].fe_off >= overlap)
-               cnt--;
+static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
+                                  u64 *index, u8 *shift)
+{
+       u32 off;
 
-       if (cnt) {
-               struct ceph_file_extent *ex = &img_extents[cnt - 1];
+       rbd_assert(objno < rbd_dev->object_map_size);
+       *index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
+       *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
+}
 
-               /* trim final overlapping extent */
-               if (ex->fe_off + ex->fe_len > overlap)
-                       ex->fe_len = overlap - ex->fe_off;
-       }
+static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
+{
+       u64 index;
+       u8 shift;
 
-       *num_img_extents = cnt;
+       lockdep_assert_held(&rbd_dev->object_map_lock);
+       __rbd_object_map_index(rbd_dev, objno, &index, &shift);
+       return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
 }
 
-/*
- * Determine the byte range(s) covered by either just the object extent
- * or the entire object in the parent image.
- */
-static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
-                                   bool entire)
+static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
 {
-       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
-       int ret;
-
-       if (!rbd_dev->parent_overlap)
-               return 0;
+       u64 index;
+       u8 shift;
+       u8 *p;
 
-       ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
-                                 entire ? 0 : obj_req->ex.oe_off,
-                                 entire ? rbd_dev->layout.object_size :
-                                                       obj_req->ex.oe_len,
-                                 &obj_req->img_extents,
-                                 &obj_req->num_img_extents);
-       if (ret)
-               return ret;
+       lockdep_assert_held(&rbd_dev->object_map_lock);
+       rbd_assert(!(val & ~OBJ_MASK));
 
-       prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
-                     rbd_dev->parent_overlap);
-       return 0;
+       __rbd_object_map_index(rbd_dev, objno, &index, &shift);
+       p = &rbd_dev->object_map[index];
+       *p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
 }
 
-static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
+static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
 {
-       switch (obj_req->img_request->data_type) {
-       case OBJ_REQUEST_BIO:
-               osd_req_op_extent_osd_data_bio(obj_req->osd_req, which,
-                                              &obj_req->bio_pos,
-                                              obj_req->ex.oe_len);
-               break;
-       case OBJ_REQUEST_BVECS:
-       case OBJ_REQUEST_OWN_BVECS:
-               rbd_assert(obj_req->bvec_pos.iter.bi_size ==
-                                                       obj_req->ex.oe_len);
-               rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
-               osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which,
-                                                   &obj_req->bvec_pos);
-               break;
-       default:
-               BUG();
-       }
+       u8 state;
+
+       spin_lock(&rbd_dev->object_map_lock);
+       state = __rbd_object_map_get(rbd_dev, objno);
+       spin_unlock(&rbd_dev->object_map_lock);
+       return state;
 }
 
-static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
+static bool use_object_map(struct rbd_device *rbd_dev)
 {
-       obj_req->osd_req = __rbd_osd_req_create(obj_req, NULL, 1);
-       if (!obj_req->osd_req)
-               return -ENOMEM;
-
-       osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ,
-                              obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
-       rbd_osd_req_setup_data(obj_req, 0);
-
-       rbd_osd_req_format_read(obj_req);
-       return 0;
+       return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
+               !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
 }
 
-static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
-                               unsigned int which)
+static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
 {
-       struct page **pages;
+       u8 state;
 
-       /*
-        * The response data for a STAT call consists of:
-        *     le64 length;
-        *     struct {
-        *         le32 tv_sec;
-        *         le32 tv_nsec;
-        *     } mtime;
-        */
-       pages = ceph_alloc_page_vector(1, GFP_NOIO);
-       if (IS_ERR(pages))
-               return PTR_ERR(pages);
+       /* fall back to default logic if object map is disabled or invalid */
+       if (!use_object_map(rbd_dev))
+               return true;
 
-       osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0);
-       osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages,
-                                    8 + sizeof(struct ceph_timespec),
-                                    0, false, true);
-       return 0;
+       state = rbd_object_map_get(rbd_dev, objno);
+       return state != OBJECT_NONEXISTENT;
 }
 
-static int count_write_ops(struct rbd_obj_request *obj_req)
+static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
+                               struct ceph_object_id *oid)
 {
-       return 2; /* setallochint + write/writefull */
+       if (snap_id == CEPH_NOSNAP)
+               ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
+                               rbd_dev->spec->image_id);
+       else
+               ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
+                               rbd_dev->spec->image_id, snap_id);
 }
 
-static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
-                                 unsigned int which)
+static int rbd_object_map_lock(struct rbd_device *rbd_dev)
 {
-       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
-       u16 opcode;
-
-       osd_req_op_alloc_hint_init(obj_req->osd_req, which++,
-                                  rbd_dev->layout.object_size,
-                                  rbd_dev->layout.object_size);
-
-       if (rbd_obj_is_entire(obj_req))
-               opcode = CEPH_OSD_OP_WRITEFULL;
-       else
-               opcode = CEPH_OSD_OP_WRITE;
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       CEPH_DEFINE_OID_ONSTACK(oid);
+       u8 lock_type;
+       char *lock_tag;
+       struct ceph_locker *lockers;
+       u32 num_lockers;
+       bool broke_lock = false;
+       int ret;
 
-       osd_req_op_extent_init(obj_req->osd_req, which, opcode,
-                              obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
-       rbd_osd_req_setup_data(obj_req, which++);
+       rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
 
-       rbd_assert(which == obj_req->osd_req->r_num_ops);
-       rbd_osd_req_format_write(obj_req);
-}
+again:
+       ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
+                           CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
+       if (ret != -EBUSY || broke_lock) {
+               if (ret == -EEXIST)
+                       ret = 0; /* already locked by myself */
+               if (ret)
+                       rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
+               return ret;
+       }
 
-static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
-{
-       unsigned int num_osd_ops, which = 0;
-       bool need_guard;
-       int ret;
+       ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
+                                RBD_LOCK_NAME, &lock_type, &lock_tag,
+                                &lockers, &num_lockers);
+       if (ret) {
+               if (ret == -ENOENT)
+                       goto again;
 
-       /* reverse map the entire object onto the parent */
-       ret = rbd_obj_calc_img_extents(obj_req, true);
-       if (ret)
+               rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
                return ret;
+       }
 
-       need_guard = rbd_obj_copyup_enabled(obj_req);
-       num_osd_ops = need_guard + count_write_ops(obj_req);
+       kfree(lock_tag);
+       if (num_lockers == 0)
+               goto again;
 
-       obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
-       if (!obj_req->osd_req)
-               return -ENOMEM;
+       rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
+                ENTITY_NAME(lockers[0].id.name));
 
-       if (need_guard) {
-               ret = __rbd_obj_setup_stat(obj_req, which++);
-               if (ret)
-                       return ret;
+       ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
+                                 RBD_LOCK_NAME, lockers[0].id.cookie,
+                                 &lockers[0].id.name);
+       ceph_free_lockers(lockers, num_lockers);
+       if (ret) {
+               if (ret == -ENOENT)
+                       goto again;
 
-               obj_req->write_state = RBD_OBJ_WRITE_GUARD;
-       } else {
-               obj_req->write_state = RBD_OBJ_WRITE_FLAT;
+               rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
+               return ret;
        }
 
-       __rbd_obj_setup_write(obj_req, which);
-       return 0;
+       broke_lock = true;
+       goto again;
 }
 
-static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
+static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
 {
-       return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
-                                         CEPH_OSD_OP_ZERO;
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       CEPH_DEFINE_OID_ONSTACK(oid);
+       int ret;
+
+       rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
+
+       ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
+                             "");
+       if (ret && ret != -ENOENT)
+               rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
 }
 
-static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
+static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
 {
-       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
-       u64 off = obj_req->ex.oe_off;
-       u64 next_off = obj_req->ex.oe_off + obj_req->ex.oe_len;
+       u8 struct_v;
+       u32 struct_len;
+       u32 header_len;
+       void *header_end;
        int ret;
 
-       /*
-        * Align the range to alloc_size boundary and punt on discards
-        * that are too small to free up any space.
-        *
-        * alloc_size == object_size && is_tail() is a special case for
-        * filestore with filestore_punch_hole = false, needed to allow
-        * truncate (in addition to delete).
-        */
-       if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
-           !rbd_obj_is_tail(obj_req)) {
-               off = round_up(off, rbd_dev->opts->alloc_size);
-               next_off = round_down(next_off, rbd_dev->opts->alloc_size);
-               if (off >= next_off)
-                       return 1;
-       }
+       ceph_decode_32_safe(p, end, header_len, e_inval);
+       header_end = *p + header_len;
 
-       /* reverse map the entire object onto the parent */
-       ret = rbd_obj_calc_img_extents(obj_req, true);
+       ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
+                                 &struct_len);
        if (ret)
                return ret;
 
-       obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
-       if (!obj_req->osd_req)
-               return -ENOMEM;
-
-       if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
-               osd_req_op_init(obj_req->osd_req, 0, CEPH_OSD_OP_DELETE, 0);
-       } else {
-               dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
-                    obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
-                    off, next_off - off);
-               osd_req_op_extent_init(obj_req->osd_req, 0,
-                                      truncate_or_zero_opcode(obj_req),
-                                      off, next_off - off, 0, 0);
-       }
+       ceph_decode_64_safe(p, end, *object_map_size, e_inval);
 
-       obj_req->write_state = RBD_OBJ_WRITE_FLAT;
-       rbd_osd_req_format_write(obj_req);
+       *p = header_end;
        return 0;
+
+e_inval:
+       return -EINVAL;
 }
 
-static int count_zeroout_ops(struct rbd_obj_request *obj_req)
+static int __rbd_object_map_load(struct rbd_device *rbd_dev)
 {
-       int num_osd_ops;
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       CEPH_DEFINE_OID_ONSTACK(oid);
+       struct page **pages;
+       void *p, *end;
+       size_t reply_len;
+       u64 num_objects;
+       u64 object_map_bytes;
+       u64 object_map_size;
+       int num_pages;
+       int ret;
 
-       if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
-           !rbd_obj_copyup_enabled(obj_req))
-               num_osd_ops = 2; /* create + truncate */
-       else
-               num_osd_ops = 1; /* delete/truncate/zero */
+       rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
 
-       return num_osd_ops;
-}
+       num_objects = ceph_get_num_objects(&rbd_dev->layout,
+                                          rbd_dev->mapping.size);
+       object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
+                                           BITS_PER_BYTE);
+       num_pages = calc_pages_for(0, object_map_bytes) + 1;
+       pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+       if (IS_ERR(pages))
+               return PTR_ERR(pages);
 
-static void __rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req,
-                                   unsigned int which)
-{
-       u16 opcode;
+       reply_len = num_pages * PAGE_SIZE;
+       rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
+       ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
+                            "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
+                            NULL, 0, pages, &reply_len);
+       if (ret)
+               goto out;
 
-       if (rbd_obj_is_entire(obj_req)) {
-               if (obj_req->num_img_extents) {
-                       if (!rbd_obj_copyup_enabled(obj_req))
-                               osd_req_op_init(obj_req->osd_req, which++,
-                                               CEPH_OSD_OP_CREATE, 0);
-                       opcode = CEPH_OSD_OP_TRUNCATE;
-               } else {
-                       osd_req_op_init(obj_req->osd_req, which++,
-                                       CEPH_OSD_OP_DELETE, 0);
-                       opcode = 0;
-               }
-       } else {
-               opcode = truncate_or_zero_opcode(obj_req);
+       p = page_address(pages[0]);
+       end = p + min(reply_len, (size_t)PAGE_SIZE);
+       ret = decode_object_map_header(&p, end, &object_map_size);
+       if (ret)
+               goto out;
+
+       if (object_map_size != num_objects) {
+               rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
+                        object_map_size, num_objects);
+               ret = -EINVAL;
+               goto out;
        }
 
-       if (opcode)
-               osd_req_op_extent_init(obj_req->osd_req, which++, opcode,
-                                      obj_req->ex.oe_off, obj_req->ex.oe_len,
-                                      0, 0);
+       if (offset_in_page(p) + object_map_bytes > reply_len) {
+               ret = -EINVAL;
+               goto out;
+       }
+
+       rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
+       if (!rbd_dev->object_map) {
+               ret = -ENOMEM;
+               goto out;
+       }
+
+       rbd_dev->object_map_size = object_map_size;
+       ceph_copy_from_page_vector(pages, rbd_dev->object_map,
+                                  offset_in_page(p), object_map_bytes);
+
+out:
+       ceph_release_page_vector(pages, num_pages);
+       return ret;
+}
 
-       rbd_assert(which == obj_req->osd_req->r_num_ops);
-       rbd_osd_req_format_write(obj_req);
+static void rbd_object_map_free(struct rbd_device *rbd_dev)
+{
+       kvfree(rbd_dev->object_map);
+       rbd_dev->object_map = NULL;
+       rbd_dev->object_map_size = 0;
 }
 
-static int rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req)
+static int rbd_object_map_load(struct rbd_device *rbd_dev)
 {
-       unsigned int num_osd_ops, which = 0;
-       bool need_guard;
        int ret;
 
-       /* reverse map the entire object onto the parent */
-       ret = rbd_obj_calc_img_extents(obj_req, true);
+       ret = __rbd_object_map_load(rbd_dev);
        if (ret)
                return ret;
 
-       need_guard = rbd_obj_copyup_enabled(obj_req);
-       num_osd_ops = need_guard + count_zeroout_ops(obj_req);
-
-       obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
-       if (!obj_req->osd_req)
-               return -ENOMEM;
+       ret = rbd_dev_v2_get_flags(rbd_dev);
+       if (ret) {
+               rbd_object_map_free(rbd_dev);
+               return ret;
+       }
+
+       if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
+               rbd_warn(rbd_dev, "object map is invalid");
+
+       return 0;
+}
+
+static int rbd_object_map_open(struct rbd_device *rbd_dev)
+{
+       int ret;
+
+       ret = rbd_object_map_lock(rbd_dev);
+       if (ret)
+               return ret;
+
+       ret = rbd_object_map_load(rbd_dev);
+       if (ret) {
+               rbd_object_map_unlock(rbd_dev);
+               return ret;
+       }
+
+       return 0;
+}
+
+static void rbd_object_map_close(struct rbd_device *rbd_dev)
+{
+       rbd_object_map_free(rbd_dev);
+       rbd_object_map_unlock(rbd_dev);
+}
+
+/*
+ * This function needs snap_id (or more precisely just something to
+ * distinguish between HEAD and snapshot object maps), new_state and
+ * current_state that were passed to rbd_object_map_update().
+ *
+ * To avoid allocating and stashing a context we piggyback on the OSD
+ * request.  A HEAD update has two ops (assert_locked).  For new_state
+ * and current_state we decode our own object_map_update op, encoded in
+ * rbd_cls_object_map_update().
+ */
+static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
+                                       struct ceph_osd_request *osd_req)
+{
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+       struct ceph_osd_data *osd_data;
+       u64 objno;
+       u8 state, new_state, current_state;
+       bool has_current_state;
+       void *p;
+
+       if (osd_req->r_result)
+               return osd_req->r_result;
+
+       /*
+        * Nothing to do for a snapshot object map.
+        */
+       if (osd_req->r_num_ops == 1)
+               return 0;
+
+       /*
+        * Update in-memory HEAD object map.
+        */
+       rbd_assert(osd_req->r_num_ops == 2);
+       osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
+       rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
+
+       p = page_address(osd_data->pages[0]);
+       objno = ceph_decode_64(&p);
+       rbd_assert(objno == obj_req->ex.oe_objno);
+       rbd_assert(ceph_decode_64(&p) == objno + 1);
+       new_state = ceph_decode_8(&p);
+       has_current_state = ceph_decode_8(&p);
+       if (has_current_state)
+               current_state = ceph_decode_8(&p);
+
+       spin_lock(&rbd_dev->object_map_lock);
+       state = __rbd_object_map_get(rbd_dev, objno);
+       if (!has_current_state || current_state == state ||
+           (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
+               __rbd_object_map_set(rbd_dev, objno, new_state);
+       spin_unlock(&rbd_dev->object_map_lock);
+
+       return 0;
+}
+
+static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
+{
+       struct rbd_obj_request *obj_req = osd_req->r_priv;
+       int result;
+
+       dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
+            osd_req->r_result, obj_req);
+
+       result = rbd_object_map_update_finish(obj_req, osd_req);
+       rbd_obj_handle_request(obj_req, result);
+}
+
+static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
+{
+       u8 state = rbd_object_map_get(rbd_dev, objno);
 
-       if (need_guard) {
-               ret = __rbd_obj_setup_stat(obj_req, which++);
+       if (state == new_state ||
+           (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
+           (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
+               return false;
+
+       return true;
+}
+
+static int rbd_cls_object_map_update(struct ceph_osd_request *req,
+                                    int which, u64 objno, u8 new_state,
+                                    const u8 *current_state)
+{
+       struct page **pages;
+       void *p, *start;
+       int ret;
+
+       ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
+       if (ret)
+               return ret;
+
+       pages = ceph_alloc_page_vector(1, GFP_NOIO);
+       if (IS_ERR(pages))
+               return PTR_ERR(pages);
+
+       p = start = page_address(pages[0]);
+       ceph_encode_64(&p, objno);
+       ceph_encode_64(&p, objno + 1);
+       ceph_encode_8(&p, new_state);
+       if (current_state) {
+               ceph_encode_8(&p, 1);
+               ceph_encode_8(&p, *current_state);
+       } else {
+               ceph_encode_8(&p, 0);
+       }
+
+       osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
+                                         false, true);
+       return 0;
+}
+
+/*
+ * Return:
+ *   0 - object map update sent
+ *   1 - object map update isn't needed
+ *  <0 - error
+ */
+static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
+                                u8 new_state, const u8 *current_state)
+{
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       struct ceph_osd_request *req;
+       int num_ops = 1;
+       int which = 0;
+       int ret;
+
+       if (snap_id == CEPH_NOSNAP) {
+               if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
+                       return 1;
+
+               num_ops++; /* assert_locked */
+       }
+
+       req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
+       if (!req)
+               return -ENOMEM;
+
+       list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
+       req->r_callback = rbd_object_map_callback;
+       req->r_priv = obj_req;
+
+       rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
+       ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
+       req->r_flags = CEPH_OSD_FLAG_WRITE;
+       ktime_get_real_ts64(&req->r_mtime);
+
+       if (snap_id == CEPH_NOSNAP) {
+               /*
+                * Protect against possible race conditions during lock
+                * ownership transitions.
+                */
+               ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
+                                            CEPH_CLS_LOCK_EXCLUSIVE, "", "");
                if (ret)
                        return ret;
+       }
+
+       ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
+                                       new_state, current_state);
+       if (ret)
+               return ret;
+
+       ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+       if (ret)
+               return ret;
 
-               obj_req->write_state = RBD_OBJ_WRITE_GUARD;
+       ceph_osdc_start_request(osdc, req, false);
+       return 0;
+}
+
+static void prune_extents(struct ceph_file_extent *img_extents,
+                         u32 *num_img_extents, u64 overlap)
+{
+       u32 cnt = *num_img_extents;
+
+       /* drop extents completely beyond the overlap */
+       while (cnt && img_extents[cnt - 1].fe_off >= overlap)
+               cnt--;
+
+       if (cnt) {
+               struct ceph_file_extent *ex = &img_extents[cnt - 1];
+
+               /* trim final overlapping extent */
+               if (ex->fe_off + ex->fe_len > overlap)
+                       ex->fe_len = overlap - ex->fe_off;
+       }
+
+       *num_img_extents = cnt;
+}
+
+/*
+ * Determine the byte range(s) covered by either just the object extent
+ * or the entire object in the parent image.
+ */
+static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
+                                   bool entire)
+{
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+       int ret;
+
+       if (!rbd_dev->parent_overlap)
+               return 0;
+
+       ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
+                                 entire ? 0 : obj_req->ex.oe_off,
+                                 entire ? rbd_dev->layout.object_size :
+                                                       obj_req->ex.oe_len,
+                                 &obj_req->img_extents,
+                                 &obj_req->num_img_extents);
+       if (ret)
+               return ret;
+
+       prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
+                     rbd_dev->parent_overlap);
+       return 0;
+}
+
+static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
+{
+       struct rbd_obj_request *obj_req = osd_req->r_priv;
+
+       switch (obj_req->img_request->data_type) {
+       case OBJ_REQUEST_BIO:
+               osd_req_op_extent_osd_data_bio(osd_req, which,
+                                              &obj_req->bio_pos,
+                                              obj_req->ex.oe_len);
+               break;
+       case OBJ_REQUEST_BVECS:
+       case OBJ_REQUEST_OWN_BVECS:
+               rbd_assert(obj_req->bvec_pos.iter.bi_size ==
+                                                       obj_req->ex.oe_len);
+               rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
+               osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
+                                                   &obj_req->bvec_pos);
+               break;
+       default:
+               BUG();
+       }
+}
+
+static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
+{
+       struct page **pages;
+
+       /*
+        * The response data for a STAT call consists of:
+        *     le64 length;
+        *     struct {
+        *         le32 tv_sec;
+        *         le32 tv_nsec;
+        *     } mtime;
+        */
+       pages = ceph_alloc_page_vector(1, GFP_NOIO);
+       if (IS_ERR(pages))
+               return PTR_ERR(pages);
+
+       osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
+       osd_req_op_raw_data_in_pages(osd_req, which, pages,
+                                    8 + sizeof(struct ceph_timespec),
+                                    0, false, true);
+       return 0;
+}
+
+static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which,
+                               u32 bytes)
+{
+       struct rbd_obj_request *obj_req = osd_req->r_priv;
+       int ret;
+
+       ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup");
+       if (ret)
+               return ret;
+
+       osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs,
+                                         obj_req->copyup_bvec_count, bytes);
+       return 0;
+}
+
+static int rbd_obj_init_read(struct rbd_obj_request *obj_req)
+{
+       obj_req->read_state = RBD_OBJ_READ_START;
+       return 0;
+}
+
+static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
+                                     int which)
+{
+       struct rbd_obj_request *obj_req = osd_req->r_priv;
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+       u16 opcode;
+
+       if (!use_object_map(rbd_dev) ||
+           !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
+               osd_req_op_alloc_hint_init(osd_req, which++,
+                                          rbd_dev->layout.object_size,
+                                          rbd_dev->layout.object_size);
+       }
+
+       if (rbd_obj_is_entire(obj_req))
+               opcode = CEPH_OSD_OP_WRITEFULL;
+       else
+               opcode = CEPH_OSD_OP_WRITE;
+
+       osd_req_op_extent_init(osd_req, which, opcode,
+                              obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
+       rbd_osd_setup_data(osd_req, which);
+}
+
+static int rbd_obj_init_write(struct rbd_obj_request *obj_req)
+{
+       int ret;
+
+       /* reverse map the entire object onto the parent */
+       ret = rbd_obj_calc_img_extents(obj_req, true);
+       if (ret)
+               return ret;
+
+       if (rbd_obj_copyup_enabled(obj_req))
+               obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
+
+       obj_req->write_state = RBD_OBJ_WRITE_START;
+       return 0;
+}
+
+static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
+{
+       return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
+                                         CEPH_OSD_OP_ZERO;
+}
+
+static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req,
+                                       int which)
+{
+       struct rbd_obj_request *obj_req = osd_req->r_priv;
+
+       if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
+               rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
+               osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0);
        } else {
-               obj_req->write_state = RBD_OBJ_WRITE_FLAT;
+               osd_req_op_extent_init(osd_req, which,
+                                      truncate_or_zero_opcode(obj_req),
+                                      obj_req->ex.oe_off, obj_req->ex.oe_len,
+                                      0, 0);
+       }
+}
+
+static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
+{
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+       u64 off, next_off;
+       int ret;
+
+       /*
+        * Align the range to alloc_size boundary and punt on discards
+        * that are too small to free up any space.
+        *
+        * alloc_size == object_size && is_tail() is a special case for
+        * filestore with filestore_punch_hole = false, needed to allow
+        * truncate (in addition to delete).
+        */
+       if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
+           !rbd_obj_is_tail(obj_req)) {
+               off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
+               next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
+                                     rbd_dev->opts->alloc_size);
+               if (off >= next_off)
+                       return 1;
+
+               dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
+                    obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
+                    off, next_off - off);
+               obj_req->ex.oe_off = off;
+               obj_req->ex.oe_len = next_off - off;
+       }
+
+       /* reverse map the entire object onto the parent */
+       ret = rbd_obj_calc_img_extents(obj_req, true);
+       if (ret)
+               return ret;
+
+       obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
+       if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
+               obj_req->flags |= RBD_OBJ_FLAG_DELETION;
+
+       obj_req->write_state = RBD_OBJ_WRITE_START;
+       return 0;
+}
+
+static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
+                                       int which)
+{
+       struct rbd_obj_request *obj_req = osd_req->r_priv;
+       u16 opcode;
+
+       if (rbd_obj_is_entire(obj_req)) {
+               if (obj_req->num_img_extents) {
+                       if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
+                               osd_req_op_init(osd_req, which++,
+                                               CEPH_OSD_OP_CREATE, 0);
+                       opcode = CEPH_OSD_OP_TRUNCATE;
+               } else {
+                       rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
+                       osd_req_op_init(osd_req, which++,
+                                       CEPH_OSD_OP_DELETE, 0);
+                       opcode = 0;
+               }
+       } else {
+               opcode = truncate_or_zero_opcode(obj_req);
+       }
+
+       if (opcode)
+               osd_req_op_extent_init(osd_req, which, opcode,
+                                      obj_req->ex.oe_off, obj_req->ex.oe_len,
+                                      0, 0);
+}
+
+static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
+{
+       int ret;
+
+       /* reverse map the entire object onto the parent */
+       ret = rbd_obj_calc_img_extents(obj_req, true);
+       if (ret)
+               return ret;
+
+       if (rbd_obj_copyup_enabled(obj_req))
+               obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
+       if (!obj_req->num_img_extents) {
+               obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
+               if (rbd_obj_is_entire(obj_req))
+                       obj_req->flags |= RBD_OBJ_FLAG_DELETION;
        }
 
-       __rbd_obj_setup_zeroout(obj_req, which);
+       obj_req->write_state = RBD_OBJ_WRITE_START;
        return 0;
 }
 
+static int count_write_ops(struct rbd_obj_request *obj_req)
+{
+       struct rbd_img_request *img_req = obj_req->img_request;
+
+       switch (img_req->op_type) {
+       case OBJ_OP_WRITE:
+               if (!use_object_map(img_req->rbd_dev) ||
+                   !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST))
+                       return 2; /* setallochint + write/writefull */
+
+               return 1; /* write/writefull */
+       case OBJ_OP_DISCARD:
+               return 1; /* delete/truncate/zero */
+       case OBJ_OP_ZEROOUT:
+               if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
+                   !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
+                       return 2; /* create + truncate */
+
+               return 1; /* delete/truncate/zero */
+       default:
+               BUG();
+       }
+}
+
+static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
+                                   int which)
+{
+       struct rbd_obj_request *obj_req = osd_req->r_priv;
+
+       switch (obj_req->img_request->op_type) {
+       case OBJ_OP_WRITE:
+               __rbd_osd_setup_write_ops(osd_req, which);
+               break;
+       case OBJ_OP_DISCARD:
+               __rbd_osd_setup_discard_ops(osd_req, which);
+               break;
+       case OBJ_OP_ZEROOUT:
+               __rbd_osd_setup_zeroout_ops(osd_req, which);
+               break;
+       default:
+               BUG();
+       }
+}
+
 /*
- * For each object request in @img_req, allocate an OSD request, add
- * individual OSD ops and prepare them for submission.  The number of
- * OSD ops depends on op_type and the overlap point (if any).
+ * Prune the list of object requests (adjust offset and/or length, drop
+ * redundant requests).  Prepare object request state machines and image
+ * request state machine for execution.
  */
 static int __rbd_img_fill_request(struct rbd_img_request *img_req)
 {
@@ -2024,16 +2568,16 @@ static int __rbd_img_fill_request(struct rbd_img_request *img_req)
        for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
                switch (img_req->op_type) {
                case OBJ_OP_READ:
-                       ret = rbd_obj_setup_read(obj_req);
+                       ret = rbd_obj_init_read(obj_req);
                        break;
                case OBJ_OP_WRITE:
-                       ret = rbd_obj_setup_write(obj_req);
+                       ret = rbd_obj_init_write(obj_req);
                        break;
                case OBJ_OP_DISCARD:
-                       ret = rbd_obj_setup_discard(obj_req);
+                       ret = rbd_obj_init_discard(obj_req);
                        break;
                case OBJ_OP_ZEROOUT:
-                       ret = rbd_obj_setup_zeroout(obj_req);
+                       ret = rbd_obj_init_zeroout(obj_req);
                        break;
                default:
                        BUG();
@@ -2041,17 +2585,12 @@ static int __rbd_img_fill_request(struct rbd_img_request *img_req)
                if (ret < 0)
                        return ret;
                if (ret > 0) {
-                       img_req->xferred += obj_req->ex.oe_len;
-                       img_req->pending_count--;
                        rbd_img_obj_request_del(img_req, obj_req);
                        continue;
                }
-
-               ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
-               if (ret)
-                       return ret;
        }
 
+       img_req->state = RBD_IMG_START;
        return 0;
 }
 
@@ -2340,17 +2879,55 @@ static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
                                         &it);
 }
 
-static void rbd_img_request_submit(struct rbd_img_request *img_request)
+static void rbd_img_handle_request_work(struct work_struct *work)
 {
-       struct rbd_obj_request *obj_request;
+       struct rbd_img_request *img_req =
+           container_of(work, struct rbd_img_request, work);
 
-       dout("%s: img %p\n", __func__, img_request);
+       rbd_img_handle_request(img_req, img_req->work_result);
+}
 
-       rbd_img_request_get(img_request);
-       for_each_obj_request(img_request, obj_request)
-               rbd_obj_request_submit(obj_request);
+static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
+{
+       INIT_WORK(&img_req->work, rbd_img_handle_request_work);
+       img_req->work_result = result;
+       queue_work(rbd_wq, &img_req->work);
+}
 
-       rbd_img_request_put(img_request);
+static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
+{
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+
+       if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
+               obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
+               return true;
+       }
+
+       dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
+            obj_req->ex.oe_objno);
+       return false;
+}
+
+static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
+{
+       struct ceph_osd_request *osd_req;
+       int ret;
+
+       osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
+       if (IS_ERR(osd_req))
+               return PTR_ERR(osd_req);
+
+       osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
+                              obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
+       rbd_osd_setup_data(osd_req, 0);
+       rbd_osd_format_read(osd_req);
+
+       ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
+       if (ret)
+               return ret;
+
+       rbd_osd_submit(osd_req);
+       return 0;
 }
 
 static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
@@ -2396,51 +2973,144 @@ static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
                return ret;
        }
 
-       rbd_img_request_submit(child_img_req);
+       /* avoid parent chain recursion */
+       rbd_img_schedule(child_img_req, 0);
        return 0;
 }
 
-static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req)
+static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
 {
        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
        int ret;
 
-       if (obj_req->result == -ENOENT &&
-           rbd_dev->parent_overlap && !obj_req->tried_parent) {
-               /* reverse map this object extent onto the parent */
-               ret = rbd_obj_calc_img_extents(obj_req, false);
+again:
+       switch (obj_req->read_state) {
+       case RBD_OBJ_READ_START:
+               rbd_assert(!*result);
+
+               if (!rbd_obj_may_exist(obj_req)) {
+                       *result = -ENOENT;
+                       obj_req->read_state = RBD_OBJ_READ_OBJECT;
+                       goto again;
+               }
+
+               ret = rbd_obj_read_object(obj_req);
                if (ret) {
-                       obj_req->result = ret;
+                       *result = ret;
                        return true;
                }
-
-               if (obj_req->num_img_extents) {
-                       obj_req->tried_parent = true;
-                       ret = rbd_obj_read_from_parent(obj_req);
+               obj_req->read_state = RBD_OBJ_READ_OBJECT;
+               return false;
+       case RBD_OBJ_READ_OBJECT:
+               if (*result == -ENOENT && rbd_dev->parent_overlap) {
+                       /* reverse map this object extent onto the parent */
+                       ret = rbd_obj_calc_img_extents(obj_req, false);
                        if (ret) {
-                               obj_req->result = ret;
+                               *result = ret;
                                return true;
                        }
-                       return false;
+                       if (obj_req->num_img_extents) {
+                               ret = rbd_obj_read_from_parent(obj_req);
+                               if (ret) {
+                                       *result = ret;
+                                       return true;
+                               }
+                               obj_req->read_state = RBD_OBJ_READ_PARENT;
+                               return false;
+                       }
                }
+
+               /*
+                * -ENOENT means a hole in the image -- zero-fill the entire
+                * length of the request.  A short read also implies zero-fill
+                * to the end of the request.
+                */
+               if (*result == -ENOENT) {
+                       rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len);
+                       *result = 0;
+               } else if (*result >= 0) {
+                       if (*result < obj_req->ex.oe_len)
+                               rbd_obj_zero_range(obj_req, *result,
+                                               obj_req->ex.oe_len - *result);
+                       else
+                               rbd_assert(*result == obj_req->ex.oe_len);
+                       *result = 0;
+               }
+               return true;
+       case RBD_OBJ_READ_PARENT:
+               return true;
+       default:
+               BUG();
        }
+}
+
+static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
+{
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+
+       if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
+               obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
+
+       if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
+           (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
+               dout("%s %p noop for nonexistent\n", __func__, obj_req);
+               return true;
+       }
+
+       return false;
+}
+
+/*
+ * Return:
+ *   0 - object map update sent
+ *   1 - object map update isn't needed
+ *  <0 - error
+ */
+static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
+{
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+       u8 new_state;
+
+       if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
+               return 1;
+
+       if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
+               new_state = OBJECT_PENDING;
+       else
+               new_state = OBJECT_EXISTS;
+
+       return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
+}
+
+static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
+{
+       struct ceph_osd_request *osd_req;
+       int num_ops = count_write_ops(obj_req);
+       int which = 0;
+       int ret;
 
-       /*
-        * -ENOENT means a hole in the image -- zero-fill the entire
-        * length of the request.  A short read also implies zero-fill
-        * to the end of the request.  In both cases we update xferred
-        * count to indicate the whole request was satisfied.
-        */
-       if (obj_req->result == -ENOENT ||
-           (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) {
-               rbd_assert(!obj_req->xferred || !obj_req->result);
-               rbd_obj_zero_range(obj_req, obj_req->xferred,
-                                  obj_req->ex.oe_len - obj_req->xferred);
-               obj_req->result = 0;
-               obj_req->xferred = obj_req->ex.oe_len;
+       if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
+               num_ops++; /* stat */
+
+       osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
+       if (IS_ERR(osd_req))
+               return PTR_ERR(osd_req);
+
+       if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
+               ret = rbd_osd_setup_stat(osd_req, which++);
+               if (ret)
+                       return ret;
        }
 
-       return true;
+       rbd_osd_setup_write_ops(osd_req, which);
+       rbd_osd_format_write(osd_req);
+
+       ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
+       if (ret)
+               return ret;
+
+       rbd_osd_submit(osd_req);
+       return 0;
 }
 
 /*
@@ -2463,123 +3133,67 @@ static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
 
 #define MODS_ONLY      U32_MAX
 
-static int rbd_obj_issue_copyup_empty_snapc(struct rbd_obj_request *obj_req,
-                                           u32 bytes)
+static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req,
+                                     u32 bytes)
 {
+       struct ceph_osd_request *osd_req;
        int ret;
 
        dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
-       rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
        rbd_assert(bytes > 0 && bytes != MODS_ONLY);
-       rbd_osd_req_destroy(obj_req->osd_req);
 
-       obj_req->osd_req = __rbd_osd_req_create(obj_req, &rbd_empty_snapc, 1);
-       if (!obj_req->osd_req)
-               return -ENOMEM;
+       osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
+       if (IS_ERR(osd_req))
+               return PTR_ERR(osd_req);
 
-       ret = osd_req_op_cls_init(obj_req->osd_req, 0, "rbd", "copyup");
+       ret = rbd_osd_setup_copyup(osd_req, 0, bytes);
        if (ret)
                return ret;
 
-       osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
-                                         obj_req->copyup_bvecs,
-                                         obj_req->copyup_bvec_count,
-                                         bytes);
-       rbd_osd_req_format_write(obj_req);
+       rbd_osd_format_write(osd_req);
 
-       ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
+       ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
        if (ret)
                return ret;
 
-       rbd_obj_request_submit(obj_req);
+       rbd_osd_submit(osd_req);
        return 0;
 }
 
-static int rbd_obj_issue_copyup_ops(struct rbd_obj_request *obj_req, u32 bytes)
+static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
+                                       u32 bytes)
 {
-       struct rbd_img_request *img_req = obj_req->img_request;
-       unsigned int num_osd_ops = (bytes != MODS_ONLY);
-       unsigned int which = 0;
+       struct ceph_osd_request *osd_req;
+       int num_ops = count_write_ops(obj_req);
+       int which = 0;
        int ret;
 
        dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
-       rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT ||
-                  obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_CALL);
-       rbd_osd_req_destroy(obj_req->osd_req);
 
-       switch (img_req->op_type) {
-       case OBJ_OP_WRITE:
-               num_osd_ops += count_write_ops(obj_req);
-               break;
-       case OBJ_OP_ZEROOUT:
-               num_osd_ops += count_zeroout_ops(obj_req);
-               break;
-       default:
-               BUG();
-       }
+       if (bytes != MODS_ONLY)
+               num_ops++; /* copyup */
 
-       obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
-       if (!obj_req->osd_req)
-               return -ENOMEM;
+       osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
+       if (IS_ERR(osd_req))
+               return PTR_ERR(osd_req);
 
        if (bytes != MODS_ONLY) {
-               ret = osd_req_op_cls_init(obj_req->osd_req, which, "rbd",
-                                         "copyup");
+               ret = rbd_osd_setup_copyup(osd_req, which++, bytes);
                if (ret)
                        return ret;
-
-               osd_req_op_cls_request_data_bvecs(obj_req->osd_req, which++,
-                                                 obj_req->copyup_bvecs,
-                                                 obj_req->copyup_bvec_count,
-                                                 bytes);
        }
 
-       switch (img_req->op_type) {
-       case OBJ_OP_WRITE:
-               __rbd_obj_setup_write(obj_req, which);
-               break;
-       case OBJ_OP_ZEROOUT:
-               __rbd_obj_setup_zeroout(obj_req, which);
-               break;
-       default:
-               BUG();
-       }
+       rbd_osd_setup_write_ops(osd_req, which);
+       rbd_osd_format_write(osd_req);
 
-       ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
+       ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
        if (ret)
                return ret;
 
-       rbd_obj_request_submit(obj_req);
+       rbd_osd_submit(osd_req);
        return 0;
 }
 
-static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
-{
-       /*
-        * Only send non-zero copyup data to save some I/O and network
-        * bandwidth -- zero copyup data is equivalent to the object not
-        * existing.
-        */
-       if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
-               dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
-               bytes = 0;
-       }
-
-       if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
-               /*
-                * Send a copyup request with an empty snapshot context to
-                * deep-copyup the object through all existing snapshots.
-                * A second request with the current snapshot context will be
-                * sent for the actual modification.
-                */
-               obj_req->write_state = RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC;
-               return rbd_obj_issue_copyup_empty_snapc(obj_req, bytes);
-       }
-
-       obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS;
-       return rbd_obj_issue_copyup_ops(obj_req, bytes);
-}
-
 static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
 {
        u32 i;
@@ -2608,7 +3222,12 @@ static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
        return 0;
 }
 
-static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
+/*
+ * The target object doesn't exist.  Read the data for the entire
+ * target object up to the overlap point (if any) from the parent,
+ * so we can use it for a copyup.
+ */
+static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
 {
        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
        int ret;
@@ -2623,178 +3242,492 @@ static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
                 * request -- pass MODS_ONLY since the copyup isn't needed
                 * anymore.
                 */
-               obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS;
-               return rbd_obj_issue_copyup_ops(obj_req, MODS_ONLY);
+               return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
        }
 
        ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
        if (ret)
                return ret;
 
-       obj_req->write_state = RBD_OBJ_WRITE_READ_FROM_PARENT;
        return rbd_obj_read_from_parent(obj_req);
 }
 
-static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
+static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
+{
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+       struct ceph_snap_context *snapc = obj_req->img_request->snapc;
+       u8 new_state;
+       u32 i;
+       int ret;
+
+       rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
+
+       if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
+               return;
+
+       if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
+               return;
+
+       for (i = 0; i < snapc->num_snaps; i++) {
+               if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
+                   i + 1 < snapc->num_snaps)
+                       new_state = OBJECT_EXISTS_CLEAN;
+               else
+                       new_state = OBJECT_EXISTS;
+
+               ret = rbd_object_map_update(obj_req, snapc->snaps[i],
+                                           new_state, NULL);
+               if (ret < 0) {
+                       obj_req->pending.result = ret;
+                       return;
+               }
+
+               rbd_assert(!ret);
+               obj_req->pending.num_pending++;
+       }
+}
+
+static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
+{
+       u32 bytes = rbd_obj_img_extents_bytes(obj_req);
+       int ret;
+
+       rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
+
+       /*
+        * Only send non-zero copyup data to save some I/O and network
+        * bandwidth -- zero copyup data is equivalent to the object not
+        * existing.
+        */
+       if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
+               bytes = 0;
+
+       if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
+               /*
+                * Send a copyup request with an empty snapshot context to
+                * deep-copyup the object through all existing snapshots.
+                * A second request with the current snapshot context will be
+                * sent for the actual modification.
+                */
+               ret = rbd_obj_copyup_empty_snapc(obj_req, bytes);
+               if (ret) {
+                       obj_req->pending.result = ret;
+                       return;
+               }
+
+               obj_req->pending.num_pending++;
+               bytes = MODS_ONLY;
+       }
+
+       ret = rbd_obj_copyup_current_snapc(obj_req, bytes);
+       if (ret) {
+               obj_req->pending.result = ret;
+               return;
+       }
+
+       obj_req->pending.num_pending++;
+}
+
+static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
+{
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+       int ret;
+
+again:
+       switch (obj_req->copyup_state) {
+       case RBD_OBJ_COPYUP_START:
+               rbd_assert(!*result);
+
+               ret = rbd_obj_copyup_read_parent(obj_req);
+               if (ret) {
+                       *result = ret;
+                       return true;
+               }
+               if (obj_req->num_img_extents)
+                       obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT;
+               else
+                       obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
+               return false;
+       case RBD_OBJ_COPYUP_READ_PARENT:
+               if (*result)
+                       return true;
+
+               if (is_zero_bvecs(obj_req->copyup_bvecs,
+                                 rbd_obj_img_extents_bytes(obj_req))) {
+                       dout("%s %p detected zeros\n", __func__, obj_req);
+                       obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
+               }
+
+               rbd_obj_copyup_object_maps(obj_req);
+               if (!obj_req->pending.num_pending) {
+                       *result = obj_req->pending.result;
+                       obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
+                       goto again;
+               }
+               obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
+               return false;
+       case __RBD_OBJ_COPYUP_OBJECT_MAPS:
+               if (!pending_result_dec(&obj_req->pending, result))
+                       return false;
+               /* fall through */
+       case RBD_OBJ_COPYUP_OBJECT_MAPS:
+               if (*result) {
+                       rbd_warn(rbd_dev, "snap object map update failed: %d",
+                                *result);
+                       return true;
+               }
+
+               rbd_obj_copyup_write_object(obj_req);
+               if (!obj_req->pending.num_pending) {
+                       *result = obj_req->pending.result;
+                       obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
+                       goto again;
+               }
+               obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT;
+               return false;
+       case __RBD_OBJ_COPYUP_WRITE_OBJECT:
+               if (!pending_result_dec(&obj_req->pending, result))
+                       return false;
+               /* fall through */
+       case RBD_OBJ_COPYUP_WRITE_OBJECT:
+               return true;
+       default:
+               BUG();
+       }
+}
+
+/*
+ * Return:
+ *   0 - object map update sent
+ *   1 - object map update isn't needed
+ *  <0 - error
+ */
+static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
+{
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+       u8 current_state = OBJECT_PENDING;
+
+       if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
+               return 1;
+
+       if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
+               return 1;
+
+       return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
+                                    &current_state);
+}
+
+static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
+{
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+       int ret;
+
+again:
+       switch (obj_req->write_state) {
+       case RBD_OBJ_WRITE_START:
+               rbd_assert(!*result);
+
+               if (rbd_obj_write_is_noop(obj_req))
+                       return true;
+
+               ret = rbd_obj_write_pre_object_map(obj_req);
+               if (ret < 0) {
+                       *result = ret;
+                       return true;
+               }
+               obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
+               if (ret > 0)
+                       goto again;
+               return false;
+       case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
+               if (*result) {
+                       rbd_warn(rbd_dev, "pre object map update failed: %d",
+                                *result);
+                       return true;
+               }
+               ret = rbd_obj_write_object(obj_req);
+               if (ret) {
+                       *result = ret;
+                       return true;
+               }
+               obj_req->write_state = RBD_OBJ_WRITE_OBJECT;
+               return false;
+       case RBD_OBJ_WRITE_OBJECT:
+               if (*result == -ENOENT) {
+                       if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
+                               *result = 0;
+                               obj_req->copyup_state = RBD_OBJ_COPYUP_START;
+                               obj_req->write_state = __RBD_OBJ_WRITE_COPYUP;
+                               goto again;
+                       }
+                       /*
+                        * On a non-existent object:
+                        *   delete - -ENOENT, truncate/zero - 0
+                        */
+                       if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
+                               *result = 0;
+               }
+               if (*result)
+                       return true;
+
+               obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
+               goto again;
+       case __RBD_OBJ_WRITE_COPYUP:
+               if (!rbd_obj_advance_copyup(obj_req, result))
+                       return false;
+               /* fall through */
+       case RBD_OBJ_WRITE_COPYUP:
+               if (*result) {
+                       rbd_warn(rbd_dev, "copyup failed: %d", *result);
+                       return true;
+               }
+               ret = rbd_obj_write_post_object_map(obj_req);
+               if (ret < 0) {
+                       *result = ret;
+                       return true;
+               }
+               obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
+               if (ret > 0)
+                       goto again;
+               return false;
+       case RBD_OBJ_WRITE_POST_OBJECT_MAP:
+               if (*result)
+                       rbd_warn(rbd_dev, "post object map update failed: %d",
+                                *result);
+               return true;
+       default:
+               BUG();
+       }
+}
+
+/*
+ * Return true if @obj_req is completed.
+ */
+static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req,
+                                    int *result)
+{
+       struct rbd_img_request *img_req = obj_req->img_request;
+       struct rbd_device *rbd_dev = img_req->rbd_dev;
+       bool done;
+
+       mutex_lock(&obj_req->state_mutex);
+       if (!rbd_img_is_write(img_req))
+               done = rbd_obj_advance_read(obj_req, result);
+       else
+               done = rbd_obj_advance_write(obj_req, result);
+       mutex_unlock(&obj_req->state_mutex);
+
+       if (done && *result) {
+               rbd_assert(*result < 0);
+               rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d",
+                        obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
+                        obj_req->ex.oe_off, obj_req->ex.oe_len, *result);
+       }
+       return done;
+}
+
+/*
+ * This is open-coded in rbd_img_handle_request() to avoid parent chain
+ * recursion.
+ */
+static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
+{
+       if (__rbd_obj_handle_request(obj_req, &result))
+               rbd_img_handle_request(obj_req->img_request, result);
+}
+
+static bool need_exclusive_lock(struct rbd_img_request *img_req)
+{
+       struct rbd_device *rbd_dev = img_req->rbd_dev;
+
+       if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK))
+               return false;
+
+       if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
+               return false;
+
+       rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
+       if (rbd_dev->opts->lock_on_read ||
+           (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
+               return true;
+
+       return rbd_img_is_write(img_req);
+}
+
+static bool rbd_lock_add_request(struct rbd_img_request *img_req)
+{
+       struct rbd_device *rbd_dev = img_req->rbd_dev;
+       bool locked;
+
+       lockdep_assert_held(&rbd_dev->lock_rwsem);
+       locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED;
+       spin_lock(&rbd_dev->lock_lists_lock);
+       rbd_assert(list_empty(&img_req->lock_item));
+       if (!locked)
+               list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list);
+       else
+               list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
+       spin_unlock(&rbd_dev->lock_lists_lock);
+       return locked;
+}
+
+static void rbd_lock_del_request(struct rbd_img_request *img_req)
+{
+       struct rbd_device *rbd_dev = img_req->rbd_dev;
+       bool need_wakeup;
+
+       lockdep_assert_held(&rbd_dev->lock_rwsem);
+       spin_lock(&rbd_dev->lock_lists_lock);
+       rbd_assert(!list_empty(&img_req->lock_item));
+       list_del_init(&img_req->lock_item);
+       need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING &&
+                      list_empty(&rbd_dev->running_list));
+       spin_unlock(&rbd_dev->lock_lists_lock);
+       if (need_wakeup)
+               complete(&rbd_dev->releasing_wait);
+}
+
+static int rbd_img_exclusive_lock(struct rbd_img_request *img_req)
+{
+       struct rbd_device *rbd_dev = img_req->rbd_dev;
+
+       if (!need_exclusive_lock(img_req))
+               return 1;
+
+       if (rbd_lock_add_request(img_req))
+               return 1;
+
+       if (rbd_dev->opts->exclusive) {
+               WARN_ON(1); /* lock got released? */
+               return -EROFS;
+       }
+
+       /*
+        * Note the use of mod_delayed_work() in rbd_acquire_lock()
+        * and cancel_delayed_work() in wake_lock_waiters().
+        */
+       dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
+       queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
+       return 0;
+}
+
+static void rbd_img_object_requests(struct rbd_img_request *img_req)
+{
+       struct rbd_obj_request *obj_req;
+
+       rbd_assert(!img_req->pending.result && !img_req->pending.num_pending);
+
+       for_each_obj_request(img_req, obj_req) {
+               int result = 0;
+
+               if (__rbd_obj_handle_request(obj_req, &result)) {
+                       if (result) {
+                               img_req->pending.result = result;
+                               return;
+                       }
+               } else {
+                       img_req->pending.num_pending++;
+               }
+       }
+}
+
+static bool rbd_img_advance(struct rbd_img_request *img_req, int *result)
 {
+       struct rbd_device *rbd_dev = img_req->rbd_dev;
        int ret;
 
-       switch (obj_req->write_state) {
-       case RBD_OBJ_WRITE_GUARD:
-               rbd_assert(!obj_req->xferred);
-               if (obj_req->result == -ENOENT) {
-                       /*
-                        * The target object doesn't exist.  Read the data for
-                        * the entire target object up to the overlap point (if
-                        * any) from the parent, so we can use it for a copyup.
-                        */
-                       ret = rbd_obj_handle_write_guard(obj_req);
-                       if (ret) {
-                               obj_req->result = ret;
-                               return true;
-                       }
-                       return false;
-               }
-               /* fall through */
-       case RBD_OBJ_WRITE_FLAT:
-       case RBD_OBJ_WRITE_COPYUP_OPS:
-               if (!obj_req->result)
-                       /*
-                        * There is no such thing as a successful short
-                        * write -- indicate the whole request was satisfied.
-                        */
-                       obj_req->xferred = obj_req->ex.oe_len;
-               return true;
-       case RBD_OBJ_WRITE_READ_FROM_PARENT:
-               if (obj_req->result)
-                       return true;
+again:
+       switch (img_req->state) {
+       case RBD_IMG_START:
+               rbd_assert(!*result);
 
-               rbd_assert(obj_req->xferred);
-               ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
-               if (ret) {
-                       obj_req->result = ret;
-                       obj_req->xferred = 0;
+               ret = rbd_img_exclusive_lock(img_req);
+               if (ret < 0) {
+                       *result = ret;
                        return true;
                }
+               img_req->state = RBD_IMG_EXCLUSIVE_LOCK;
+               if (ret > 0)
+                       goto again;
                return false;
-       case RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC:
-               if (obj_req->result)
+       case RBD_IMG_EXCLUSIVE_LOCK:
+               if (*result)
                        return true;
 
-               obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS;
-               ret = rbd_obj_issue_copyup_ops(obj_req, MODS_ONLY);
-               if (ret) {
-                       obj_req->result = ret;
-                       return true;
+               rbd_assert(!need_exclusive_lock(img_req) ||
+                          __rbd_is_lock_owner(rbd_dev));
+
+               rbd_img_object_requests(img_req);
+               if (!img_req->pending.num_pending) {
+                       *result = img_req->pending.result;
+                       img_req->state = RBD_IMG_OBJECT_REQUESTS;
+                       goto again;
                }
+               img_req->state = __RBD_IMG_OBJECT_REQUESTS;
                return false;
+       case __RBD_IMG_OBJECT_REQUESTS:
+               if (!pending_result_dec(&img_req->pending, result))
+                       return false;
+               /* fall through */
+       case RBD_IMG_OBJECT_REQUESTS:
+               return true;
        default:
                BUG();
        }
 }
 
 /*
- * Returns true if @obj_req is completed, or false otherwise.
+ * Return true if @img_req is completed.
  */
-static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req)
-{
-       switch (obj_req->img_request->op_type) {
-       case OBJ_OP_READ:
-               return rbd_obj_handle_read(obj_req);
-       case OBJ_OP_WRITE:
-               return rbd_obj_handle_write(obj_req);
-       case OBJ_OP_DISCARD:
-       case OBJ_OP_ZEROOUT:
-               if (rbd_obj_handle_write(obj_req)) {
-                       /*
-                        * Hide -ENOENT from delete/truncate/zero -- discarding
-                        * a non-existent object is not a problem.
-                        */
-                       if (obj_req->result == -ENOENT) {
-                               obj_req->result = 0;
-                               obj_req->xferred = obj_req->ex.oe_len;
-                       }
-                       return true;
-               }
-               return false;
-       default:
-               BUG();
-       }
-}
-
-static void rbd_obj_end_request(struct rbd_obj_request *obj_req)
+static bool __rbd_img_handle_request(struct rbd_img_request *img_req,
+                                    int *result)
 {
-       struct rbd_img_request *img_req = obj_req->img_request;
+       struct rbd_device *rbd_dev = img_req->rbd_dev;
+       bool done;
 
-       rbd_assert((!obj_req->result &&
-                   obj_req->xferred == obj_req->ex.oe_len) ||
-                  (obj_req->result < 0 && !obj_req->xferred));
-       if (!obj_req->result) {
-               img_req->xferred += obj_req->xferred;
-               return;
+       if (need_exclusive_lock(img_req)) {
+               down_read(&rbd_dev->lock_rwsem);
+               mutex_lock(&img_req->state_mutex);
+               done = rbd_img_advance(img_req, result);
+               if (done)
+                       rbd_lock_del_request(img_req);
+               mutex_unlock(&img_req->state_mutex);
+               up_read(&rbd_dev->lock_rwsem);
+       } else {
+               mutex_lock(&img_req->state_mutex);
+               done = rbd_img_advance(img_req, result);
+               mutex_unlock(&img_req->state_mutex);
        }
 
-       rbd_warn(img_req->rbd_dev,
-                "%s at objno %llu %llu~%llu result %d xferred %llu",
-                obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
-                obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result,
-                obj_req->xferred);
-       if (!img_req->result) {
-               img_req->result = obj_req->result;
-               img_req->xferred = 0;
+       if (done && *result) {
+               rbd_assert(*result < 0);
+               rbd_warn(rbd_dev, "%s%s result %d",
+                     test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "",
+                     obj_op_name(img_req->op_type), *result);
        }
+       return done;
 }
 
-static void rbd_img_end_child_request(struct rbd_img_request *img_req)
-{
-       struct rbd_obj_request *obj_req = img_req->obj_request;
-
-       rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags));
-       rbd_assert((!img_req->result &&
-                   img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) ||
-                  (img_req->result < 0 && !img_req->xferred));
-
-       obj_req->result = img_req->result;
-       obj_req->xferred = img_req->xferred;
-       rbd_img_request_put(img_req);
-}
-
-static void rbd_img_end_request(struct rbd_img_request *img_req)
-{
-       rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
-       rbd_assert((!img_req->result &&
-                   img_req->xferred == blk_rq_bytes(img_req->rq)) ||
-                  (img_req->result < 0 && !img_req->xferred));
-
-       blk_mq_end_request(img_req->rq,
-                          errno_to_blk_status(img_req->result));
-       rbd_img_request_put(img_req);
-}
-
-static void rbd_obj_handle_request(struct rbd_obj_request *obj_req)
+static void rbd_img_handle_request(struct rbd_img_request *img_req, int result)
 {
-       struct rbd_img_request *img_req;
-
 again:
-       if (!__rbd_obj_handle_request(obj_req))
-               return;
-
-       img_req = obj_req->img_request;
-       spin_lock(&img_req->completion_lock);
-       rbd_obj_end_request(obj_req);
-       rbd_assert(img_req->pending_count);
-       if (--img_req->pending_count) {
-               spin_unlock(&img_req->completion_lock);
+       if (!__rbd_img_handle_request(img_req, &result))
                return;
-       }
 
-       spin_unlock(&img_req->completion_lock);
        if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
-               obj_req = img_req->obj_request;
-               rbd_img_end_child_request(img_req);
-               goto again;
+               struct rbd_obj_request *obj_req = img_req->obj_request;
+
+               rbd_img_request_put(img_req);
+               if (__rbd_obj_handle_request(obj_req, &result)) {
+                       img_req = obj_req->img_request;
+                       goto again;
+               }
+       } else {
+               struct request *rq = img_req->rq;
+
+               rbd_img_request_put(img_req);
+               blk_mq_end_request(rq, errno_to_blk_status(result));
        }
-       rbd_img_end_request(img_req);
 }
 
 static const struct rbd_client_id rbd_empty_cid;
@@ -2839,6 +3772,7 @@ static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
 {
        struct rbd_client_id cid = rbd_get_cid(rbd_dev);
 
+       rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
        strcpy(rbd_dev->lock_cookie, cookie);
        rbd_set_owner_cid(rbd_dev, &cid);
        queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
@@ -2863,7 +3797,6 @@ static int rbd_lock(struct rbd_device *rbd_dev)
        if (ret)
                return ret;
 
-       rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
        __rbd_lock(rbd_dev, cookie);
        return 0;
 }
@@ -2882,7 +3815,7 @@ static void rbd_unlock(struct rbd_device *rbd_dev)
        ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
                              RBD_LOCK_NAME, rbd_dev->lock_cookie);
        if (ret && ret != -ENOENT)
-               rbd_warn(rbd_dev, "failed to unlock: %d", ret);
+               rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
 
        /* treat errors as the image is unlocked */
        rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
@@ -3009,15 +3942,34 @@ e_inval:
        goto out;
 }
 
-static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
+/*
+ * Either image request state machine(s) or rbd_add_acquire_lock()
+ * (i.e. "rbd map").
+ */
+static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
 {
-       dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
+       struct rbd_img_request *img_req;
+
+       dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
+       lockdep_assert_held_write(&rbd_dev->lock_rwsem);
 
        cancel_delayed_work(&rbd_dev->lock_dwork);
-       if (wake_all)
-               wake_up_all(&rbd_dev->lock_waitq);
-       else
-               wake_up(&rbd_dev->lock_waitq);
+       if (!completion_done(&rbd_dev->acquire_wait)) {
+               rbd_assert(list_empty(&rbd_dev->acquiring_list) &&
+                          list_empty(&rbd_dev->running_list));
+               rbd_dev->acquire_err = result;
+               complete_all(&rbd_dev->acquire_wait);
+               return;
+       }
+
+       list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) {
+               mutex_lock(&img_req->state_mutex);
+               rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK);
+               rbd_img_schedule(img_req, result);
+               mutex_unlock(&img_req->state_mutex);
+       }
+
+       list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list);
 }
 
 static int get_lock_owner_info(struct rbd_device *rbd_dev,
@@ -3132,13 +4084,10 @@ static int rbd_try_lock(struct rbd_device *rbd_dev)
                        goto again;
 
                ret = find_watcher(rbd_dev, lockers);
-               if (ret) {
-                       if (ret > 0)
-                               ret = 0; /* have to request lock */
-                       goto out;
-               }
+               if (ret)
+                       goto out; /* request lock or error */
 
-               rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
+               rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
                         ENTITY_NAME(lockers[0].id.name));
 
                ret = ceph_monc_blacklist_add(&client->monc,
@@ -3165,53 +4114,90 @@ out:
        return ret;
 }
 
+static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
+{
+       int ret;
+
+       if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
+               ret = rbd_object_map_open(rbd_dev);
+               if (ret)
+                       return ret;
+       }
+
+       return 0;
+}
+
 /*
- * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
+ * Return:
+ *   0 - lock acquired
+ *   1 - caller should call rbd_request_lock()
+ *  <0 - error
  */
-static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
-                                               int *pret)
+static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
 {
-       enum rbd_lock_state lock_state;
+       int ret;
 
        down_read(&rbd_dev->lock_rwsem);
        dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
             rbd_dev->lock_state);
        if (__rbd_is_lock_owner(rbd_dev)) {
-               lock_state = rbd_dev->lock_state;
                up_read(&rbd_dev->lock_rwsem);
-               return lock_state;
+               return 0;
        }
 
        up_read(&rbd_dev->lock_rwsem);
        down_write(&rbd_dev->lock_rwsem);
        dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
             rbd_dev->lock_state);
-       if (!__rbd_is_lock_owner(rbd_dev)) {
-               *pret = rbd_try_lock(rbd_dev);
-               if (*pret)
-                       rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
+       if (__rbd_is_lock_owner(rbd_dev)) {
+               up_write(&rbd_dev->lock_rwsem);
+               return 0;
+       }
+
+       ret = rbd_try_lock(rbd_dev);
+       if (ret < 0) {
+               rbd_warn(rbd_dev, "failed to lock header: %d", ret);
+               if (ret == -EBLACKLISTED)
+                       goto out;
+
+               ret = 1; /* request lock anyway */
+       }
+       if (ret > 0) {
+               up_write(&rbd_dev->lock_rwsem);
+               return ret;
+       }
+
+       rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
+       rbd_assert(list_empty(&rbd_dev->running_list));
+
+       ret = rbd_post_acquire_action(rbd_dev);
+       if (ret) {
+               rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
+               /*
+                * Can't stay in RBD_LOCK_STATE_LOCKED because
+                * rbd_lock_add_request() would let the request through,
+                * assuming that e.g. object map is locked and loaded.
+                */
+               rbd_unlock(rbd_dev);
        }
 
-       lock_state = rbd_dev->lock_state;
+out:
+       wake_lock_waiters(rbd_dev, ret);
        up_write(&rbd_dev->lock_rwsem);
-       return lock_state;
+       return ret;
 }
 
 static void rbd_acquire_lock(struct work_struct *work)
 {
        struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
                                            struct rbd_device, lock_dwork);
-       enum rbd_lock_state lock_state;
-       int ret = 0;
+       int ret;
 
        dout("%s rbd_dev %p\n", __func__, rbd_dev);
 again:
-       lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
-       if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
-               if (lock_state == RBD_LOCK_STATE_LOCKED)
-                       wake_requests(rbd_dev, true);
-               dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
-                    rbd_dev, lock_state, ret);
+       ret = rbd_try_acquire_lock(rbd_dev);
+       if (ret <= 0) {
+               dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret);
                return;
        }
 
@@ -3220,16 +4206,9 @@ again:
                goto again; /* treat this as a dead client */
        } else if (ret == -EROFS) {
                rbd_warn(rbd_dev, "peer will not release lock");
-               /*
-                * If this is rbd_add_acquire_lock(), we want to fail
-                * immediately -- reuse BLACKLISTED flag.  Otherwise we
-                * want to block.
-                */
-               if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
-                       set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
-                       /* wake "rbd map --exclusive" process */
-                       wake_requests(rbd_dev, false);
-               }
+               down_write(&rbd_dev->lock_rwsem);
+               wake_lock_waiters(rbd_dev, ret);
+               up_write(&rbd_dev->lock_rwsem);
        } else if (ret < 0) {
                rbd_warn(rbd_dev, "error requesting lock: %d", ret);
                mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
@@ -3246,43 +4225,67 @@ again:
        }
 }
 
-/*
- * lock_rwsem must be held for write
- */
-static bool rbd_release_lock(struct rbd_device *rbd_dev)
+static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
 {
-       dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
-            rbd_dev->lock_state);
+       bool need_wait;
+
+       dout("%s rbd_dev %p\n", __func__, rbd_dev);
+       lockdep_assert_held_write(&rbd_dev->lock_rwsem);
+
        if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
                return false;
 
-       rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
-       downgrade_write(&rbd_dev->lock_rwsem);
        /*
         * Ensure that all in-flight IO is flushed.
-        *
-        * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
-        * may be shared with other devices.
         */
-       ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
+       rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
+       rbd_assert(!completion_done(&rbd_dev->releasing_wait));
+       need_wait = !list_empty(&rbd_dev->running_list);
+       downgrade_write(&rbd_dev->lock_rwsem);
+       if (need_wait)
+               wait_for_completion(&rbd_dev->releasing_wait);
        up_read(&rbd_dev->lock_rwsem);
 
        down_write(&rbd_dev->lock_rwsem);
-       dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
-            rbd_dev->lock_state);
        if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
                return false;
 
+       rbd_assert(list_empty(&rbd_dev->running_list));
+       return true;
+}
+
+static void rbd_pre_release_action(struct rbd_device *rbd_dev)
+{
+       if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
+               rbd_object_map_close(rbd_dev);
+}
+
+static void __rbd_release_lock(struct rbd_device *rbd_dev)
+{
+       rbd_assert(list_empty(&rbd_dev->running_list));
+
+       rbd_pre_release_action(rbd_dev);
        rbd_unlock(rbd_dev);
+}
+
+/*
+ * lock_rwsem must be held for write
+ */
+static void rbd_release_lock(struct rbd_device *rbd_dev)
+{
+       if (!rbd_quiesce_lock(rbd_dev))
+               return;
+
+       __rbd_release_lock(rbd_dev);
+
        /*
         * Give others a chance to grab the lock - we would re-acquire
-        * almost immediately if we got new IO during ceph_osdc_sync()
-        * otherwise.  We need to ack our own notifications, so this
-        * lock_dwork will be requeued from rbd_wait_state_locked()
-        * after wake_requests() in rbd_handle_released_lock().
+        * almost immediately if we got new IO while draining the running
+        * list otherwise.  We need to ack our own notifications, so this
+        * lock_dwork will be requeued from rbd_handle_released_lock() by
+        * way of maybe_kick_acquire().
         */
        cancel_delayed_work(&rbd_dev->lock_dwork);
-       return true;
 }
 
 static void rbd_release_lock_work(struct work_struct *work)
@@ -3295,6 +4298,23 @@ static void rbd_release_lock_work(struct work_struct *work)
        up_write(&rbd_dev->lock_rwsem);
 }
 
+static void maybe_kick_acquire(struct rbd_device *rbd_dev)
+{
+       bool have_requests;
+
+       dout("%s rbd_dev %p\n", __func__, rbd_dev);
+       if (__rbd_is_lock_owner(rbd_dev))
+               return;
+
+       spin_lock(&rbd_dev->lock_lists_lock);
+       have_requests = !list_empty(&rbd_dev->acquiring_list);
+       spin_unlock(&rbd_dev->lock_lists_lock);
+       if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) {
+               dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev);
+               mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
+       }
+}
+
 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
                                     void **p)
 {
@@ -3324,8 +4344,7 @@ static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
                down_read(&rbd_dev->lock_rwsem);
        }
 
-       if (!__rbd_is_lock_owner(rbd_dev))
-               wake_requests(rbd_dev, false);
+       maybe_kick_acquire(rbd_dev);
        up_read(&rbd_dev->lock_rwsem);
 }
 
@@ -3357,8 +4376,7 @@ static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
                down_read(&rbd_dev->lock_rwsem);
        }
 
-       if (!__rbd_is_lock_owner(rbd_dev))
-               wake_requests(rbd_dev, false);
+       maybe_kick_acquire(rbd_dev);
        up_read(&rbd_dev->lock_rwsem);
 }
 
@@ -3608,7 +4626,6 @@ static void cancel_tasks_sync(struct rbd_device *rbd_dev)
 
 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
 {
-       WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
        cancel_tasks_sync(rbd_dev);
 
        mutex_lock(&rbd_dev->watch_mutex);
@@ -3630,7 +4647,8 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
        char cookie[32];
        int ret;
 
-       WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
+       if (!rbd_quiesce_lock(rbd_dev))
+               return;
 
        format_lock_cookie(rbd_dev, cookie);
        ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
@@ -3646,11 +4664,11 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
                 * Lock cookie cannot be updated on older OSDs, so do
                 * a manual release and queue an acquire.
                 */
-               if (rbd_release_lock(rbd_dev))
-                       queue_delayed_work(rbd_dev->task_wq,
-                                          &rbd_dev->lock_dwork, 0);
+               __rbd_release_lock(rbd_dev);
+               queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
        } else {
                __rbd_lock(rbd_dev, cookie);
+               wake_lock_waiters(rbd_dev, 0);
        }
 }
 
@@ -3671,15 +4689,18 @@ static void rbd_reregister_watch(struct work_struct *work)
        ret = __rbd_register_watch(rbd_dev);
        if (ret) {
                rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
-               if (ret == -EBLACKLISTED || ret == -ENOENT) {
-                       set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
-                       wake_requests(rbd_dev, true);
-               } else {
+               if (ret != -EBLACKLISTED && ret != -ENOENT) {
                        queue_delayed_work(rbd_dev->task_wq,
                                           &rbd_dev->watch_dwork,
                                           RBD_RETRY_DELAY);
+                       mutex_unlock(&rbd_dev->watch_mutex);
+                       return;
                }
+
                mutex_unlock(&rbd_dev->watch_mutex);
+               down_write(&rbd_dev->lock_rwsem);
+               wake_lock_waiters(rbd_dev, ret);
+               up_write(&rbd_dev->lock_rwsem);
                return;
        }
 
@@ -3742,7 +4763,7 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
 
        ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
                             CEPH_OSD_FLAG_READ, req_page, outbound_size,
-                            reply_page, &inbound_size);
+                            &reply_page, &inbound_size);
        if (!ret) {
                memcpy(inbound, page_address(reply_page), inbound_size);
                ret = inbound_size;
@@ -3754,54 +4775,6 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
        return ret;
 }
 
-/*
- * lock_rwsem must be held for read
- */
-static int rbd_wait_state_locked(struct rbd_device *rbd_dev, bool may_acquire)
-{
-       DEFINE_WAIT(wait);
-       unsigned long timeout;
-       int ret = 0;
-
-       if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
-               return -EBLACKLISTED;
-
-       if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
-               return 0;
-
-       if (!may_acquire) {
-               rbd_warn(rbd_dev, "exclusive lock required");
-               return -EROFS;
-       }
-
-       do {
-               /*
-                * Note the use of mod_delayed_work() in rbd_acquire_lock()
-                * and cancel_delayed_work() in wake_requests().
-                */
-               dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
-               queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
-               prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
-                                         TASK_UNINTERRUPTIBLE);
-               up_read(&rbd_dev->lock_rwsem);
-               timeout = schedule_timeout(ceph_timeout_jiffies(
-                                               rbd_dev->opts->lock_timeout));
-               down_read(&rbd_dev->lock_rwsem);
-               if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
-                       ret = -EBLACKLISTED;
-                       break;
-               }
-               if (!timeout) {
-                       rbd_warn(rbd_dev, "timed out waiting for lock");
-                       ret = -ETIMEDOUT;
-                       break;
-               }
-       } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
-
-       finish_wait(&rbd_dev->lock_waitq, &wait);
-       return ret;
-}
-
 static void rbd_queue_workfn(struct work_struct *work)
 {
        struct request *rq = blk_mq_rq_from_pdu(work);
@@ -3812,7 +4785,6 @@ static void rbd_queue_workfn(struct work_struct *work)
        u64 length = blk_rq_bytes(rq);
        enum obj_operation_type op_type;
        u64 mapping_size;
-       bool must_be_locked;
        int result;
 
        switch (req_op(rq)) {
@@ -3886,21 +4858,10 @@ static void rbd_queue_workfn(struct work_struct *work)
                goto err_rq;
        }
 
-       must_be_locked =
-           (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
-           (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
-       if (must_be_locked) {
-               down_read(&rbd_dev->lock_rwsem);
-               result = rbd_wait_state_locked(rbd_dev,
-                                              !rbd_dev->opts->exclusive);
-               if (result)
-                       goto err_unlock;
-       }
-
        img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
        if (!img_request) {
                result = -ENOMEM;
-               goto err_unlock;
+               goto err_rq;
        }
        img_request->rq = rq;
        snapc = NULL; /* img_request consumes a ref */
@@ -3910,19 +4871,14 @@ static void rbd_queue_workfn(struct work_struct *work)
        else
                result = rbd_img_fill_from_bio(img_request, offset, length,
                                               rq->bio);
-       if (result || !img_request->pending_count)
+       if (result)
                goto err_img_request;
 
-       rbd_img_request_submit(img_request);
-       if (must_be_locked)
-               up_read(&rbd_dev->lock_rwsem);
+       rbd_img_handle_request(img_request, 0);
        return;
 
 err_img_request:
        rbd_img_request_put(img_request);
-err_unlock:
-       if (must_be_locked)
-               up_read(&rbd_dev->lock_rwsem);
 err_rq:
        if (result)
                rbd_warn(rbd_dev, "%s %llx at %llx result %d",
@@ -4589,7 +5545,13 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
        INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
        INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
        INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
-       init_waitqueue_head(&rbd_dev->lock_waitq);
+       spin_lock_init(&rbd_dev->lock_lists_lock);
+       INIT_LIST_HEAD(&rbd_dev->acquiring_list);
+       INIT_LIST_HEAD(&rbd_dev->running_list);
+       init_completion(&rbd_dev->acquire_wait);
+       init_completion(&rbd_dev->releasing_wait);
+
+       spin_lock_init(&rbd_dev->object_map_lock);
 
        rbd_dev->dev.bus = &rbd_bus_type;
        rbd_dev->dev.type = &rbd_device_type;
@@ -4772,6 +5734,32 @@ static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
                                                &rbd_dev->header.features);
 }
 
+/*
+ * These are generic image flags, but since they are used only for
+ * object map, store them in rbd_dev->object_map_flags.
+ *
+ * For the same reason, this function is called only on object map
+ * (re)load and not on header refresh.
+ */
+static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev)
+{
+       __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
+       __le64 flags;
+       int ret;
+
+       ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
+                                 &rbd_dev->header_oloc, "get_flags",
+                                 &snapid, sizeof(snapid),
+                                 &flags, sizeof(flags));
+       if (ret < 0)
+               return ret;
+       if (ret < sizeof(flags))
+               return -EBADMSG;
+
+       rbd_dev->object_map_flags = le64_to_cpu(flags);
+       return 0;
+}
+
 struct parent_image_info {
        u64             pool_id;
        const char      *pool_ns;
@@ -4829,7 +5817,7 @@ static int __get_parent_info(struct rbd_device *rbd_dev,
 
        ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
                             "rbd", "parent_get", CEPH_OSD_FLAG_READ,
-                            req_page, sizeof(u64), reply_page, &reply_len);
+                            req_page, sizeof(u64), &reply_page, &reply_len);
        if (ret)
                return ret == -EOPNOTSUPP ? 1 : ret;
 
@@ -4841,7 +5829,7 @@ static int __get_parent_info(struct rbd_device *rbd_dev,
 
        ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
                             "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
-                            req_page, sizeof(u64), reply_page, &reply_len);
+                            req_page, sizeof(u64), &reply_page, &reply_len);
        if (ret)
                return ret;
 
@@ -4872,7 +5860,7 @@ static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
 
        ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
                             "rbd", "get_parent", CEPH_OSD_FLAG_READ,
-                            req_page, sizeof(u64), reply_page, &reply_len);
+                            req_page, sizeof(u64), &reply_page, &reply_len);
        if (ret)
                return ret;
 
@@ -5605,28 +6593,49 @@ static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
 {
        down_write(&rbd_dev->lock_rwsem);
        if (__rbd_is_lock_owner(rbd_dev))
-               rbd_unlock(rbd_dev);
+               __rbd_release_lock(rbd_dev);
        up_write(&rbd_dev->lock_rwsem);
 }
 
+/*
+ * If the wait is interrupted, an error is returned even if the lock
+ * was successfully acquired.  rbd_dev_image_unlock() will release it
+ * if needed.
+ */
 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
 {
-       int ret;
+       long ret;
 
        if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
+               if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read)
+                       return 0;
+
                rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
                return -EINVAL;
        }
 
-       /* FIXME: "rbd map --exclusive" should be in interruptible */
-       down_read(&rbd_dev->lock_rwsem);
-       ret = rbd_wait_state_locked(rbd_dev, true);
-       up_read(&rbd_dev->lock_rwsem);
+       if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
+               return 0;
+
+       rbd_assert(!rbd_is_lock_owner(rbd_dev));
+       queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
+       ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait,
+                           ceph_timeout_jiffies(rbd_dev->opts->lock_timeout));
+       if (ret > 0)
+               ret = rbd_dev->acquire_err;
+       else if (!ret)
+               ret = -ETIMEDOUT;
+
        if (ret) {
-               rbd_warn(rbd_dev, "failed to acquire exclusive lock");
-               return -EROFS;
+               rbd_warn(rbd_dev, "failed to acquire exclusive lock: %ld", ret);
+               return ret;
        }
 
+       /*
+        * The lock may have been released by now, unless automatic lock
+        * transitions are disabled.
+        */
+       rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev));
        return 0;
 }
 
@@ -5724,6 +6733,8 @@ static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
        struct rbd_image_header *header;
 
        rbd_dev_parent_put(rbd_dev);
+       rbd_object_map_free(rbd_dev);
+       rbd_dev_mapping_clear(rbd_dev);
 
        /* Free dynamic fields from the header, then zero it out */
 
@@ -5824,7 +6835,6 @@ out_err:
 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
 {
        clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
-       rbd_dev_mapping_clear(rbd_dev);
        rbd_free_disk(rbd_dev);
        if (!single_major)
                unregister_blkdev(rbd_dev->major, rbd_dev->name);
@@ -5858,23 +6868,17 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
        if (ret)
                goto err_out_blkdev;
 
-       ret = rbd_dev_mapping_set(rbd_dev);
-       if (ret)
-               goto err_out_disk;
-
        set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
        set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only);
 
        ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
        if (ret)
-               goto err_out_mapping;
+               goto err_out_disk;
 
        set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
        up_write(&rbd_dev->header_rwsem);
        return 0;
 
-err_out_mapping:
-       rbd_dev_mapping_clear(rbd_dev);
 err_out_disk:
        rbd_free_disk(rbd_dev);
 err_out_blkdev:
@@ -5975,6 +6979,17 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
                goto err_out_probe;
        }
 
+       ret = rbd_dev_mapping_set(rbd_dev);
+       if (ret)
+               goto err_out_probe;
+
+       if (rbd_dev->spec->snap_id != CEPH_NOSNAP &&
+           (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) {
+               ret = rbd_object_map_load(rbd_dev);
+               if (ret)
+                       goto err_out_probe;
+       }
+
        if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
                ret = rbd_dev_v2_parent_info(rbd_dev);
                if (ret)
@@ -6071,11 +7086,9 @@ static ssize_t do_rbd_add(struct bus_type *bus,
        if (rc)
                goto err_out_image_probe;
 
-       if (rbd_dev->opts->exclusive) {
-               rc = rbd_add_acquire_lock(rbd_dev);
-               if (rc)
-                       goto err_out_device_setup;
-       }
+       rc = rbd_add_acquire_lock(rbd_dev);
+       if (rc)
+               goto err_out_image_lock;
 
        /* Everything's ready.  Announce the disk to the world. */
 
@@ -6101,7 +7114,6 @@ out:
 
 err_out_image_lock:
        rbd_dev_image_unlock(rbd_dev);
-err_out_device_setup:
        rbd_dev_device_release(rbd_dev);
 err_out_image_probe:
        rbd_dev_image_release(rbd_dev);