libceph: update osd request/reply encoding
[linux-2.6-block.git] / net / ceph / osd_client.c
index eb9a4447876481e9a4110a1e68ea351ce3ec86d9..d730dd4d8eb28bc421babb138f187605ee7c0ec6 100644 (file)
@@ -23,7 +23,7 @@
 
 static const struct ceph_connection_operations osd_con_ops;
 
-static void send_queued(struct ceph_osd_client *osdc);
+static void __send_queued(struct ceph_osd_client *osdc);
 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
 static void __register_request(struct ceph_osd_client *osdc,
                               struct ceph_osd_request *req);
@@ -32,64 +32,12 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
 static void __send_request(struct ceph_osd_client *osdc,
                           struct ceph_osd_request *req);
 
-static int op_needs_trail(int op)
-{
-       switch (op) {
-       case CEPH_OSD_OP_GETXATTR:
-       case CEPH_OSD_OP_SETXATTR:
-       case CEPH_OSD_OP_CMPXATTR:
-       case CEPH_OSD_OP_CALL:
-       case CEPH_OSD_OP_NOTIFY:
-               return 1;
-       default:
-               return 0;
-       }
-}
-
 static int op_has_extent(int op)
 {
        return (op == CEPH_OSD_OP_READ ||
                op == CEPH_OSD_OP_WRITE);
 }
 
-int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
-                       struct ceph_file_layout *layout,
-                       u64 snapid,
-                       u64 off, u64 *plen, u64 *bno,
-                       struct ceph_osd_request *req,
-                       struct ceph_osd_req_op *op)
-{
-       struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
-       u64 orig_len = *plen;
-       u64 objoff, objlen;    /* extent in object */
-       int r;
-
-       reqhead->snapid = cpu_to_le64(snapid);
-
-       /* object extent? */
-       r = ceph_calc_file_object_mapping(layout, off, plen, bno,
-                                         &objoff, &objlen);
-       if (r < 0)
-               return r;
-       if (*plen < orig_len)
-               dout(" skipping last %llu, final file extent %llu~%llu\n",
-                    orig_len - *plen, off, *plen);
-
-       if (op_has_extent(op->op)) {
-               op->extent.offset = objoff;
-               op->extent.length = objlen;
-       }
-       req->r_num_pages = calc_pages_for(off, *plen);
-       req->r_page_alignment = off & ~PAGE_MASK;
-       if (op->op == CEPH_OSD_OP_WRITE)
-               op->payload_len = *plen;
-
-       dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
-            *bno, objoff, objlen, req->r_num_pages);
-       return 0;
-}
-EXPORT_SYMBOL(ceph_calc_raw_layout);
-
 /*
  * Implement client access to distributed object storage cluster.
  *
@@ -115,20 +63,48 @@ EXPORT_SYMBOL(ceph_calc_raw_layout);
  *
  * fill osd op in request message.
  */
-static int calc_layout(struct ceph_osd_client *osdc,
-                      struct ceph_vino vino,
+static int calc_layout(struct ceph_vino vino,
                       struct ceph_file_layout *layout,
                       u64 off, u64 *plen,
                       struct ceph_osd_request *req,
                       struct ceph_osd_req_op *op)
 {
-       u64 bno;
+       u64 orig_len = *plen;
+       u64 bno = 0;
+       u64 objoff = 0;
+       u64 objlen = 0;
        int r;
 
-       r = ceph_calc_raw_layout(osdc, layout, vino.snap, off,
-                                plen, &bno, req, op);
+       /* object extent? */
+       r = ceph_calc_file_object_mapping(layout, off, orig_len, &bno,
+                                         &objoff, &objlen);
        if (r < 0)
                return r;
+       if (objlen < orig_len) {
+               *plen = objlen;
+               dout(" skipping last %llu, final file extent %llu~%llu\n",
+                    orig_len - *plen, off, *plen);
+       }
+
+       if (op_has_extent(op->op)) {
+               u32 osize = le32_to_cpu(layout->fl_object_size);
+               op->extent.offset = objoff;
+               op->extent.length = objlen;
+               if (op->extent.truncate_size <= off - objoff) {
+                       op->extent.truncate_size = 0;
+               } else {
+                       op->extent.truncate_size -= off - objoff;
+                       if (op->extent.truncate_size > osize)
+                               op->extent.truncate_size = osize;
+               }
+       }
+       req->r_num_pages = calc_pages_for(off, *plen);
+       req->r_page_alignment = off & ~PAGE_MASK;
+       if (op->op == CEPH_OSD_OP_WRITE)
+               op->payload_len = *plen;
+
+       dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
+            bno, objoff, objlen, req->r_num_pages);
 
        snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
        req->r_oid_len = strlen(req->r_oid);
@@ -148,25 +124,19 @@ void ceph_osdc_release_request(struct kref *kref)
        if (req->r_request)
                ceph_msg_put(req->r_request);
        if (req->r_con_filling_msg) {
-               dout("%s revoking pages %p from con %p\n", __func__,
-                    req->r_pages, req->r_con_filling_msg);
+               dout("%s revoking msg %p from con %p\n", __func__,
+                    req->r_reply, req->r_con_filling_msg);
                ceph_msg_revoke_incoming(req->r_reply);
                req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
+               req->r_con_filling_msg = NULL;
        }
        if (req->r_reply)
                ceph_msg_put(req->r_reply);
        if (req->r_own_pages)
                ceph_release_page_vector(req->r_pages,
                                         req->r_num_pages);
-#ifdef CONFIG_BLOCK
-       if (req->r_bio)
-               bio_put(req->r_bio);
-#endif
        ceph_put_snap_context(req->r_snapc);
-       if (req->r_trail) {
-               ceph_pagelist_release(req->r_trail);
-               kfree(req->r_trail);
-       }
+       ceph_pagelist_release(&req->r_trail);
        if (req->r_mempool)
                mempool_free(req, req->r_osdc->req_mempool);
        else
@@ -174,37 +144,25 @@ void ceph_osdc_release_request(struct kref *kref)
 }
 EXPORT_SYMBOL(ceph_osdc_release_request);
 
