rbd: record image-relative offset in object requests
[linux-block.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (0)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 obj_version;
112 };
113
114 /*
115  * An rbd image specification.
116  *
117  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
118  * identify an image.  Each rbd_dev structure includes a pointer to
119  * an rbd_spec structure that encapsulates this identity.
120  *
121  * Each of the id's in an rbd_spec has an associated name.  For a
122  * user-mapped image, the names are supplied and the id's associated
123  * with them are looked up.  For a layered image, a parent image is
124  * defined by the tuple, and the names are looked up.
125  *
126  * An rbd_dev structure contains a parent_spec pointer which is
127  * non-null if the image it represents is a child in a layered
128  * image.  This pointer will refer to the rbd_spec structure used
129  * by the parent rbd_dev for its own identity (i.e., the structure
130  * is shared between the parent and child).
131  *
132  * Since these structures are populated once, during the discovery
133  * phase of image construction, they are effectively immutable so
134  * we make no effort to synchronize access to them.
135  *
136  * Note that code herein does not assume the image name is known (it
137  * could be a null pointer).
138  */
139 struct rbd_spec {
140         u64             pool_id;
141         char            *pool_name;
142
143         char            *image_id;
144         char            *image_name;
145
146         u64             snap_id;
147         char            *snap_name;
148
149         struct kref     kref;
150 };
151
152 /*
153  * an instance of the client.  multiple devices may share an rbd client.
154  */
155 struct rbd_client {
156         struct ceph_client      *client;
157         struct kref             kref;
158         struct list_head        node;
159 };
160
161 struct rbd_img_request;
162 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
163
164 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
165
166 struct rbd_obj_request;
167 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
168
169 enum obj_request_type {
170         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
171 };
172
173 struct rbd_obj_request {
174         const char              *object_name;
175         u64                     offset;         /* object start byte */
176         u64                     length;         /* bytes from offset */
177
178         struct rbd_img_request  *img_request;
179         u64                     img_offset;     /* image relative offset */
180         struct list_head        links;          /* img_request->obj_requests */
181         u32                     which;          /* posn image request list */
182
183         enum obj_request_type   type;
184         union {
185                 struct bio      *bio_list;
186                 struct {
187                         struct page     **pages;
188                         u32             page_count;
189                 };
190         };
191
192         struct ceph_osd_request *osd_req;
193
194         u64                     xferred;        /* bytes transferred */
195         u64                     version;
196         int                     result;
197         atomic_t                done;
198
199         rbd_obj_callback_t      callback;
200         struct completion       completion;
201
202         struct kref             kref;
203 };
204
205 struct rbd_img_request {
206         struct request          *rq;
207         struct rbd_device       *rbd_dev;
208         u64                     offset; /* starting image byte offset */
209         u64                     length; /* byte count from offset */
210         bool                    write_request;  /* false for read */
211         union {
212                 struct ceph_snap_context *snapc;        /* for writes */
213                 u64             snap_id;                /* for reads */
214         };
215         spinlock_t              completion_lock;/* protects next_completion */
216         u32                     next_completion;
217         rbd_img_callback_t      callback;
218         u64                     xferred;/* aggregate bytes transferred */
219         int                     result; /* first nonzero obj_request result */
220
221         u32                     obj_request_count;
222         struct list_head        obj_requests;   /* rbd_obj_request structs */
223
224         struct kref             kref;
225 };
226
227 #define for_each_obj_request(ireq, oreq) \
228         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
229 #define for_each_obj_request_from(ireq, oreq) \
230         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
231 #define for_each_obj_request_safe(ireq, oreq, n) \
232         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
233
234 struct rbd_snap {
235         struct  device          dev;
236         const char              *name;
237         u64                     size;
238         struct list_head        node;
239         u64                     id;
240         u64                     features;
241 };
242
243 struct rbd_mapping {
244         u64                     size;
245         u64                     features;
246         bool                    read_only;
247 };
248
249 /*
250  * a single device
251  */
252 struct rbd_device {
253         int                     dev_id;         /* blkdev unique id */
254
255         int                     major;          /* blkdev assigned major */
256         struct gendisk          *disk;          /* blkdev's gendisk and rq */
257
258         u32                     image_format;   /* Either 1 or 2 */
259         struct rbd_client       *rbd_client;
260
261         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
262
263         spinlock_t              lock;           /* queue, flags, open_count */
264
265         struct rbd_image_header header;
266         unsigned long           flags;          /* possibly lock protected */
267         struct rbd_spec         *spec;
268
269         char                    *header_name;
270
271         struct ceph_file_layout layout;
272
273         struct ceph_osd_event   *watch_event;
274         struct rbd_obj_request  *watch_request;
275
276         struct rbd_spec         *parent_spec;
277         u64                     parent_overlap;
278
279         /* protects updating the header */
280         struct rw_semaphore     header_rwsem;
281
282         struct rbd_mapping      mapping;
283
284         struct list_head        node;
285
286         /* list of snapshots */
287         struct list_head        snaps;
288
289         /* sysfs related */
290         struct device           dev;
291         unsigned long           open_count;     /* protected by lock */
292 };
293
294 /*
295  * Flag bits for rbd_dev->flags.  If atomicity is required,
296  * rbd_dev->lock is used to protect access.
297  *
298  * Currently, only the "removing" flag (which is coupled with the
299  * "open_count" field) requires atomic access.
300  */
301 enum rbd_dev_flags {
302         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
303         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
304 };
305
306 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
307
308 static LIST_HEAD(rbd_dev_list);    /* devices */
309 static DEFINE_SPINLOCK(rbd_dev_list_lock);
310
311 static LIST_HEAD(rbd_client_list);              /* clients */
312 static DEFINE_SPINLOCK(rbd_client_list_lock);
313
314 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
315 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
316
317 static void rbd_dev_release(struct device *dev);
318 static void rbd_remove_snap_dev(struct rbd_snap *snap);
319
320 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
321                        size_t count);
322 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
323                           size_t count);
324
325 static struct bus_attribute rbd_bus_attrs[] = {
326         __ATTR(add, S_IWUSR, NULL, rbd_add),
327         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
328         __ATTR_NULL
329 };
330
331 static struct bus_type rbd_bus_type = {
332         .name           = "rbd",
333         .bus_attrs      = rbd_bus_attrs,
334 };
335
336 static void rbd_root_dev_release(struct device *dev)
337 {
338 }
339
340 static struct device rbd_root_dev = {
341         .init_name =    "rbd",
342         .release =      rbd_root_dev_release,
343 };
344
345 static __printf(2, 3)
346 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
347 {
348         struct va_format vaf;
349         va_list args;
350
351         va_start(args, fmt);
352         vaf.fmt = fmt;
353         vaf.va = &args;
354
355         if (!rbd_dev)
356                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
357         else if (rbd_dev->disk)
358                 printk(KERN_WARNING "%s: %s: %pV\n",
359                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
360         else if (rbd_dev->spec && rbd_dev->spec->image_name)
361                 printk(KERN_WARNING "%s: image %s: %pV\n",
362                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
363         else if (rbd_dev->spec && rbd_dev->spec->image_id)
364                 printk(KERN_WARNING "%s: id %s: %pV\n",
365                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
366         else    /* punt */
367                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
368                         RBD_DRV_NAME, rbd_dev, &vaf);
369         va_end(args);
370 }
371
372 #ifdef RBD_DEBUG
373 #define rbd_assert(expr)                                                \
374                 if (unlikely(!(expr))) {                                \
375                         printk(KERN_ERR "\nAssertion failure in %s() "  \
376                                                 "at line %d:\n\n"       \
377                                         "\trbd_assert(%s);\n\n",        \
378                                         __func__, __LINE__, #expr);     \
379                         BUG();                                          \
380                 }
381 #else /* !RBD_DEBUG */
382 #  define rbd_assert(expr)      ((void) 0)
383 #endif /* !RBD_DEBUG */
384
385 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
386 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
387
388 static int rbd_open(struct block_device *bdev, fmode_t mode)
389 {
390         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
391         bool removing = false;
392
393         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
394                 return -EROFS;
395
396         spin_lock_irq(&rbd_dev->lock);
397         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
398                 removing = true;
399         else
400                 rbd_dev->open_count++;
401         spin_unlock_irq(&rbd_dev->lock);
402         if (removing)
403                 return -ENOENT;
404
405         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
406         (void) get_device(&rbd_dev->dev);
407         set_device_ro(bdev, rbd_dev->mapping.read_only);
408         mutex_unlock(&ctl_mutex);
409
410         return 0;
411 }
412
413 static int rbd_release(struct gendisk *disk, fmode_t mode)
414 {
415         struct rbd_device *rbd_dev = disk->private_data;
416         unsigned long open_count_before;
417
418         spin_lock_irq(&rbd_dev->lock);
419         open_count_before = rbd_dev->open_count--;
420         spin_unlock_irq(&rbd_dev->lock);
421         rbd_assert(open_count_before > 0);
422
423         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
424         put_device(&rbd_dev->dev);
425         mutex_unlock(&ctl_mutex);
426
427         return 0;
428 }
429
430 static const struct block_device_operations rbd_bd_ops = {
431         .owner                  = THIS_MODULE,
432         .open                   = rbd_open,
433         .release                = rbd_release,
434 };
435
436 /*
437  * Initialize an rbd client instance.
438  * We own *ceph_opts.
439  */
440 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
441 {
442         struct rbd_client *rbdc;
443         int ret = -ENOMEM;
444
445         dout("%s:\n", __func__);
446         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
447         if (!rbdc)
448                 goto out_opt;
449
450         kref_init(&rbdc->kref);
451         INIT_LIST_HEAD(&rbdc->node);
452
453         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
454
455         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
456         if (IS_ERR(rbdc->client))
457                 goto out_mutex;
458         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
459
460         ret = ceph_open_session(rbdc->client);
461         if (ret < 0)
462                 goto out_err;
463
464         spin_lock(&rbd_client_list_lock);
465         list_add_tail(&rbdc->node, &rbd_client_list);
466         spin_unlock(&rbd_client_list_lock);
467
468         mutex_unlock(&ctl_mutex);
469         dout("%s: rbdc %p\n", __func__, rbdc);
470
471         return rbdc;
472
473 out_err:
474         ceph_destroy_client(rbdc->client);
475 out_mutex:
476         mutex_unlock(&ctl_mutex);
477         kfree(rbdc);
478 out_opt:
479         if (ceph_opts)
480                 ceph_destroy_options(ceph_opts);
481         dout("%s: error %d\n", __func__, ret);
482
483         return ERR_PTR(ret);
484 }
485
486 /*
487  * Find a ceph client with specific addr and configuration.  If
488  * found, bump its reference count.
489  */
490 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
491 {
492         struct rbd_client *client_node;
493         bool found = false;
494
495         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
496                 return NULL;
497
498         spin_lock(&rbd_client_list_lock);
499         list_for_each_entry(client_node, &rbd_client_list, node) {
500                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
501                         kref_get(&client_node->kref);
502                         found = true;
503                         break;
504                 }
505         }
506         spin_unlock(&rbd_client_list_lock);
507
508         return found ? client_node : NULL;
509 }
510
511 /*
512  * mount options
513  */
514 enum {
515         Opt_last_int,
516         /* int args above */
517         Opt_last_string,
518         /* string args above */
519         Opt_read_only,
520         Opt_read_write,
521         /* Boolean args above */
522         Opt_last_bool,
523 };
524
525 static match_table_t rbd_opts_tokens = {
526         /* int args above */
527         /* string args above */
528         {Opt_read_only, "read_only"},
529         {Opt_read_only, "ro"},          /* Alternate spelling */
530         {Opt_read_write, "read_write"},
531         {Opt_read_write, "rw"},         /* Alternate spelling */
532         /* Boolean args above */
533         {-1, NULL}
534 };
535
536 struct rbd_options {
537         bool    read_only;
538 };
539
540 #define RBD_READ_ONLY_DEFAULT   false
541
542 static int parse_rbd_opts_token(char *c, void *private)
543 {
544         struct rbd_options *rbd_opts = private;
545         substring_t argstr[MAX_OPT_ARGS];
546         int token, intval, ret;
547
548         token = match_token(c, rbd_opts_tokens, argstr);
549         if (token < 0)
550                 return -EINVAL;
551
552         if (token < Opt_last_int) {
553                 ret = match_int(&argstr[0], &intval);
554                 if (ret < 0) {
555                         pr_err("bad mount option arg (not int) "
556                                "at '%s'\n", c);
557                         return ret;
558                 }
559                 dout("got int token %d val %d\n", token, intval);
560         } else if (token > Opt_last_int && token < Opt_last_string) {
561                 dout("got string token %d val %s\n", token,
562                      argstr[0].from);
563         } else if (token > Opt_last_string && token < Opt_last_bool) {
564                 dout("got Boolean token %d\n", token);
565         } else {
566                 dout("got token %d\n", token);
567         }
568
569         switch (token) {
570         case Opt_read_only:
571                 rbd_opts->read_only = true;
572                 break;
573         case Opt_read_write:
574                 rbd_opts->read_only = false;
575                 break;
576         default:
577                 rbd_assert(false);
578                 break;
579         }
580         return 0;
581 }
582
583 /*
584  * Get a ceph client with specific addr and configuration, if one does
585  * not exist create it.
586  */
587 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
588 {
589         struct rbd_client *rbdc;
590
591         rbdc = rbd_client_find(ceph_opts);
592         if (rbdc)       /* using an existing client */
593                 ceph_destroy_options(ceph_opts);
594         else
595                 rbdc = rbd_client_create(ceph_opts);
596
597         return rbdc;
598 }
599
600 /*
601  * Destroy ceph client
602  *
603  * Caller must hold rbd_client_list_lock.
604  */
605 static void rbd_client_release(struct kref *kref)
606 {
607         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
608
609         dout("%s: rbdc %p\n", __func__, rbdc);
610         spin_lock(&rbd_client_list_lock);
611         list_del(&rbdc->node);
612         spin_unlock(&rbd_client_list_lock);
613
614         ceph_destroy_client(rbdc->client);
615         kfree(rbdc);
616 }
617
618 /*
619  * Drop reference to ceph client node. If it's not referenced anymore, release
620  * it.
621  */
622 static void rbd_put_client(struct rbd_client *rbdc)
623 {
624         if (rbdc)
625                 kref_put(&rbdc->kref, rbd_client_release);
626 }
627
628 static bool rbd_image_format_valid(u32 image_format)
629 {
630         return image_format == 1 || image_format == 2;
631 }
632
633 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
634 {
635         size_t size;
636         u32 snap_count;
637
638         /* The header has to start with the magic rbd header text */
639         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
640                 return false;
641
642         /* The bio layer requires at least sector-sized I/O */
643
644         if (ondisk->options.order < SECTOR_SHIFT)
645                 return false;
646
647         /* If we use u64 in a few spots we may be able to loosen this */
648
649         if (ondisk->options.order > 8 * sizeof (int) - 1)
650                 return false;
651
652         /*
653          * The size of a snapshot header has to fit in a size_t, and
654          * that limits the number of snapshots.
655          */
656         snap_count = le32_to_cpu(ondisk->snap_count);
657         size = SIZE_MAX - sizeof (struct ceph_snap_context);
658         if (snap_count > size / sizeof (__le64))
659                 return false;
660
661         /*
662          * Not only that, but the size of the entire the snapshot
663          * header must also be representable in a size_t.
664          */
665         size -= snap_count * sizeof (__le64);
666         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
667                 return false;
668
669         return true;
670 }
671
672 /*
673  * Create a new header structure, translate header format from the on-disk
674  * header.
675  */
676 static int rbd_header_from_disk(struct rbd_image_header *header,
677                                  struct rbd_image_header_ondisk *ondisk)
678 {
679         u32 snap_count;
680         size_t len;
681         size_t size;
682         u32 i;
683
684         memset(header, 0, sizeof (*header));
685
686         snap_count = le32_to_cpu(ondisk->snap_count);
687
688         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
689         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
690         if (!header->object_prefix)
691                 return -ENOMEM;
692         memcpy(header->object_prefix, ondisk->object_prefix, len);
693         header->object_prefix[len] = '\0';
694
695         if (snap_count) {
696                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
697
698                 /* Save a copy of the snapshot names */
699
700                 if (snap_names_len > (u64) SIZE_MAX)
701                         return -EIO;
702                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
703                 if (!header->snap_names)
704                         goto out_err;
705                 /*
706                  * Note that rbd_dev_v1_header_read() guarantees
707                  * the ondisk buffer we're working with has
708                  * snap_names_len bytes beyond the end of the
709                  * snapshot id array, this memcpy() is safe.
710                  */
711                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
712                         snap_names_len);
713
714                 /* Record each snapshot's size */
715
716                 size = snap_count * sizeof (*header->snap_sizes);
717                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
718                 if (!header->snap_sizes)
719                         goto out_err;
720                 for (i = 0; i < snap_count; i++)
721                         header->snap_sizes[i] =
722                                 le64_to_cpu(ondisk->snaps[i].image_size);
723         } else {
724                 WARN_ON(ondisk->snap_names_len);
725                 header->snap_names = NULL;
726                 header->snap_sizes = NULL;
727         }
728
729         header->features = 0;   /* No features support in v1 images */
730         header->obj_order = ondisk->options.order;
731         header->crypt_type = ondisk->options.crypt_type;
732         header->comp_type = ondisk->options.comp_type;
733
734         /* Allocate and fill in the snapshot context */
735
736         header->image_size = le64_to_cpu(ondisk->image_size);
737         size = sizeof (struct ceph_snap_context);
738         size += snap_count * sizeof (header->snapc->snaps[0]);
739         header->snapc = kzalloc(size, GFP_KERNEL);
740         if (!header->snapc)
741                 goto out_err;
742
743         atomic_set(&header->snapc->nref, 1);
744         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
745         header->snapc->num_snaps = snap_count;
746         for (i = 0; i < snap_count; i++)
747                 header->snapc->snaps[i] =
748                         le64_to_cpu(ondisk->snaps[i].id);
749
750         return 0;
751
752 out_err:
753         kfree(header->snap_sizes);
754         header->snap_sizes = NULL;
755         kfree(header->snap_names);
756         header->snap_names = NULL;
757         kfree(header->object_prefix);
758         header->object_prefix = NULL;
759
760         return -ENOMEM;
761 }
762
763 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
764 {
765         struct rbd_snap *snap;
766
767         if (snap_id == CEPH_NOSNAP)
768                 return RBD_SNAP_HEAD_NAME;
769
770         list_for_each_entry(snap, &rbd_dev->snaps, node)
771                 if (snap_id == snap->id)
772                         return snap->name;
773
774         return NULL;
775 }
776
777 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
778 {
779
780         struct rbd_snap *snap;
781
782         list_for_each_entry(snap, &rbd_dev->snaps, node) {
783                 if (!strcmp(snap_name, snap->name)) {
784                         rbd_dev->spec->snap_id = snap->id;
785                         rbd_dev->mapping.size = snap->size;
786                         rbd_dev->mapping.features = snap->features;
787
788                         return 0;
789                 }
790         }
791
792         return -ENOENT;
793 }
794
795 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
796 {
797         int ret;
798
799         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
800                     sizeof (RBD_SNAP_HEAD_NAME))) {
801                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
802                 rbd_dev->mapping.size = rbd_dev->header.image_size;
803                 rbd_dev->mapping.features = rbd_dev->header.features;
804                 ret = 0;
805         } else {
806                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
807                 if (ret < 0)
808                         goto done;
809                 rbd_dev->mapping.read_only = true;
810         }
811         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
812
813 done:
814         return ret;
815 }
816
817 static void rbd_header_free(struct rbd_image_header *header)
818 {
819         kfree(header->object_prefix);
820         header->object_prefix = NULL;
821         kfree(header->snap_sizes);
822         header->snap_sizes = NULL;
823         kfree(header->snap_names);
824         header->snap_names = NULL;
825         ceph_put_snap_context(header->snapc);
826         header->snapc = NULL;
827 }
828
829 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
830 {
831         char *name;
832         u64 segment;
833         int ret;
834
835         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
836         if (!name)
837                 return NULL;
838         segment = offset >> rbd_dev->header.obj_order;
839         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
840                         rbd_dev->header.object_prefix, segment);
841         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
842                 pr_err("error formatting segment name for #%llu (%d)\n",
843                         segment, ret);
844                 kfree(name);
845                 name = NULL;
846         }
847
848         return name;
849 }
850
851 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
852 {
853         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
854
855         return offset & (segment_size - 1);
856 }
857
858 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
859                                 u64 offset, u64 length)
860 {
861         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
862
863         offset &= segment_size - 1;
864
865         rbd_assert(length <= U64_MAX - offset);
866         if (offset + length > segment_size)
867                 length = segment_size - offset;
868
869         return length;
870 }
871
872 /*
873  * returns the size of an object in the image
874  */
875 static u64 rbd_obj_bytes(struct rbd_image_header *header)
876 {
877         return 1 << header->obj_order;
878 }
879
880 /*
881  * bio helpers
882  */
883
884 static void bio_chain_put(struct bio *chain)
885 {
886         struct bio *tmp;
887
888         while (chain) {
889                 tmp = chain;
890                 chain = chain->bi_next;
891                 bio_put(tmp);
892         }
893 }
894
895 /*
896  * zeros a bio chain, starting at specific offset
897  */
898 static void zero_bio_chain(struct bio *chain, int start_ofs)
899 {
900         struct bio_vec *bv;
901         unsigned long flags;
902         void *buf;
903         int i;
904         int pos = 0;
905
906         while (chain) {
907                 bio_for_each_segment(bv, chain, i) {
908                         if (pos + bv->bv_len > start_ofs) {
909                                 int remainder = max(start_ofs - pos, 0);
910                                 buf = bvec_kmap_irq(bv, &flags);
911                                 memset(buf + remainder, 0,
912                                        bv->bv_len - remainder);
913                                 bvec_kunmap_irq(buf, &flags);
914                         }
915                         pos += bv->bv_len;
916                 }
917
918                 chain = chain->bi_next;
919         }
920 }
921
922 /*
923  * Clone a portion of a bio, starting at the given byte offset
924  * and continuing for the number of bytes indicated.
925  */
926 static struct bio *bio_clone_range(struct bio *bio_src,
927                                         unsigned int offset,
928                                         unsigned int len,
929                                         gfp_t gfpmask)
930 {
931         struct bio_vec *bv;
932         unsigned int resid;
933         unsigned short idx;
934         unsigned int voff;
935         unsigned short end_idx;
936         unsigned short vcnt;
937         struct bio *bio;
938
939         /* Handle the easy case for the caller */
940
941         if (!offset && len == bio_src->bi_size)
942                 return bio_clone(bio_src, gfpmask);
943
944         if (WARN_ON_ONCE(!len))
945                 return NULL;
946         if (WARN_ON_ONCE(len > bio_src->bi_size))
947                 return NULL;
948         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
949                 return NULL;
950
951         /* Find first affected segment... */
952
953         resid = offset;
954         __bio_for_each_segment(bv, bio_src, idx, 0) {
955                 if (resid < bv->bv_len)
956                         break;
957                 resid -= bv->bv_len;
958         }
959         voff = resid;
960
961         /* ...and the last affected segment */
962
963         resid += len;
964         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
965                 if (resid <= bv->bv_len)
966                         break;
967                 resid -= bv->bv_len;
968         }
969         vcnt = end_idx - idx + 1;
970
971         /* Build the clone */
972
973         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
974         if (!bio)
975                 return NULL;    /* ENOMEM */
976
977         bio->bi_bdev = bio_src->bi_bdev;
978         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
979         bio->bi_rw = bio_src->bi_rw;
980         bio->bi_flags |= 1 << BIO_CLONED;
981
982         /*
983          * Copy over our part of the bio_vec, then update the first
984          * and last (or only) entries.
985          */
986         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
987                         vcnt * sizeof (struct bio_vec));
988         bio->bi_io_vec[0].bv_offset += voff;
989         if (vcnt > 1) {
990                 bio->bi_io_vec[0].bv_len -= voff;
991                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
992         } else {
993                 bio->bi_io_vec[0].bv_len = len;
994         }
995
996         bio->bi_vcnt = vcnt;
997         bio->bi_size = len;
998         bio->bi_idx = 0;
999
1000         return bio;
1001 }
1002
1003 /*
1004  * Clone a portion of a bio chain, starting at the given byte offset
1005  * into the first bio in the source chain and continuing for the
1006  * number of bytes indicated.  The result is another bio chain of
1007  * exactly the given length, or a null pointer on error.
1008  *
1009  * The bio_src and offset parameters are both in-out.  On entry they
1010  * refer to the first source bio and the offset into that bio where
1011  * the start of data to be cloned is located.
1012  *
1013  * On return, bio_src is updated to refer to the bio in the source
1014  * chain that contains first un-cloned byte, and *offset will
1015  * contain the offset of that byte within that bio.
1016  */
1017 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1018                                         unsigned int *offset,
1019                                         unsigned int len,
1020                                         gfp_t gfpmask)
1021 {
1022         struct bio *bi = *bio_src;
1023         unsigned int off = *offset;
1024         struct bio *chain = NULL;
1025         struct bio **end;
1026
1027         /* Build up a chain of clone bios up to the limit */
1028
1029         if (!bi || off >= bi->bi_size || !len)
1030                 return NULL;            /* Nothing to clone */
1031
1032         end = &chain;
1033         while (len) {
1034                 unsigned int bi_size;
1035                 struct bio *bio;
1036
1037                 if (!bi) {
1038                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1039                         goto out_err;   /* EINVAL; ran out of bio's */
1040                 }
1041                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1042                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1043                 if (!bio)
1044                         goto out_err;   /* ENOMEM */
1045
1046                 *end = bio;
1047                 end = &bio->bi_next;
1048
1049                 off += bi_size;
1050                 if (off == bi->bi_size) {
1051                         bi = bi->bi_next;
1052                         off = 0;
1053                 }
1054                 len -= bi_size;
1055         }
1056         *bio_src = bi;
1057         *offset = off;
1058
1059         return chain;
1060 out_err:
1061         bio_chain_put(chain);
1062
1063         return NULL;
1064 }
1065
1066 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1067 {
1068         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1069                 atomic_read(&obj_request->kref.refcount));
1070         kref_get(&obj_request->kref);
1071 }
1072
1073 static void rbd_obj_request_destroy(struct kref *kref);
1074 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1075 {
1076         rbd_assert(obj_request != NULL);
1077         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1078                 atomic_read(&obj_request->kref.refcount));
1079         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1080 }
1081
1082 static void rbd_img_request_get(struct rbd_img_request *img_request)
1083 {
1084         dout("%s: img %p (was %d)\n", __func__, img_request,
1085                 atomic_read(&img_request->kref.refcount));
1086         kref_get(&img_request->kref);
1087 }
1088
1089 static void rbd_img_request_destroy(struct kref *kref);
1090 static void rbd_img_request_put(struct rbd_img_request *img_request)
1091 {
1092         rbd_assert(img_request != NULL);
1093         dout("%s: img %p (was %d)\n", __func__, img_request,
1094                 atomic_read(&img_request->kref.refcount));
1095         kref_put(&img_request->kref, rbd_img_request_destroy);
1096 }
1097
1098 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1099                                         struct rbd_obj_request *obj_request)
1100 {
1101         rbd_assert(obj_request->img_request == NULL);
1102
1103         rbd_obj_request_get(obj_request);
1104         obj_request->img_request = img_request;
1105         obj_request->which = img_request->obj_request_count;
1106         rbd_assert(obj_request->which != BAD_WHICH);
1107         img_request->obj_request_count++;
1108         list_add_tail(&obj_request->links, &img_request->obj_requests);
1109         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1110                 obj_request->which);
1111 }
1112
1113 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1114                                         struct rbd_obj_request *obj_request)
1115 {
1116         rbd_assert(obj_request->which != BAD_WHICH);
1117
1118         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1119                 obj_request->which);
1120         list_del(&obj_request->links);
1121         rbd_assert(img_request->obj_request_count > 0);
1122         img_request->obj_request_count--;
1123         rbd_assert(obj_request->which == img_request->obj_request_count);
1124         obj_request->which = BAD_WHICH;
1125         rbd_assert(obj_request->img_request == img_request);
1126         obj_request->img_request = NULL;
1127         obj_request->callback = NULL;
1128         rbd_obj_request_put(obj_request);
1129 }
1130
1131 static bool obj_request_type_valid(enum obj_request_type type)
1132 {
1133         switch (type) {
1134         case OBJ_REQUEST_NODATA:
1135         case OBJ_REQUEST_BIO:
1136         case OBJ_REQUEST_PAGES:
1137                 return true;
1138         default:
1139                 return false;
1140         }
1141 }
1142
1143 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1144                                 struct rbd_obj_request *obj_request)
1145 {
1146         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1147
1148         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1149 }
1150
1151 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1152 {
1153
1154         dout("%s: img %p\n", __func__, img_request);
1155
1156         /*
1157          * If no error occurred, compute the aggregate transfer
1158          * count for the image request.  We could instead use
1159          * atomic64_cmpxchg() to update it as each object request
1160          * completes; not clear which way is better off hand.
1161          */
1162         if (!img_request->result) {
1163                 struct rbd_obj_request *obj_request;
1164                 u64 xferred = 0;
1165
1166                 for_each_obj_request(img_request, obj_request)
1167                         xferred += obj_request->xferred;
1168                 img_request->xferred = xferred;
1169         }
1170
1171         if (img_request->callback)
1172                 img_request->callback(img_request);
1173         else
1174                 rbd_img_request_put(img_request);
1175 }
1176
1177 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1178
1179 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1180 {
1181         dout("%s: obj %p\n", __func__, obj_request);
1182
1183         return wait_for_completion_interruptible(&obj_request->completion);
1184 }
1185
1186 static void obj_request_done_init(struct rbd_obj_request *obj_request)
1187 {
1188         atomic_set(&obj_request->done, 0);
1189         smp_wmb();
1190 }
1191
1192 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1193 {
1194         int done;
1195
1196         done = atomic_inc_return(&obj_request->done);
1197         if (done > 1) {
1198                 struct rbd_img_request *img_request = obj_request->img_request;
1199                 struct rbd_device *rbd_dev;
1200
1201                 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1202                 rbd_warn(rbd_dev, "obj_request %p was already done\n",
1203                         obj_request);
1204         }
1205 }
1206
1207 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1208 {
1209         smp_mb();
1210         return atomic_read(&obj_request->done) != 0;
1211 }
1212
1213 static void
1214 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1215 {
1216         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1217                 obj_request, obj_request->img_request, obj_request->result,
1218                 obj_request->xferred, obj_request->length);
1219         /*
1220          * ENOENT means a hole in the image.  We zero-fill the
1221          * entire length of the request.  A short read also implies
1222          * zero-fill to the end of the request.  Either way we
1223          * update the xferred count to indicate the whole request
1224          * was satisfied.
1225          */
1226         BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
1227         if (obj_request->result == -ENOENT) {
1228                 zero_bio_chain(obj_request->bio_list, 0);
1229                 obj_request->result = 0;
1230                 obj_request->xferred = obj_request->length;
1231         } else if (obj_request->xferred < obj_request->length &&
1232                         !obj_request->result) {
1233                 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1234                 obj_request->xferred = obj_request->length;
1235         }
1236         obj_request_done_set(obj_request);
1237 }
1238
1239 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1240 {
1241         dout("%s: obj %p cb %p\n", __func__, obj_request,
1242                 obj_request->callback);
1243         if (obj_request->callback)
1244                 obj_request->callback(obj_request);
1245         else
1246                 complete_all(&obj_request->completion);
1247 }
1248
1249 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1250 {
1251         dout("%s: obj %p\n", __func__, obj_request);
1252         obj_request_done_set(obj_request);
1253 }
1254
1255 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1256 {
1257         dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
1258                 obj_request->result, obj_request->xferred, obj_request->length);
1259         if (obj_request->img_request)
1260                 rbd_img_obj_request_read_callback(obj_request);
1261         else
1262                 obj_request_done_set(obj_request);
1263 }
1264
1265 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1266 {
1267         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1268                 obj_request->result, obj_request->length);
1269         /*
1270          * There is no such thing as a successful short write.
1271          * Our xferred value is the number of bytes transferred
1272          * back.  Set it to our originally-requested length.
1273          */
1274         obj_request->xferred = obj_request->length;
1275         obj_request_done_set(obj_request);
1276 }
1277
1278 /*
1279  * For a simple stat call there's nothing to do.  We'll do more if
1280  * this is part of a write sequence for a layered image.
1281  */
1282 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1283 {
1284         dout("%s: obj %p\n", __func__, obj_request);
1285         obj_request_done_set(obj_request);
1286 }
1287
1288 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1289                                 struct ceph_msg *msg)
1290 {
1291         struct rbd_obj_request *obj_request = osd_req->r_priv;
1292         u16 opcode;
1293
1294         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1295         rbd_assert(osd_req == obj_request->osd_req);
1296         rbd_assert(!!obj_request->img_request ^
1297                                 (obj_request->which == BAD_WHICH));
1298
1299         if (osd_req->r_result < 0)
1300                 obj_request->result = osd_req->r_result;
1301         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1302
1303         WARN_ON(osd_req->r_num_ops != 1);       /* For now */
1304
1305         /*
1306          * We support a 64-bit length, but ultimately it has to be
1307          * passed to blk_end_request(), which takes an unsigned int.
1308          */
1309         obj_request->xferred = osd_req->r_reply_op_len[0];
1310         rbd_assert(obj_request->xferred < (u64) UINT_MAX);
1311         opcode = osd_req->r_ops[0].op;
1312         switch (opcode) {
1313         case CEPH_OSD_OP_READ:
1314                 rbd_osd_read_callback(obj_request);
1315                 break;
1316         case CEPH_OSD_OP_WRITE:
1317                 rbd_osd_write_callback(obj_request);
1318                 break;
1319         case CEPH_OSD_OP_STAT:
1320                 rbd_osd_stat_callback(obj_request);
1321                 break;
1322         case CEPH_OSD_OP_CALL:
1323         case CEPH_OSD_OP_NOTIFY_ACK:
1324         case CEPH_OSD_OP_WATCH:
1325                 rbd_osd_trivial_callback(obj_request);
1326                 break;
1327         default:
1328                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1329                         obj_request->object_name, (unsigned short) opcode);
1330                 break;
1331         }
1332
1333         if (obj_request_done_test(obj_request))
1334                 rbd_obj_request_complete(obj_request);
1335 }
1336
1337 static void rbd_osd_req_format(struct rbd_obj_request *obj_request,
1338                                         bool write_request)
1339 {
1340         struct rbd_img_request *img_request = obj_request->img_request;
1341         struct ceph_osd_request *osd_req = obj_request->osd_req;
1342         struct ceph_snap_context *snapc = NULL;
1343         u64 snap_id = CEPH_NOSNAP;
1344         struct timespec *mtime = NULL;
1345         struct timespec now;
1346
1347         rbd_assert(osd_req != NULL);
1348
1349         if (write_request) {
1350                 now = CURRENT_TIME;
1351                 mtime = &now;
1352                 if (img_request)
1353                         snapc = img_request->snapc;
1354         } else if (img_request) {
1355                 snap_id = img_request->snap_id;
1356         }
1357         ceph_osdc_build_request(osd_req, obj_request->offset,
1358                         snapc, snap_id, mtime);
1359 }
1360
1361 static struct ceph_osd_request *rbd_osd_req_create(
1362                                         struct rbd_device *rbd_dev,
1363                                         bool write_request,
1364                                         struct rbd_obj_request *obj_request)
1365 {
1366         struct rbd_img_request *img_request = obj_request->img_request;
1367         struct ceph_snap_context *snapc = NULL;
1368         struct ceph_osd_client *osdc;
1369         struct ceph_osd_request *osd_req;
1370
1371         if (img_request) {
1372                 rbd_assert(img_request->write_request == write_request);
1373                 if (img_request->write_request)
1374                         snapc = img_request->snapc;
1375         }
1376
1377         /* Allocate and initialize the request, for the single op */
1378
1379         osdc = &rbd_dev->rbd_client->client->osdc;
1380         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1381         if (!osd_req)
1382                 return NULL;    /* ENOMEM */
1383
1384         if (write_request)
1385                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1386         else
1387                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1388
1389         osd_req->r_callback = rbd_osd_req_callback;
1390         osd_req->r_priv = obj_request;
1391
1392         osd_req->r_oid_len = strlen(obj_request->object_name);
1393         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1394         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1395
1396         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1397
1398         return osd_req;
1399 }
1400
1401 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1402 {
1403         ceph_osdc_put_request(osd_req);
1404 }
1405
1406 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1407
1408 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1409                                                 u64 offset, u64 length,
1410                                                 enum obj_request_type type)
1411 {
1412         struct rbd_obj_request *obj_request;
1413         size_t size;
1414         char *name;
1415
1416         rbd_assert(obj_request_type_valid(type));
1417
1418         size = strlen(object_name) + 1;
1419         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1420         if (!obj_request)
1421                 return NULL;
1422
1423         name = (char *)(obj_request + 1);
1424         obj_request->object_name = memcpy(name, object_name, size);
1425         obj_request->offset = offset;
1426         obj_request->length = length;
1427         obj_request->which = BAD_WHICH;
1428         obj_request->type = type;
1429         INIT_LIST_HEAD(&obj_request->links);
1430         obj_request_done_init(obj_request);
1431         init_completion(&obj_request->completion);
1432         kref_init(&obj_request->kref);
1433
1434         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1435                 offset, length, (int)type, obj_request);
1436
1437         return obj_request;
1438 }
1439
1440 static void rbd_obj_request_destroy(struct kref *kref)
1441 {
1442         struct rbd_obj_request *obj_request;
1443
1444         obj_request = container_of(kref, struct rbd_obj_request, kref);
1445
1446         dout("%s: obj %p\n", __func__, obj_request);
1447
1448         rbd_assert(obj_request->img_request == NULL);
1449         rbd_assert(obj_request->which == BAD_WHICH);
1450
1451         if (obj_request->osd_req)
1452                 rbd_osd_req_destroy(obj_request->osd_req);
1453
1454         rbd_assert(obj_request_type_valid(obj_request->type));
1455         switch (obj_request->type) {
1456         case OBJ_REQUEST_NODATA:
1457                 break;          /* Nothing to do */
1458         case OBJ_REQUEST_BIO:
1459                 if (obj_request->bio_list)
1460                         bio_chain_put(obj_request->bio_list);
1461                 break;
1462         case OBJ_REQUEST_PAGES:
1463                 if (obj_request->pages)
1464                         ceph_release_page_vector(obj_request->pages,
1465                                                 obj_request->page_count);
1466                 break;
1467         }
1468
1469         kfree(obj_request);
1470 }
1471
1472 /*
1473  * Caller is responsible for filling in the list of object requests
1474  * that comprises the image request, and the Linux request pointer
1475  * (if there is one).
1476  */
1477 static struct rbd_img_request *rbd_img_request_create(
1478                                         struct rbd_device *rbd_dev,
1479                                         u64 offset, u64 length,
1480                                         bool write_request)
1481 {
1482         struct rbd_img_request *img_request;
1483         struct ceph_snap_context *snapc = NULL;
1484
1485         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1486         if (!img_request)
1487                 return NULL;
1488
1489         if (write_request) {
1490                 down_read(&rbd_dev->header_rwsem);
1491                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1492                 up_read(&rbd_dev->header_rwsem);
1493                 if (WARN_ON(!snapc)) {
1494                         kfree(img_request);
1495                         return NULL;    /* Shouldn't happen */
1496                 }
1497         }
1498
1499         img_request->rq = NULL;
1500         img_request->rbd_dev = rbd_dev;
1501         img_request->offset = offset;
1502         img_request->length = length;
1503         img_request->write_request = write_request;
1504         if (write_request)
1505                 img_request->snapc = snapc;
1506         else
1507                 img_request->snap_id = rbd_dev->spec->snap_id;
1508         spin_lock_init(&img_request->completion_lock);
1509         img_request->next_completion = 0;
1510         img_request->callback = NULL;
1511         img_request->result = 0;
1512         img_request->obj_request_count = 0;
1513         INIT_LIST_HEAD(&img_request->obj_requests);
1514         kref_init(&img_request->kref);
1515
1516         rbd_img_request_get(img_request);       /* Avoid a warning */
1517         rbd_img_request_put(img_request);       /* TEMPORARY */
1518
1519         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1520                 write_request ? "write" : "read", offset, length,
1521                 img_request);
1522
1523         return img_request;
1524 }
1525
1526 static void rbd_img_request_destroy(struct kref *kref)
1527 {
1528         struct rbd_img_request *img_request;
1529         struct rbd_obj_request *obj_request;
1530         struct rbd_obj_request *next_obj_request;
1531
1532         img_request = container_of(kref, struct rbd_img_request, kref);
1533
1534         dout("%s: img %p\n", __func__, img_request);
1535
1536         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1537                 rbd_img_obj_request_del(img_request, obj_request);
1538         rbd_assert(img_request->obj_request_count == 0);
1539
1540         if (img_request->write_request)
1541                 ceph_put_snap_context(img_request->snapc);
1542
1543         kfree(img_request);
1544 }
1545
1546 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1547 {
1548         struct rbd_img_request *img_request;
1549         u32 which = obj_request->which;
1550         bool more = true;
1551
1552         img_request = obj_request->img_request;
1553
1554         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1555         rbd_assert(img_request != NULL);
1556         rbd_assert(img_request->rq != NULL);
1557         rbd_assert(img_request->obj_request_count > 0);
1558         rbd_assert(which != BAD_WHICH);
1559         rbd_assert(which < img_request->obj_request_count);
1560         rbd_assert(which >= img_request->next_completion);
1561
1562         spin_lock_irq(&img_request->completion_lock);
1563         if (which != img_request->next_completion)
1564                 goto out;
1565
1566         for_each_obj_request_from(img_request, obj_request) {
1567                 unsigned int xferred;
1568                 int result;
1569
1570                 rbd_assert(more);
1571                 rbd_assert(which < img_request->obj_request_count);
1572
1573                 if (!obj_request_done_test(obj_request))
1574                         break;
1575
1576                 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1577                 xferred = (unsigned int)obj_request->xferred;
1578                 result = obj_request->result;
1579                 if (result) {
1580                         struct rbd_device *rbd_dev = img_request->rbd_dev;
1581
1582                         rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1583                                 img_request->write_request ? "write" : "read",
1584                                 obj_request->length, obj_request->img_offset,
1585                                 obj_request->offset);
1586                         rbd_warn(rbd_dev, "  result %d xferred %x\n",
1587                                 result, xferred);
1588                         if (!img_request->result)
1589                                 img_request->result = result;
1590                 }
1591
1592                 more = blk_end_request(img_request->rq, result, xferred);
1593                 which++;
1594         }
1595
1596         rbd_assert(more ^ (which == img_request->obj_request_count));
1597         img_request->next_completion = which;
1598 out:
1599         spin_unlock_irq(&img_request->completion_lock);
1600
1601         if (!more)
1602                 rbd_img_request_complete(img_request);
1603 }
1604
1605 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1606                                         struct bio *bio_list)
1607 {
1608         struct rbd_device *rbd_dev = img_request->rbd_dev;
1609         struct rbd_obj_request *obj_request = NULL;
1610         struct rbd_obj_request *next_obj_request;
1611         bool write_request = img_request->write_request;
1612         unsigned int bio_offset;
1613         u64 img_offset;
1614         u64 resid;
1615         u16 opcode;
1616
1617         dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1618
1619         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1620         bio_offset = 0;
1621         img_offset = img_request->offset;
1622         rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1623         resid = img_request->length;
1624         rbd_assert(resid > 0);
1625         while (resid) {
1626                 struct ceph_osd_request *osd_req;
1627                 const char *object_name;
1628                 unsigned int clone_size;
1629                 u64 offset;
1630                 u64 length;
1631
1632                 object_name = rbd_segment_name(rbd_dev, img_offset);
1633                 if (!object_name)
1634                         goto out_unwind;
1635                 offset = rbd_segment_offset(rbd_dev, img_offset);
1636                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1637                 obj_request = rbd_obj_request_create(object_name,
1638                                                 offset, length,
1639                                                 OBJ_REQUEST_BIO);
1640                 kfree(object_name);     /* object request has its own copy */
1641                 if (!obj_request)
1642                         goto out_unwind;
1643
1644                 rbd_assert(length <= (u64) UINT_MAX);
1645                 clone_size = (unsigned int) length;
1646                 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1647                                                 &bio_offset, clone_size,
1648                                                 GFP_ATOMIC);
1649                 if (!obj_request->bio_list)
1650                         goto out_partial;
1651
1652                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1653                                                 obj_request);
1654                 if (!osd_req)
1655                         goto out_partial;
1656                 obj_request->osd_req = osd_req;
1657                 obj_request->callback = rbd_img_obj_callback;
1658
1659                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1660                                                 0, 0);
1661                 osd_req_op_extent_osd_data_bio(osd_req, 0, write_request,
1662                                 obj_request->bio_list, obj_request->length);
1663                 rbd_osd_req_format(obj_request, write_request);
1664
1665                 obj_request->img_offset = img_offset;
1666                 rbd_img_obj_request_add(img_request, obj_request);
1667
1668                 img_offset += length;
1669                 resid -= length;
1670         }
1671
1672         return 0;
1673
1674 out_partial:
1675         rbd_obj_request_put(obj_request);
1676 out_unwind:
1677         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1678                 rbd_obj_request_put(obj_request);
1679
1680         return -ENOMEM;
1681 }
1682
1683 static int rbd_img_request_submit(struct rbd_img_request *img_request)
1684 {
1685         struct rbd_device *rbd_dev = img_request->rbd_dev;
1686         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1687         struct rbd_obj_request *obj_request;
1688         struct rbd_obj_request *next_obj_request;
1689
1690         dout("%s: img %p\n", __func__, img_request);
1691         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
1692                 int ret;
1693
1694                 ret = rbd_obj_request_submit(osdc, obj_request);
1695                 if (ret)
1696                         return ret;
1697                 /*
1698                  * The image request has its own reference to each
1699                  * of its object requests, so we can safely drop the
1700                  * initial one here.
1701                  */
1702                 rbd_obj_request_put(obj_request);
1703         }
1704
1705         return 0;
1706 }
1707
1708 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1709                                    u64 ver, u64 notify_id)
1710 {
1711         struct rbd_obj_request *obj_request;
1712         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1713         int ret;
1714
1715         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1716                                                         OBJ_REQUEST_NODATA);
1717         if (!obj_request)
1718                 return -ENOMEM;
1719
1720         ret = -ENOMEM;
1721         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1722         if (!obj_request->osd_req)
1723                 goto out;
1724         obj_request->callback = rbd_obj_request_put;
1725
1726         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
1727                                         notify_id, ver, 0);
1728         rbd_osd_req_format(obj_request, false);
1729
1730         ret = rbd_obj_request_submit(osdc, obj_request);
1731 out:
1732         if (ret)
1733                 rbd_obj_request_put(obj_request);
1734
1735         return ret;
1736 }
1737
1738 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1739 {
1740         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1741         u64 hver;
1742         int rc;
1743
1744         if (!rbd_dev)
1745                 return;
1746
1747         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
1748                 rbd_dev->header_name, (unsigned long long) notify_id,
1749                 (unsigned int) opcode);
1750         rc = rbd_dev_refresh(rbd_dev, &hver);
1751         if (rc)
1752                 rbd_warn(rbd_dev, "got notification but failed to "
1753                            " update snaps: %d\n", rc);
1754
1755         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
1756 }
1757
1758 /*
1759  * Request sync osd watch/unwatch.  The value of "start" determines
1760  * whether a watch request is being initiated or torn down.
1761  */
1762 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1763 {
1764         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1765         struct rbd_obj_request *obj_request;
1766         int ret;
1767
1768         rbd_assert(start ^ !!rbd_dev->watch_event);
1769         rbd_assert(start ^ !!rbd_dev->watch_request);
1770
1771         if (start) {
1772                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
1773                                                 &rbd_dev->watch_event);
1774                 if (ret < 0)
1775                         return ret;
1776                 rbd_assert(rbd_dev->watch_event != NULL);
1777         }
1778
1779         ret = -ENOMEM;
1780         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1781                                                         OBJ_REQUEST_NODATA);
1782         if (!obj_request)
1783                 goto out_cancel;
1784
1785         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
1786         if (!obj_request->osd_req)
1787                 goto out_cancel;
1788
1789         if (start)
1790                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
1791         else
1792                 ceph_osdc_unregister_linger_request(osdc,
1793                                         rbd_dev->watch_request->osd_req);
1794
1795         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1796                                 rbd_dev->watch_event->cookie,
1797                                 rbd_dev->header.obj_version, start);
1798         rbd_osd_req_format(obj_request, true);
1799
1800         ret = rbd_obj_request_submit(osdc, obj_request);
1801         if (ret)
1802                 goto out_cancel;
1803         ret = rbd_obj_request_wait(obj_request);
1804         if (ret)
1805                 goto out_cancel;
1806         ret = obj_request->result;
1807         if (ret)
1808                 goto out_cancel;
1809
1810         /*
1811          * A watch request is set to linger, so the underlying osd
1812          * request won't go away until we unregister it.  We retain
1813          * a pointer to the object request during that time (in
1814          * rbd_dev->watch_request), so we'll keep a reference to
1815          * it.  We'll drop that reference (below) after we've
1816          * unregistered it.
1817          */
1818         if (start) {
1819                 rbd_dev->watch_request = obj_request;
1820
1821                 return 0;
1822         }
1823
1824         /* We have successfully torn down the watch request */
1825
1826         rbd_obj_request_put(rbd_dev->watch_request);
1827         rbd_dev->watch_request = NULL;
1828 out_cancel:
1829         /* Cancel the event if we're tearing down, or on error */
1830         ceph_osdc_cancel_event(rbd_dev->watch_event);
1831         rbd_dev->watch_event = NULL;
1832         if (obj_request)
1833                 rbd_obj_request_put(obj_request);
1834
1835         return ret;
1836 }
1837
1838 /*
1839  * Synchronous osd object method call
1840  */
1841 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1842                              const char *object_name,
1843                              const char *class_name,
1844                              const char *method_name,
1845                              const char *outbound,
1846                              size_t outbound_size,
1847                              char *inbound,
1848                              size_t inbound_size,
1849                              u64 *version)
1850 {
1851         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1852         struct rbd_obj_request *obj_request;
1853         struct page **pages;
1854         u32 page_count;
1855         int ret;
1856
1857         /*
1858          * Method calls are ultimately read operations.  The result
1859          * should placed into the inbound buffer provided.  They
1860          * also supply outbound data--parameters for the object
1861          * method.  Currently if this is present it will be a
1862          * snapshot id.
1863          */
1864         page_count = (u32) calc_pages_for(0, inbound_size);
1865         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1866         if (IS_ERR(pages))
1867                 return PTR_ERR(pages);
1868
1869         ret = -ENOMEM;
1870         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
1871                                                         OBJ_REQUEST_PAGES);
1872         if (!obj_request)
1873                 goto out;
1874
1875         obj_request->pages = pages;
1876         obj_request->page_count = page_count;
1877
1878         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1879         if (!obj_request->osd_req)
1880                 goto out;
1881
1882         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
1883                                         class_name, method_name);
1884         if (outbound_size) {
1885                 struct ceph_pagelist *pagelist;
1886
1887                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
1888                 if (!pagelist)
1889                         goto out;
1890
1891                 ceph_pagelist_init(pagelist);
1892                 ceph_pagelist_append(pagelist, outbound, outbound_size);
1893                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
1894                                                 pagelist);
1895         }
1896         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
1897                                         obj_request->pages, inbound_size,
1898                                         0, false, false);
1899         rbd_osd_req_format(obj_request, false);
1900
1901         ret = rbd_obj_request_submit(osdc, obj_request);
1902         if (ret)
1903                 goto out;
1904         ret = rbd_obj_request_wait(obj_request);
1905         if (ret)
1906                 goto out;
1907
1908         ret = obj_request->result;
1909         if (ret < 0)
1910                 goto out;
1911         ret = 0;
1912         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
1913         if (version)
1914                 *version = obj_request->version;
1915 out:
1916         if (obj_request)
1917                 rbd_obj_request_put(obj_request);
1918         else
1919                 ceph_release_page_vector(pages, page_count);
1920
1921         return ret;
1922 }
1923
1924 static void rbd_request_fn(struct request_queue *q)
1925                 __releases(q->queue_lock) __acquires(q->queue_lock)
1926 {
1927         struct rbd_device *rbd_dev = q->queuedata;
1928         bool read_only = rbd_dev->mapping.read_only;
1929         struct request *rq;
1930         int result;
1931
1932         while ((rq = blk_fetch_request(q))) {
1933                 bool write_request = rq_data_dir(rq) == WRITE;
1934                 struct rbd_img_request *img_request;
1935                 u64 offset;
1936                 u64 length;
1937
1938                 /* Ignore any non-FS requests that filter through. */
1939
1940                 if (rq->cmd_type != REQ_TYPE_FS) {
1941                         dout("%s: non-fs request type %d\n", __func__,
1942                                 (int) rq->cmd_type);
1943                         __blk_end_request_all(rq, 0);
1944                         continue;
1945                 }
1946
1947                 /* Ignore/skip any zero-length requests */
1948
1949                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
1950                 length = (u64) blk_rq_bytes(rq);
1951
1952                 if (!length) {
1953                         dout("%s: zero-length request\n", __func__);
1954                         __blk_end_request_all(rq, 0);
1955                         continue;
1956                 }
1957
1958                 spin_unlock_irq(q->queue_lock);
1959
1960                 /* Disallow writes to a read-only device */
1961
1962                 if (write_request) {
1963                         result = -EROFS;
1964                         if (read_only)
1965                                 goto end_request;
1966                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
1967                 }
1968
1969                 /*
1970                  * Quit early if the mapped snapshot no longer
1971                  * exists.  It's still possible the snapshot will
1972                  * have disappeared by the time our request arrives
1973                  * at the osd, but there's no sense in sending it if
1974                  * we already know.
1975                  */
1976                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
1977                         dout("request for non-existent snapshot");
1978                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1979                         result = -ENXIO;
1980                         goto end_request;
1981                 }
1982
1983                 result = -EINVAL;
1984                 if (WARN_ON(offset && length > U64_MAX - offset + 1))
1985                         goto end_request;       /* Shouldn't happen */
1986
1987                 result = -ENOMEM;
1988                 img_request = rbd_img_request_create(rbd_dev, offset, length,
1989                                                         write_request);
1990                 if (!img_request)
1991                         goto end_request;
1992
1993                 img_request->rq = rq;
1994
1995                 result = rbd_img_request_fill_bio(img_request, rq->bio);
1996                 if (!result)
1997                         result = rbd_img_request_submit(img_request);
1998                 if (result)
1999                         rbd_img_request_put(img_request);
2000 end_request:
2001                 spin_lock_irq(q->queue_lock);
2002                 if (result < 0) {
2003                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2004                                 write_request ? "write" : "read",
2005                                 length, offset, result);
2006
2007                         __blk_end_request_all(rq, result);
2008                 }
2009         }
2010 }
2011
2012 /*
2013  * a queue callback. Makes sure that we don't create a bio that spans across
2014  * multiple osd objects. One exception would be with a single page bios,
2015  * which we handle later at bio_chain_clone_range()
2016  */
2017 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2018                           struct bio_vec *bvec)
2019 {
2020         struct rbd_device *rbd_dev = q->queuedata;
2021         sector_t sector_offset;
2022         sector_t sectors_per_obj;
2023         sector_t obj_sector_offset;
2024         int ret;
2025
2026         /*
2027          * Find how far into its rbd object the partition-relative
2028          * bio start sector is to offset relative to the enclosing
2029          * device.
2030          */
2031         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2032         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2033         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2034
2035         /*
2036          * Compute the number of bytes from that offset to the end
2037          * of the object.  Account for what's already used by the bio.
2038          */
2039         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2040         if (ret > bmd->bi_size)
2041                 ret -= bmd->bi_size;
2042         else
2043                 ret = 0;
2044
2045         /*
2046          * Don't send back more than was asked for.  And if the bio
2047          * was empty, let the whole thing through because:  "Note
2048          * that a block device *must* allow a single page to be
2049          * added to an empty bio."
2050          */
2051         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2052         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2053                 ret = (int) bvec->bv_len;
2054
2055         return ret;
2056 }
2057
2058 static void rbd_free_disk(struct rbd_device *rbd_dev)
2059 {
2060         struct gendisk *disk = rbd_dev->disk;
2061
2062         if (!disk)
2063                 return;
2064
2065         if (disk->flags & GENHD_FL_UP)
2066                 del_gendisk(disk);
2067         if (disk->queue)
2068                 blk_cleanup_queue(disk->queue);
2069         put_disk(disk);
2070 }
2071
2072 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2073                                 const char *object_name,
2074                                 u64 offset, u64 length,
2075                                 char *buf, u64 *version)
2076
2077 {
2078         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2079         struct rbd_obj_request *obj_request;
2080         struct page **pages = NULL;
2081         u32 page_count;
2082         size_t size;
2083         int ret;
2084
2085         page_count = (u32) calc_pages_for(offset, length);
2086         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2087         if (IS_ERR(pages))
2088                 ret = PTR_ERR(pages);
2089
2090         ret = -ENOMEM;
2091         obj_request = rbd_obj_request_create(object_name, offset, length,
2092                                                         OBJ_REQUEST_PAGES);
2093         if (!obj_request)
2094                 goto out;
2095
2096         obj_request->pages = pages;
2097         obj_request->page_count = page_count;
2098
2099         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2100         if (!obj_request->osd_req)
2101                 goto out;
2102
2103         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2104                                         offset, length, 0, 0);
2105         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, false,
2106                                         obj_request->pages,
2107                                         obj_request->length,
2108                                         obj_request->offset & ~PAGE_MASK,
2109                                         false, false);
2110         rbd_osd_req_format(obj_request, false);
2111
2112         ret = rbd_obj_request_submit(osdc, obj_request);
2113         if (ret)
2114                 goto out;
2115         ret = rbd_obj_request_wait(obj_request);
2116         if (ret)
2117                 goto out;
2118
2119         ret = obj_request->result;
2120         if (ret < 0)
2121                 goto out;
2122
2123         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2124         size = (size_t) obj_request->xferred;
2125         ceph_copy_from_page_vector(pages, buf, 0, size);
2126         rbd_assert(size <= (size_t) INT_MAX);
2127         ret = (int) size;
2128         if (version)
2129                 *version = obj_request->version;
2130 out:
2131         if (obj_request)
2132                 rbd_obj_request_put(obj_request);
2133         else
2134                 ceph_release_page_vector(pages, page_count);
2135
2136         return ret;
2137 }
2138
2139 /*
2140  * Read the complete header for the given rbd device.
2141  *
2142  * Returns a pointer to a dynamically-allocated buffer containing
2143  * the complete and validated header.  Caller can pass the address
2144  * of a variable that will be filled in with the version of the
2145  * header object at the time it was read.
2146  *
2147  * Returns a pointer-coded errno if a failure occurs.
2148  */
2149 static struct rbd_image_header_ondisk *
2150 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2151 {
2152         struct rbd_image_header_ondisk *ondisk = NULL;
2153         u32 snap_count = 0;
2154         u64 names_size = 0;
2155         u32 want_count;
2156         int ret;
2157
2158         /*
2159          * The complete header will include an array of its 64-bit
2160          * snapshot ids, followed by the names of those snapshots as
2161          * a contiguous block of NUL-terminated strings.  Note that
2162          * the number of snapshots could change by the time we read
2163          * it in, in which case we re-read it.
2164          */
2165         do {
2166                 size_t size;
2167
2168                 kfree(ondisk);
2169
2170                 size = sizeof (*ondisk);
2171                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2172                 size += names_size;
2173                 ondisk = kmalloc(size, GFP_KERNEL);
2174                 if (!ondisk)
2175                         return ERR_PTR(-ENOMEM);
2176
2177                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2178                                        0, size,
2179                                        (char *) ondisk, version);
2180                 if (ret < 0)
2181                         goto out_err;
2182                 if (WARN_ON((size_t) ret < size)) {
2183                         ret = -ENXIO;
2184                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2185                                 size, ret);
2186                         goto out_err;
2187                 }
2188                 if (!rbd_dev_ondisk_valid(ondisk)) {
2189                         ret = -ENXIO;
2190                         rbd_warn(rbd_dev, "invalid header");
2191                         goto out_err;
2192                 }
2193
2194                 names_size = le64_to_cpu(ondisk->snap_names_len);
2195                 want_count = snap_count;
2196                 snap_count = le32_to_cpu(ondisk->snap_count);
2197         } while (snap_count != want_count);
2198
2199         return ondisk;
2200
2201 out_err:
2202         kfree(ondisk);
2203
2204         return ERR_PTR(ret);
2205 }
2206
2207 /*
2208  * reload the ondisk the header
2209  */
2210 static int rbd_read_header(struct rbd_device *rbd_dev,
2211                            struct rbd_image_header *header)
2212 {
2213         struct rbd_image_header_ondisk *ondisk;
2214         u64 ver = 0;
2215         int ret;
2216
2217         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2218         if (IS_ERR(ondisk))
2219                 return PTR_ERR(ondisk);
2220         ret = rbd_header_from_disk(header, ondisk);
2221         if (ret >= 0)
2222                 header->obj_version = ver;
2223         kfree(ondisk);
2224
2225         return ret;
2226 }
2227
2228 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2229 {
2230         struct rbd_snap *snap;
2231         struct rbd_snap *next;
2232
2233         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2234                 rbd_remove_snap_dev(snap);
2235 }
2236
2237 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2238 {
2239         sector_t size;
2240
2241         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2242                 return;
2243
2244         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2245         dout("setting size to %llu sectors", (unsigned long long) size);
2246         rbd_dev->mapping.size = (u64) size;
2247         set_capacity(rbd_dev->disk, size);
2248 }
2249
2250 /*
2251  * only read the first part of the ondisk header, without the snaps info
2252  */
2253 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2254 {
2255         int ret;
2256         struct rbd_image_header h;
2257
2258         ret = rbd_read_header(rbd_dev, &h);
2259         if (ret < 0)
2260                 return ret;
2261
2262         down_write(&rbd_dev->header_rwsem);
2263
2264         /* Update image size, and check for resize of mapped image */
2265         rbd_dev->header.image_size = h.image_size;
2266         rbd_update_mapping_size(rbd_dev);
2267
2268         /* rbd_dev->header.object_prefix shouldn't change */
2269         kfree(rbd_dev->header.snap_sizes);
2270         kfree(rbd_dev->header.snap_names);
2271         /* osd requests may still refer to snapc */
2272         ceph_put_snap_context(rbd_dev->header.snapc);
2273
2274         if (hver)
2275                 *hver = h.obj_version;
2276         rbd_dev->header.obj_version = h.obj_version;
2277         rbd_dev->header.image_size = h.image_size;
2278         rbd_dev->header.snapc = h.snapc;
2279         rbd_dev->header.snap_names = h.snap_names;
2280         rbd_dev->header.snap_sizes = h.snap_sizes;
2281         /* Free the extra copy of the object prefix */
2282         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2283         kfree(h.object_prefix);
2284
2285         ret = rbd_dev_snaps_update(rbd_dev);
2286         if (!ret)
2287                 ret = rbd_dev_snaps_register(rbd_dev);
2288
2289         up_write(&rbd_dev->header_rwsem);
2290
2291         return ret;
2292 }
2293
2294 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2295 {
2296         int ret;
2297
2298         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2299         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2300         if (rbd_dev->image_format == 1)
2301                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2302         else
2303                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2304         mutex_unlock(&ctl_mutex);
2305
2306         return ret;
2307 }
2308
2309 static int rbd_init_disk(struct rbd_device *rbd_dev)
2310 {
2311         struct gendisk *disk;
2312         struct request_queue *q;
2313         u64 segment_size;
2314
2315         /* create gendisk info */
2316         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2317         if (!disk)
2318                 return -ENOMEM;
2319
2320         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2321                  rbd_dev->dev_id);
2322         disk->major = rbd_dev->major;
2323         disk->first_minor = 0;
2324         disk->fops = &rbd_bd_ops;
2325         disk->private_data = rbd_dev;
2326
2327         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2328         if (!q)
2329                 goto out_disk;
2330
2331         /* We use the default size, but let's be explicit about it. */
2332         blk_queue_physical_block_size(q, SECTOR_SIZE);
2333
2334         /* set io sizes to object size */
2335         segment_size = rbd_obj_bytes(&rbd_dev->header);
2336         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2337         blk_queue_max_segment_size(q, segment_size);
2338         blk_queue_io_min(q, segment_size);
2339         blk_queue_io_opt(q, segment_size);
2340
2341         blk_queue_merge_bvec(q, rbd_merge_bvec);
2342         disk->queue = q;
2343
2344         q->queuedata = rbd_dev;
2345
2346         rbd_dev->disk = disk;
2347
2348         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2349
2350         return 0;
2351 out_disk:
2352         put_disk(disk);
2353
2354         return -ENOMEM;
2355 }
2356
2357 /*
2358   sysfs
2359 */
2360
2361 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2362 {
2363         return container_of(dev, struct rbd_device, dev);
2364 }
2365
2366 static ssize_t rbd_size_show(struct device *dev,
2367                              struct device_attribute *attr, char *buf)
2368 {
2369         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2370         sector_t size;
2371
2372         down_read(&rbd_dev->header_rwsem);
2373         size = get_capacity(rbd_dev->disk);
2374         up_read(&rbd_dev->header_rwsem);
2375
2376         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2377 }
2378
2379 /*
2380  * Note this shows the features for whatever's mapped, which is not
2381  * necessarily the base image.
2382  */
2383 static ssize_t rbd_features_show(struct device *dev,
2384                              struct device_attribute *attr, char *buf)
2385 {
2386         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2387
2388         return sprintf(buf, "0x%016llx\n",
2389                         (unsigned long long) rbd_dev->mapping.features);
2390 }
2391
2392 static ssize_t rbd_major_show(struct device *dev,
2393                               struct device_attribute *attr, char *buf)
2394 {
2395         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2396
2397         return sprintf(buf, "%d\n", rbd_dev->major);
2398 }
2399
2400 static ssize_t rbd_client_id_show(struct device *dev,
2401                                   struct device_attribute *attr, char *buf)
2402 {
2403         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2404
2405         return sprintf(buf, "client%lld\n",
2406                         ceph_client_id(rbd_dev->rbd_client->client));
2407 }
2408
2409 static ssize_t rbd_pool_show(struct device *dev,
2410                              struct device_attribute *attr, char *buf)
2411 {
2412         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2413
2414         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2415 }
2416
2417 static ssize_t rbd_pool_id_show(struct device *dev,
2418                              struct device_attribute *attr, char *buf)
2419 {
2420         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2421
2422         return sprintf(buf, "%llu\n",
2423                 (unsigned long long) rbd_dev->spec->pool_id);
2424 }
2425
2426 static ssize_t rbd_name_show(struct device *dev,
2427                              struct device_attribute *attr, char *buf)
2428 {
2429         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2430
2431         if (rbd_dev->spec->image_name)
2432                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2433
2434         return sprintf(buf, "(unknown)\n");
2435 }
2436
2437 static ssize_t rbd_image_id_show(struct device *dev,
2438                              struct device_attribute *attr, char *buf)
2439 {
2440         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2441
2442         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2443 }
2444
2445 /*
2446  * Shows the name of the currently-mapped snapshot (or
2447  * RBD_SNAP_HEAD_NAME for the base image).
2448  */
2449 static ssize_t rbd_snap_show(struct device *dev,
2450                              struct device_attribute *attr,
2451                              char *buf)
2452 {
2453         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2454
2455         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2456 }
2457
2458 /*
2459  * For an rbd v2 image, shows the pool id, image id, and snapshot id
2460  * for the parent image.  If there is no parent, simply shows
2461  * "(no parent image)".
2462  */
2463 static ssize_t rbd_parent_show(struct device *dev,
2464                              struct device_attribute *attr,
2465                              char *buf)
2466 {
2467         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2468         struct rbd_spec *spec = rbd_dev->parent_spec;
2469         int count;
2470         char *bufp = buf;
2471
2472         if (!spec)
2473                 return sprintf(buf, "(no parent image)\n");
2474
2475         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2476                         (unsigned long long) spec->pool_id, spec->pool_name);
2477         if (count < 0)
2478                 return count;
2479         bufp += count;
2480
2481         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2482                         spec->image_name ? spec->image_name : "(unknown)");
2483         if (count < 0)
2484                 return count;
2485         bufp += count;
2486
2487         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2488                         (unsigned long long) spec->snap_id, spec->snap_name);
2489         if (count < 0)
2490                 return count;
2491         bufp += count;
2492
2493         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2494         if (count < 0)
2495                 return count;
2496         bufp += count;
2497
2498         return (ssize_t) (bufp - buf);
2499 }
2500
2501 static ssize_t rbd_image_refresh(struct device *dev,
2502                                  struct device_attribute *attr,
2503                                  const char *buf,
2504                                  size_t size)
2505 {
2506         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2507         int ret;
2508
2509         ret = rbd_dev_refresh(rbd_dev, NULL);
2510
2511         return ret < 0 ? ret : size;
2512 }
2513
2514 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2515 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2516 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2517 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2518 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2519 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2520 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2521 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2522 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2523 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2524 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2525
2526 static struct attribute *rbd_attrs[] = {
2527         &dev_attr_size.attr,
2528         &dev_attr_features.attr,
2529         &dev_attr_major.attr,
2530         &dev_attr_client_id.attr,
2531         &dev_attr_pool.attr,
2532         &dev_attr_pool_id.attr,
2533         &dev_attr_name.attr,
2534         &dev_attr_image_id.attr,
2535         &dev_attr_current_snap.attr,
2536         &dev_attr_parent.attr,
2537         &dev_attr_refresh.attr,
2538         NULL
2539 };
2540
2541 static struct attribute_group rbd_attr_group = {
2542         .attrs = rbd_attrs,
2543 };
2544
2545 static const struct attribute_group *rbd_attr_groups[] = {
2546         &rbd_attr_group,
2547         NULL
2548 };
2549
2550 static void rbd_sysfs_dev_release(struct device *dev)
2551 {
2552 }
2553
2554 static struct device_type rbd_device_type = {
2555         .name           = "rbd",
2556         .groups         = rbd_attr_groups,
2557         .release        = rbd_sysfs_dev_release,
2558 };
2559
2560
2561 /*
2562   sysfs - snapshots
2563 */
2564
2565 static ssize_t rbd_snap_size_show(struct device *dev,
2566                                   struct device_attribute *attr,
2567                                   char *buf)
2568 {
2569         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2570
2571         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2572 }
2573
2574 static ssize_t rbd_snap_id_show(struct device *dev,
2575                                 struct device_attribute *attr,
2576                                 char *buf)
2577 {
2578         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2579
2580         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2581 }
2582
2583 static ssize_t rbd_snap_features_show(struct device *dev,
2584                                 struct device_attribute *attr,
2585                                 char *buf)
2586 {
2587         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2588
2589         return sprintf(buf, "0x%016llx\n",
2590                         (unsigned long long) snap->features);
2591 }
2592
2593 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2594 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2595 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2596
2597 static struct attribute *rbd_snap_attrs[] = {
2598         &dev_attr_snap_size.attr,
2599         &dev_attr_snap_id.attr,
2600         &dev_attr_snap_features.attr,
2601         NULL,
2602 };
2603
2604 static struct attribute_group rbd_snap_attr_group = {
2605         .attrs = rbd_snap_attrs,
2606 };
2607
2608 static void rbd_snap_dev_release(struct device *dev)
2609 {
2610         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2611         kfree(snap->name);
2612         kfree(snap);
2613 }
2614
2615 static const struct attribute_group *rbd_snap_attr_groups[] = {
2616         &rbd_snap_attr_group,
2617         NULL
2618 };
2619
2620 static struct device_type rbd_snap_device_type = {
2621         .groups         = rbd_snap_attr_groups,
2622         .release        = rbd_snap_dev_release,
2623 };
2624
2625 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2626 {
2627         kref_get(&spec->kref);
2628
2629         return spec;
2630 }
2631
2632 static void rbd_spec_free(struct kref *kref);
2633 static void rbd_spec_put(struct rbd_spec *spec)
2634 {
2635         if (spec)
2636                 kref_put(&spec->kref, rbd_spec_free);
2637 }
2638
2639 static struct rbd_spec *rbd_spec_alloc(void)
2640 {
2641         struct rbd_spec *spec;
2642
2643         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2644         if (!spec)
2645                 return NULL;
2646         kref_init(&spec->kref);
2647
2648         rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
2649
2650         return spec;
2651 }
2652
2653 static void rbd_spec_free(struct kref *kref)
2654 {
2655         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2656
2657         kfree(spec->pool_name);
2658         kfree(spec->image_id);
2659         kfree(spec->image_name);
2660         kfree(spec->snap_name);
2661         kfree(spec);
2662 }
2663
2664 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2665                                 struct rbd_spec *spec)
2666 {
2667         struct rbd_device *rbd_dev;
2668
2669         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2670         if (!rbd_dev)
2671                 return NULL;
2672
2673         spin_lock_init(&rbd_dev->lock);
2674         rbd_dev->flags = 0;
2675         INIT_LIST_HEAD(&rbd_dev->node);
2676         INIT_LIST_HEAD(&rbd_dev->snaps);
2677         init_rwsem(&rbd_dev->header_rwsem);
2678
2679         rbd_dev->spec = spec;
2680         rbd_dev->rbd_client = rbdc;
2681
2682         /* Initialize the layout used for all rbd requests */
2683
2684         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2685         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2686         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2687         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2688
2689         return rbd_dev;
2690 }
2691
2692 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2693 {
2694         rbd_spec_put(rbd_dev->parent_spec);
2695         kfree(rbd_dev->header_name);
2696         rbd_put_client(rbd_dev->rbd_client);
2697         rbd_spec_put(rbd_dev->spec);
2698         kfree(rbd_dev);
2699 }
2700
2701 static bool rbd_snap_registered(struct rbd_snap *snap)
2702 {
2703         bool ret = snap->dev.type == &rbd_snap_device_type;
2704         bool reg = device_is_registered(&snap->dev);
2705
2706         rbd_assert(!ret ^ reg);
2707
2708         return ret;
2709 }
2710
2711 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2712 {
2713         list_del(&snap->node);
2714         if (device_is_registered(&snap->dev))
2715                 device_unregister(&snap->dev);
2716 }
2717
2718 static int rbd_register_snap_dev(struct rbd_snap *snap,
2719                                   struct device *parent)
2720 {
2721         struct device *dev = &snap->dev;
2722         int ret;
2723
2724         dev->type = &rbd_snap_device_type;
2725         dev->parent = parent;
2726         dev->release = rbd_snap_dev_release;
2727         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2728         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2729
2730         ret = device_register(dev);
2731
2732         return ret;
2733 }
2734
2735 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2736                                                 const char *snap_name,
2737                                                 u64 snap_id, u64 snap_size,
2738                                                 u64 snap_features)
2739 {
2740         struct rbd_snap *snap;
2741         int ret;
2742
2743         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2744         if (!snap)
2745                 return ERR_PTR(-ENOMEM);
2746
2747         ret = -ENOMEM;
2748         snap->name = kstrdup(snap_name, GFP_KERNEL);
2749         if (!snap->name)
2750                 goto err;
2751
2752         snap->id = snap_id;
2753         snap->size = snap_size;
2754         snap->features = snap_features;
2755
2756         return snap;
2757
2758 err:
2759         kfree(snap->name);
2760         kfree(snap);
2761
2762         return ERR_PTR(ret);
2763 }
2764
2765 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2766                 u64 *snap_size, u64 *snap_features)
2767 {
2768         char *snap_name;
2769
2770         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2771
2772         *snap_size = rbd_dev->header.snap_sizes[which];
2773         *snap_features = 0;     /* No features for v1 */
2774
2775         /* Skip over names until we find the one we are looking for */
2776
2777         snap_name = rbd_dev->header.snap_names;
2778         while (which--)
2779                 snap_name += strlen(snap_name) + 1;
2780
2781         return snap_name;
2782 }
2783
2784 /*
2785  * Get the size and object order for an image snapshot, or if
2786  * snap_id is CEPH_NOSNAP, gets this information for the base
2787  * image.
2788  */
2789 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2790                                 u8 *order, u64 *snap_size)
2791 {
2792         __le64 snapid = cpu_to_le64(snap_id);
2793         int ret;
2794         struct {
2795                 u8 order;
2796                 __le64 size;
2797         } __attribute__ ((packed)) size_buf = { 0 };
2798
2799         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2800                                 "rbd", "get_size",
2801                                 (char *) &snapid, sizeof (snapid),
2802                                 (char *) &size_buf, sizeof (size_buf), NULL);
2803         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2804         if (ret < 0)
2805                 return ret;
2806
2807         *order = size_buf.order;
2808         *snap_size = le64_to_cpu(size_buf.size);
2809
2810         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2811                 (unsigned long long) snap_id, (unsigned int) *order,
2812                 (unsigned long long) *snap_size);
2813
2814         return 0;
2815 }
2816
2817 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2818 {
2819         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2820                                         &rbd_dev->header.obj_order,
2821                                         &rbd_dev->header.image_size);
2822 }
2823
2824 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2825 {
2826         void *reply_buf;
2827         int ret;
2828         void *p;
2829
2830         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2831         if (!reply_buf)
2832                 return -ENOMEM;
2833
2834         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2835                                 "rbd", "get_object_prefix",
2836                                 NULL, 0,
2837                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2838         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2839         if (ret < 0)
2840                 goto out;
2841
2842         p = reply_buf;
2843         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2844                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2845                                                 NULL, GFP_NOIO);
2846
2847         if (IS_ERR(rbd_dev->header.object_prefix)) {
2848                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2849                 rbd_dev->header.object_prefix = NULL;
2850         } else {
2851                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2852         }
2853
2854 out:
2855         kfree(reply_buf);
2856
2857         return ret;
2858 }
2859
2860 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2861                 u64 *snap_features)
2862 {
2863         __le64 snapid = cpu_to_le64(snap_id);
2864         struct {
2865                 __le64 features;
2866                 __le64 incompat;
2867         } features_buf = { 0 };
2868         u64 incompat;
2869         int ret;
2870
2871         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2872                                 "rbd", "get_features",
2873                                 (char *) &snapid, sizeof (snapid),
2874                                 (char *) &features_buf, sizeof (features_buf),
2875                                 NULL);
2876         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2877         if (ret < 0)
2878                 return ret;
2879
2880         incompat = le64_to_cpu(features_buf.incompat);
2881         if (incompat & ~RBD_FEATURES_SUPPORTED)
2882                 return -ENXIO;
2883
2884         *snap_features = le64_to_cpu(features_buf.features);
2885
2886         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2887                 (unsigned long long) snap_id,
2888                 (unsigned long long) *snap_features,
2889                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2890
2891         return 0;
2892 }
2893
2894 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2895 {
2896         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2897                                                 &rbd_dev->header.features);
2898 }
2899
2900 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2901 {
2902         struct rbd_spec *parent_spec;
2903         size_t size;
2904         void *reply_buf = NULL;
2905         __le64 snapid;
2906         void *p;
2907         void *end;
2908         char *image_id;
2909         u64 overlap;
2910         int ret;
2911
2912         parent_spec = rbd_spec_alloc();
2913         if (!parent_spec)
2914                 return -ENOMEM;
2915
2916         size = sizeof (__le64) +                                /* pool_id */
2917                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
2918                 sizeof (__le64) +                               /* snap_id */
2919                 sizeof (__le64);                                /* overlap */
2920         reply_buf = kmalloc(size, GFP_KERNEL);
2921         if (!reply_buf) {
2922                 ret = -ENOMEM;
2923                 goto out_err;
2924         }
2925
2926         snapid = cpu_to_le64(CEPH_NOSNAP);
2927         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2928                                 "rbd", "get_parent",
2929                                 (char *) &snapid, sizeof (snapid),
2930                                 (char *) reply_buf, size, NULL);
2931         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2932         if (ret < 0)
2933                 goto out_err;
2934
2935         ret = -ERANGE;
2936         p = reply_buf;
2937         end = (char *) reply_buf + size;
2938         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2939         if (parent_spec->pool_id == CEPH_NOPOOL)
2940                 goto out;       /* No parent?  No problem. */
2941
2942         /* The ceph file layout needs to fit pool id in 32 bits */
2943
2944         ret = -EIO;
2945         if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2946                 goto out;
2947
2948         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2949         if (IS_ERR(image_id)) {
2950                 ret = PTR_ERR(image_id);
2951                 goto out_err;
2952         }
2953         parent_spec->image_id = image_id;
2954         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2955         ceph_decode_64_safe(&p, end, overlap, out_err);
2956
2957         rbd_dev->parent_overlap = overlap;
2958         rbd_dev->parent_spec = parent_spec;
2959         parent_spec = NULL;     /* rbd_dev now owns this */
2960 out:
2961         ret = 0;
2962 out_err:
2963         kfree(reply_buf);
2964         rbd_spec_put(parent_spec);
2965
2966         return ret;
2967 }
2968
2969 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2970 {
2971         size_t image_id_size;
2972         char *image_id;
2973         void *p;
2974         void *end;
2975         size_t size;
2976         void *reply_buf = NULL;
2977         size_t len = 0;
2978         char *image_name = NULL;
2979         int ret;
2980
2981         rbd_assert(!rbd_dev->spec->image_name);
2982
2983         len = strlen(rbd_dev->spec->image_id);
2984         image_id_size = sizeof (__le32) + len;
2985         image_id = kmalloc(image_id_size, GFP_KERNEL);
2986         if (!image_id)
2987                 return NULL;
2988
2989         p = image_id;
2990         end = (char *) image_id + image_id_size;
2991         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
2992
2993         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2994         reply_buf = kmalloc(size, GFP_KERNEL);
2995         if (!reply_buf)
2996                 goto out;
2997
2998         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
2999                                 "rbd", "dir_get_name",
3000                                 image_id, image_id_size,
3001                                 (char *) reply_buf, size, NULL);
3002         if (ret < 0)
3003                 goto out;
3004         p = reply_buf;
3005         end = (char *) reply_buf + size;
3006         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3007         if (IS_ERR(image_name))
3008                 image_name = NULL;
3009         else
3010                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3011 out:
3012         kfree(reply_buf);
3013         kfree(image_id);
3014
3015         return image_name;
3016 }
3017
3018 /*
3019  * When a parent image gets probed, we only have the pool, image,
3020  * and snapshot ids but not the names of any of them.  This call
3021  * is made later to fill in those names.  It has to be done after
3022  * rbd_dev_snaps_update() has completed because some of the
3023  * information (in particular, snapshot name) is not available
3024  * until then.
3025  */
3026 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3027 {
3028         struct ceph_osd_client *osdc;
3029         const char *name;
3030         void *reply_buf = NULL;
3031         int ret;
3032
3033         if (rbd_dev->spec->pool_name)
3034                 return 0;       /* Already have the names */
3035
3036         /* Look up the pool name */
3037
3038         osdc = &rbd_dev->rbd_client->client->osdc;
3039         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3040         if (!name) {
3041                 rbd_warn(rbd_dev, "there is no pool with id %llu",
3042                         rbd_dev->spec->pool_id);        /* Really a BUG() */
3043                 return -EIO;
3044         }
3045
3046         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3047         if (!rbd_dev->spec->pool_name)
3048                 return -ENOMEM;
3049
3050         /* Fetch the image name; tolerate failure here */
3051
3052         name = rbd_dev_image_name(rbd_dev);
3053         if (name)
3054                 rbd_dev->spec->image_name = (char *) name;
3055         else
3056                 rbd_warn(rbd_dev, "unable to get image name");
3057
3058         /* Look up the snapshot name. */
3059
3060         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3061         if (!name) {
3062                 rbd_warn(rbd_dev, "no snapshot with id %llu",
3063                         rbd_dev->spec->snap_id);        /* Really a BUG() */
3064                 ret = -EIO;
3065                 goto out_err;
3066         }
3067         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3068         if(!rbd_dev->spec->snap_name)
3069                 goto out_err;
3070
3071         return 0;
3072 out_err:
3073         kfree(reply_buf);
3074         kfree(rbd_dev->spec->pool_name);
3075         rbd_dev->spec->pool_name = NULL;
3076
3077         return ret;
3078 }
3079
3080 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3081 {
3082         size_t size;
3083         int ret;
3084         void *reply_buf;
3085         void *p;
3086         void *end;
3087         u64 seq;
3088         u32 snap_count;
3089         struct ceph_snap_context *snapc;
3090         u32 i;
3091
3092         /*
3093          * We'll need room for the seq value (maximum snapshot id),
3094          * snapshot count, and array of that many snapshot ids.
3095          * For now we have a fixed upper limit on the number we're
3096          * prepared to receive.
3097          */
3098         size = sizeof (__le64) + sizeof (__le32) +
3099                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3100         reply_buf = kzalloc(size, GFP_KERNEL);
3101         if (!reply_buf)
3102                 return -ENOMEM;
3103
3104         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3105                                 "rbd", "get_snapcontext",
3106                                 NULL, 0,
3107                                 reply_buf, size, ver);
3108         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3109         if (ret < 0)
3110                 goto out;
3111
3112         ret = -ERANGE;
3113         p = reply_buf;
3114         end = (char *) reply_buf + size;
3115         ceph_decode_64_safe(&p, end, seq, out);
3116         ceph_decode_32_safe(&p, end, snap_count, out);
3117
3118         /*
3119          * Make sure the reported number of snapshot ids wouldn't go
3120          * beyond the end of our buffer.  But before checking that,
3121          * make sure the computed size of the snapshot context we
3122          * allocate is representable in a size_t.
3123          */
3124         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3125                                  / sizeof (u64)) {
3126                 ret = -EINVAL;
3127                 goto out;
3128         }
3129         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3130                 goto out;
3131
3132         size = sizeof (struct ceph_snap_context) +
3133                                 snap_count * sizeof (snapc->snaps[0]);
3134         snapc = kmalloc(size, GFP_KERNEL);
3135         if (!snapc) {
3136                 ret = -ENOMEM;
3137                 goto out;
3138         }
3139
3140         atomic_set(&snapc->nref, 1);
3141         snapc->seq = seq;
3142         snapc->num_snaps = snap_count;
3143         for (i = 0; i < snap_count; i++)
3144                 snapc->snaps[i] = ceph_decode_64(&p);
3145
3146         rbd_dev->header.snapc = snapc;
3147
3148         dout("  snap context seq = %llu, snap_count = %u\n",
3149                 (unsigned long long) seq, (unsigned int) snap_count);
3150
3151 out:
3152         kfree(reply_buf);
3153
3154         return 0;
3155 }
3156
3157 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3158 {
3159         size_t size;
3160         void *reply_buf;
3161         __le64 snap_id;
3162         int ret;
3163         void *p;
3164         void *end;
3165         char *snap_name;
3166
3167         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3168         reply_buf = kmalloc(size, GFP_KERNEL);
3169         if (!reply_buf)
3170                 return ERR_PTR(-ENOMEM);
3171
3172         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3173         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3174                                 "rbd", "get_snapshot_name",
3175                                 (char *) &snap_id, sizeof (snap_id),
3176                                 reply_buf, size, NULL);
3177         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3178         if (ret < 0)
3179                 goto out;
3180
3181         p = reply_buf;
3182         end = (char *) reply_buf + size;
3183         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3184         if (IS_ERR(snap_name)) {
3185                 ret = PTR_ERR(snap_name);
3186                 goto out;
3187         } else {
3188                 dout("  snap_id 0x%016llx snap_name = %s\n",
3189                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
3190         }
3191         kfree(reply_buf);
3192
3193         return snap_name;
3194 out:
3195         kfree(reply_buf);
3196
3197         return ERR_PTR(ret);
3198 }
3199
3200 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3201                 u64 *snap_size, u64 *snap_features)
3202 {
3203         u64 snap_id;
3204         u8 order;
3205         int ret;
3206
3207         snap_id = rbd_dev->header.snapc->snaps[which];
3208         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3209         if (ret)
3210                 return ERR_PTR(ret);
3211         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3212         if (ret)
3213                 return ERR_PTR(ret);
3214
3215         return rbd_dev_v2_snap_name(rbd_dev, which);
3216 }
3217
3218 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3219                 u64 *snap_size, u64 *snap_features)
3220 {
3221         if (rbd_dev->image_format == 1)
3222                 return rbd_dev_v1_snap_info(rbd_dev, which,
3223                                         snap_size, snap_features);
3224         if (rbd_dev->image_format == 2)
3225                 return rbd_dev_v2_snap_info(rbd_dev, which,
3226                                         snap_size, snap_features);
3227         return ERR_PTR(-EINVAL);
3228 }
3229
3230 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3231 {
3232         int ret;
3233         __u8 obj_order;
3234
3235         down_write(&rbd_dev->header_rwsem);
3236
3237         /* Grab old order first, to see if it changes */
3238
3239         obj_order = rbd_dev->header.obj_order,
3240         ret = rbd_dev_v2_image_size(rbd_dev);
3241         if (ret)
3242                 goto out;
3243         if (rbd_dev->header.obj_order != obj_order) {
3244                 ret = -EIO;
3245                 goto out;
3246         }
3247         rbd_update_mapping_size(rbd_dev);
3248
3249         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3250         dout("rbd_dev_v2_snap_context returned %d\n", ret);
3251         if (ret)
3252                 goto out;
3253         ret = rbd_dev_snaps_update(rbd_dev);
3254         dout("rbd_dev_snaps_update returned %d\n", ret);
3255         if (ret)
3256                 goto out;
3257         ret = rbd_dev_snaps_register(rbd_dev);
3258         dout("rbd_dev_snaps_register returned %d\n", ret);
3259 out:
3260         up_write(&rbd_dev->header_rwsem);
3261
3262         return ret;
3263 }
3264
3265 /*
3266  * Scan the rbd device's current snapshot list and compare it to the
3267  * newly-received snapshot context.  Remove any existing snapshots
3268  * not present in the new snapshot context.  Add a new snapshot for
3269  * any snaphots in the snapshot context not in the current list.
3270  * And verify there are no changes to snapshots we already know
3271  * about.
3272  *
3273  * Assumes the snapshots in the snapshot context are sorted by
3274  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
3275  * are also maintained in that order.)
3276  */
3277 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3278 {
3279         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3280         const u32 snap_count = snapc->num_snaps;
3281         struct list_head *head = &rbd_dev->snaps;
3282         struct list_head *links = head->next;
3283         u32 index = 0;
3284
3285         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3286         while (index < snap_count || links != head) {
3287                 u64 snap_id;
3288                 struct rbd_snap *snap;
3289                 char *snap_name;
3290                 u64 snap_size = 0;
3291                 u64 snap_features = 0;
3292
3293                 snap_id = index < snap_count ? snapc->snaps[index]
3294                                              : CEPH_NOSNAP;
3295                 snap = links != head ? list_entry(links, struct rbd_snap, node)
3296                                      : NULL;
3297                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3298
3299                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3300                         struct list_head *next = links->next;
3301
3302                         /*
3303                          * A previously-existing snapshot is not in
3304                          * the new snap context.
3305                          *
3306                          * If the now missing snapshot is the one the
3307                          * image is mapped to, clear its exists flag
3308                          * so we can avoid sending any more requests
3309                          * to it.
3310                          */
3311                         if (rbd_dev->spec->snap_id == snap->id)
3312                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3313                         rbd_remove_snap_dev(snap);
3314                         dout("%ssnap id %llu has been removed\n",
3315                                 rbd_dev->spec->snap_id == snap->id ?
3316                                                         "mapped " : "",
3317                                 (unsigned long long) snap->id);
3318
3319                         /* Done with this list entry; advance */
3320
3321                         links = next;
3322                         continue;
3323                 }
3324
3325                 snap_name = rbd_dev_snap_info(rbd_dev, index,
3326                                         &snap_size, &snap_features);
3327                 if (IS_ERR(snap_name))
3328                         return PTR_ERR(snap_name);
3329
3330                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3331                         (unsigned long long) snap_id);
3332                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3333                         struct rbd_snap *new_snap;
3334
3335                         /* We haven't seen this snapshot before */
3336
3337                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3338                                         snap_id, snap_size, snap_features);
3339                         if (IS_ERR(new_snap)) {
3340                                 int err = PTR_ERR(new_snap);
3341
3342                                 dout("  failed to add dev, error %d\n", err);
3343
3344                                 return err;
3345                         }
3346
3347                         /* New goes before existing, or at end of list */
3348
3349                         dout("  added dev%s\n", snap ? "" : " at end\n");
3350                         if (snap)
3351                                 list_add_tail(&new_snap->node, &snap->node);
3352                         else
3353                                 list_add_tail(&new_snap->node, head);
3354                 } else {
3355                         /* Already have this one */
3356
3357                         dout("  already present\n");
3358
3359                         rbd_assert(snap->size == snap_size);
3360                         rbd_assert(!strcmp(snap->name, snap_name));
3361                         rbd_assert(snap->features == snap_features);
3362
3363                         /* Done with this list entry; advance */
3364
3365                         links = links->next;
3366                 }
3367
3368                 /* Advance to the next entry in the snapshot context */
3369
3370                 index++;
3371         }
3372         dout("%s: done\n", __func__);
3373
3374         return 0;
3375 }
3376
3377 /*
3378  * Scan the list of snapshots and register the devices for any that
3379  * have not already been registered.
3380  */
3381 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3382 {
3383         struct rbd_snap *snap;
3384         int ret = 0;
3385
3386         dout("%s:\n", __func__);
3387         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3388                 return -EIO;
3389
3390         list_for_each_entry(snap, &rbd_dev->snaps, node) {
3391                 if (!rbd_snap_registered(snap)) {
3392                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3393                         if (ret < 0)
3394                                 break;
3395                 }
3396         }
3397         dout("%s: returning %d\n", __func__, ret);
3398
3399         return ret;
3400 }
3401
3402 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3403 {
3404         struct device *dev;
3405         int ret;
3406
3407         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3408
3409         dev = &rbd_dev->dev;
3410         dev->bus = &rbd_bus_type;
3411         dev->type = &rbd_device_type;
3412         dev->parent = &rbd_root_dev;
3413         dev->release = rbd_dev_release;
3414         dev_set_name(dev, "%d", rbd_dev->dev_id);
3415         ret = device_register(dev);
3416
3417         mutex_unlock(&ctl_mutex);
3418
3419         return ret;
3420 }
3421
3422 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3423 {
3424         device_unregister(&rbd_dev->dev);
3425 }
3426
3427 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3428
3429 /*
3430  * Get a unique rbd identifier for the given new rbd_dev, and add
3431  * the rbd_dev to the global list.  The minimum rbd id is 1.
3432  */
3433 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3434 {
3435         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3436
3437         spin_lock(&rbd_dev_list_lock);
3438         list_add_tail(&rbd_dev->node, &rbd_dev_list);
3439         spin_unlock(&rbd_dev_list_lock);
3440         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3441                 (unsigned long long) rbd_dev->dev_id);
3442 }
3443
3444 /*
3445  * Remove an rbd_dev from the global list, and record that its
3446  * identifier is no longer in use.
3447  */
3448 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3449 {
3450         struct list_head *tmp;
3451         int rbd_id = rbd_dev->dev_id;
3452         int max_id;
3453
3454         rbd_assert(rbd_id > 0);
3455
3456         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3457                 (unsigned long long) rbd_dev->dev_id);
3458         spin_lock(&rbd_dev_list_lock);
3459         list_del_init(&rbd_dev->node);
3460
3461         /*
3462          * If the id being "put" is not the current maximum, there
3463          * is nothing special we need to do.
3464          */
3465         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3466                 spin_unlock(&rbd_dev_list_lock);
3467                 return;
3468         }
3469
3470         /*
3471          * We need to update the current maximum id.  Search the
3472          * list to find out what it is.  We're more likely to find
3473          * the maximum at the end, so search the list backward.
3474          */
3475         max_id = 0;
3476         list_for_each_prev(tmp, &rbd_dev_list) {
3477                 struct rbd_device *rbd_dev;
3478
3479                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3480                 if (rbd_dev->dev_id > max_id)
3481                         max_id = rbd_dev->dev_id;
3482         }
3483         spin_unlock(&rbd_dev_list_lock);
3484
3485         /*
3486          * The max id could have been updated by rbd_dev_id_get(), in
3487          * which case it now accurately reflects the new maximum.
3488          * Be careful not to overwrite the maximum value in that
3489          * case.
3490          */
3491         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3492         dout("  max dev id has been reset\n");
3493 }
3494
3495 /*
3496  * Skips over white space at *buf, and updates *buf to point to the
3497  * first found non-space character (if any). Returns the length of
3498  * the token (string of non-white space characters) found.  Note
3499  * that *buf must be terminated with '\0'.
3500  */
3501 static inline size_t next_token(const char **buf)
3502 {
3503         /*
3504         * These are the characters that produce nonzero for
3505         * isspace() in the "C" and "POSIX" locales.
3506         */
3507         const char *spaces = " \f\n\r\t\v";
3508
3509         *buf += strspn(*buf, spaces);   /* Find start of token */
3510
3511         return strcspn(*buf, spaces);   /* Return token length */
3512 }
3513
3514 /*
3515  * Finds the next token in *buf, and if the provided token buffer is
3516  * big enough, copies the found token into it.  The result, if
3517  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
3518  * must be terminated with '\0' on entry.
3519  *
3520  * Returns the length of the token found (not including the '\0').
3521  * Return value will be 0 if no token is found, and it will be >=
3522  * token_size if the token would not fit.
3523  *
3524  * The *buf pointer will be updated to point beyond the end of the
3525  * found token.  Note that this occurs even if the token buffer is
3526  * too small to hold it.
3527  */
3528 static inline size_t copy_token(const char **buf,
3529                                 char *token,
3530                                 size_t token_size)
3531 {
3532         size_t len;
3533
3534         len = next_token(buf);
3535         if (len < token_size) {
3536                 memcpy(token, *buf, len);
3537                 *(token + len) = '\0';
3538         }
3539         *buf += len;
3540
3541         return len;
3542 }
3543
3544 /*
3545  * Finds the next token in *buf, dynamically allocates a buffer big
3546  * enough to hold a copy of it, and copies the token into the new
3547  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
3548  * that a duplicate buffer is created even for a zero-length token.
3549  *
3550  * Returns a pointer to the newly-allocated duplicate, or a null
3551  * pointer if memory for the duplicate was not available.  If
3552  * the lenp argument is a non-null pointer, the length of the token
3553  * (not including the '\0') is returned in *lenp.
3554  *
3555  * If successful, the *buf pointer will be updated to point beyond
3556  * the end of the found token.
3557  *
3558  * Note: uses GFP_KERNEL for allocation.
3559  */
3560 static inline char *dup_token(const char **buf, size_t *lenp)
3561 {
3562         char *dup;
3563         size_t len;
3564
3565         len = next_token(buf);
3566         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3567         if (!dup)
3568                 return NULL;
3569         *(dup + len) = '\0';
3570         *buf += len;
3571
3572         if (lenp)
3573                 *lenp = len;
3574
3575         return dup;
3576 }
3577
3578 /*
3579  * Parse the options provided for an "rbd add" (i.e., rbd image
3580  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
3581  * and the data written is passed here via a NUL-terminated buffer.
3582  * Returns 0 if successful or an error code otherwise.
3583  *
3584  * The information extracted from these options is recorded in
3585  * the other parameters which return dynamically-allocated
3586  * structures:
3587  *  ceph_opts
3588  *      The address of a pointer that will refer to a ceph options
3589  *      structure.  Caller must release the returned pointer using
3590  *      ceph_destroy_options() when it is no longer needed.
3591  *  rbd_opts
3592  *      Address of an rbd options pointer.  Fully initialized by
3593  *      this function; caller must release with kfree().
3594  *  spec
3595  *      Address of an rbd image specification pointer.  Fully
3596  *      initialized by this function based on parsed options.
3597  *      Caller must release with rbd_spec_put().
3598  *
3599  * The options passed take this form:
3600  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3601  * where:
3602  *  <mon_addrs>
3603  *      A comma-separated list of one or more monitor addresses.
3604  *      A monitor address is an ip address, optionally followed
3605  *      by a port number (separated by a colon).
3606  *        I.e.:  ip1[:port1][,ip2[:port2]...]
3607  *  <options>
3608  *      A comma-separated list of ceph and/or rbd options.
3609  *  <pool_name>
3610  *      The name of the rados pool containing the rbd image.
3611  *  <image_name>
3612  *      The name of the image in that pool to map.
3613  *  <snap_id>
3614  *      An optional snapshot id.  If provided, the mapping will
3615  *      present data from the image at the time that snapshot was
3616  *      created.  The image head is used if no snapshot id is
3617  *      provided.  Snapshot mappings are always read-only.
3618  */
3619 static int rbd_add_parse_args(const char *buf,
3620                                 struct ceph_options **ceph_opts,
3621                                 struct rbd_options **opts,
3622                                 struct rbd_spec **rbd_spec)
3623 {
3624         size_t len;
3625         char *options;
3626         const char *mon_addrs;
3627         size_t mon_addrs_size;
3628         struct rbd_spec *spec = NULL;
3629         struct rbd_options *rbd_opts = NULL;
3630         struct ceph_options *copts;
3631         int ret;
3632
3633         /* The first four tokens are required */
3634
3635         len = next_token(&buf);
3636         if (!len) {
3637                 rbd_warn(NULL, "no monitor address(es) provided");
3638                 return -EINVAL;
3639         }
3640         mon_addrs = buf;
3641         mon_addrs_size = len + 1;
3642         buf += len;
3643
3644         ret = -EINVAL;
3645         options = dup_token(&buf, NULL);
3646         if (!options)
3647                 return -ENOMEM;
3648         if (!*options) {
3649                 rbd_warn(NULL, "no options provided");
3650                 goto out_err;
3651         }
3652
3653         spec = rbd_spec_alloc();
3654         if (!spec)
3655                 goto out_mem;
3656
3657         spec->pool_name = dup_token(&buf, NULL);
3658         if (!spec->pool_name)
3659                 goto out_mem;
3660         if (!*spec->pool_name) {
3661                 rbd_warn(NULL, "no pool name provided");
3662                 goto out_err;
3663         }
3664
3665         spec->image_name = dup_token(&buf, NULL);
3666         if (!spec->image_name)
3667                 goto out_mem;
3668         if (!*spec->image_name) {
3669                 rbd_warn(NULL, "no image name provided");
3670                 goto out_err;
3671         }
3672
3673         /*
3674          * Snapshot name is optional; default is to use "-"
3675          * (indicating the head/no snapshot).
3676          */
3677         len = next_token(&buf);
3678         if (!len) {
3679                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3680                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3681         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3682                 ret = -ENAMETOOLONG;
3683                 goto out_err;
3684         }
3685         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3686         if (!spec->snap_name)
3687                 goto out_mem;
3688         *(spec->snap_name + len) = '\0';
3689
3690         /* Initialize all rbd options to the defaults */
3691
3692         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3693         if (!rbd_opts)
3694                 goto out_mem;
3695
3696         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3697
3698         copts = ceph_parse_options(options, mon_addrs,
3699                                         mon_addrs + mon_addrs_size - 1,
3700                                         parse_rbd_opts_token, rbd_opts);
3701         if (IS_ERR(copts)) {
3702                 ret = PTR_ERR(copts);
3703                 goto out_err;
3704         }
3705         kfree(options);
3706
3707         *ceph_opts = copts;
3708         *opts = rbd_opts;
3709         *rbd_spec = spec;
3710
3711         return 0;
3712 out_mem:
3713         ret = -ENOMEM;
3714 out_err:
3715         kfree(rbd_opts);
3716         rbd_spec_put(spec);
3717         kfree(options);
3718
3719         return ret;
3720 }
3721
3722 /*
3723  * An rbd format 2 image has a unique identifier, distinct from the
3724  * name given to it by the user.  Internally, that identifier is
3725  * what's used to specify the names of objects related to the image.
3726  *
3727  * A special "rbd id" object is used to map an rbd image name to its
3728  * id.  If that object doesn't exist, then there is no v2 rbd image
3729  * with the supplied name.
3730  *
3731  * This function will record the given rbd_dev's image_id field if
3732  * it can be determined, and in that case will return 0.  If any
3733  * errors occur a negative errno will be returned and the rbd_dev's
3734  * image_id field will be unchanged (and should be NULL).
3735  */
3736 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3737 {
3738         int ret;
3739         size_t size;
3740         char *object_name;
3741         void *response;
3742         void *p;
3743
3744         /*
3745          * When probing a parent image, the image id is already
3746          * known (and the image name likely is not).  There's no
3747          * need to fetch the image id again in this case.
3748          */
3749         if (rbd_dev->spec->image_id)
3750                 return 0;
3751
3752         /*
3753          * First, see if the format 2 image id file exists, and if
3754          * so, get the image's persistent id from it.
3755          */
3756         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3757         object_name = kmalloc(size, GFP_NOIO);
3758         if (!object_name)
3759                 return -ENOMEM;
3760         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3761         dout("rbd id object name is %s\n", object_name);
3762
3763         /* Response will be an encoded string, which includes a length */
3764
3765         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3766         response = kzalloc(size, GFP_NOIO);
3767         if (!response) {
3768                 ret = -ENOMEM;
3769                 goto out;
3770         }
3771
3772         ret = rbd_obj_method_sync(rbd_dev, object_name,
3773                                 "rbd", "get_id",
3774                                 NULL, 0,
3775                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3776         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3777         if (ret < 0)
3778                 goto out;
3779
3780         p = response;
3781         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3782                                                 p + RBD_IMAGE_ID_LEN_MAX,
3783                                                 NULL, GFP_NOIO);
3784         if (IS_ERR(rbd_dev->spec->image_id)) {
3785                 ret = PTR_ERR(rbd_dev->spec->image_id);
3786                 rbd_dev->spec->image_id = NULL;
3787         } else {
3788                 dout("image_id is %s\n", rbd_dev->spec->image_id);
3789         }
3790 out:
3791         kfree(response);
3792         kfree(object_name);
3793
3794         return ret;
3795 }
3796
3797 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3798 {
3799         int ret;
3800         size_t size;
3801
3802         /* Version 1 images have no id; empty string is used */
3803
3804         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3805         if (!rbd_dev->spec->image_id)
3806                 return -ENOMEM;
3807
3808         /* Record the header object name for this rbd image. */
3809
3810         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3811         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3812         if (!rbd_dev->header_name) {
3813                 ret = -ENOMEM;
3814                 goto out_err;
3815         }
3816         sprintf(rbd_dev->header_name, "%s%s",
3817                 rbd_dev->spec->image_name, RBD_SUFFIX);
3818
3819         /* Populate rbd image metadata */
3820
3821         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3822         if (ret < 0)
3823                 goto out_err;
3824
3825         /* Version 1 images have no parent (no layering) */
3826
3827         rbd_dev->parent_spec = NULL;
3828         rbd_dev->parent_overlap = 0;
3829
3830         rbd_dev->image_format = 1;
3831
3832         dout("discovered version 1 image, header name is %s\n",
3833                 rbd_dev->header_name);
3834
3835         return 0;
3836
3837 out_err:
3838         kfree(rbd_dev->header_name);
3839         rbd_dev->header_name = NULL;
3840         kfree(rbd_dev->spec->image_id);
3841         rbd_dev->spec->image_id = NULL;
3842
3843         return ret;
3844 }
3845
3846 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3847 {
3848         size_t size;
3849         int ret;
3850         u64 ver = 0;
3851
3852         /*
3853          * Image id was filled in by the caller.  Record the header
3854          * object name for this rbd image.
3855          */
3856         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3857         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3858         if (!rbd_dev->header_name)
3859                 return -ENOMEM;
3860         sprintf(rbd_dev->header_name, "%s%s",
3861                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3862
3863         /* Get the size and object order for the image */
3864
3865         ret = rbd_dev_v2_image_size(rbd_dev);
3866         if (ret < 0)
3867                 goto out_err;
3868
3869         /* Get the object prefix (a.k.a. block_name) for the image */
3870
3871         ret = rbd_dev_v2_object_prefix(rbd_dev);
3872         if (ret < 0)
3873                 goto out_err;
3874
3875         /* Get the and check features for the image */
3876
3877         ret = rbd_dev_v2_features(rbd_dev);
3878         if (ret < 0)
3879                 goto out_err;
3880
3881         /* If the image supports layering, get the parent info */
3882
3883         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3884                 ret = rbd_dev_v2_parent_info(rbd_dev);
3885                 if (ret < 0)
3886                         goto out_err;
3887         }
3888
3889         /* crypto and compression type aren't (yet) supported for v2 images */
3890
3891         rbd_dev->header.crypt_type = 0;
3892         rbd_dev->header.comp_type = 0;
3893
3894         /* Get the snapshot context, plus the header version */
3895
3896         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3897         if (ret)
3898                 goto out_err;
3899         rbd_dev->header.obj_version = ver;
3900
3901         rbd_dev->image_format = 2;
3902
3903         dout("discovered version 2 image, header name is %s\n",
3904                 rbd_dev->header_name);
3905
3906         return 0;
3907 out_err:
3908         rbd_dev->parent_overlap = 0;
3909         rbd_spec_put(rbd_dev->parent_spec);
3910         rbd_dev->parent_spec = NULL;
3911         kfree(rbd_dev->header_name);
3912         rbd_dev->header_name = NULL;
3913         kfree(rbd_dev->header.object_prefix);
3914         rbd_dev->header.object_prefix = NULL;
3915
3916         return ret;
3917 }
3918
3919 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3920 {
3921         int ret;
3922
3923         /* no need to lock here, as rbd_dev is not registered yet */
3924         ret = rbd_dev_snaps_update(rbd_dev);
3925         if (ret)
3926                 return ret;
3927
3928         ret = rbd_dev_probe_update_spec(rbd_dev);
3929         if (ret)
3930                 goto err_out_snaps;
3931
3932         ret = rbd_dev_set_mapping(rbd_dev);
3933         if (ret)
3934                 goto err_out_snaps;
3935
3936         /* generate unique id: find highest unique id, add one */
3937         rbd_dev_id_get(rbd_dev);
3938
3939         /* Fill in the device name, now that we have its id. */
3940         BUILD_BUG_ON(DEV_NAME_LEN
3941                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3942         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3943
3944         /* Get our block major device number. */
3945
3946         ret = register_blkdev(0, rbd_dev->name);
3947         if (ret < 0)
3948                 goto err_out_id;
3949         rbd_dev->major = ret;
3950
3951         /* Set up the blkdev mapping. */
3952
3953         ret = rbd_init_disk(rbd_dev);
3954         if (ret)
3955                 goto err_out_blkdev;
3956
3957         ret = rbd_bus_add_dev(rbd_dev);
3958         if (ret)
3959                 goto err_out_disk;
3960
3961         /*
3962          * At this point cleanup in the event of an error is the job
3963          * of the sysfs code (initiated by rbd_bus_del_dev()).
3964          */
3965         down_write(&rbd_dev->header_rwsem);
3966         ret = rbd_dev_snaps_register(rbd_dev);
3967         up_write(&rbd_dev->header_rwsem);
3968         if (ret)
3969                 goto err_out_bus;
3970
3971         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
3972         if (ret)
3973                 goto err_out_bus;
3974
3975         /* Everything's ready.  Announce the disk to the world. */
3976
3977         add_disk(rbd_dev->disk);
3978
3979         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3980                 (unsigned long long) rbd_dev->mapping.size);
3981
3982         return ret;
3983 err_out_bus:
3984         /* this will also clean up rest of rbd_dev stuff */
3985
3986         rbd_bus_del_dev(rbd_dev);
3987
3988         return ret;
3989 err_out_disk:
3990         rbd_free_disk(rbd_dev);
3991 err_out_blkdev:
3992         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3993 err_out_id:
3994         rbd_dev_id_put(rbd_dev);
3995 err_out_snaps:
3996         rbd_remove_all_snaps(rbd_dev);
3997
3998         return ret;
3999 }
4000
4001 /*
4002  * Probe for the existence of the header object for the given rbd
4003  * device.  For format 2 images this includes determining the image
4004  * id.
4005  */
4006 static int rbd_dev_probe(struct rbd_device *rbd_dev)
4007 {
4008         int ret;
4009
4010         /*
4011          * Get the id from the image id object.  If it's not a
4012          * format 2 image, we'll get ENOENT back, and we'll assume
4013          * it's a format 1 image.
4014          */
4015         ret = rbd_dev_image_id(rbd_dev);
4016         if (ret)
4017                 ret = rbd_dev_v1_probe(rbd_dev);
4018         else
4019                 ret = rbd_dev_v2_probe(rbd_dev);
4020         if (ret) {
4021                 dout("probe failed, returning %d\n", ret);
4022
4023                 return ret;
4024         }
4025
4026         ret = rbd_dev_probe_finish(rbd_dev);
4027         if (ret)
4028                 rbd_header_free(&rbd_dev->header);
4029
4030         return ret;
4031 }
4032
4033 static ssize_t rbd_add(struct bus_type *bus,
4034                        const char *buf,
4035                        size_t count)
4036 {
4037         struct rbd_device *rbd_dev = NULL;
4038         struct ceph_options *ceph_opts = NULL;
4039         struct rbd_options *rbd_opts = NULL;
4040         struct rbd_spec *spec = NULL;
4041         struct rbd_client *rbdc;
4042         struct ceph_osd_client *osdc;
4043         int rc = -ENOMEM;
4044
4045         if (!try_module_get(THIS_MODULE))
4046                 return -ENODEV;
4047
4048         /* parse add command */
4049         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4050         if (rc < 0)
4051                 goto err_out_module;
4052
4053         rbdc = rbd_get_client(ceph_opts);
4054         if (IS_ERR(rbdc)) {
4055                 rc = PTR_ERR(rbdc);
4056                 goto err_out_args;
4057         }
4058         ceph_opts = NULL;       /* rbd_dev client now owns this */
4059
4060         /* pick the pool */
4061         osdc = &rbdc->client->osdc;
4062         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4063         if (rc < 0)
4064                 goto err_out_client;
4065         spec->pool_id = (u64) rc;
4066
4067         /* The ceph file layout needs to fit pool id in 32 bits */
4068
4069         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4070                 rc = -EIO;
4071                 goto err_out_client;
4072         }
4073
4074         rbd_dev = rbd_dev_create(rbdc, spec);
4075         if (!rbd_dev)
4076                 goto err_out_client;
4077         rbdc = NULL;            /* rbd_dev now owns this */
4078         spec = NULL;            /* rbd_dev now owns this */
4079
4080         rbd_dev->mapping.read_only = rbd_opts->read_only;
4081         kfree(rbd_opts);
4082         rbd_opts = NULL;        /* done with this */
4083
4084         rc = rbd_dev_probe(rbd_dev);
4085         if (rc < 0)
4086                 goto err_out_rbd_dev;
4087
4088         return count;
4089 err_out_rbd_dev:
4090         rbd_dev_destroy(rbd_dev);
4091 err_out_client:
4092         rbd_put_client(rbdc);
4093 err_out_args:
4094         if (ceph_opts)
4095                 ceph_destroy_options(ceph_opts);
4096         kfree(rbd_opts);
4097         rbd_spec_put(spec);
4098 err_out_module:
4099         module_put(THIS_MODULE);
4100
4101         dout("Error adding device %s\n", buf);
4102
4103         return (ssize_t) rc;
4104 }
4105
4106 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4107 {
4108         struct list_head *tmp;
4109         struct rbd_device *rbd_dev;
4110
4111         spin_lock(&rbd_dev_list_lock);
4112         list_for_each(tmp, &rbd_dev_list) {
4113                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4114                 if (rbd_dev->dev_id == dev_id) {
4115                         spin_unlock(&rbd_dev_list_lock);
4116                         return rbd_dev;
4117                 }
4118         }
4119         spin_unlock(&rbd_dev_list_lock);
4120         return NULL;
4121 }
4122
4123 static void rbd_dev_release(struct device *dev)
4124 {
4125         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4126
4127         if (rbd_dev->watch_event)
4128                 rbd_dev_header_watch_sync(rbd_dev, 0);
4129
4130         /* clean up and free blkdev */
4131         rbd_free_disk(rbd_dev);
4132         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4133
4134         /* release allocated disk header fields */
4135         rbd_header_free(&rbd_dev->header);
4136
4137         /* done with the id, and with the rbd_dev */
4138         rbd_dev_id_put(rbd_dev);
4139         rbd_assert(rbd_dev->rbd_client != NULL);
4140         rbd_dev_destroy(rbd_dev);
4141
4142         /* release module ref */
4143         module_put(THIS_MODULE);
4144 }
4145
4146 static ssize_t rbd_remove(struct bus_type *bus,
4147                           const char *buf,
4148                           size_t count)
4149 {
4150         struct rbd_device *rbd_dev = NULL;
4151         int target_id, rc;
4152         unsigned long ul;
4153         int ret = count;
4154
4155         rc = strict_strtoul(buf, 10, &ul);
4156         if (rc)
4157                 return rc;
4158
4159         /* convert to int; abort if we lost anything in the conversion */
4160         target_id = (int) ul;
4161         if (target_id != ul)
4162                 return -EINVAL;
4163
4164         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4165
4166         rbd_dev = __rbd_get_dev(target_id);
4167         if (!rbd_dev) {
4168                 ret = -ENOENT;
4169                 goto done;
4170         }
4171
4172         spin_lock_irq(&rbd_dev->lock);
4173         if (rbd_dev->open_count)
4174                 ret = -EBUSY;
4175         else
4176                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4177         spin_unlock_irq(&rbd_dev->lock);
4178         if (ret < 0)
4179                 goto done;
4180
4181         rbd_remove_all_snaps(rbd_dev);
4182         rbd_bus_del_dev(rbd_dev);
4183
4184 done:
4185         mutex_unlock(&ctl_mutex);
4186
4187         return ret;
4188 }
4189
4190 /*
4191  * create control files in sysfs
4192  * /sys/bus/rbd/...
4193  */
4194 static int rbd_sysfs_init(void)
4195 {
4196         int ret;
4197
4198         ret = device_register(&rbd_root_dev);
4199         if (ret < 0)
4200                 return ret;
4201
4202         ret = bus_register(&rbd_bus_type);
4203         if (ret < 0)
4204                 device_unregister(&rbd_root_dev);
4205
4206         return ret;
4207 }
4208
4209 static void rbd_sysfs_cleanup(void)
4210 {
4211         bus_unregister(&rbd_bus_type);
4212         device_unregister(&rbd_root_dev);
4213 }
4214
4215 static int __init rbd_init(void)
4216 {
4217         int rc;
4218
4219         if (!libceph_compatible(NULL)) {
4220                 rbd_warn(NULL, "libceph incompatibility (quitting)");
4221
4222                 return -EINVAL;
4223         }
4224         rc = rbd_sysfs_init();
4225         if (rc)
4226                 return rc;
4227         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4228         return 0;
4229 }
4230
4231 static void __exit rbd_exit(void)
4232 {
4233         rbd_sysfs_cleanup();
4234 }
4235
4236 module_init(rbd_init);
4237 module_exit(rbd_exit);
4238
4239 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4240 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4241 MODULE_DESCRIPTION("rados block device");
4242
4243 /* following authorship retained from original osdblk.c */
4244 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4245
4246 MODULE_LICENSE("GPL");