a27167942a9285d8d842049cdf629cd3152ad77a
[linux-2.6-block.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define SECTOR_SHIFT    9
51 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN   32
59 #define RBD_MAX_OPT_LEN         1024
60
61 #define RBD_SNAP_HEAD_NAME      "-"
62
63 /*
64  * An RBD device name will be "rbd#", where the "rbd" comes from
65  * RBD_DRV_NAME above, and # is a unique integer identifier.
66  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67  * enough to hold all possible device names.
68  */
69 #define DEV_NAME_LEN            32
70 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74 /*
75  * block device image metadata (in-memory version)
76  */
77 struct rbd_image_header {
78         u64 image_size;
79         char *object_prefix;
80         __u8 obj_order;
81         __u8 crypt_type;
82         __u8 comp_type;
83         struct ceph_snap_context *snapc;
84         u64 snap_names_len;
85         u32 total_snaps;
86
87         char *snap_names;
88         u64 *snap_sizes;
89
90         u64 obj_version;
91 };
92
93 struct rbd_options {
94         int     notify_timeout;
95 };
96
97 /*
98  * an instance of the client.  multiple devices may share an rbd client.
99  */
100 struct rbd_client {
101         struct ceph_client      *client;
102         struct rbd_options      *rbd_opts;
103         struct kref             kref;
104         struct list_head        node;
105 };
106
107 /*
108  * a request completion status
109  */
110 struct rbd_req_status {
111         int done;
112         int rc;
113         u64 bytes;
114 };
115
116 /*
117  * a collection of requests
118  */
119 struct rbd_req_coll {
120         int                     total;
121         int                     num_done;
122         struct kref             kref;
123         struct rbd_req_status   status[0];
124 };
125
126 /*
127  * a single io request
128  */
129 struct rbd_request {
130         struct request          *rq;            /* blk layer request */
131         struct bio              *bio;           /* cloned bio */
132         struct page             **pages;        /* list of used pages */
133         u64                     len;
134         int                     coll_index;
135         struct rbd_req_coll     *coll;
136 };
137
138 struct rbd_snap {
139         struct  device          dev;
140         const char              *name;
141         u64                     size;
142         struct list_head        node;
143         u64                     id;
144 };
145
146 /*
147  * a single device
148  */
149 struct rbd_device {
150         int                     dev_id;         /* blkdev unique id */
151
152         int                     major;          /* blkdev assigned major */
153         struct gendisk          *disk;          /* blkdev's gendisk and rq */
154         struct request_queue    *q;
155
156         struct rbd_client       *rbd_client;
157
158         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159
160         spinlock_t              lock;           /* queue lock */
161
162         struct rbd_image_header header;
163         char                    *image_name;
164         size_t                  image_name_len;
165         char                    *header_name;
166         char                    *pool_name;
167         int                     pool_id;
168
169         struct ceph_osd_event   *watch_event;
170         struct ceph_osd_request *watch_request;
171
172         /* protects updating the header */
173         struct rw_semaphore     header_rwsem;
174         /* name of the snapshot this device reads from */
175         char                    *snap_name;
176         /* id of the snapshot this device reads from */
177         u64                     snap_id;        /* current snapshot id */
178         /* whether the snap_id this device reads from still exists */
179         bool                    snap_exists;
180         int                     read_only;
181
182         struct list_head        node;
183
184         /* list of snapshots */
185         struct list_head        snaps;
186
187         /* sysfs related */
188         struct device           dev;
189 };
190
191 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
192
193 static LIST_HEAD(rbd_dev_list);    /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
196 static LIST_HEAD(rbd_client_list);              /* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
198
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202                             struct device_attribute *attr,
203                             const char *buf,
204                             size_t count);
205 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
206
207 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208                        size_t count);
209 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210                           size_t count);
211
212 static struct bus_attribute rbd_bus_attrs[] = {
213         __ATTR(add, S_IWUSR, NULL, rbd_add),
214         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
215         __ATTR_NULL
216 };
217
218 static struct bus_type rbd_bus_type = {
219         .name           = "rbd",
220         .bus_attrs      = rbd_bus_attrs,
221 };
222
223 static void rbd_root_dev_release(struct device *dev)
224 {
225 }
226
227 static struct device rbd_root_dev = {
228         .init_name =    "rbd",
229         .release =      rbd_root_dev_release,
230 };
231
232
233 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
234 {
235         return get_device(&rbd_dev->dev);
236 }
237
238 static void rbd_put_dev(struct rbd_device *rbd_dev)
239 {
240         put_device(&rbd_dev->dev);
241 }
242
243 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
244
245 static int rbd_open(struct block_device *bdev, fmode_t mode)
246 {
247         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
248
249         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
250                 return -EROFS;
251
252         rbd_get_dev(rbd_dev);
253         set_device_ro(bdev, rbd_dev->read_only);
254
255         return 0;
256 }
257
258 static int rbd_release(struct gendisk *disk, fmode_t mode)
259 {
260         struct rbd_device *rbd_dev = disk->private_data;
261
262         rbd_put_dev(rbd_dev);
263
264         return 0;
265 }
266
267 static const struct block_device_operations rbd_bd_ops = {
268         .owner                  = THIS_MODULE,
269         .open                   = rbd_open,
270         .release                = rbd_release,
271 };
272
273 /*
274  * Initialize an rbd client instance.
275  * We own *ceph_opts.
276  */
277 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
278                                             struct rbd_options *rbd_opts)
279 {
280         struct rbd_client *rbdc;
281         int ret = -ENOMEM;
282
283         dout("rbd_client_create\n");
284         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
285         if (!rbdc)
286                 goto out_opt;
287
288         kref_init(&rbdc->kref);
289         INIT_LIST_HEAD(&rbdc->node);
290
291         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
292
293         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
294         if (IS_ERR(rbdc->client))
295                 goto out_mutex;
296         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
297
298         ret = ceph_open_session(rbdc->client);
299         if (ret < 0)
300                 goto out_err;
301
302         rbdc->rbd_opts = rbd_opts;
303
304         spin_lock(&rbd_client_list_lock);
305         list_add_tail(&rbdc->node, &rbd_client_list);
306         spin_unlock(&rbd_client_list_lock);
307
308         mutex_unlock(&ctl_mutex);
309
310         dout("rbd_client_create created %p\n", rbdc);
311         return rbdc;
312
313 out_err:
314         ceph_destroy_client(rbdc->client);
315 out_mutex:
316         mutex_unlock(&ctl_mutex);
317         kfree(rbdc);
318 out_opt:
319         if (ceph_opts)
320                 ceph_destroy_options(ceph_opts);
321         return ERR_PTR(ret);
322 }
323
324 /*
325  * Find a ceph client with specific addr and configuration.  If
326  * found, bump its reference count.
327  */
328 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
329 {
330         struct rbd_client *client_node;
331         bool found = false;
332
333         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
334                 return NULL;
335
336         spin_lock(&rbd_client_list_lock);
337         list_for_each_entry(client_node, &rbd_client_list, node) {
338                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
339                         kref_get(&client_node->kref);
340                         found = true;
341                         break;
342                 }
343         }
344         spin_unlock(&rbd_client_list_lock);
345
346         return found ? client_node : NULL;
347 }
348
349 /*
350  * mount options
351  */
352 enum {
353         Opt_notify_timeout,
354         Opt_last_int,
355         /* int args above */
356         Opt_last_string,
357         /* string args above */
358 };
359
360 static match_table_t rbd_opts_tokens = {
361         {Opt_notify_timeout, "notify_timeout=%d"},
362         /* int args above */
363         /* string args above */
364         {-1, NULL}
365 };
366
367 static int parse_rbd_opts_token(char *c, void *private)
368 {
369         struct rbd_options *rbd_opts = private;
370         substring_t argstr[MAX_OPT_ARGS];
371         int token, intval, ret;
372
373         token = match_token(c, rbd_opts_tokens, argstr);
374         if (token < 0)
375                 return -EINVAL;
376
377         if (token < Opt_last_int) {
378                 ret = match_int(&argstr[0], &intval);
379                 if (ret < 0) {
380                         pr_err("bad mount option arg (not int) "
381                                "at '%s'\n", c);
382                         return ret;
383                 }
384                 dout("got int token %d val %d\n", token, intval);
385         } else if (token > Opt_last_int && token < Opt_last_string) {
386                 dout("got string token %d val %s\n", token,
387                      argstr[0].from);
388         } else {
389                 dout("got token %d\n", token);
390         }
391
392         switch (token) {
393         case Opt_notify_timeout:
394                 rbd_opts->notify_timeout = intval;
395                 break;
396         default:
397                 BUG_ON(token);
398         }
399         return 0;
400 }
401
402 /*
403  * Get a ceph client with specific addr and configuration, if one does
404  * not exist create it.
405  */
406 static struct rbd_client *rbd_get_client(const char *mon_addr,
407                                          size_t mon_addr_len,
408                                          char *options)
409 {
410         struct rbd_client *rbdc;
411         struct ceph_options *ceph_opts;
412         struct rbd_options *rbd_opts;
413
414         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
415         if (!rbd_opts)
416                 return ERR_PTR(-ENOMEM);
417
418         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
419
420         ceph_opts = ceph_parse_options(options, mon_addr,
421                                         mon_addr + mon_addr_len,
422                                         parse_rbd_opts_token, rbd_opts);
423         if (IS_ERR(ceph_opts)) {
424                 kfree(rbd_opts);
425                 return ERR_CAST(ceph_opts);
426         }
427
428         rbdc = rbd_client_find(ceph_opts);
429         if (rbdc) {
430                 /* using an existing client */
431                 ceph_destroy_options(ceph_opts);
432                 kfree(rbd_opts);
433
434                 return rbdc;
435         }
436
437         rbdc = rbd_client_create(ceph_opts, rbd_opts);
438         if (IS_ERR(rbdc))
439                 kfree(rbd_opts);
440
441         return rbdc;
442 }
443
444 /*
445  * Destroy ceph client
446  *
447  * Caller must hold rbd_client_list_lock.
448  */
449 static void rbd_client_release(struct kref *kref)
450 {
451         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
452
453         dout("rbd_release_client %p\n", rbdc);
454         spin_lock(&rbd_client_list_lock);
455         list_del(&rbdc->node);
456         spin_unlock(&rbd_client_list_lock);
457
458         ceph_destroy_client(rbdc->client);
459         kfree(rbdc->rbd_opts);
460         kfree(rbdc);
461 }
462
463 /*
464  * Drop reference to ceph client node. If it's not referenced anymore, release
465  * it.
466  */
467 static void rbd_put_client(struct rbd_device *rbd_dev)
468 {
469         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
470         rbd_dev->rbd_client = NULL;
471 }
472
473 /*
474  * Destroy requests collection
475  */
476 static void rbd_coll_release(struct kref *kref)
477 {
478         struct rbd_req_coll *coll =
479                 container_of(kref, struct rbd_req_coll, kref);
480
481         dout("rbd_coll_release %p\n", coll);
482         kfree(coll);
483 }
484
485 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
486 {
487         size_t size;
488         u32 snap_count;
489
490         /* The header has to start with the magic rbd header text */
491         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
492                 return false;
493
494         /*
495          * The size of a snapshot header has to fit in a size_t, and
496          * that limits the number of snapshots.
497          */
498         snap_count = le32_to_cpu(ondisk->snap_count);
499         size = SIZE_MAX - sizeof (struct ceph_snap_context);
500         if (snap_count > size / sizeof (__le64))
501                 return false;
502
503         /*
504          * Not only that, but the size of the entire the snapshot
505          * header must also be representable in a size_t.
506          */
507         size -= snap_count * sizeof (__le64);
508         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
509                 return false;
510
511         return true;
512 }
513
514 /*
515  * Create a new header structure, translate header format from the on-disk
516  * header.
517  */
518 static int rbd_header_from_disk(struct rbd_image_header *header,
519                                  struct rbd_image_header_ondisk *ondisk)
520 {
521         u32 snap_count;
522         size_t len;
523         size_t size;
524
525         memset(header, 0, sizeof (*header));
526
527         snap_count = le32_to_cpu(ondisk->snap_count);
528
529         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
530         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
531         if (!header->object_prefix)
532                 return -ENOMEM;
533         memcpy(header->object_prefix, ondisk->object_prefix, len);
534         header->object_prefix[len] = '\0';
535
536         if (snap_count) {
537                 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
538                 BUG_ON(header->snap_names_len > (u64) SIZE_MAX);
539                 header->snap_names = kmalloc(header->snap_names_len,
540                                              GFP_KERNEL);
541                 if (!header->snap_names)
542                         goto out_err;
543
544                 size = snap_count * sizeof (*header->snap_sizes);
545                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
546                 if (!header->snap_sizes)
547                         goto out_err;
548         } else {
549                 WARN_ON(ondisk->snap_names_len);
550                 header->snap_names_len = 0;
551                 header->snap_names = NULL;
552                 header->snap_sizes = NULL;
553         }
554
555         header->image_size = le64_to_cpu(ondisk->image_size);
556         header->obj_order = ondisk->options.order;
557         header->crypt_type = ondisk->options.crypt_type;
558         header->comp_type = ondisk->options.comp_type;
559         header->total_snaps = snap_count;
560
561         size = sizeof (struct ceph_snap_context);
562         size += snap_count * sizeof (header->snapc->snaps[0]);
563         header->snapc = kzalloc(size, GFP_KERNEL);
564         if (!header->snapc)
565                 goto out_err;
566
567         atomic_set(&header->snapc->nref, 1);
568         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
569         header->snapc->num_snaps = snap_count;
570
571         /* Fill in the snapshot information */
572
573         if (snap_count) {
574                 u32 i;
575
576                 for (i = 0; i < snap_count; i++) {
577                         header->snapc->snaps[i] =
578                                 le64_to_cpu(ondisk->snaps[i].id);
579                         header->snap_sizes[i] =
580                                 le64_to_cpu(ondisk->snaps[i].image_size);
581                 }
582
583                 /* copy snapshot names */
584                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
585                         header->snap_names_len);
586         }
587
588         return 0;
589
590 out_err:
591         kfree(header->snap_sizes);
592         header->snap_sizes = NULL;
593         kfree(header->snap_names);
594         header->snap_names = NULL;
595         header->snap_names_len = 0;
596         kfree(header->object_prefix);
597         header->object_prefix = NULL;
598
599         return -ENOMEM;
600 }
601
602 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
603                         u64 *seq, u64 *size)
604 {
605         int i;
606         char *p = header->snap_names;
607
608         for (i = 0; i < header->total_snaps; i++) {
609                 if (!strcmp(snap_name, p)) {
610
611                         /* Found it.  Pass back its id and/or size */
612
613                         if (seq)
614                                 *seq = header->snapc->snaps[i];
615                         if (size)
616                                 *size = header->snap_sizes[i];
617                         return i;
618                 }
619                 p += strlen(p) + 1;     /* Skip ahead to the next name */
620         }
621         return -ENOENT;
622 }
623
624 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
625 {
626         int ret;
627
628         down_write(&rbd_dev->header_rwsem);
629
630         if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
631                     sizeof (RBD_SNAP_HEAD_NAME))) {
632                 rbd_dev->snap_id = CEPH_NOSNAP;
633                 rbd_dev->snap_exists = false;
634                 rbd_dev->read_only = 0;
635                 if (size)
636                         *size = rbd_dev->header.image_size;
637         } else {
638                 u64 snap_id = 0;
639
640                 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
641                                         &snap_id, size);
642                 if (ret < 0)
643                         goto done;
644                 rbd_dev->snap_id = snap_id;
645                 rbd_dev->snap_exists = true;
646                 rbd_dev->read_only = 1;
647         }
648
649         ret = 0;
650 done:
651         up_write(&rbd_dev->header_rwsem);
652         return ret;
653 }
654
655 static void rbd_header_free(struct rbd_image_header *header)
656 {
657         kfree(header->object_prefix);
658         header->object_prefix = NULL;
659         kfree(header->snap_sizes);
660         header->snap_sizes = NULL;
661         kfree(header->snap_names);
662         header->snap_names = NULL;
663         header->snap_names_len = 0;
664         ceph_put_snap_context(header->snapc);
665         header->snapc = NULL;
666 }
667
668 /*
669  * get the actual striped segment name, offset and length
670  */
671 static u64 rbd_get_segment(struct rbd_image_header *header,
672                            const char *object_prefix,
673                            u64 ofs, u64 len,
674                            char *seg_name, u64 *segofs)
675 {
676         u64 seg = ofs >> header->obj_order;
677
678         if (seg_name)
679                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
680                          "%s.%012llx", object_prefix, seg);
681
682         ofs = ofs & ((1 << header->obj_order) - 1);
683         len = min_t(u64, len, (1 << header->obj_order) - ofs);
684
685         if (segofs)
686                 *segofs = ofs;
687
688         return len;
689 }
690
691 static int rbd_get_num_segments(struct rbd_image_header *header,
692                                 u64 ofs, u64 len)
693 {
694         u64 start_seg = ofs >> header->obj_order;
695         u64 end_seg = (ofs + len - 1) >> header->obj_order;
696         return end_seg - start_seg + 1;
697 }
698
699 /*
700  * returns the size of an object in the image
701  */
702 static u64 rbd_obj_bytes(struct rbd_image_header *header)
703 {
704         return 1 << header->obj_order;
705 }
706
707 /*
708  * bio helpers
709  */
710
711 static void bio_chain_put(struct bio *chain)
712 {
713         struct bio *tmp;
714
715         while (chain) {
716                 tmp = chain;
717                 chain = chain->bi_next;
718                 bio_put(tmp);
719         }
720 }
721
722 /*
723  * zeros a bio chain, starting at specific offset
724  */
725 static void zero_bio_chain(struct bio *chain, int start_ofs)
726 {
727         struct bio_vec *bv;
728         unsigned long flags;
729         void *buf;
730         int i;
731         int pos = 0;
732
733         while (chain) {
734                 bio_for_each_segment(bv, chain, i) {
735                         if (pos + bv->bv_len > start_ofs) {
736                                 int remainder = max(start_ofs - pos, 0);
737                                 buf = bvec_kmap_irq(bv, &flags);
738                                 memset(buf + remainder, 0,
739                                        bv->bv_len - remainder);
740                                 bvec_kunmap_irq(buf, &flags);
741                         }
742                         pos += bv->bv_len;
743                 }
744
745                 chain = chain->bi_next;
746         }
747 }
748
749 /*
750  * bio_chain_clone - clone a chain of bios up to a certain length.
751  * might return a bio_pair that will need to be released.
752  */
753 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
754                                    struct bio_pair **bp,
755                                    int len, gfp_t gfpmask)
756 {
757         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
758         int total = 0;
759
760         if (*bp) {
761                 bio_pair_release(*bp);
762                 *bp = NULL;
763         }
764
765         while (old_chain && (total < len)) {
766                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
767                 if (!tmp)
768                         goto err_out;
769
770                 if (total + old_chain->bi_size > len) {
771                         struct bio_pair *bp;
772
773                         /*
774                          * this split can only happen with a single paged bio,
775                          * split_bio will BUG_ON if this is not the case
776                          */
777                         dout("bio_chain_clone split! total=%d remaining=%d"
778                              "bi_size=%u\n",
779                              total, len - total, old_chain->bi_size);
780
781                         /* split the bio. We'll release it either in the next
782                            call, or it will have to be released outside */
783                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
784                         if (!bp)
785                                 goto err_out;
786
787                         __bio_clone(tmp, &bp->bio1);
788
789                         *next = &bp->bio2;
790                 } else {
791                         __bio_clone(tmp, old_chain);
792                         *next = old_chain->bi_next;
793                 }
794
795                 tmp->bi_bdev = NULL;
796                 gfpmask &= ~__GFP_WAIT;
797                 tmp->bi_next = NULL;
798
799                 if (!new_chain) {
800                         new_chain = tail = tmp;
801                 } else {
802                         tail->bi_next = tmp;
803                         tail = tmp;
804                 }
805                 old_chain = old_chain->bi_next;
806
807                 total += tmp->bi_size;
808         }
809
810         BUG_ON(total < len);
811
812         if (tail)
813                 tail->bi_next = NULL;
814
815         *old = old_chain;
816
817         return new_chain;
818
819 err_out:
820         dout("bio_chain_clone with err\n");
821         bio_chain_put(new_chain);
822         return NULL;
823 }
824
825 /*
826  * helpers for osd request op vectors.
827  */
828 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
829                                         int opcode, u32 payload_len)
830 {
831         struct ceph_osd_req_op *ops;
832
833         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
834         if (!ops)
835                 return NULL;
836
837         ops[0].op = opcode;
838
839         /*
840          * op extent offset and length will be set later on
841          * in calc_raw_layout()
842          */
843         ops[0].payload_len = payload_len;
844
845         return ops;
846 }
847
848 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
849 {
850         kfree(ops);
851 }
852
853 static void rbd_coll_end_req_index(struct request *rq,
854                                    struct rbd_req_coll *coll,
855                                    int index,
856                                    int ret, u64 len)
857 {
858         struct request_queue *q;
859         int min, max, i;
860
861         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
862              coll, index, ret, (unsigned long long) len);
863
864         if (!rq)
865                 return;
866
867         if (!coll) {
868                 blk_end_request(rq, ret, len);
869                 return;
870         }
871
872         q = rq->q;
873
874         spin_lock_irq(q->queue_lock);
875         coll->status[index].done = 1;
876         coll->status[index].rc = ret;
877         coll->status[index].bytes = len;
878         max = min = coll->num_done;
879         while (max < coll->total && coll->status[max].done)
880                 max++;
881
882         for (i = min; i<max; i++) {
883                 __blk_end_request(rq, coll->status[i].rc,
884                                   coll->status[i].bytes);
885                 coll->num_done++;
886                 kref_put(&coll->kref, rbd_coll_release);
887         }
888         spin_unlock_irq(q->queue_lock);
889 }
890
891 static void rbd_coll_end_req(struct rbd_request *req,
892                              int ret, u64 len)
893 {
894         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
895 }
896
897 /*
898  * Send ceph osd request
899  */
900 static int rbd_do_request(struct request *rq,
901                           struct rbd_device *rbd_dev,
902                           struct ceph_snap_context *snapc,
903                           u64 snapid,
904                           const char *object_name, u64 ofs, u64 len,
905                           struct bio *bio,
906                           struct page **pages,
907                           int num_pages,
908                           int flags,
909                           struct ceph_osd_req_op *ops,
910                           struct rbd_req_coll *coll,
911                           int coll_index,
912                           void (*rbd_cb)(struct ceph_osd_request *req,
913                                          struct ceph_msg *msg),
914                           struct ceph_osd_request **linger_req,
915                           u64 *ver)
916 {
917         struct ceph_osd_request *req;
918         struct ceph_file_layout *layout;
919         int ret;
920         u64 bno;
921         struct timespec mtime = CURRENT_TIME;
922         struct rbd_request *req_data;
923         struct ceph_osd_request_head *reqhead;
924         struct ceph_osd_client *osdc;
925
926         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
927         if (!req_data) {
928                 if (coll)
929                         rbd_coll_end_req_index(rq, coll, coll_index,
930                                                -ENOMEM, len);
931                 return -ENOMEM;
932         }
933
934         if (coll) {
935                 req_data->coll = coll;
936                 req_data->coll_index = coll_index;
937         }
938
939         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
940                 (unsigned long long) ofs, (unsigned long long) len);
941
942         osdc = &rbd_dev->rbd_client->client->osdc;
943         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
944                                         false, GFP_NOIO, pages, bio);
945         if (!req) {
946                 ret = -ENOMEM;
947                 goto done_pages;
948         }
949
950         req->r_callback = rbd_cb;
951
952         req_data->rq = rq;
953         req_data->bio = bio;
954         req_data->pages = pages;
955         req_data->len = len;
956
957         req->r_priv = req_data;
958
959         reqhead = req->r_request->front.iov_base;
960         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
961
962         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
963         req->r_oid_len = strlen(req->r_oid);
964
965         layout = &req->r_file_layout;
966         memset(layout, 0, sizeof(*layout));
967         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
968         layout->fl_stripe_count = cpu_to_le32(1);
969         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
970         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
971         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
972                                 req, ops);
973
974         ceph_osdc_build_request(req, ofs, &len,
975                                 ops,
976                                 snapc,
977                                 &mtime,
978                                 req->r_oid, req->r_oid_len);
979
980         if (linger_req) {
981                 ceph_osdc_set_request_linger(osdc, req);
982                 *linger_req = req;
983         }
984
985         ret = ceph_osdc_start_request(osdc, req, false);
986         if (ret < 0)
987                 goto done_err;
988
989         if (!rbd_cb) {
990                 ret = ceph_osdc_wait_request(osdc, req);
991                 if (ver)
992                         *ver = le64_to_cpu(req->r_reassert_version.version);
993                 dout("reassert_ver=%llu\n",
994                         (unsigned long long)
995                                 le64_to_cpu(req->r_reassert_version.version));
996                 ceph_osdc_put_request(req);
997         }
998         return ret;
999
1000 done_err:
1001         bio_chain_put(req_data->bio);
1002         ceph_osdc_put_request(req);
1003 done_pages:
1004         rbd_coll_end_req(req_data, ret, len);
1005         kfree(req_data);
1006         return ret;
1007 }
1008
1009 /*
1010  * Ceph osd op callback
1011  */
1012 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1013 {
1014         struct rbd_request *req_data = req->r_priv;
1015         struct ceph_osd_reply_head *replyhead;
1016         struct ceph_osd_op *op;
1017         __s32 rc;
1018         u64 bytes;
1019         int read_op;
1020
1021         /* parse reply */
1022         replyhead = msg->front.iov_base;
1023         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1024         op = (void *)(replyhead + 1);
1025         rc = le32_to_cpu(replyhead->result);
1026         bytes = le64_to_cpu(op->extent.length);
1027         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1028
1029         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1030                 (unsigned long long) bytes, read_op, (int) rc);
1031
1032         if (rc == -ENOENT && read_op) {
1033                 zero_bio_chain(req_data->bio, 0);
1034                 rc = 0;
1035         } else if (rc == 0 && read_op && bytes < req_data->len) {
1036                 zero_bio_chain(req_data->bio, bytes);
1037                 bytes = req_data->len;
1038         }
1039
1040         rbd_coll_end_req(req_data, rc, bytes);
1041
1042         if (req_data->bio)
1043                 bio_chain_put(req_data->bio);
1044
1045         ceph_osdc_put_request(req);
1046         kfree(req_data);
1047 }
1048
1049 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1050 {
1051         ceph_osdc_put_request(req);
1052 }
1053
1054 /*
1055  * Do a synchronous ceph osd operation
1056  */
1057 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1058                            struct ceph_snap_context *snapc,
1059                            u64 snapid,
1060                            int flags,
1061                            struct ceph_osd_req_op *ops,
1062                            const char *object_name,
1063                            u64 ofs, u64 len,
1064                            char *buf,
1065                            struct ceph_osd_request **linger_req,
1066                            u64 *ver)
1067 {
1068         int ret;
1069         struct page **pages;
1070         int num_pages;
1071
1072         BUG_ON(ops == NULL);
1073
1074         num_pages = calc_pages_for(ofs , len);
1075         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1076         if (IS_ERR(pages))
1077                 return PTR_ERR(pages);
1078
1079         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1080                           object_name, ofs, len, NULL,
1081                           pages, num_pages,
1082                           flags,
1083                           ops,
1084                           NULL, 0,
1085                           NULL,
1086                           linger_req, ver);
1087         if (ret < 0)
1088                 goto done;
1089
1090         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1091                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1092
1093 done:
1094         ceph_release_page_vector(pages, num_pages);
1095         return ret;
1096 }
1097
1098 /*
1099  * Do an asynchronous ceph osd operation
1100  */
1101 static int rbd_do_op(struct request *rq,
1102                      struct rbd_device *rbd_dev,
1103                      struct ceph_snap_context *snapc,
1104                      u64 snapid,
1105                      int opcode, int flags,
1106                      u64 ofs, u64 len,
1107                      struct bio *bio,
1108                      struct rbd_req_coll *coll,
1109                      int coll_index)
1110 {
1111         char *seg_name;
1112         u64 seg_ofs;
1113         u64 seg_len;
1114         int ret;
1115         struct ceph_osd_req_op *ops;
1116         u32 payload_len;
1117
1118         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1119         if (!seg_name)
1120                 return -ENOMEM;
1121
1122         seg_len = rbd_get_segment(&rbd_dev->header,
1123                                   rbd_dev->header.object_prefix,
1124                                   ofs, len,
1125                                   seg_name, &seg_ofs);
1126
1127         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1128
1129         ret = -ENOMEM;
1130         ops = rbd_create_rw_ops(1, opcode, payload_len);
1131         if (!ops)
1132                 goto done;
1133
1134         /* we've taken care of segment sizes earlier when we
1135            cloned the bios. We should never have a segment
1136            truncated at this point */
1137         BUG_ON(seg_len < len);
1138
1139         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1140                              seg_name, seg_ofs, seg_len,
1141                              bio,
1142                              NULL, 0,
1143                              flags,
1144                              ops,
1145                              coll, coll_index,
1146                              rbd_req_cb, 0, NULL);
1147
1148         rbd_destroy_ops(ops);
1149 done:
1150         kfree(seg_name);
1151         return ret;
1152 }
1153
1154 /*
1155  * Request async osd write
1156  */
1157 static int rbd_req_write(struct request *rq,
1158                          struct rbd_device *rbd_dev,
1159                          struct ceph_snap_context *snapc,
1160                          u64 ofs, u64 len,
1161                          struct bio *bio,
1162                          struct rbd_req_coll *coll,
1163                          int coll_index)
1164 {
1165         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1166                          CEPH_OSD_OP_WRITE,
1167                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1168                          ofs, len, bio, coll, coll_index);
1169 }
1170
1171 /*
1172  * Request async osd read
1173  */
1174 static int rbd_req_read(struct request *rq,
1175                          struct rbd_device *rbd_dev,
1176                          u64 snapid,
1177                          u64 ofs, u64 len,
1178                          struct bio *bio,
1179                          struct rbd_req_coll *coll,
1180                          int coll_index)
1181 {
1182         return rbd_do_op(rq, rbd_dev, NULL,
1183                          snapid,
1184                          CEPH_OSD_OP_READ,
1185                          CEPH_OSD_FLAG_READ,
1186                          ofs, len, bio, coll, coll_index);
1187 }
1188
1189 /*
1190  * Request sync osd read
1191  */
1192 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1193                           u64 snapid,
1194                           const char *object_name,
1195                           u64 ofs, u64 len,
1196                           char *buf,
1197                           u64 *ver)
1198 {
1199         struct ceph_osd_req_op *ops;
1200         int ret;
1201
1202         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1203         if (!ops)
1204                 return -ENOMEM;
1205
1206         ret = rbd_req_sync_op(rbd_dev, NULL,
1207                                snapid,
1208                                CEPH_OSD_FLAG_READ,
1209                                ops, object_name, ofs, len, buf, NULL, ver);
1210         rbd_destroy_ops(ops);
1211
1212         return ret;
1213 }
1214
1215 /*
1216  * Request sync osd watch
1217  */
1218 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1219                                    u64 ver,
1220                                    u64 notify_id)
1221 {
1222         struct ceph_osd_req_op *ops;
1223         int ret;
1224
1225         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1226         if (!ops)
1227                 return -ENOMEM;
1228
1229         ops[0].watch.ver = cpu_to_le64(ver);
1230         ops[0].watch.cookie = notify_id;
1231         ops[0].watch.flag = 0;
1232
1233         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1234                           rbd_dev->header_name, 0, 0, NULL,
1235                           NULL, 0,
1236                           CEPH_OSD_FLAG_READ,
1237                           ops,
1238                           NULL, 0,
1239                           rbd_simple_req_cb, 0, NULL);
1240
1241         rbd_destroy_ops(ops);
1242         return ret;
1243 }
1244
1245 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1246 {
1247         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1248         u64 hver;
1249         int rc;
1250
1251         if (!rbd_dev)
1252                 return;
1253
1254         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1255                 rbd_dev->header_name, (unsigned long long) notify_id,
1256                 (unsigned int) opcode);
1257         rc = rbd_refresh_header(rbd_dev, &hver);
1258         if (rc)
1259                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1260                            " update snaps: %d\n", rbd_dev->major, rc);
1261
1262         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1263 }
1264
1265 /*
1266  * Request sync osd watch
1267  */
1268 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1269 {
1270         struct ceph_osd_req_op *ops;
1271         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1272         int ret;
1273
1274         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1275         if (!ops)
1276                 return -ENOMEM;
1277
1278         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1279                                      (void *)rbd_dev, &rbd_dev->watch_event);
1280         if (ret < 0)
1281                 goto fail;
1282
1283         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1284         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1285         ops[0].watch.flag = 1;
1286
1287         ret = rbd_req_sync_op(rbd_dev, NULL,
1288                               CEPH_NOSNAP,
1289                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1290                               ops,
1291                               rbd_dev->header_name,
1292                               0, 0, NULL,
1293                               &rbd_dev->watch_request, NULL);
1294
1295         if (ret < 0)
1296                 goto fail_event;
1297
1298         rbd_destroy_ops(ops);
1299         return 0;
1300
1301 fail_event:
1302         ceph_osdc_cancel_event(rbd_dev->watch_event);
1303         rbd_dev->watch_event = NULL;
1304 fail:
1305         rbd_destroy_ops(ops);
1306         return ret;
1307 }
1308
1309 /*
1310  * Request sync osd unwatch
1311  */
1312 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1313 {
1314         struct ceph_osd_req_op *ops;
1315         int ret;
1316
1317         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1318         if (!ops)
1319                 return -ENOMEM;
1320
1321         ops[0].watch.ver = 0;
1322         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1323         ops[0].watch.flag = 0;
1324
1325         ret = rbd_req_sync_op(rbd_dev, NULL,
1326                               CEPH_NOSNAP,
1327                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1328                               ops,
1329                               rbd_dev->header_name,
1330                               0, 0, NULL, NULL, NULL);
1331
1332
1333         rbd_destroy_ops(ops);
1334         ceph_osdc_cancel_event(rbd_dev->watch_event);
1335         rbd_dev->watch_event = NULL;
1336         return ret;
1337 }
1338
1339 struct rbd_notify_info {
1340         struct rbd_device *rbd_dev;
1341 };
1342
1343 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1344 {
1345         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1346         if (!rbd_dev)
1347                 return;
1348
1349         dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1350                         rbd_dev->header_name, (unsigned long long) notify_id,
1351                         (unsigned int) opcode);
1352 }
1353
1354 /*
1355  * Request sync osd notify
1356  */
1357 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1358 {
1359         struct ceph_osd_req_op *ops;
1360         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1361         struct ceph_osd_event *event;
1362         struct rbd_notify_info info;
1363         int payload_len = sizeof(u32) + sizeof(u32);
1364         int ret;
1365
1366         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1367         if (!ops)
1368                 return -ENOMEM;
1369
1370         info.rbd_dev = rbd_dev;
1371
1372         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1373                                      (void *)&info, &event);
1374         if (ret < 0)
1375                 goto fail;
1376
1377         ops[0].watch.ver = 1;
1378         ops[0].watch.flag = 1;
1379         ops[0].watch.cookie = event->cookie;
1380         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1381         ops[0].watch.timeout = 12;
1382
1383         ret = rbd_req_sync_op(rbd_dev, NULL,
1384                                CEPH_NOSNAP,
1385                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1386                                ops,
1387                                rbd_dev->header_name,
1388                                0, 0, NULL, NULL, NULL);
1389         if (ret < 0)
1390                 goto fail_event;
1391
1392         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1393         dout("ceph_osdc_wait_event returned %d\n", ret);
1394         rbd_destroy_ops(ops);
1395         return 0;
1396
1397 fail_event:
1398         ceph_osdc_cancel_event(event);
1399 fail:
1400         rbd_destroy_ops(ops);
1401         return ret;
1402 }
1403
1404 /*
1405  * Request sync osd read
1406  */
1407 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1408                              const char *object_name,
1409                              const char *class_name,
1410                              const char *method_name,
1411                              const char *data,
1412                              int len,
1413                              u64 *ver)
1414 {
1415         struct ceph_osd_req_op *ops;
1416         int class_name_len = strlen(class_name);
1417         int method_name_len = strlen(method_name);
1418         int ret;
1419
1420         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1421                                     class_name_len + method_name_len + len);
1422         if (!ops)
1423                 return -ENOMEM;
1424
1425         ops[0].cls.class_name = class_name;
1426         ops[0].cls.class_len = (__u8) class_name_len;
1427         ops[0].cls.method_name = method_name;
1428         ops[0].cls.method_len = (__u8) method_name_len;
1429         ops[0].cls.argc = 0;
1430         ops[0].cls.indata = data;
1431         ops[0].cls.indata_len = len;
1432
1433         ret = rbd_req_sync_op(rbd_dev, NULL,
1434                                CEPH_NOSNAP,
1435                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1436                                ops,
1437                                object_name, 0, 0, NULL, NULL, ver);
1438
1439         rbd_destroy_ops(ops);
1440
1441         dout("cls_exec returned %d\n", ret);
1442         return ret;
1443 }
1444
1445 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1446 {
1447         struct rbd_req_coll *coll =
1448                         kzalloc(sizeof(struct rbd_req_coll) +
1449                                 sizeof(struct rbd_req_status) * num_reqs,
1450                                 GFP_ATOMIC);
1451
1452         if (!coll)
1453                 return NULL;
1454         coll->total = num_reqs;
1455         kref_init(&coll->kref);
1456         return coll;
1457 }
1458
1459 /*
1460  * block device queue callback
1461  */
1462 static void rbd_rq_fn(struct request_queue *q)
1463 {
1464         struct rbd_device *rbd_dev = q->queuedata;
1465         struct request *rq;
1466         struct bio_pair *bp = NULL;
1467
1468         while ((rq = blk_fetch_request(q))) {
1469                 struct bio *bio;
1470                 struct bio *rq_bio, *next_bio = NULL;
1471                 bool do_write;
1472                 unsigned int size;
1473                 u64 op_size = 0;
1474                 u64 ofs;
1475                 int num_segs, cur_seg = 0;
1476                 struct rbd_req_coll *coll;
1477                 struct ceph_snap_context *snapc;
1478
1479                 /* peek at request from block layer */
1480                 if (!rq)
1481                         break;
1482
1483                 dout("fetched request\n");
1484
1485                 /* filter out block requests we don't understand */
1486                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1487                         __blk_end_request_all(rq, 0);
1488                         continue;
1489                 }
1490
1491                 /* deduce our operation (read, write) */
1492                 do_write = (rq_data_dir(rq) == WRITE);
1493
1494                 size = blk_rq_bytes(rq);
1495                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1496                 rq_bio = rq->bio;
1497                 if (do_write && rbd_dev->read_only) {
1498                         __blk_end_request_all(rq, -EROFS);
1499                         continue;
1500                 }
1501
1502                 spin_unlock_irq(q->queue_lock);
1503
1504                 down_read(&rbd_dev->header_rwsem);
1505
1506                 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1507                         up_read(&rbd_dev->header_rwsem);
1508                         dout("request for non-existent snapshot");
1509                         spin_lock_irq(q->queue_lock);
1510                         __blk_end_request_all(rq, -ENXIO);
1511                         continue;
1512                 }
1513
1514                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1515
1516                 up_read(&rbd_dev->header_rwsem);
1517
1518                 dout("%s 0x%x bytes at 0x%llx\n",
1519                      do_write ? "write" : "read",
1520                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1521
1522                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1523                 coll = rbd_alloc_coll(num_segs);
1524                 if (!coll) {
1525                         spin_lock_irq(q->queue_lock);
1526                         __blk_end_request_all(rq, -ENOMEM);
1527                         ceph_put_snap_context(snapc);
1528                         continue;
1529                 }
1530
1531                 do {
1532                         /* a bio clone to be passed down to OSD req */
1533                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1534                         op_size = rbd_get_segment(&rbd_dev->header,
1535                                                   rbd_dev->header.object_prefix,
1536                                                   ofs, size,
1537                                                   NULL, NULL);
1538                         kref_get(&coll->kref);
1539                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1540                                               op_size, GFP_ATOMIC);
1541                         if (!bio) {
1542                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1543                                                        -ENOMEM, op_size);
1544                                 goto next_seg;
1545                         }
1546
1547
1548                         /* init OSD command: write or read */
1549                         if (do_write)
1550                                 rbd_req_write(rq, rbd_dev,
1551                                               snapc,
1552                                               ofs,
1553                                               op_size, bio,
1554                                               coll, cur_seg);
1555                         else
1556                                 rbd_req_read(rq, rbd_dev,
1557                                              rbd_dev->snap_id,
1558                                              ofs,
1559                                              op_size, bio,
1560                                              coll, cur_seg);
1561
1562 next_seg:
1563                         size -= op_size;
1564                         ofs += op_size;
1565
1566                         cur_seg++;
1567                         rq_bio = next_bio;
1568                 } while (size > 0);
1569                 kref_put(&coll->kref, rbd_coll_release);
1570
1571                 if (bp)
1572                         bio_pair_release(bp);
1573                 spin_lock_irq(q->queue_lock);
1574
1575                 ceph_put_snap_context(snapc);
1576         }
1577 }
1578
1579 /*
1580  * a queue callback. Makes sure that we don't create a bio that spans across
1581  * multiple osd objects. One exception would be with a single page bios,
1582  * which we handle later at bio_chain_clone
1583  */
1584 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1585                           struct bio_vec *bvec)
1586 {
1587         struct rbd_device *rbd_dev = q->queuedata;
1588         unsigned int chunk_sectors;
1589         sector_t sector;
1590         unsigned int bio_sectors;
1591         int max;
1592
1593         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1594         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1595         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1596
1597         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1598                                  + bio_sectors)) << SECTOR_SHIFT;
1599         if (max < 0)
1600                 max = 0; /* bio_add cannot handle a negative return */
1601         if (max <= bvec->bv_len && bio_sectors == 0)
1602                 return bvec->bv_len;
1603         return max;
1604 }
1605
1606 static void rbd_free_disk(struct rbd_device *rbd_dev)
1607 {
1608         struct gendisk *disk = rbd_dev->disk;
1609
1610         if (!disk)
1611                 return;
1612
1613         rbd_header_free(&rbd_dev->header);
1614
1615         if (disk->flags & GENHD_FL_UP)
1616                 del_gendisk(disk);
1617         if (disk->queue)
1618                 blk_cleanup_queue(disk->queue);
1619         put_disk(disk);
1620 }
1621
1622 /*
1623  * Read the complete header for the given rbd device.
1624  *
1625  * Returns a pointer to a dynamically-allocated buffer containing
1626  * the complete and validated header.  Caller can pass the address
1627  * of a variable that will be filled in with the version of the
1628  * header object at the time it was read.
1629  *
1630  * Returns a pointer-coded errno if a failure occurs.
1631  */
1632 static struct rbd_image_header_ondisk *
1633 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1634 {
1635         struct rbd_image_header_ondisk *ondisk = NULL;
1636         u32 snap_count = 0;
1637         u64 names_size = 0;
1638         u32 want_count;
1639         int ret;
1640
1641         /*
1642          * The complete header will include an array of its 64-bit
1643          * snapshot ids, followed by the names of those snapshots as
1644          * a contiguous block of NUL-terminated strings.  Note that
1645          * the number of snapshots could change by the time we read
1646          * it in, in which case we re-read it.
1647          */
1648         do {
1649                 size_t size;
1650
1651                 kfree(ondisk);
1652
1653                 size = sizeof (*ondisk);
1654                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1655                 size += names_size;
1656                 ondisk = kmalloc(size, GFP_KERNEL);
1657                 if (!ondisk)
1658                         return ERR_PTR(-ENOMEM);
1659
1660                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1661                                        rbd_dev->header_name,
1662                                        0, size,
1663                                        (char *) ondisk, version);
1664
1665                 if (ret < 0)
1666                         goto out_err;
1667                 if (WARN_ON((size_t) ret < size)) {
1668                         ret = -ENXIO;
1669                         pr_warning("short header read for image %s"
1670                                         " (want %zd got %d)\n",
1671                                 rbd_dev->image_name, size, ret);
1672                         goto out_err;
1673                 }
1674                 if (!rbd_dev_ondisk_valid(ondisk)) {
1675                         ret = -ENXIO;
1676                         pr_warning("invalid header for image %s\n",
1677                                 rbd_dev->image_name);
1678                         goto out_err;
1679                 }
1680
1681                 names_size = le64_to_cpu(ondisk->snap_names_len);
1682                 want_count = snap_count;
1683                 snap_count = le32_to_cpu(ondisk->snap_count);
1684         } while (snap_count != want_count);
1685
1686         return ondisk;
1687
1688 out_err:
1689         kfree(ondisk);
1690
1691         return ERR_PTR(ret);
1692 }
1693
1694 /*
1695  * reload the ondisk the header
1696  */
1697 static int rbd_read_header(struct rbd_device *rbd_dev,
1698                            struct rbd_image_header *header)
1699 {
1700         struct rbd_image_header_ondisk *ondisk;
1701         u64 ver = 0;
1702         int ret;
1703
1704         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1705         if (IS_ERR(ondisk))
1706                 return PTR_ERR(ondisk);
1707         ret = rbd_header_from_disk(header, ondisk);
1708         if (ret >= 0)
1709                 header->obj_version = ver;
1710         kfree(ondisk);
1711
1712         return ret;
1713 }
1714
1715 /*
1716  * create a snapshot
1717  */
1718 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1719                                const char *snap_name,
1720                                gfp_t gfp_flags)
1721 {
1722         int name_len = strlen(snap_name);
1723         u64 new_snapid;
1724         int ret;
1725         void *data, *p, *e;
1726         struct ceph_mon_client *monc;
1727
1728         /* we should create a snapshot only if we're pointing at the head */
1729         if (rbd_dev->snap_id != CEPH_NOSNAP)
1730                 return -EINVAL;
1731
1732         monc = &rbd_dev->rbd_client->client->monc;
1733         ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1734         dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1735         if (ret < 0)
1736                 return ret;
1737
1738         data = kmalloc(name_len + 16, gfp_flags);
1739         if (!data)
1740                 return -ENOMEM;
1741
1742         p = data;
1743         e = data + name_len + 16;
1744
1745         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1746         ceph_encode_64_safe(&p, e, new_snapid, bad);
1747
1748         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1749                                 "rbd", "snap_add",
1750                                 data, p - data, NULL);
1751
1752         kfree(data);
1753
1754         return ret < 0 ? ret : 0;
1755 bad:
1756         return -ERANGE;
1757 }
1758
1759 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1760 {
1761         struct rbd_snap *snap;
1762         struct rbd_snap *next;
1763
1764         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1765                 __rbd_remove_snap_dev(snap);
1766 }
1767
1768 /*
1769  * only read the first part of the ondisk header, without the snaps info
1770  */
1771 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1772 {
1773         int ret;
1774         struct rbd_image_header h;
1775
1776         ret = rbd_read_header(rbd_dev, &h);
1777         if (ret < 0)
1778                 return ret;
1779
1780         down_write(&rbd_dev->header_rwsem);
1781
1782         /* resized? */
1783         if (rbd_dev->snap_id == CEPH_NOSNAP) {
1784                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1785
1786                 dout("setting size to %llu sectors", (unsigned long long) size);
1787                 set_capacity(rbd_dev->disk, size);
1788         }
1789
1790         /* rbd_dev->header.object_prefix shouldn't change */
1791         kfree(rbd_dev->header.snap_sizes);
1792         kfree(rbd_dev->header.snap_names);
1793         /* osd requests may still refer to snapc */
1794         ceph_put_snap_context(rbd_dev->header.snapc);
1795
1796         if (hver)
1797                 *hver = h.obj_version;
1798         rbd_dev->header.obj_version = h.obj_version;
1799         rbd_dev->header.image_size = h.image_size;
1800         rbd_dev->header.total_snaps = h.total_snaps;
1801         rbd_dev->header.snapc = h.snapc;
1802         rbd_dev->header.snap_names = h.snap_names;
1803         rbd_dev->header.snap_names_len = h.snap_names_len;
1804         rbd_dev->header.snap_sizes = h.snap_sizes;
1805         /* Free the extra copy of the object prefix */
1806         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1807         kfree(h.object_prefix);
1808
1809         ret = __rbd_init_snaps_header(rbd_dev);
1810
1811         up_write(&rbd_dev->header_rwsem);
1812
1813         return ret;
1814 }
1815
1816 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1817 {
1818         int ret;
1819
1820         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1821         ret = __rbd_refresh_header(rbd_dev, hver);
1822         mutex_unlock(&ctl_mutex);
1823
1824         return ret;
1825 }
1826
1827 static int rbd_init_disk(struct rbd_device *rbd_dev)
1828 {
1829         struct gendisk *disk;
1830         struct request_queue *q;
1831         int rc;
1832         u64 segment_size;
1833         u64 total_size = 0;
1834
1835         /* contact OSD, request size info about the object being mapped */
1836         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1837         if (rc)
1838                 return rc;
1839
1840         /* no need to lock here, as rbd_dev is not registered yet */
1841         rc = __rbd_init_snaps_header(rbd_dev);
1842         if (rc)
1843                 return rc;
1844
1845         rc = rbd_header_set_snap(rbd_dev, &total_size);
1846         if (rc)
1847                 return rc;
1848
1849         /* create gendisk info */
1850         rc = -ENOMEM;
1851         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1852         if (!disk)
1853                 goto out;
1854
1855         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1856                  rbd_dev->dev_id);
1857         disk->major = rbd_dev->major;
1858         disk->first_minor = 0;
1859         disk->fops = &rbd_bd_ops;
1860         disk->private_data = rbd_dev;
1861
1862         /* init rq */
1863         rc = -ENOMEM;
1864         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1865         if (!q)
1866                 goto out_disk;
1867
1868         /* We use the default size, but let's be explicit about it. */
1869         blk_queue_physical_block_size(q, SECTOR_SIZE);
1870
1871         /* set io sizes to object size */
1872         segment_size = rbd_obj_bytes(&rbd_dev->header);
1873         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1874         blk_queue_max_segment_size(q, segment_size);
1875         blk_queue_io_min(q, segment_size);
1876         blk_queue_io_opt(q, segment_size);
1877
1878         blk_queue_merge_bvec(q, rbd_merge_bvec);
1879         disk->queue = q;
1880
1881         q->queuedata = rbd_dev;
1882
1883         rbd_dev->disk = disk;
1884         rbd_dev->q = q;
1885
1886         /* finally, announce the disk to the world */
1887         set_capacity(disk, total_size / SECTOR_SIZE);
1888         add_disk(disk);
1889
1890         pr_info("%s: added with size 0x%llx\n",
1891                 disk->disk_name, (unsigned long long)total_size);
1892         return 0;
1893
1894 out_disk:
1895         put_disk(disk);
1896 out:
1897         return rc;
1898 }
1899
1900 /*
1901   sysfs
1902 */
1903
1904 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1905 {
1906         return container_of(dev, struct rbd_device, dev);
1907 }
1908
1909 static ssize_t rbd_size_show(struct device *dev,
1910                              struct device_attribute *attr, char *buf)
1911 {
1912         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1913         sector_t size;
1914
1915         down_read(&rbd_dev->header_rwsem);
1916         size = get_capacity(rbd_dev->disk);
1917         up_read(&rbd_dev->header_rwsem);
1918
1919         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1920 }
1921
1922 static ssize_t rbd_major_show(struct device *dev,
1923                               struct device_attribute *attr, char *buf)
1924 {
1925         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1926
1927         return sprintf(buf, "%d\n", rbd_dev->major);
1928 }
1929
1930 static ssize_t rbd_client_id_show(struct device *dev,
1931                                   struct device_attribute *attr, char *buf)
1932 {
1933         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1934
1935         return sprintf(buf, "client%lld\n",
1936                         ceph_client_id(rbd_dev->rbd_client->client));
1937 }
1938
1939 static ssize_t rbd_pool_show(struct device *dev,
1940                              struct device_attribute *attr, char *buf)
1941 {
1942         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1943
1944         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1945 }
1946
1947 static ssize_t rbd_pool_id_show(struct device *dev,
1948                              struct device_attribute *attr, char *buf)
1949 {
1950         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1951
1952         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1953 }
1954
1955 static ssize_t rbd_name_show(struct device *dev,
1956                              struct device_attribute *attr, char *buf)
1957 {
1958         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1959
1960         return sprintf(buf, "%s\n", rbd_dev->image_name);
1961 }
1962
1963 static ssize_t rbd_snap_show(struct device *dev,
1964                              struct device_attribute *attr,
1965                              char *buf)
1966 {
1967         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1968
1969         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1970 }
1971
1972 static ssize_t rbd_image_refresh(struct device *dev,
1973                                  struct device_attribute *attr,
1974                                  const char *buf,
1975                                  size_t size)
1976 {
1977         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1978         int ret;
1979
1980         ret = rbd_refresh_header(rbd_dev, NULL);
1981
1982         return ret < 0 ? ret : size;
1983 }
1984
1985 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1986 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1987 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1988 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1989 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1990 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1991 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1992 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1993 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1994
1995 static struct attribute *rbd_attrs[] = {
1996         &dev_attr_size.attr,
1997         &dev_attr_major.attr,
1998         &dev_attr_client_id.attr,
1999         &dev_attr_pool.attr,
2000         &dev_attr_pool_id.attr,
2001         &dev_attr_name.attr,
2002         &dev_attr_current_snap.attr,
2003         &dev_attr_refresh.attr,
2004         &dev_attr_create_snap.attr,
2005         NULL
2006 };
2007
2008 static struct attribute_group rbd_attr_group = {
2009         .attrs = rbd_attrs,
2010 };
2011
2012 static const struct attribute_group *rbd_attr_groups[] = {
2013         &rbd_attr_group,
2014         NULL
2015 };
2016
2017 static void rbd_sysfs_dev_release(struct device *dev)
2018 {
2019 }
2020
2021 static struct device_type rbd_device_type = {
2022         .name           = "rbd",
2023         .groups         = rbd_attr_groups,
2024         .release        = rbd_sysfs_dev_release,
2025 };
2026
2027
2028 /*
2029   sysfs - snapshots
2030 */
2031
2032 static ssize_t rbd_snap_size_show(struct device *dev,
2033                                   struct device_attribute *attr,
2034                                   char *buf)
2035 {
2036         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2037
2038         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2039 }
2040
2041 static ssize_t rbd_snap_id_show(struct device *dev,
2042                                 struct device_attribute *attr,
2043                                 char *buf)
2044 {
2045         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2046
2047         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2048 }
2049
2050 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2051 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2052
2053 static struct attribute *rbd_snap_attrs[] = {
2054         &dev_attr_snap_size.attr,
2055         &dev_attr_snap_id.attr,
2056         NULL,
2057 };
2058
2059 static struct attribute_group rbd_snap_attr_group = {
2060         .attrs = rbd_snap_attrs,
2061 };
2062
2063 static void rbd_snap_dev_release(struct device *dev)
2064 {
2065         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2066         kfree(snap->name);
2067         kfree(snap);
2068 }
2069
2070 static const struct attribute_group *rbd_snap_attr_groups[] = {
2071         &rbd_snap_attr_group,
2072         NULL
2073 };
2074
2075 static struct device_type rbd_snap_device_type = {
2076         .groups         = rbd_snap_attr_groups,
2077         .release        = rbd_snap_dev_release,
2078 };
2079
2080 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2081 {
2082         list_del(&snap->node);
2083         device_unregister(&snap->dev);
2084 }
2085
2086 static int rbd_register_snap_dev(struct rbd_snap *snap,
2087                                   struct device *parent)
2088 {
2089         struct device *dev = &snap->dev;
2090         int ret;
2091
2092         dev->type = &rbd_snap_device_type;
2093         dev->parent = parent;
2094         dev->release = rbd_snap_dev_release;
2095         dev_set_name(dev, "snap_%s", snap->name);
2096         ret = device_register(dev);
2097
2098         return ret;
2099 }
2100
2101 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2102                                               int i, const char *name)
2103 {
2104         struct rbd_snap *snap;
2105         int ret;
2106
2107         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2108         if (!snap)
2109                 return ERR_PTR(-ENOMEM);
2110
2111         ret = -ENOMEM;
2112         snap->name = kstrdup(name, GFP_KERNEL);
2113         if (!snap->name)
2114                 goto err;
2115
2116         snap->size = rbd_dev->header.snap_sizes[i];
2117         snap->id = rbd_dev->header.snapc->snaps[i];
2118         if (device_is_registered(&rbd_dev->dev)) {
2119                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2120                 if (ret < 0)
2121                         goto err;
2122         }
2123
2124         return snap;
2125
2126 err:
2127         kfree(snap->name);
2128         kfree(snap);
2129
2130         return ERR_PTR(ret);
2131 }
2132
2133 /*
2134  * Scan the rbd device's current snapshot list and compare it to the
2135  * newly-received snapshot context.  Remove any existing snapshots
2136  * not present in the new snapshot context.  Add a new snapshot for
2137  * any snaphots in the snapshot context not in the current list.
2138  * And verify there are no changes to snapshots we already know
2139  * about.
2140  *
2141  * Assumes the snapshots in the snapshot context are sorted by
2142  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2143  * are also maintained in that order.)
2144  */
2145 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2146 {
2147         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2148         const u32 snap_count = snapc->num_snaps;
2149         char *snap_name = rbd_dev->header.snap_names;
2150         struct list_head *head = &rbd_dev->snaps;
2151         struct list_head *links = head->next;
2152         u32 index = 0;
2153
2154         while (index < snap_count || links != head) {
2155                 u64 snap_id;
2156                 struct rbd_snap *snap;
2157
2158                 snap_id = index < snap_count ? snapc->snaps[index]
2159                                              : CEPH_NOSNAP;
2160                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2161                                      : NULL;
2162                 BUG_ON(snap && snap->id == CEPH_NOSNAP);
2163
2164                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2165                         struct list_head *next = links->next;
2166
2167                         /* Existing snapshot not in the new snap context */
2168
2169                         if (rbd_dev->snap_id == snap->id)
2170                                 rbd_dev->snap_exists = false;
2171                         __rbd_remove_snap_dev(snap);
2172
2173                         /* Done with this list entry; advance */
2174
2175                         links = next;
2176                         continue;
2177                 }
2178
2179                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2180                         struct rbd_snap *new_snap;
2181
2182                         /* We haven't seen this snapshot before */
2183
2184                         new_snap = __rbd_add_snap_dev(rbd_dev, index,
2185                                                         snap_name);
2186                         if (IS_ERR(new_snap))
2187                                 return PTR_ERR(new_snap);
2188
2189                         /* New goes before existing, or at end of list */
2190
2191                         if (snap)
2192                                 list_add_tail(&new_snap->node, &snap->node);
2193                         else
2194                                 list_add_tail(&new_snap->node, head);
2195                 } else {
2196                         /* Already have this one */
2197
2198                         BUG_ON(snap->size != rbd_dev->header.snap_sizes[index]);
2199                         BUG_ON(strcmp(snap->name, snap_name));
2200
2201                         /* Done with this list entry; advance */
2202
2203                         links = links->next;
2204                 }
2205
2206                 /* Advance to the next entry in the snapshot context */
2207
2208                 index++;
2209                 snap_name += strlen(snap_name) + 1;
2210         }
2211
2212         return 0;
2213 }
2214
2215 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2216 {
2217         int ret;
2218         struct device *dev;
2219         struct rbd_snap *snap;
2220
2221         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2222         dev = &rbd_dev->dev;
2223
2224         dev->bus = &rbd_bus_type;
2225         dev->type = &rbd_device_type;
2226         dev->parent = &rbd_root_dev;
2227         dev->release = rbd_dev_release;
2228         dev_set_name(dev, "%d", rbd_dev->dev_id);
2229         ret = device_register(dev);
2230         if (ret < 0)
2231                 goto out;
2232
2233         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2234                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2235                 if (ret < 0)
2236                         break;
2237         }
2238 out:
2239         mutex_unlock(&ctl_mutex);
2240         return ret;
2241 }
2242
2243 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2244 {
2245         device_unregister(&rbd_dev->dev);
2246 }
2247
2248 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2249 {
2250         int ret, rc;
2251
2252         do {
2253                 ret = rbd_req_sync_watch(rbd_dev);
2254                 if (ret == -ERANGE) {
2255                         rc = rbd_refresh_header(rbd_dev, NULL);
2256                         if (rc < 0)
2257                                 return rc;
2258                 }
2259         } while (ret == -ERANGE);
2260
2261         return ret;
2262 }
2263
2264 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2265
2266 /*
2267  * Get a unique rbd identifier for the given new rbd_dev, and add
2268  * the rbd_dev to the global list.  The minimum rbd id is 1.
2269  */
2270 static void rbd_id_get(struct rbd_device *rbd_dev)
2271 {
2272         rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2273
2274         spin_lock(&rbd_dev_list_lock);
2275         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2276         spin_unlock(&rbd_dev_list_lock);
2277 }
2278
2279 /*
2280  * Remove an rbd_dev from the global list, and record that its
2281  * identifier is no longer in use.
2282  */
2283 static void rbd_id_put(struct rbd_device *rbd_dev)
2284 {
2285         struct list_head *tmp;
2286         int rbd_id = rbd_dev->dev_id;
2287         int max_id;
2288
2289         BUG_ON(rbd_id < 1);
2290
2291         spin_lock(&rbd_dev_list_lock);
2292         list_del_init(&rbd_dev->node);
2293
2294         /*
2295          * If the id being "put" is not the current maximum, there
2296          * is nothing special we need to do.
2297          */
2298         if (rbd_id != atomic64_read(&rbd_id_max)) {
2299                 spin_unlock(&rbd_dev_list_lock);
2300                 return;
2301         }
2302
2303         /*
2304          * We need to update the current maximum id.  Search the
2305          * list to find out what it is.  We're more likely to find
2306          * the maximum at the end, so search the list backward.
2307          */
2308         max_id = 0;
2309         list_for_each_prev(tmp, &rbd_dev_list) {
2310                 struct rbd_device *rbd_dev;
2311
2312                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2313                 if (rbd_id > max_id)
2314                         max_id = rbd_id;
2315         }
2316         spin_unlock(&rbd_dev_list_lock);
2317
2318         /*
2319          * The max id could have been updated by rbd_id_get(), in
2320          * which case it now accurately reflects the new maximum.
2321          * Be careful not to overwrite the maximum value in that
2322          * case.
2323          */
2324         atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2325 }
2326
2327 /*
2328  * Skips over white space at *buf, and updates *buf to point to the
2329  * first found non-space character (if any). Returns the length of
2330  * the token (string of non-white space characters) found.  Note
2331  * that *buf must be terminated with '\0'.
2332  */
2333 static inline size_t next_token(const char **buf)
2334 {
2335         /*
2336         * These are the characters that produce nonzero for
2337         * isspace() in the "C" and "POSIX" locales.
2338         */
2339         const char *spaces = " \f\n\r\t\v";
2340
2341         *buf += strspn(*buf, spaces);   /* Find start of token */
2342
2343         return strcspn(*buf, spaces);   /* Return token length */
2344 }
2345
2346 /*
2347  * Finds the next token in *buf, and if the provided token buffer is
2348  * big enough, copies the found token into it.  The result, if
2349  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2350  * must be terminated with '\0' on entry.
2351  *
2352  * Returns the length of the token found (not including the '\0').
2353  * Return value will be 0 if no token is found, and it will be >=
2354  * token_size if the token would not fit.
2355  *
2356  * The *buf pointer will be updated to point beyond the end of the
2357  * found token.  Note that this occurs even if the token buffer is
2358  * too small to hold it.
2359  */
2360 static inline size_t copy_token(const char **buf,
2361                                 char *token,
2362                                 size_t token_size)
2363 {
2364         size_t len;
2365
2366         len = next_token(buf);
2367         if (len < token_size) {
2368                 memcpy(token, *buf, len);
2369                 *(token + len) = '\0';
2370         }
2371         *buf += len;
2372
2373         return len;
2374 }
2375
2376 /*
2377  * Finds the next token in *buf, dynamically allocates a buffer big
2378  * enough to hold a copy of it, and copies the token into the new
2379  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2380  * that a duplicate buffer is created even for a zero-length token.
2381  *
2382  * Returns a pointer to the newly-allocated duplicate, or a null
2383  * pointer if memory for the duplicate was not available.  If
2384  * the lenp argument is a non-null pointer, the length of the token
2385  * (not including the '\0') is returned in *lenp.
2386  *
2387  * If successful, the *buf pointer will be updated to point beyond
2388  * the end of the found token.
2389  *
2390  * Note: uses GFP_KERNEL for allocation.
2391  */
2392 static inline char *dup_token(const char **buf, size_t *lenp)
2393 {
2394         char *dup;
2395         size_t len;
2396
2397         len = next_token(buf);
2398         dup = kmalloc(len + 1, GFP_KERNEL);
2399         if (!dup)
2400                 return NULL;
2401
2402         memcpy(dup, *buf, len);
2403         *(dup + len) = '\0';
2404         *buf += len;
2405
2406         if (lenp)
2407                 *lenp = len;
2408
2409         return dup;
2410 }
2411
2412 /*
2413  * This fills in the pool_name, image_name, image_name_len, snap_name,
2414  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2415  * on the list of monitor addresses and other options provided via
2416  * /sys/bus/rbd/add.
2417  *
2418  * Note: rbd_dev is assumed to have been initially zero-filled.
2419  */
2420 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2421                               const char *buf,
2422                               const char **mon_addrs,
2423                               size_t *mon_addrs_size,
2424                               char *options,
2425                              size_t options_size)
2426 {
2427         size_t len;
2428         int ret;
2429
2430         /* The first four tokens are required */
2431
2432         len = next_token(&buf);
2433         if (!len)
2434                 return -EINVAL;
2435         *mon_addrs_size = len + 1;
2436         *mon_addrs = buf;
2437
2438         buf += len;
2439
2440         len = copy_token(&buf, options, options_size);
2441         if (!len || len >= options_size)
2442                 return -EINVAL;
2443
2444         ret = -ENOMEM;
2445         rbd_dev->pool_name = dup_token(&buf, NULL);
2446         if (!rbd_dev->pool_name)
2447                 goto out_err;
2448
2449         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2450         if (!rbd_dev->image_name)
2451                 goto out_err;
2452
2453         /* Create the name of the header object */
2454
2455         rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2456                                                 + sizeof (RBD_SUFFIX),
2457                                         GFP_KERNEL);
2458         if (!rbd_dev->header_name)
2459                 goto out_err;
2460         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2461
2462         /*
2463          * The snapshot name is optional.  If none is is supplied,
2464          * we use the default value.
2465          */
2466         rbd_dev->snap_name = dup_token(&buf, &len);
2467         if (!rbd_dev->snap_name)
2468                 goto out_err;
2469         if (!len) {
2470                 /* Replace the empty name with the default */
2471                 kfree(rbd_dev->snap_name);
2472                 rbd_dev->snap_name
2473                         = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2474                 if (!rbd_dev->snap_name)
2475                         goto out_err;
2476
2477                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2478                         sizeof (RBD_SNAP_HEAD_NAME));
2479         }
2480
2481         return 0;
2482
2483 out_err:
2484         kfree(rbd_dev->header_name);
2485         rbd_dev->header_name = NULL;
2486         kfree(rbd_dev->image_name);
2487         rbd_dev->image_name = NULL;
2488         rbd_dev->image_name_len = 0;
2489         kfree(rbd_dev->pool_name);
2490         rbd_dev->pool_name = NULL;
2491
2492         return ret;
2493 }
2494
2495 static ssize_t rbd_add(struct bus_type *bus,
2496                        const char *buf,
2497                        size_t count)
2498 {
2499         char *options;
2500         struct rbd_device *rbd_dev = NULL;
2501         const char *mon_addrs = NULL;
2502         size_t mon_addrs_size = 0;
2503         struct ceph_osd_client *osdc;
2504         int rc = -ENOMEM;
2505
2506         if (!try_module_get(THIS_MODULE))
2507                 return -ENODEV;
2508
2509         options = kmalloc(count, GFP_KERNEL);
2510         if (!options)
2511                 goto err_nomem;
2512         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2513         if (!rbd_dev)
2514                 goto err_nomem;
2515
2516         /* static rbd_device initialization */
2517         spin_lock_init(&rbd_dev->lock);
2518         INIT_LIST_HEAD(&rbd_dev->node);
2519         INIT_LIST_HEAD(&rbd_dev->snaps);
2520         init_rwsem(&rbd_dev->header_rwsem);
2521
2522         /* generate unique id: find highest unique id, add one */
2523         rbd_id_get(rbd_dev);
2524
2525         /* Fill in the device name, now that we have its id. */
2526         BUILD_BUG_ON(DEV_NAME_LEN
2527                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2528         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2529
2530         /* parse add command */
2531         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2532                                 options, count);
2533         if (rc)
2534                 goto err_put_id;
2535
2536         rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2537                                                 options);
2538         if (IS_ERR(rbd_dev->rbd_client)) {
2539                 rc = PTR_ERR(rbd_dev->rbd_client);
2540                 rbd_dev->rbd_client = NULL;
2541                 goto err_put_id;
2542         }
2543
2544         /* pick the pool */
2545         osdc = &rbd_dev->rbd_client->client->osdc;
2546         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2547         if (rc < 0)
2548                 goto err_out_client;
2549         rbd_dev->pool_id = rc;
2550
2551         /* register our block device */
2552         rc = register_blkdev(0, rbd_dev->name);
2553         if (rc < 0)
2554                 goto err_out_client;
2555         rbd_dev->major = rc;
2556
2557         rc = rbd_bus_add_dev(rbd_dev);
2558         if (rc)
2559                 goto err_out_blkdev;
2560
2561         /*
2562          * At this point cleanup in the event of an error is the job
2563          * of the sysfs code (initiated by rbd_bus_del_dev()).
2564          *
2565          * Set up and announce blkdev mapping.
2566          */
2567         rc = rbd_init_disk(rbd_dev);
2568         if (rc)
2569                 goto err_out_bus;
2570
2571         rc = rbd_init_watch_dev(rbd_dev);
2572         if (rc)
2573                 goto err_out_bus;
2574
2575         return count;
2576
2577 err_out_bus:
2578         /* this will also clean up rest of rbd_dev stuff */
2579
2580         rbd_bus_del_dev(rbd_dev);
2581         kfree(options);
2582         return rc;
2583
2584 err_out_blkdev:
2585         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2586 err_out_client:
2587         rbd_put_client(rbd_dev);
2588 err_put_id:
2589         if (rbd_dev->pool_name) {
2590                 kfree(rbd_dev->snap_name);
2591                 kfree(rbd_dev->header_name);
2592                 kfree(rbd_dev->image_name);
2593                 kfree(rbd_dev->pool_name);
2594         }
2595         rbd_id_put(rbd_dev);
2596 err_nomem:
2597         kfree(rbd_dev);
2598         kfree(options);
2599
2600         dout("Error adding device %s\n", buf);
2601         module_put(THIS_MODULE);
2602
2603         return (ssize_t) rc;
2604 }
2605
2606 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2607 {
2608         struct list_head *tmp;
2609         struct rbd_device *rbd_dev;
2610
2611         spin_lock(&rbd_dev_list_lock);
2612         list_for_each(tmp, &rbd_dev_list) {
2613                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2614                 if (rbd_dev->dev_id == dev_id) {
2615                         spin_unlock(&rbd_dev_list_lock);
2616                         return rbd_dev;
2617                 }
2618         }
2619         spin_unlock(&rbd_dev_list_lock);
2620         return NULL;
2621 }
2622
2623 static void rbd_dev_release(struct device *dev)
2624 {
2625         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2626
2627         if (rbd_dev->watch_request) {
2628                 struct ceph_client *client = rbd_dev->rbd_client->client;
2629
2630                 ceph_osdc_unregister_linger_request(&client->osdc,
2631                                                     rbd_dev->watch_request);
2632         }
2633         if (rbd_dev->watch_event)
2634                 rbd_req_sync_unwatch(rbd_dev);
2635
2636         rbd_put_client(rbd_dev);
2637
2638         /* clean up and free blkdev */
2639         rbd_free_disk(rbd_dev);
2640         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2641
2642         /* done with the id, and with the rbd_dev */
2643         kfree(rbd_dev->snap_name);
2644         kfree(rbd_dev->header_name);
2645         kfree(rbd_dev->pool_name);
2646         kfree(rbd_dev->image_name);
2647         rbd_id_put(rbd_dev);
2648         kfree(rbd_dev);
2649
2650         /* release module ref */
2651         module_put(THIS_MODULE);
2652 }
2653
2654 static ssize_t rbd_remove(struct bus_type *bus,
2655                           const char *buf,
2656                           size_t count)
2657 {
2658         struct rbd_device *rbd_dev = NULL;
2659         int target_id, rc;
2660         unsigned long ul;
2661         int ret = count;
2662
2663         rc = strict_strtoul(buf, 10, &ul);
2664         if (rc)
2665                 return rc;
2666
2667         /* convert to int; abort if we lost anything in the conversion */
2668         target_id = (int) ul;
2669         if (target_id != ul)
2670                 return -EINVAL;
2671
2672         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2673
2674         rbd_dev = __rbd_get_dev(target_id);
2675         if (!rbd_dev) {
2676                 ret = -ENOENT;
2677                 goto done;
2678         }
2679
2680         __rbd_remove_all_snaps(rbd_dev);
2681         rbd_bus_del_dev(rbd_dev);
2682
2683 done:
2684         mutex_unlock(&ctl_mutex);
2685         return ret;
2686 }
2687
2688 static ssize_t rbd_snap_add(struct device *dev,
2689                             struct device_attribute *attr,
2690                             const char *buf,
2691                             size_t count)
2692 {
2693         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2694         int ret;
2695         char *name = kmalloc(count + 1, GFP_KERNEL);
2696         if (!name)
2697                 return -ENOMEM;
2698
2699         snprintf(name, count, "%s", buf);
2700
2701         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2702
2703         ret = rbd_header_add_snap(rbd_dev,
2704                                   name, GFP_KERNEL);
2705         if (ret < 0)
2706                 goto err_unlock;
2707
2708         ret = __rbd_refresh_header(rbd_dev, NULL);
2709         if (ret < 0)
2710                 goto err_unlock;
2711
2712         /* shouldn't hold ctl_mutex when notifying.. notify might
2713            trigger a watch callback that would need to get that mutex */
2714         mutex_unlock(&ctl_mutex);
2715
2716         /* make a best effort, don't error if failed */
2717         rbd_req_sync_notify(rbd_dev);
2718
2719         ret = count;
2720         kfree(name);
2721         return ret;
2722
2723 err_unlock:
2724         mutex_unlock(&ctl_mutex);
2725         kfree(name);
2726         return ret;
2727 }
2728
2729 /*
2730  * create control files in sysfs
2731  * /sys/bus/rbd/...
2732  */
2733 static int rbd_sysfs_init(void)
2734 {
2735         int ret;
2736
2737         ret = device_register(&rbd_root_dev);
2738         if (ret < 0)
2739                 return ret;
2740
2741         ret = bus_register(&rbd_bus_type);
2742         if (ret < 0)
2743                 device_unregister(&rbd_root_dev);
2744
2745         return ret;
2746 }
2747
2748 static void rbd_sysfs_cleanup(void)
2749 {
2750         bus_unregister(&rbd_bus_type);
2751         device_unregister(&rbd_root_dev);
2752 }
2753
2754 int __init rbd_init(void)
2755 {
2756         int rc;
2757
2758         rc = rbd_sysfs_init();
2759         if (rc)
2760                 return rc;
2761         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2762         return 0;
2763 }
2764
2765 void __exit rbd_exit(void)
2766 {
2767         rbd_sysfs_cleanup();
2768 }
2769
2770 module_init(rbd_init);
2771 module_exit(rbd_exit);
2772
2773 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2774 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2775 MODULE_DESCRIPTION("rados block device");
2776
2777 /* following authorship retained from original osdblk.c */
2778 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2779
2780 MODULE_LICENSE("GPL");