-static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
-{
-       int i = 0;
-
-       if (needs_trail)
-               *needs_trail = 0;
-       while (ops[i].op) {
-               if (needs_trail && op_needs_trail(ops[i].op))
-                       *needs_trail = 1;
-               i++;
-       }
-
-       return i;
-}
-
 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
-                                              int flags,
                                               struct ceph_snap_context *snapc,
-                                              struct ceph_osd_req_op *ops,
+                                              unsigned int num_ops,
                                               bool use_mempool,
-                                              gfp_t gfp_flags,
-                                              struct page **pages,
-                                              struct bio *bio)
+                                              gfp_t gfp_flags)
 {
        struct ceph_osd_request *req;
        struct ceph_msg *msg;
-       int needs_trail;
-       int num_op = get_num_ops(ops, &needs_trail);
-       size_t msg_size = sizeof(struct ceph_osd_request_head);
-
-       msg_size += num_op*sizeof(struct ceph_osd_op);
+       size_t msg_size;
+
+       msg_size = 4 + 4 + 8 + 8 + 4+8;
+       msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
+       msg_size += 1 + 8 + 4 + 4;     /* pg_t */
+       msg_size += 4 + MAX_OBJ_NAME_SIZE;
+       msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
+       msg_size += 8;  /* snapid */
+       msg_size += 8;  /* snap_seq */
+       msg_size += 8 * (snapc ? snapc->num_snaps : 0);  /* snaps */
+       msg_size += 4;
 
        if (use_mempool) {
                req = mempool_alloc(osdc->req_mempool, gfp_flags);
@@ -228,10 +186,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        INIT_LIST_HEAD(&req->r_req_lru_item);
        INIT_LIST_HEAD(&req->r_osd_item);
 
-       req->r_flags = flags;
-
-       WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
-
        /* create reply message */
        if (use_mempool)
                msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
@@ -244,20 +198,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        }
        req->r_reply = msg;
 
-       /* allocate space for the trailing data */
-       if (needs_trail) {
-               req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
-               if (!req->r_trail) {
-                       ceph_osdc_put_request(req);
-                       return NULL;
-               }
-               ceph_pagelist_init(req->r_trail);
-       }
+       ceph_pagelist_init(&req->r_trail);
 
        /* create request message; allow space for oid */
-       msg_size += MAX_OBJ_NAME_SIZE;
-       if (snapc)
-               msg_size += sizeof(u64) * snapc->num_snaps;
        if (use_mempool)
                msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
        else
@@ -270,13 +213,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        memset(msg->front.iov_base, 0, msg->front.iov_len);
 
        req->r_request = msg;
-       req->r_pages = pages;
-#ifdef CONFIG_BLOCK
-       if (bio) {
-               req->r_bio = bio;
-               bio_get(req->r_bio);
-       }
-#endif
 
        return req;
 }
