libceph: enable large, variable-sized OSD requests
[linux-2.6-block.git] / net / ceph / osd_client.c
index f93d0e893f3f50ff777e426de26374e07035ba24..ccd4e031fa3f20ea9200395f16b620e4357420a8 100644 (file)
@@ -338,9 +338,10 @@ static void ceph_osdc_release_request(struct kref *kref)
        ceph_put_snap_context(req->r_snapc);
        if (req->r_mempool)
                mempool_free(req, req->r_osdc->req_mempool);
-       else
+       else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
                kmem_cache_free(ceph_osd_request_cache, req);
-
+       else
+               kfree(req);
 }
 
 void ceph_osdc_get_request(struct ceph_osd_request *req)
@@ -369,18 +370,22 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        struct ceph_msg *msg;
        size_t msg_size;
 
-       BUILD_BUG_ON(CEPH_OSD_MAX_OP > U16_MAX);
-       BUG_ON(num_ops > CEPH_OSD_MAX_OP);
-
        if (use_mempool) {
+               BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
                req = mempool_alloc(osdc->req_mempool, gfp_flags);
-               memset(req, 0, sizeof(*req));
+       } else if (num_ops <= CEPH_OSD_SLAB_OPS) {
+               req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags);
        } else {
-               req = kmem_cache_zalloc(ceph_osd_request_cache, gfp_flags);
+               BUG_ON(num_ops > CEPH_OSD_MAX_OPS);
+               req = kmalloc(sizeof(*req) + num_ops * sizeof(req->r_ops[0]),
+                             gfp_flags);
        }
-       if (req == NULL)
+       if (unlikely(!req))
                return NULL;
 
+       /* req only, each op is zeroed in _osd_req_op_init() */
+       memset(req, 0, sizeof(*req));
+
        req->r_osdc = osdc;
        req->r_mempool = use_mempool;
        req->r_num_ops = num_ops;
@@ -398,12 +403,19 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        req->r_base_oloc.pool = -1;
        req->r_target_oloc.pool = -1;
 
+       msg_size = OSD_OPREPLY_FRONT_LEN;
+       if (num_ops > CEPH_OSD_SLAB_OPS) {
+               /* ceph_osd_op and rval */
+               msg_size += (num_ops - CEPH_OSD_SLAB_OPS) *
+                           (sizeof(struct ceph_osd_op) + 4);
+       }
+
        /* create reply message */
        if (use_mempool)
                msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
        else
-               msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
-                                  OSD_OPREPLY_FRONT_LEN, gfp_flags, true);
+               msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size,
+                                  gfp_flags, true);
        if (!msg) {
                ceph_osdc_put_request(req);
                return NULL;
@@ -1811,7 +1823,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 
        ceph_decode_need(&p, end, 4, bad_put);
        numops = ceph_decode_32(&p);
-       if (numops > CEPH_OSD_MAX_OP)
+       if (numops > CEPH_OSD_MAX_OPS)
                goto bad_put;
        if (numops != req->r_num_ops)
                goto bad_put;
@@ -2784,11 +2796,12 @@ EXPORT_SYMBOL(ceph_osdc_writepages);
 
 int ceph_osdc_setup(void)
 {
+       size_t size = sizeof(struct ceph_osd_request) +
+           CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op);
+
        BUG_ON(ceph_osd_request_cache);
-       ceph_osd_request_cache = kmem_cache_create("ceph_osd_request",
-                                       sizeof (struct ceph_osd_request),
-                                       __alignof__(struct ceph_osd_request),
-                                       0, NULL);
+       ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size,
+                                                  0, 0, NULL);
 
        return ceph_osd_request_cache ? 0 : -ENOMEM;
 }