@@ -289,6 +225,8 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
        dst->op = cpu_to_le16(src->op);
 
        switch (src->op) {
+       case CEPH_OSD_OP_STAT:
+               break;
        case CEPH_OSD_OP_READ:
        case CEPH_OSD_OP_WRITE:
                dst->extent.offset =
@@ -300,52 +238,20 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
                dst->extent.truncate_seq =
                        cpu_to_le32(src->extent.truncate_seq);
                break;
-
-       case CEPH_OSD_OP_GETXATTR:
-       case CEPH_OSD_OP_SETXATTR:
-       case CEPH_OSD_OP_CMPXATTR:
-               BUG_ON(!req->r_trail);
-
-               dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
-               dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
-               dst->xattr.cmp_op = src->xattr.cmp_op;
-               dst->xattr.cmp_mode = src->xattr.cmp_mode;
-               ceph_pagelist_append(req->r_trail, src->xattr.name,
-                                    src->xattr.name_len);
-               ceph_pagelist_append(req->r_trail, src->xattr.val,
-                                    src->xattr.value_len);
-               break;
        case CEPH_OSD_OP_CALL:
-               BUG_ON(!req->r_trail);
-
                dst->cls.class_len = src->cls.class_len;
                dst->cls.method_len = src->cls.method_len;
                dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
 
-               ceph_pagelist_append(req->r_trail, src->cls.class_name,
+               ceph_pagelist_append(&req->r_trail, src->cls.class_name,
                                     src->cls.class_len);
-               ceph_pagelist_append(req->r_trail, src->cls.method_name,
+               ceph_pagelist_append(&req->r_trail, src->cls.method_name,
                                     src->cls.method_len);
-               ceph_pagelist_append(req->r_trail, src->cls.indata,
+               ceph_pagelist_append(&req->r_trail, src->cls.indata,
                                     src->cls.indata_len);
                break;
-       case CEPH_OSD_OP_ROLLBACK:
-               dst->snap.snapid = cpu_to_le64(src->snap.snapid);
-               break;
        case CEPH_OSD_OP_STARTSYNC:
                break;
-       case CEPH_OSD_OP_NOTIFY:
-               {
-                       __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
-                       __le32 timeout = cpu_to_le32(src->watch.timeout);
-
-                       BUG_ON(!req->r_trail);
-
-                       ceph_pagelist_append(req->r_trail,
-                                               &prot_ver, sizeof(prot_ver));
-                       ceph_pagelist_append(req->r_trail,
-                                               &timeout, sizeof(timeout));
-               }
        case CEPH_OSD_OP_NOTIFY_ACK:
        case CEPH_OSD_OP_WATCH:
                dst->watch.cookie = cpu_to_le64(src->watch.cookie);
@@ -356,6 +262,64 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
                pr_err("unrecognized osd opcode %d\n", dst->op);
                WARN_ON(1);
                break;
+       case CEPH_OSD_OP_MAPEXT:
+       case CEPH_OSD_OP_MASKTRUNC:
+       case CEPH_OSD_OP_SPARSE_READ:
+       case CEPH_OSD_OP_NOTIFY:
+       case CEPH_OSD_OP_ASSERT_VER:
+       case CEPH_OSD_OP_WRITEFULL:
+       case CEPH_OSD_OP_TRUNCATE:
+       case CEPH_OSD_OP_ZERO:
+       case CEPH_OSD_OP_DELETE:
+       case CEPH_OSD_OP_APPEND:
+       case CEPH_OSD_OP_SETTRUNC:
+       case CEPH_OSD_OP_TRIMTRUNC:
+       case CEPH_OSD_OP_TMAPUP:
+       case CEPH_OSD_OP_TMAPPUT:
+       case CEPH_OSD_OP_TMAPGET:
+       case CEPH_OSD_OP_CREATE:
+       case CEPH_OSD_OP_ROLLBACK:
+       case CEPH_OSD_OP_OMAPGETKEYS:
+       case CEPH_OSD_OP_OMAPGETVALS:
+       case CEPH_OSD_OP_OMAPGETHEADER:
+       case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
+       case CEPH_OSD_OP_MODE_RD:
+       case CEPH_OSD_OP_OMAPSETVALS:
+       case CEPH_OSD_OP_OMAPSETHEADER:
+       case CEPH_OSD_OP_OMAPCLEAR:
+       case CEPH_OSD_OP_OMAPRMKEYS:
+       case CEPH_OSD_OP_OMAP_CMP:
+       case CEPH_OSD_OP_CLONERANGE:
+       case CEPH_OSD_OP_ASSERT_SRC_VERSION:
+       case CEPH_OSD_OP_SRC_CMPXATTR:
+       case CEPH_OSD_OP_GETXATTR:
+       case CEPH_OSD_OP_GETXATTRS:
+       case CEPH_OSD_OP_CMPXATTR:
+       case CEPH_OSD_OP_SETXATTR:
+       case CEPH_OSD_OP_SETXATTRS:
+       case CEPH_OSD_OP_RESETXATTRS:
+       case CEPH_OSD_OP_RMXATTR:
+       case CEPH_OSD_OP_PULL:
+       case CEPH_OSD_OP_PUSH:
+       case CEPH_OSD_OP_BALANCEREADS:
+       case CEPH_OSD_OP_UNBALANCEREADS:
+       case CEPH_OSD_OP_SCRUB:
+       case CEPH_OSD_OP_SCRUB_RESERVE:
+       case CEPH_OSD_OP_SCRUB_UNRESERVE:
+       case CEPH_OSD_OP_SCRUB_STOP:
+       case CEPH_OSD_OP_SCRUB_MAP:
+       case CEPH_OSD_OP_WRLOCK:
+       case CEPH_OSD_OP_WRUNLOCK:
+       case CEPH_OSD_OP_RDLOCK:
+       case CEPH_OSD_OP_RDUNLOCK:
+       case CEPH_OSD_OP_UPLOCK:
+       case CEPH_OSD_OP_DNLOCK:
+       case CEPH_OSD_OP_PGLS:
+       case CEPH_OSD_OP_PGLS_FILTER:
+               pr_err("unsupported osd opcode %s\n",
+                       ceph_osd_op_name(dst->op));
+               WARN_ON(1);
+               break;
        }
        dst->payload_len = cpu_to_le32(src->payload_len);
 }
@@ -365,75 +329,95 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
  *
  */
 void ceph_osdc_build_request(struct ceph_osd_request *req,
-                            u64 off, u64 *plen,
+                            u64 off, u64 len, unsigned int num_ops,
                             struct ceph_osd_req_op *src_ops,
-                            struct ceph_snap_context *snapc,
-                            struct timespec *mtime,
-                            const char *oid,
-                            int oid_len)
+                            struct ceph_snap_context *snapc, u64 snap_id,
+                            struct timespec *mtime)
 {
        struct ceph_msg *msg = req->r_request;
-       struct ceph_osd_request_head *head;
        struct ceph_osd_req_op *src_op;
-       struct ceph_osd_op *op;
        void *p;
-       int num_op = get_num_ops(src_ops, NULL);
-       size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
+       size_t msg_size;
        int flags = req->r_flags;
-       u64 data_len = 0;
+       u64 data_len;
        int i;
 
-       head = msg->front.iov_base;
-       op = (void *)(head + 1);
-       p = (void *)(op + num_op);
-
+       req->r_num_ops = num_ops;
+       req->r_snapid = snap_id;
        req->r_snapc = ceph_get_snap_context(snapc);
 
-       head->client_inc = cpu_to_le32(1); /* always, for now. */
-       head->flags = cpu_to_le32(flags);
-       if (flags & CEPH_OSD_FLAG_WRITE)
-               ceph_encode_timespec(&head->mtime, mtime);
-       head->num_ops = cpu_to_le16(num_op);
-
-
-       /* fill in oid */
-       head->object_len = cpu_to_le32(oid_len);
-       memcpy(p, oid, oid_len);
-       p += oid_len;
+       /* encode request */
+       msg->hdr.version = cpu_to_le16(4);
 
+       p = msg->front.iov_base;
+       ceph_encode_32(&p, 1);   /* client_inc  is always 1 */
+       req->r_request_osdmap_epoch = p;
+       p += 4;
+       req->r_request_flags = p;
+       p += 4;
+       if (req->r_flags & CEPH_OSD_FLAG_WRITE)
+               ceph_encode_timespec(p, mtime);
+       p += sizeof(struct ceph_timespec);
+       req->r_request_reassert_version = p;
+       p += sizeof(struct ceph_eversion); /* will get filled in */
+
+       /* oloc */
+       ceph_encode_8(&p, 4);
+       ceph_encode_8(&p, 4);
+       ceph_encode_32(&p, 8 + 4 + 4);
+       req->r_request_pool = p;
+       p += 8;
+       ceph_encode_32(&p, -1);  /* preferred */
+       ceph_encode_32(&p, 0);   /* key len */
+
+       ceph_encode_8(&p, 1);
+       req->r_request_pgid = p;
+       p += 8 + 4;
+       ceph_encode_32(&p, -1);  /* preferred */
+
+       /* oid */
+       ceph_encode_32(&p, req->r_oid_len);
+       memcpy(p, req->r_oid, req->r_oid_len);
+       dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
+       p += req->r_oid_len;
+
+       /* ops */
+       ceph_encode_16(&p, num_ops);
        src_op = src_ops;
-       while (src_op->op) {
-               osd_req_encode_op(req, op, src_op);
-               src_op++;
-               op++;
+       req->r_request_ops = p;
+       for (i = 0; i < num_ops; i++, src_op++) {
+               osd_req_encode_op(req, p, src_op);
+               p += sizeof(struct ceph_osd_op);
        }
 
-       if (req->r_trail)
-               data_len += req->r_trail->length;
-
-       if (snapc) {
-               head->snap_seq = cpu_to_le64(snapc->seq);
-               head->num_snaps = cpu_to_le32(snapc->num_snaps);
+       /* snaps */
+       ceph_encode_64(&p, req->r_snapid);
+       ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
+       ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
+       if (req->r_snapc) {
                for (i = 0; i < snapc->num_snaps; i++) {
-                       put_unaligned_le64(snapc->snaps[i], p);
-                       p += sizeof(u64);
+                       ceph_encode_64(&p, req->r_snapc->snaps[i]);
                }
        }
 
+       req->r_request_attempts = p;
+       p += 4;
+
+       data_len = req->r_trail.length;
        if (flags & CEPH_OSD_FLAG_WRITE) {
                req->r_request->hdr.data_off = cpu_to_le16(off);
-               req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
-       } else if (data_len) {
-               req->r_request->hdr.data_off = 0;
-               req->r_request->hdr.data_len = cpu_to_le32(data_len);
+               data_len += len;
        }
-
+       req->r_request->hdr.data_len = cpu_to_le32(data_len);
        req->r_request->page_alignment = req->r_page_alignment;
 
        BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
        msg_size = p - msg->front.iov_base;
        msg->front.iov_len = msg_size;
        msg->hdr.front_len = cpu_to_le32(msg_size);
+
+       dout("build_request msg_size was %d num_ops %d\n", (int)msg_size,
+            num_ops);
        return;
 }
 EXPORT_SYMBOL(ceph_osdc_build_request);
@@ -459,34 +443,33 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
                                               u32 truncate_seq,
                                               u64 truncate_size,
                                               struct timespec *mtime,
-                                              bool use_mempool, int num_reply,
+                                              bool use_mempool,
                                               int page_align)
 {
-       struct ceph_osd_req_op ops[3];
+       struct ceph_osd_req_op ops[2];
        struct ceph_osd_request *req;
+       unsigned int num_op = 1;
        int r;
 
+       memset(&ops, 0, sizeof ops);
+
        ops[0].op = opcode;
        ops[0].extent.truncate_seq = truncate_seq;
        ops[0].extent.truncate_size = truncate_size;
-       ops[0].payload_len = 0;
 
        if (do_sync) {
                ops[1].op = CEPH_OSD_OP_STARTSYNC;
-               ops[1].payload_len = 0;
-               ops[2].op = 0;
-       } else
-               ops[1].op = 0;
-
-       req = ceph_osdc_alloc_request(osdc, flags,
-                                        snapc, ops,
-                                        use_mempool,
-                                        GFP_NOFS, NULL, NULL);
+               num_op++;
+       }
+
+       req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool,
+                                       GFP_NOFS);
        if (!req)
                return ERR_PTR(-ENOMEM);
+       req->r_flags = flags;
 
        /* calculate max write size */
-       r = calc_layout(osdc, vino, layout, off, plen, req, ops);
+       r = calc_layout(vino, layout, off, plen, req, ops);
        if (r < 0)
                return ERR_PTR(r);
        req->r_file_layout = *layout;  /* keep a copy */
@@ -496,10 +479,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        req->r_num_pages = calc_pages_for(page_align, *plen);
        req->r_page_alignment = page_align;
 
-       ceph_osdc_build_request(req, off, plen, ops,
-                               snapc,
-                               mtime,
-                               req->r_oid, req->r_oid_len);
+       ceph_osdc_build_request(req, off, *plen, num_op, ops,
+                               snapc, vino.snap, mtime);
 
        return req;
 }
@@ -623,8 +604,8 @@ static void osd_reset(struct ceph_connection *con)
        down_read(&osdc->map_sem);
        mutex_lock(&osdc->request_mutex);
        __kick_osd_requests(osdc, osd);
+       __send_queued(osdc);
        mutex_unlock(&osdc->request_mutex);
-       send_queued(osdc);
        up_read(&osdc->map_sem);
 }
 
@@ -739,31 +720,35 @@ static void remove_old_osds(struct ceph_osd_client *osdc)
  */
 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
-       struct ceph_osd_request *req;
-       int ret = 0;
+       struct ceph_entity_addr *peer_addr;
 
        dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
        if (list_empty(&osd->o_requests) &&
            list_empty(&osd->o_linger_requests)) {
                __remove_osd(osdc, osd);
-               ret = -ENODEV;
-       } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
-                         &osd->o_con.peer_addr,
-                         sizeof(osd->o_con.peer_addr)) == 0 &&
-                  !ceph_con_opened(&osd->o_con)) {
+
+               return -ENODEV;
+       }
+
+       peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
+       if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
+                       !ceph_con_opened(&osd->o_con)) {
+               struct ceph_osd_request *req;
+
                dout(" osd addr hasn't changed and connection never opened,"
                     " letting msgr retry");
                /* touch each r_stamp for handle_timeout()'s benfit */
                list_for_each_entry(req, &osd->o_requests, r_osd_item)
                        req->r_stamp = jiffies;
-               ret = -EAGAIN;
-       } else {
-               ceph_con_close(&osd->o_con);
-               ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
-                             &osdc->osdmap->osd_addr[osd->o_osd]);
-               osd->o_incarnation++;
+
+               return -EAGAIN;
        }
-       return ret;
+
+       ceph_con_close(&osd->o_con);
+       ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
+       osd->o_incarnation++;
+
+       return 0;
 }
 
 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
@@ -961,20 +946,18 @@ EXPORT_SYMBOL(ceph_osdc_set_request_linger);
 static int __map_request(struct ceph_osd_client *osdc,
                         struct ceph_osd_request *req, int force_resend)
 {
-       struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
        struct ceph_pg pgid;
        int acting[CEPH_PG_MAX_SIZE];
        int o = -1, num = 0;
        int err;
 
        dout("map_request %p tid %lld\n", req, req->r_tid);
-       err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
+       err = ceph_calc_object_layout(&pgid, req->r_oid,
                                      &req->r_file_layout, osdc->osdmap);
        if (err) {
                list_move(&req->r_req_lru_item, &osdc->req_notarget);
                return err;
        }
-       pgid = reqhead->layout.ol_pgid;
        req->r_pgid = pgid;
 
        err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
@@ -991,8 +974,8 @@ static int __map_request(struct ceph_osd_client *osdc,
            (req->r_osd == NULL && o == -1))
                return 0;  /* no change */
 
-       dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n",
-            req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
+       dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
+            req->r_tid, pgid.pool, pgid.seed, o,
             req->r_osd ? req->r_osd->o_osd : -1);
 
        /* record full pg acting set */
@@ -1041,15 +1024,22 @@ out:
 static void __send_request(struct ceph_osd_client *osdc,
                           struct ceph_osd_request *req)
 {
-       struct ceph_osd_request_head *reqhead;
-
-       dout("send_request %p tid %llu to osd%d flags %d\n",
-            req, req->r_tid, req->r_osd->o_osd, req->r_flags);
+       void *p;
 
-       reqhead = req->r_request->front.iov_base;
-       reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
-       reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
-       reqhead->reassert_version = req->r_reassert_version;
+       dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
+            req, req->r_tid, req->r_osd->o_osd, req->r_flags,
+            (unsigned long long)req->r_pgid.pool, req->r_pgid.seed);
+
+       /* fill in message content that changes each time we send it */
+       put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
+       put_unaligned_le32(req->r_flags, req->r_request_flags);
+       put_unaligned_le64(req->r_pgid.pool, req->r_request_pool);
+       p = req->r_request_pgid;
+       ceph_encode_64(&p, req->r_pgid.pool);
+       ceph_encode_32(&p, req->r_pgid.seed);
+       put_unaligned_le64(1, req->r_request_attempts);  /* FIXME */
+       memcpy(req->r_request_reassert_version, &req->r_reassert_version,
+              sizeof(req->r_reassert_version));
 
        req->r_stamp = jiffies;
        list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
@@ -1062,16 +1052,13 @@ static void __send_request(struct ceph_osd_client *osdc,
 /*
  * Send any requests in the queue (req_unsent).
  */
-static void send_queued(struct ceph_osd_client *osdc)
+static void __send_queued(struct ceph_osd_client *osdc)
 {
        struct ceph_osd_request *req, *tmp;
 
-       dout("send_queued\n");
-       mutex_lock(&osdc->request_mutex);
-       list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
+       dout("__send_queued\n");
+       list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item)
                __send_request(osdc, req);
-       }
-       mutex_unlock(&osdc->request_mutex);
 }
 
 /*
@@ -1123,8 +1110,8 @@ static void handle_timeout(struct work_struct *work)
        }
 
        __schedule_osd_timeout(osdc);
+       __send_queued(osdc);
        mutex_unlock(&osdc->request_mutex);
-       send_queued(osdc);
        up_read(&osdc->map_sem);
 }
 
@@ -1152,6 +1139,26 @@ static void complete_request(struct ceph_osd_request *req)
        complete_all(&req->r_safe_completion);  /* fsync waiter */
 }
 
+static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid)
+{
+       __u8 v;
+
+       ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad);
+       v = ceph_decode_8(p);
+       if (v > 1) {
+               pr_warning("do not understand pg encoding %d > 1", v);
+               return -EINVAL;
+       }
+       pgid->pool = ceph_decode_64(p);
+       pgid->seed = ceph_decode_32(p);
+       *p += 4;
+       return 0;
+
+bad:
+       pr_warning("incomplete pg encoding");
+       return -EINVAL;
+}
+
 /*
  * handle osd op reply.  either call the callback if it is specified,
  * or do the completion to wake up the waiting thread.
@@ -1159,22 +1166,42 @@ static void complete_request(struct ceph_osd_request *req)
 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
                         struct ceph_connection *con)
 {
-       struct ceph_osd_reply_head *rhead = msg->front.iov_base;
+       void *p, *end;
        struct ceph_osd_request *req;
        u64 tid;
-       int numops, object_len, flags;
+       int object_len;
+       int numops, payload_len, flags;
        s32 result;
+       s32 retry_attempt;
+       struct ceph_pg pg;
+       int err;
+       u32 reassert_epoch;
+       u64 reassert_version;
+       u32 osdmap_epoch;
+       int i;
 
        tid = le64_to_cpu(msg->hdr.tid);
-       if (msg->front.iov_len < sizeof(*rhead))
-               goto bad;
-       numops = le32_to_cpu(rhead->num_ops);
-       object_len = le32_to_cpu(rhead->object_len);
-       result = le32_to_cpu(rhead->result);
-       if (msg->front.iov_len != sizeof(*rhead) + object_len +
-           numops * sizeof(struct ceph_osd_op))
+       dout("handle_reply %p tid %llu\n", msg, tid);
+
+       p = msg->front.iov_base;
+       end = p + msg->front.iov_len;
+
+       ceph_decode_need(&p, end, 4, bad);
+       object_len = ceph_decode_32(&p);
+       ceph_decode_need(&p, end, object_len, bad);
+       p += object_len;
+
+       err = __decode_pgid(&p, end, &pg);
+       if (err)
                goto bad;
-       dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
+
+       ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad);
+       flags = ceph_decode_64(&p);
+       result = ceph_decode_32(&p);
+       reassert_epoch = ceph_decode_32(&p);
+       reassert_version = ceph_decode_64(&p);
+       osdmap_epoch = ceph_decode_32(&p);
+
        /* lookup */
        mutex_lock(&osdc->request_mutex);
        req = __lookup_request(osdc, tid);
@@ -1184,7 +1211,38 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
                return;
        }
        ceph_osdc_get_request(req);
-       flags = le32_to_cpu(rhead->flags);
+
+       dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
+            req, result);
+
+       ceph_decode_need(&p, end, 4, bad);
+       numops = ceph_decode_32(&p);
+       if (numops > CEPH_OSD_MAX_OP)
+               goto bad_put;
+       if (numops != req->r_num_ops)
+               goto bad_put;
+       payload_len = 0;
+       ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad);
+       for (i = 0; i < numops; i++) {
+               struct ceph_osd_op *op = p;
+               int len;
+
+               len = le32_to_cpu(op->payload_len);
+               req->r_reply_op_len[i] = len;
+               dout(" op %d has %d bytes\n", i, len);
+               payload_len += len;
+               p += sizeof(*op);
+       }
+       if (payload_len != le32_to_cpu(msg->hdr.data_len)) {
+               pr_warning("sum of op payload lens %d != data_len %d",
+                          payload_len, le32_to_cpu(msg->hdr.data_len));
+               goto bad_put;
+       }
+
+       ceph_decode_need(&p, end, 4 + numops * 4, bad);
+       retry_attempt = ceph_decode_32(&p);
+       for (i = 0; i < numops; i++)
+               req->r_reply_op_result[i] = ceph_decode_32(&p);
 
        /*
         * if this connection filled our message, drop our reference now, to
@@ -1199,7 +1257,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        if (!req->r_got_reply) {
                unsigned int bytes;
 
-               req->r_result = le32_to_cpu(rhead->result);
+               req->r_result = result;
                bytes = le32_to_cpu(msg->hdr.data_len);
                dout("handle_reply result %d bytes %d\n", req->r_result,
                     bytes);
@@ -1207,7 +1265,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
                        req->r_result = bytes;
 
                /* in case this is a write and we need to replay, */
-               req->r_reassert_version = rhead->reassert_version;
+               req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch);
+               req->r_reassert_version.version = cpu_to_le64(reassert_version);
 
                req->r_got_reply = 1;
        } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
@@ -1242,10 +1301,11 @@ done:
        ceph_osdc_put_request(req);
        return;
 
+bad_put:
+       ceph_osdc_put_request(req);
 bad:
-       pr_err("corrupt osd_op_reply got %d %d expected %d\n",
-              (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
-              (int)sizeof(*rhead));
+       pr_err("corrupt osd_op_reply got %d %d\n",
+              (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
        ceph_msg_dump(msg);
 }
 
@@ -1462,7 +1522,9 @@ done:
        if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
                ceph_monc_request_next_osdmap(&osdc->client->monc);
 
-       send_queued(osdc);
+       mutex_lock(&osdc->request_mutex);
+       __send_queued(osdc);
+       mutex_unlock(&osdc->request_mutex);
        up_read(&osdc->map_sem);
        wake_up_all(&osdc->client->auth_wq);
        return;
@@ -1556,8 +1618,7 @@ static void __remove_event(struct ceph_osd_event *event)
 
 int ceph_osdc_create_event(struct ceph_osd_client *osdc,
                           void (*event_cb)(u64, u64, u8, void *),
-                          int one_shot, void *data,
-                          struct ceph_osd_event **pevent)
+                          void *data, struct ceph_osd_event **pevent)
 {
        struct ceph_osd_event *event;
 
@@ -1567,14 +1628,13 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc,
 
        dout("create_event %p\n", event);
        event->cb = event_cb;
-       event->one_shot = one_shot;
+       event->one_shot = 0;
        event->data = data;
        event->osdc = osdc;
        INIT_LIST_HEAD(&event->osd_node);
        RB_CLEAR_NODE(&event->node);
        kref_init(&event->kref);   /* one ref for us */
        kref_get(&event->kref);    /* one ref for the caller */
-       init_completion(&event->completion);
 
        spin_lock(&osdc->event_lock);
        event->cookie = ++osdc->event_count;
@@ -1610,7 +1670,6 @@ static void do_event_work(struct work_struct *work)
 
        dout("do_event_work completing %p\n", event);
        event->cb(ver, notify_id, opcode, event->data);
-       complete(&event->completion);
        dout("do_event_work completed %p\n", event);
        ceph_osdc_put_event(event);
        kfree(event_work);
@@ -1620,7 +1679,8 @@ static void do_event_work(struct work_struct *work)
 /*
  * Process osd watch notifications
  */
-void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+static void handle_watch_notify(struct ceph_osd_client *osdc,
+                               struct ceph_msg *msg)
 {
        void *p, *end;
        u8 proto_ver;
@@ -1641,9 +1701,8 @@ void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
        spin_lock(&osdc->event_lock);
        event = __find_event(osdc, cookie);
        if (event) {
+               BUG_ON(event->one_shot);
                get_event(event);
-               if (event->one_shot)
-                       __remove_event(event);
        }
        spin_unlock(&osdc->event_lock);
        dout("handle_watch_notify cookie %lld ver %lld event %p\n",
@@ -1668,7 +1727,6 @@ void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
        return;
 
 done_err:
-       complete(&event->completion);
        ceph_osdc_put_event(event);
        return;
 
@@ -1677,21 +1735,6 @@ bad:
        return;
 }
 
-int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
-{
-       int err;
-
-       dout("wait_event %p\n", event);
-       err = wait_for_completion_interruptible_timeout(&event->completion,
-                                                       timeout * HZ);
-       ceph_osdc_put_event(event);
-       if (err > 0)
-               err = 0;
-       dout("wait_event %p returns %d\n", event, err);
-       return err;
-}
-EXPORT_SYMBOL(ceph_osdc_wait_event);
-
 /*
  * Register request, send initial attempt.
  */
@@ -1706,7 +1749,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
 #ifdef CONFIG_BLOCK
        req->r_request->bio = req->r_bio;
 #endif
-       req->r_request->trail = req->r_trail;
+       req->r_request->trail = &req->r_trail;
 
        register_request(osdc, req);
 
@@ -1865,7 +1908,6 @@ out_mempool:
 out:
        return err;
 }
-EXPORT_SYMBOL(ceph_osdc_init);
 
 void ceph_osdc_stop(struct ceph_osd_client *osdc)
 {
@@ -1882,7 +1924,6 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
        ceph_msgpool_destroy(&osdc->msgpool_op);
        ceph_msgpool_destroy(&osdc->msgpool_op_reply);
 }
-EXPORT_SYMBOL(ceph_osdc_stop);
 
 /*
  * Read some contiguous pages.  If we cross a stripe boundary, shorten
@@ -1902,7 +1943,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
        req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
                                    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
                                    NULL, 0, truncate_seq, truncate_size, NULL,
-                                   false, 1, page_align);
+                                   false, page_align);
        if (IS_ERR(req))
                return PTR_ERR(req);
 
@@ -1931,8 +1972,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
                         u64 off, u64 len,
                         u32 truncate_seq, u64 truncate_size,
                         struct timespec *mtime,
-                        struct page **pages, int num_pages,
-                        int flags, int do_sync, bool nofail)
+                        struct page **pages, int num_pages)
 {
        struct ceph_osd_request *req;
        int rc = 0;
@@ -1941,11 +1981,10 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
        BUG_ON(vino.snap != CEPH_NOSNAP);
        req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
                                    CEPH_OSD_OP_WRITE,
-                                   flags | CEPH_OSD_FLAG_ONDISK |
-                                           CEPH_OSD_FLAG_WRITE,
-                                   snapc, do_sync,
+                                   CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
+                                   snapc, 0,
                                    truncate_seq, truncate_size, mtime,
-                                   nofail, 1, page_align);
+                                   true, page_align);
        if (IS_ERR(req))
                return PTR_ERR(req);
 
@@ -1954,7 +1993,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
        dout("writepages %llu~%llu (%d pages)\n", off, len,
             req->r_num_pages);
 
-       rc = ceph_osdc_start_request(osdc, req, nofail);
+       rc = ceph_osdc_start_request(osdc, req, true);
        if (!rc)
                rc = ceph_osdc_wait_request(osdc, req);
 
@@ -2047,7 +2086,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
        if (data_len > 0) {
                int want = calc_pages_for(req->r_page_alignment, data_len);
 
-               if (unlikely(req->r_num_pages < want)) {
+               if (req->r_pages && unlikely(req->r_num_pages < want)) {
                        pr_warning("tid %lld reply has %d bytes %d pages, we"
                                   " had only %d pages ready\n", tid, data_len,
                                   want, req->r_num_pages);