rbd: add target object existence flags
[linux-2.6-block.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
aafb230e
AE
44#define RBD_DEBUG /* Activate rbd_assert() calls */
45
593a9e7b
AE
46/*
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52#define SECTOR_SHIFT 9
53#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54
f0f8cef5
AE
55#define RBD_DRV_NAME "rbd"
56#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
57
58#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
59
d4b125e9
AE
60#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
61#define RBD_MAX_SNAP_NAME_LEN \
62 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
35d489f9 64#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
65
66#define RBD_SNAP_HEAD_NAME "-"
67
9e15b77d
AE
68/* This allows a single page to hold an image name sent by OSD */
69#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 70#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 71
1e130199 72#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 73
d889140c
AE
74/* Feature bits */
75
5cbf6f12
AE
76#define RBD_FEATURE_LAYERING (1<<0)
77#define RBD_FEATURE_STRIPINGV2 (1<<1)
78#define RBD_FEATURES_ALL \
79 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
80
81/* Features supported by this (client software) implementation. */
82
5cbf6f12 83#define RBD_FEATURES_SUPPORTED (0)
d889140c 84
81a89793
AE
85/*
86 * An RBD device name will be "rbd#", where the "rbd" comes from
87 * RBD_DRV_NAME above, and # is a unique integer identifier.
88 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89 * enough to hold all possible device names.
90 */
602adf40 91#define DEV_NAME_LEN 32
81a89793 92#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
93
94/*
95 * block device image metadata (in-memory version)
96 */
97struct rbd_image_header {
f84344f3 98 /* These four fields never change for a given rbd image */
849b4260 99 char *object_prefix;
34b13184 100 u64 features;
602adf40
YS
101 __u8 obj_order;
102 __u8 crypt_type;
103 __u8 comp_type;
602adf40 104
f84344f3
AE
105 /* The remaining fields need to be updated occasionally */
106 u64 image_size;
107 struct ceph_snap_context *snapc;
602adf40
YS
108 char *snap_names;
109 u64 *snap_sizes;
59c2be1e
YS
110
111 u64 obj_version;
112};
113
0d7dbfce
AE
114/*
115 * An rbd image specification.
116 *
117 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
118 * identify an image. Each rbd_dev structure includes a pointer to
119 * an rbd_spec structure that encapsulates this identity.
120 *
121 * Each of the id's in an rbd_spec has an associated name. For a
122 * user-mapped image, the names are supplied and the id's associated
123 * with them are looked up. For a layered image, a parent image is
124 * defined by the tuple, and the names are looked up.
125 *
126 * An rbd_dev structure contains a parent_spec pointer which is
127 * non-null if the image it represents is a child in a layered
128 * image. This pointer will refer to the rbd_spec structure used
129 * by the parent rbd_dev for its own identity (i.e., the structure
130 * is shared between the parent and child).
131 *
132 * Since these structures are populated once, during the discovery
133 * phase of image construction, they are effectively immutable so
134 * we make no effort to synchronize access to them.
135 *
136 * Note that code herein does not assume the image name is known (it
137 * could be a null pointer).
0d7dbfce
AE
138 */
139struct rbd_spec {
140 u64 pool_id;
141 char *pool_name;
142
143 char *image_id;
0d7dbfce 144 char *image_name;
0d7dbfce
AE
145
146 u64 snap_id;
147 char *snap_name;
148
149 struct kref kref;
150};
151
602adf40 152/*
f0f8cef5 153 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
154 */
155struct rbd_client {
156 struct ceph_client *client;
157 struct kref kref;
158 struct list_head node;
159};
160
bf0d5f50
AE
161struct rbd_img_request;
162typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
163
164#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
165
166struct rbd_obj_request;
167typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
168
9969ebc5
AE
169enum obj_request_type {
170 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
171};
bf0d5f50 172
926f9b3f
AE
173enum obj_req_flags {
174 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 175 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
176 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
177 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
178};
179
bf0d5f50
AE
180struct rbd_obj_request {
181 const char *object_name;
182 u64 offset; /* object start byte */
183 u64 length; /* bytes from offset */
926f9b3f 184 unsigned long flags;
bf0d5f50
AE
185
186 struct rbd_img_request *img_request;
7da22d29 187 u64 img_offset; /* image relative offset */
bf0d5f50
AE
188 struct list_head links; /* img_request->obj_requests */
189 u32 which; /* posn image request list */
190
191 enum obj_request_type type;
788e2df3
AE
192 union {
193 struct bio *bio_list;
194 struct {
195 struct page **pages;
196 u32 page_count;
197 };
198 };
bf0d5f50
AE
199
200 struct ceph_osd_request *osd_req;
201
202 u64 xferred; /* bytes transferred */
203 u64 version;
1b83bef2 204 int result;
bf0d5f50
AE
205
206 rbd_obj_callback_t callback;
788e2df3 207 struct completion completion;
bf0d5f50
AE
208
209 struct kref kref;
210};
211
0c425248 212enum img_req_flags {
9849e986
AE
213 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
214 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 215 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
216};
217
bf0d5f50 218struct rbd_img_request {
bf0d5f50
AE
219 struct rbd_device *rbd_dev;
220 u64 offset; /* starting image byte offset */
221 u64 length; /* byte count from offset */
0c425248 222 unsigned long flags;
bf0d5f50 223 union {
9849e986 224 u64 snap_id; /* for reads */
bf0d5f50 225 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
226 };
227 union {
228 struct request *rq; /* block request */
229 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50
AE
230 };
231 spinlock_t completion_lock;/* protects next_completion */
232 u32 next_completion;
233 rbd_img_callback_t callback;
55f27e09 234 u64 xferred;/* aggregate bytes transferred */
a5a337d4 235 int result; /* first nonzero obj_request result */
bf0d5f50
AE
236
237 u32 obj_request_count;
238 struct list_head obj_requests; /* rbd_obj_request structs */
239
240 struct kref kref;
241};
242
243#define for_each_obj_request(ireq, oreq) \
ef06f4d3 244 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 245#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 246 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 247#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 248 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 249
dfc5606d
YS
250struct rbd_snap {
251 struct device dev;
252 const char *name;
3591538f 253 u64 size;
dfc5606d
YS
254 struct list_head node;
255 u64 id;
34b13184 256 u64 features;
dfc5606d
YS
257};
258
f84344f3 259struct rbd_mapping {
99c1f08f 260 u64 size;
34b13184 261 u64 features;
f84344f3
AE
262 bool read_only;
263};
264
602adf40
YS
265/*
266 * a single device
267 */
268struct rbd_device {
de71a297 269 int dev_id; /* blkdev unique id */
602adf40
YS
270
271 int major; /* blkdev assigned major */
272 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 273
a30b71b9 274 u32 image_format; /* Either 1 or 2 */
602adf40
YS
275 struct rbd_client *rbd_client;
276
277 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
278
b82d167b 279 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
280
281 struct rbd_image_header header;
b82d167b 282 unsigned long flags; /* possibly lock protected */
0d7dbfce 283 struct rbd_spec *spec;
602adf40 284
0d7dbfce 285 char *header_name;
971f839a 286
0903e875
AE
287 struct ceph_file_layout layout;
288
59c2be1e 289 struct ceph_osd_event *watch_event;
975241af 290 struct rbd_obj_request *watch_request;
59c2be1e 291
86b00e0d
AE
292 struct rbd_spec *parent_spec;
293 u64 parent_overlap;
2f82ee54 294 struct rbd_device *parent;
86b00e0d 295
c666601a
JD
296 /* protects updating the header */
297 struct rw_semaphore header_rwsem;
f84344f3
AE
298
299 struct rbd_mapping mapping;
602adf40
YS
300
301 struct list_head node;
dfc5606d
YS
302
303 /* list of snapshots */
304 struct list_head snaps;
305
306 /* sysfs related */
307 struct device dev;
b82d167b 308 unsigned long open_count; /* protected by lock */
dfc5606d
YS
309};
310
b82d167b
AE
311/*
312 * Flag bits for rbd_dev->flags. If atomicity is required,
313 * rbd_dev->lock is used to protect access.
314 *
315 * Currently, only the "removing" flag (which is coupled with the
316 * "open_count" field) requires atomic access.
317 */
6d292906
AE
318enum rbd_dev_flags {
319 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 320 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
321};
322
602adf40 323static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 324
602adf40 325static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
326static DEFINE_SPINLOCK(rbd_dev_list_lock);
327
432b8587
AE
328static LIST_HEAD(rbd_client_list); /* clients */
329static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 330
304f6808
AE
331static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
332static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
333
dfc5606d 334static void rbd_dev_release(struct device *dev);
41f38c2b 335static void rbd_remove_snap_dev(struct rbd_snap *snap);
dfc5606d 336
f0f8cef5
AE
337static ssize_t rbd_add(struct bus_type *bus, const char *buf,
338 size_t count);
339static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
340 size_t count);
2f82ee54 341static int rbd_dev_probe(struct rbd_device *rbd_dev);
f0f8cef5
AE
342
343static struct bus_attribute rbd_bus_attrs[] = {
344 __ATTR(add, S_IWUSR, NULL, rbd_add),
345 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
346 __ATTR_NULL
347};
348
349static struct bus_type rbd_bus_type = {
350 .name = "rbd",
351 .bus_attrs = rbd_bus_attrs,
352};
353
354static void rbd_root_dev_release(struct device *dev)
355{
356}
357
358static struct device rbd_root_dev = {
359 .init_name = "rbd",
360 .release = rbd_root_dev_release,
361};
362
06ecc6cb
AE
363static __printf(2, 3)
364void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
365{
366 struct va_format vaf;
367 va_list args;
368
369 va_start(args, fmt);
370 vaf.fmt = fmt;
371 vaf.va = &args;
372
373 if (!rbd_dev)
374 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
375 else if (rbd_dev->disk)
376 printk(KERN_WARNING "%s: %s: %pV\n",
377 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
378 else if (rbd_dev->spec && rbd_dev->spec->image_name)
379 printk(KERN_WARNING "%s: image %s: %pV\n",
380 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
381 else if (rbd_dev->spec && rbd_dev->spec->image_id)
382 printk(KERN_WARNING "%s: id %s: %pV\n",
383 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
384 else /* punt */
385 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
386 RBD_DRV_NAME, rbd_dev, &vaf);
387 va_end(args);
388}
389
aafb230e
AE
390#ifdef RBD_DEBUG
391#define rbd_assert(expr) \
392 if (unlikely(!(expr))) { \
393 printk(KERN_ERR "\nAssertion failure in %s() " \
394 "at line %d:\n\n" \
395 "\trbd_assert(%s);\n\n", \
396 __func__, __LINE__, #expr); \
397 BUG(); \
398 }
399#else /* !RBD_DEBUG */
400# define rbd_assert(expr) ((void) 0)
401#endif /* !RBD_DEBUG */
dfc5606d 402
8b3e1a56
AE
403static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
404
117973fb
AE
405static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
406static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
59c2be1e 407
602adf40
YS
408static int rbd_open(struct block_device *bdev, fmode_t mode)
409{
f0f8cef5 410 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 411 bool removing = false;
602adf40 412
f84344f3 413 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
414 return -EROFS;
415
a14ea269 416 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
417 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
418 removing = true;
419 else
420 rbd_dev->open_count++;
a14ea269 421 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
422 if (removing)
423 return -ENOENT;
424
42382b70 425 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 426 (void) get_device(&rbd_dev->dev);
f84344f3 427 set_device_ro(bdev, rbd_dev->mapping.read_only);
42382b70 428 mutex_unlock(&ctl_mutex);
340c7a2b 429
602adf40
YS
430 return 0;
431}
432
dfc5606d
YS
433static int rbd_release(struct gendisk *disk, fmode_t mode)
434{
435 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
436 unsigned long open_count_before;
437
a14ea269 438 spin_lock_irq(&rbd_dev->lock);
b82d167b 439 open_count_before = rbd_dev->open_count--;
a14ea269 440 spin_unlock_irq(&rbd_dev->lock);
b82d167b 441 rbd_assert(open_count_before > 0);
dfc5606d 442
42382b70 443 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 444 put_device(&rbd_dev->dev);
42382b70 445 mutex_unlock(&ctl_mutex);
dfc5606d
YS
446
447 return 0;
448}
449
602adf40
YS
450static const struct block_device_operations rbd_bd_ops = {
451 .owner = THIS_MODULE,
452 .open = rbd_open,
dfc5606d 453 .release = rbd_release,
602adf40
YS
454};
455
456/*
457 * Initialize an rbd client instance.
43ae4701 458 * We own *ceph_opts.
602adf40 459 */
f8c38929 460static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
461{
462 struct rbd_client *rbdc;
463 int ret = -ENOMEM;
464
37206ee5 465 dout("%s:\n", __func__);
602adf40
YS
466 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
467 if (!rbdc)
468 goto out_opt;
469
470 kref_init(&rbdc->kref);
471 INIT_LIST_HEAD(&rbdc->node);
472
bc534d86
AE
473 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
474
43ae4701 475 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 476 if (IS_ERR(rbdc->client))
bc534d86 477 goto out_mutex;
43ae4701 478 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
479
480 ret = ceph_open_session(rbdc->client);
481 if (ret < 0)
482 goto out_err;
483
432b8587 484 spin_lock(&rbd_client_list_lock);
602adf40 485 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 486 spin_unlock(&rbd_client_list_lock);
602adf40 487
bc534d86 488 mutex_unlock(&ctl_mutex);
37206ee5 489 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 490
602adf40
YS
491 return rbdc;
492
493out_err:
494 ceph_destroy_client(rbdc->client);
bc534d86
AE
495out_mutex:
496 mutex_unlock(&ctl_mutex);
602adf40
YS
497 kfree(rbdc);
498out_opt:
43ae4701
AE
499 if (ceph_opts)
500 ceph_destroy_options(ceph_opts);
37206ee5
AE
501 dout("%s: error %d\n", __func__, ret);
502
28f259b7 503 return ERR_PTR(ret);
602adf40
YS
504}
505
2f82ee54
AE
506static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
507{
508 kref_get(&rbdc->kref);
509
510 return rbdc;
511}
512
602adf40 513/*
1f7ba331
AE
514 * Find a ceph client with specific addr and configuration. If
515 * found, bump its reference count.
602adf40 516 */
1f7ba331 517static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
518{
519 struct rbd_client *client_node;
1f7ba331 520 bool found = false;
602adf40 521
43ae4701 522 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
523 return NULL;
524
1f7ba331
AE
525 spin_lock(&rbd_client_list_lock);
526 list_for_each_entry(client_node, &rbd_client_list, node) {
527 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
528 __rbd_get_client(client_node);
529
1f7ba331
AE
530 found = true;
531 break;
532 }
533 }
534 spin_unlock(&rbd_client_list_lock);
535
536 return found ? client_node : NULL;
602adf40
YS
537}
538
59c2be1e
YS
539/*
540 * mount options
541 */
542enum {
59c2be1e
YS
543 Opt_last_int,
544 /* int args above */
545 Opt_last_string,
546 /* string args above */
cc0538b6
AE
547 Opt_read_only,
548 Opt_read_write,
549 /* Boolean args above */
550 Opt_last_bool,
59c2be1e
YS
551};
552
43ae4701 553static match_table_t rbd_opts_tokens = {
59c2be1e
YS
554 /* int args above */
555 /* string args above */
be466c1c 556 {Opt_read_only, "read_only"},
cc0538b6
AE
557 {Opt_read_only, "ro"}, /* Alternate spelling */
558 {Opt_read_write, "read_write"},
559 {Opt_read_write, "rw"}, /* Alternate spelling */
560 /* Boolean args above */
59c2be1e
YS
561 {-1, NULL}
562};
563
98571b5a
AE
564struct rbd_options {
565 bool read_only;
566};
567
568#define RBD_READ_ONLY_DEFAULT false
569
59c2be1e
YS
570static int parse_rbd_opts_token(char *c, void *private)
571{
43ae4701 572 struct rbd_options *rbd_opts = private;
59c2be1e
YS
573 substring_t argstr[MAX_OPT_ARGS];
574 int token, intval, ret;
575
43ae4701 576 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
577 if (token < 0)
578 return -EINVAL;
579
580 if (token < Opt_last_int) {
581 ret = match_int(&argstr[0], &intval);
582 if (ret < 0) {
583 pr_err("bad mount option arg (not int) "
584 "at '%s'\n", c);
585 return ret;
586 }
587 dout("got int token %d val %d\n", token, intval);
588 } else if (token > Opt_last_int && token < Opt_last_string) {
589 dout("got string token %d val %s\n", token,
590 argstr[0].from);
cc0538b6
AE
591 } else if (token > Opt_last_string && token < Opt_last_bool) {
592 dout("got Boolean token %d\n", token);
59c2be1e
YS
593 } else {
594 dout("got token %d\n", token);
595 }
596
597 switch (token) {
cc0538b6
AE
598 case Opt_read_only:
599 rbd_opts->read_only = true;
600 break;
601 case Opt_read_write:
602 rbd_opts->read_only = false;
603 break;
59c2be1e 604 default:
aafb230e
AE
605 rbd_assert(false);
606 break;
59c2be1e
YS
607 }
608 return 0;
609}
610
602adf40
YS
611/*
612 * Get a ceph client with specific addr and configuration, if one does
613 * not exist create it.
614 */
9d3997fd 615static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 616{
f8c38929 617 struct rbd_client *rbdc;
59c2be1e 618
1f7ba331 619 rbdc = rbd_client_find(ceph_opts);
9d3997fd 620 if (rbdc) /* using an existing client */
43ae4701 621 ceph_destroy_options(ceph_opts);
9d3997fd 622 else
f8c38929 623 rbdc = rbd_client_create(ceph_opts);
602adf40 624
9d3997fd 625 return rbdc;
602adf40
YS
626}
627
628/*
629 * Destroy ceph client
d23a4b3f 630 *
432b8587 631 * Caller must hold rbd_client_list_lock.
602adf40
YS
632 */
633static void rbd_client_release(struct kref *kref)
634{
635 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
636
37206ee5 637 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 638 spin_lock(&rbd_client_list_lock);
602adf40 639 list_del(&rbdc->node);
cd9d9f5d 640 spin_unlock(&rbd_client_list_lock);
602adf40
YS
641
642 ceph_destroy_client(rbdc->client);
643 kfree(rbdc);
644}
645
646/*
647 * Drop reference to ceph client node. If it's not referenced anymore, release
648 * it.
649 */
9d3997fd 650static void rbd_put_client(struct rbd_client *rbdc)
602adf40 651{
c53d5893
AE
652 if (rbdc)
653 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
654}
655
a30b71b9
AE
656static bool rbd_image_format_valid(u32 image_format)
657{
658 return image_format == 1 || image_format == 2;
659}
660
8e94af8e
AE
661static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
662{
103a150f
AE
663 size_t size;
664 u32 snap_count;
665
666 /* The header has to start with the magic rbd header text */
667 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
668 return false;
669
db2388b6
AE
670 /* The bio layer requires at least sector-sized I/O */
671
672 if (ondisk->options.order < SECTOR_SHIFT)
673 return false;
674
675 /* If we use u64 in a few spots we may be able to loosen this */
676
677 if (ondisk->options.order > 8 * sizeof (int) - 1)
678 return false;
679
103a150f
AE
680 /*
681 * The size of a snapshot header has to fit in a size_t, and
682 * that limits the number of snapshots.
683 */
684 snap_count = le32_to_cpu(ondisk->snap_count);
685 size = SIZE_MAX - sizeof (struct ceph_snap_context);
686 if (snap_count > size / sizeof (__le64))
687 return false;
688
689 /*
690 * Not only that, but the size of the entire the snapshot
691 * header must also be representable in a size_t.
692 */
693 size -= snap_count * sizeof (__le64);
694 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
695 return false;
696
697 return true;
8e94af8e
AE
698}
699
602adf40
YS
700/*
701 * Create a new header structure, translate header format from the on-disk
702 * header.
703 */
704static int rbd_header_from_disk(struct rbd_image_header *header,
4156d998 705 struct rbd_image_header_ondisk *ondisk)
602adf40 706{
ccece235 707 u32 snap_count;
58c17b0e 708 size_t len;
d2bb24e5 709 size_t size;
621901d6 710 u32 i;
602adf40 711
6a52325f
AE
712 memset(header, 0, sizeof (*header));
713
103a150f
AE
714 snap_count = le32_to_cpu(ondisk->snap_count);
715
58c17b0e
AE
716 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
717 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
6a52325f 718 if (!header->object_prefix)
602adf40 719 return -ENOMEM;
58c17b0e
AE
720 memcpy(header->object_prefix, ondisk->object_prefix, len);
721 header->object_prefix[len] = '\0';
00f1f36f 722
602adf40 723 if (snap_count) {
f785cc1d
AE
724 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
725
621901d6
AE
726 /* Save a copy of the snapshot names */
727
f785cc1d
AE
728 if (snap_names_len > (u64) SIZE_MAX)
729 return -EIO;
730 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
602adf40 731 if (!header->snap_names)
6a52325f 732 goto out_err;
f785cc1d
AE
733 /*
734 * Note that rbd_dev_v1_header_read() guarantees
735 * the ondisk buffer we're working with has
736 * snap_names_len bytes beyond the end of the
737 * snapshot id array, this memcpy() is safe.
738 */
739 memcpy(header->snap_names, &ondisk->snaps[snap_count],
740 snap_names_len);
6a52325f 741
621901d6
AE
742 /* Record each snapshot's size */
743
d2bb24e5
AE
744 size = snap_count * sizeof (*header->snap_sizes);
745 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602adf40 746 if (!header->snap_sizes)
6a52325f 747 goto out_err;
621901d6
AE
748 for (i = 0; i < snap_count; i++)
749 header->snap_sizes[i] =
750 le64_to_cpu(ondisk->snaps[i].image_size);
602adf40 751 } else {
ccece235 752 WARN_ON(ondisk->snap_names_len);
602adf40
YS
753 header->snap_names = NULL;
754 header->snap_sizes = NULL;
755 }
849b4260 756
34b13184 757 header->features = 0; /* No features support in v1 images */
602adf40
YS
758 header->obj_order = ondisk->options.order;
759 header->crypt_type = ondisk->options.crypt_type;
760 header->comp_type = ondisk->options.comp_type;
6a52325f 761
621901d6
AE
762 /* Allocate and fill in the snapshot context */
763
f84344f3 764 header->image_size = le64_to_cpu(ondisk->image_size);
6a52325f
AE
765 size = sizeof (struct ceph_snap_context);
766 size += snap_count * sizeof (header->snapc->snaps[0]);
767 header->snapc = kzalloc(size, GFP_KERNEL);
768 if (!header->snapc)
769 goto out_err;
602adf40
YS
770
771 atomic_set(&header->snapc->nref, 1);
505cbb9b 772 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 773 header->snapc->num_snaps = snap_count;
621901d6
AE
774 for (i = 0; i < snap_count; i++)
775 header->snapc->snaps[i] =
776 le64_to_cpu(ondisk->snaps[i].id);
602adf40
YS
777
778 return 0;
779
6a52325f 780out_err:
849b4260 781 kfree(header->snap_sizes);
ccece235 782 header->snap_sizes = NULL;
602adf40 783 kfree(header->snap_names);
ccece235 784 header->snap_names = NULL;
6a52325f
AE
785 kfree(header->object_prefix);
786 header->object_prefix = NULL;
ccece235 787
00f1f36f 788 return -ENOMEM;
602adf40
YS
789}
790
9e15b77d
AE
791static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
792{
793 struct rbd_snap *snap;
794
795 if (snap_id == CEPH_NOSNAP)
796 return RBD_SNAP_HEAD_NAME;
797
798 list_for_each_entry(snap, &rbd_dev->snaps, node)
799 if (snap_id == snap->id)
800 return snap->name;
801
802 return NULL;
803}
804
8836b995 805static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
602adf40 806{
602adf40 807
e86924a8 808 struct rbd_snap *snap;
602adf40 809
e86924a8
AE
810 list_for_each_entry(snap, &rbd_dev->snaps, node) {
811 if (!strcmp(snap_name, snap->name)) {
0d7dbfce 812 rbd_dev->spec->snap_id = snap->id;
e86924a8 813 rbd_dev->mapping.size = snap->size;
34b13184 814 rbd_dev->mapping.features = snap->features;
602adf40 815
e86924a8 816 return 0;
00f1f36f 817 }
00f1f36f 818 }
e86924a8 819
00f1f36f 820 return -ENOENT;
602adf40
YS
821}
822
819d52bf 823static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
602adf40 824{
78dc447d 825 int ret;
602adf40 826
0d7dbfce 827 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
cc9d734c 828 sizeof (RBD_SNAP_HEAD_NAME))) {
0d7dbfce 829 rbd_dev->spec->snap_id = CEPH_NOSNAP;
99c1f08f 830 rbd_dev->mapping.size = rbd_dev->header.image_size;
34b13184 831 rbd_dev->mapping.features = rbd_dev->header.features;
e86924a8 832 ret = 0;
602adf40 833 } else {
0d7dbfce 834 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
602adf40
YS
835 if (ret < 0)
836 goto done;
f84344f3 837 rbd_dev->mapping.read_only = true;
602adf40 838 }
6d292906
AE
839 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
840
602adf40 841done:
602adf40
YS
842 return ret;
843}
844
845static void rbd_header_free(struct rbd_image_header *header)
846{
849b4260 847 kfree(header->object_prefix);
d78fd7ae 848 header->object_prefix = NULL;
602adf40 849 kfree(header->snap_sizes);
d78fd7ae 850 header->snap_sizes = NULL;
849b4260 851 kfree(header->snap_names);
d78fd7ae 852 header->snap_names = NULL;
d1d25646 853 ceph_put_snap_context(header->snapc);
d78fd7ae 854 header->snapc = NULL;
602adf40
YS
855}
856
98571b5a 857static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 858{
65ccfe21
AE
859 char *name;
860 u64 segment;
861 int ret;
602adf40 862
2fd82b9e 863 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
65ccfe21
AE
864 if (!name)
865 return NULL;
866 segment = offset >> rbd_dev->header.obj_order;
2fd82b9e 867 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
65ccfe21 868 rbd_dev->header.object_prefix, segment);
2fd82b9e 869 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
870 pr_err("error formatting segment name for #%llu (%d)\n",
871 segment, ret);
872 kfree(name);
873 name = NULL;
874 }
602adf40 875
65ccfe21
AE
876 return name;
877}
602adf40 878
65ccfe21
AE
879static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
880{
881 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 882
65ccfe21
AE
883 return offset & (segment_size - 1);
884}
885
886static u64 rbd_segment_length(struct rbd_device *rbd_dev,
887 u64 offset, u64 length)
888{
889 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
890
891 offset &= segment_size - 1;
892
aafb230e 893 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
894 if (offset + length > segment_size)
895 length = segment_size - offset;
896
897 return length;
602adf40
YS
898}
899
029bcbd8
JD
900/*
901 * returns the size of an object in the image
902 */
903static u64 rbd_obj_bytes(struct rbd_image_header *header)
904{
905 return 1 << header->obj_order;
906}
907
602adf40
YS
908/*
909 * bio helpers
910 */
911
912static void bio_chain_put(struct bio *chain)
913{
914 struct bio *tmp;
915
916 while (chain) {
917 tmp = chain;
918 chain = chain->bi_next;
919 bio_put(tmp);
920 }
921}
922
923/*
924 * zeros a bio chain, starting at specific offset
925 */
926static void zero_bio_chain(struct bio *chain, int start_ofs)
927{
928 struct bio_vec *bv;
929 unsigned long flags;
930 void *buf;
931 int i;
932 int pos = 0;
933
934 while (chain) {
935 bio_for_each_segment(bv, chain, i) {
936 if (pos + bv->bv_len > start_ofs) {
937 int remainder = max(start_ofs - pos, 0);
938 buf = bvec_kmap_irq(bv, &flags);
939 memset(buf + remainder, 0,
940 bv->bv_len - remainder);
85b5aaa6 941 bvec_kunmap_irq(buf, &flags);
602adf40
YS
942 }
943 pos += bv->bv_len;
944 }
945
946 chain = chain->bi_next;
947 }
948}
949
950/*
f7760dad
AE
951 * Clone a portion of a bio, starting at the given byte offset
952 * and continuing for the number of bytes indicated.
602adf40 953 */
f7760dad
AE
954static struct bio *bio_clone_range(struct bio *bio_src,
955 unsigned int offset,
956 unsigned int len,
957 gfp_t gfpmask)
602adf40 958{
f7760dad
AE
959 struct bio_vec *bv;
960 unsigned int resid;
961 unsigned short idx;
962 unsigned int voff;
963 unsigned short end_idx;
964 unsigned short vcnt;
965 struct bio *bio;
966
967 /* Handle the easy case for the caller */
968
969 if (!offset && len == bio_src->bi_size)
970 return bio_clone(bio_src, gfpmask);
971
972 if (WARN_ON_ONCE(!len))
973 return NULL;
974 if (WARN_ON_ONCE(len > bio_src->bi_size))
975 return NULL;
976 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
977 return NULL;
978
979 /* Find first affected segment... */
980
981 resid = offset;
982 __bio_for_each_segment(bv, bio_src, idx, 0) {
983 if (resid < bv->bv_len)
984 break;
985 resid -= bv->bv_len;
602adf40 986 }
f7760dad 987 voff = resid;
602adf40 988
f7760dad 989 /* ...and the last affected segment */
602adf40 990
f7760dad
AE
991 resid += len;
992 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
993 if (resid <= bv->bv_len)
994 break;
995 resid -= bv->bv_len;
996 }
997 vcnt = end_idx - idx + 1;
998
999 /* Build the clone */
1000
1001 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1002 if (!bio)
1003 return NULL; /* ENOMEM */
602adf40 1004
f7760dad
AE
1005 bio->bi_bdev = bio_src->bi_bdev;
1006 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1007 bio->bi_rw = bio_src->bi_rw;
1008 bio->bi_flags |= 1 << BIO_CLONED;
1009
1010 /*
1011 * Copy over our part of the bio_vec, then update the first
1012 * and last (or only) entries.
1013 */
1014 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1015 vcnt * sizeof (struct bio_vec));
1016 bio->bi_io_vec[0].bv_offset += voff;
1017 if (vcnt > 1) {
1018 bio->bi_io_vec[0].bv_len -= voff;
1019 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1020 } else {
1021 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
1022 }
1023
f7760dad
AE
1024 bio->bi_vcnt = vcnt;
1025 bio->bi_size = len;
1026 bio->bi_idx = 0;
1027
1028 return bio;
1029}
1030
1031/*
1032 * Clone a portion of a bio chain, starting at the given byte offset
1033 * into the first bio in the source chain and continuing for the
1034 * number of bytes indicated. The result is another bio chain of
1035 * exactly the given length, or a null pointer on error.
1036 *
1037 * The bio_src and offset parameters are both in-out. On entry they
1038 * refer to the first source bio and the offset into that bio where
1039 * the start of data to be cloned is located.
1040 *
1041 * On return, bio_src is updated to refer to the bio in the source
1042 * chain that contains first un-cloned byte, and *offset will
1043 * contain the offset of that byte within that bio.
1044 */
1045static struct bio *bio_chain_clone_range(struct bio **bio_src,
1046 unsigned int *offset,
1047 unsigned int len,
1048 gfp_t gfpmask)
1049{
1050 struct bio *bi = *bio_src;
1051 unsigned int off = *offset;
1052 struct bio *chain = NULL;
1053 struct bio **end;
1054
1055 /* Build up a chain of clone bios up to the limit */
1056
1057 if (!bi || off >= bi->bi_size || !len)
1058 return NULL; /* Nothing to clone */
602adf40 1059
f7760dad
AE
1060 end = &chain;
1061 while (len) {
1062 unsigned int bi_size;
1063 struct bio *bio;
1064
f5400b7a
AE
1065 if (!bi) {
1066 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1067 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1068 }
f7760dad
AE
1069 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1070 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1071 if (!bio)
1072 goto out_err; /* ENOMEM */
1073
1074 *end = bio;
1075 end = &bio->bi_next;
602adf40 1076
f7760dad
AE
1077 off += bi_size;
1078 if (off == bi->bi_size) {
1079 bi = bi->bi_next;
1080 off = 0;
1081 }
1082 len -= bi_size;
1083 }
1084 *bio_src = bi;
1085 *offset = off;
1086
1087 return chain;
1088out_err:
1089 bio_chain_put(chain);
602adf40 1090
602adf40
YS
1091 return NULL;
1092}
1093
926f9b3f
AE
1094/*
1095 * The default/initial value for all object request flags is 0. For
1096 * each flag, once its value is set to 1 it is never reset to 0
1097 * again.
1098 */
57acbaa7 1099static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1100{
57acbaa7 1101 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1102 struct rbd_device *rbd_dev;
1103
57acbaa7
AE
1104 rbd_dev = obj_request->img_request->rbd_dev;
1105 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
926f9b3f
AE
1106 obj_request);
1107 }
1108}
1109
57acbaa7 1110static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1111{
1112 smp_mb();
57acbaa7 1113 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1114}
1115
57acbaa7 1116static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1117{
57acbaa7
AE
1118 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1119 struct rbd_device *rbd_dev = NULL;
6365d33a 1120
57acbaa7
AE
1121 if (obj_request_img_data_test(obj_request))
1122 rbd_dev = obj_request->img_request->rbd_dev;
1123 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
6365d33a
AE
1124 obj_request);
1125 }
1126}
1127
57acbaa7 1128static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1129{
1130 smp_mb();
57acbaa7 1131 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1132}
1133
5679c59f
AE
1134/*
1135 * This sets the KNOWN flag after (possibly) setting the EXISTS
1136 * flag. The latter is set based on the "exists" value provided.
1137 *
1138 * Note that for our purposes once an object exists it never goes
1139 * away again. It's possible that the response from two existence
1140 * checks are separated by the creation of the target object, and
1141 * the first ("doesn't exist") response arrives *after* the second
1142 * ("does exist"). In that case we ignore the second one.
1143 */
1144static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1145 bool exists)
1146{
1147 if (exists)
1148 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1149 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1150 smp_mb();
1151}
1152
1153static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1154{
1155 smp_mb();
1156 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1157}
1158
1159static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1160{
1161 smp_mb();
1162 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1163}
1164
bf0d5f50
AE
1165static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1166{
37206ee5
AE
1167 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1168 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1169 kref_get(&obj_request->kref);
1170}
1171
1172static void rbd_obj_request_destroy(struct kref *kref);
1173static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1174{
1175 rbd_assert(obj_request != NULL);
37206ee5
AE
1176 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1177 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1178 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1179}
1180
1181static void rbd_img_request_get(struct rbd_img_request *img_request)
1182{
37206ee5
AE
1183 dout("%s: img %p (was %d)\n", __func__, img_request,
1184 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1185 kref_get(&img_request->kref);
1186}
1187
1188static void rbd_img_request_destroy(struct kref *kref);
1189static void rbd_img_request_put(struct rbd_img_request *img_request)
1190{
1191 rbd_assert(img_request != NULL);
37206ee5
AE
1192 dout("%s: img %p (was %d)\n", __func__, img_request,
1193 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1194 kref_put(&img_request->kref, rbd_img_request_destroy);
1195}
1196
1197static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1198 struct rbd_obj_request *obj_request)
1199{
25dcf954
AE
1200 rbd_assert(obj_request->img_request == NULL);
1201
b155e86c 1202 /* Image request now owns object's original reference */
bf0d5f50 1203 obj_request->img_request = img_request;
25dcf954 1204 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1205 rbd_assert(!obj_request_img_data_test(obj_request));
1206 obj_request_img_data_set(obj_request);
bf0d5f50 1207 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1208 img_request->obj_request_count++;
1209 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1210 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1211 obj_request->which);
bf0d5f50
AE
1212}
1213
1214static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1215 struct rbd_obj_request *obj_request)
1216{
1217 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1218
37206ee5
AE
1219 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1220 obj_request->which);
bf0d5f50 1221 list_del(&obj_request->links);
25dcf954
AE
1222 rbd_assert(img_request->obj_request_count > 0);
1223 img_request->obj_request_count--;
1224 rbd_assert(obj_request->which == img_request->obj_request_count);
1225 obj_request->which = BAD_WHICH;
6365d33a 1226 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1227 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1228 obj_request->img_request = NULL;
25dcf954 1229 obj_request->callback = NULL;
bf0d5f50
AE
1230 rbd_obj_request_put(obj_request);
1231}
1232
1233static bool obj_request_type_valid(enum obj_request_type type)
1234{
1235 switch (type) {
9969ebc5 1236 case OBJ_REQUEST_NODATA:
bf0d5f50 1237 case OBJ_REQUEST_BIO:
788e2df3 1238 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1239 return true;
1240 default:
1241 return false;
1242 }
1243}
1244
bf0d5f50
AE
1245static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1246 struct rbd_obj_request *obj_request)
1247{
37206ee5
AE
1248 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1249
bf0d5f50
AE
1250 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1251}
1252
1253static void rbd_img_request_complete(struct rbd_img_request *img_request)
1254{
55f27e09 1255
37206ee5 1256 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1257
1258 /*
1259 * If no error occurred, compute the aggregate transfer
1260 * count for the image request. We could instead use
1261 * atomic64_cmpxchg() to update it as each object request
1262 * completes; not clear which way is better off hand.
1263 */
1264 if (!img_request->result) {
1265 struct rbd_obj_request *obj_request;
1266 u64 xferred = 0;
1267
1268 for_each_obj_request(img_request, obj_request)
1269 xferred += obj_request->xferred;
1270 img_request->xferred = xferred;
1271 }
1272
bf0d5f50
AE
1273 if (img_request->callback)
1274 img_request->callback(img_request);
1275 else
1276 rbd_img_request_put(img_request);
1277}
1278
788e2df3
AE
1279/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1280
1281static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1282{
37206ee5
AE
1283 dout("%s: obj %p\n", __func__, obj_request);
1284
788e2df3
AE
1285 return wait_for_completion_interruptible(&obj_request->completion);
1286}
1287
0c425248
AE
1288/*
1289 * The default/initial value for all image request flags is 0. Each
1290 * is conditionally set to 1 at image request initialization time
1291 * and currently never change thereafter.
1292 */
1293static void img_request_write_set(struct rbd_img_request *img_request)
1294{
1295 set_bit(IMG_REQ_WRITE, &img_request->flags);
1296 smp_mb();
1297}
1298
1299static bool img_request_write_test(struct rbd_img_request *img_request)
1300{
1301 smp_mb();
1302 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1303}
1304
9849e986
AE
1305static void img_request_child_set(struct rbd_img_request *img_request)
1306{
1307 set_bit(IMG_REQ_CHILD, &img_request->flags);
1308 smp_mb();
1309}
1310
1311static bool img_request_child_test(struct rbd_img_request *img_request)
1312{
1313 smp_mb();
1314 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1315}
1316
d0b2e944
AE
1317static void img_request_layered_set(struct rbd_img_request *img_request)
1318{
1319 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1320 smp_mb();
1321}
1322
1323static bool img_request_layered_test(struct rbd_img_request *img_request)
1324{
1325 smp_mb();
1326 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1327}
1328
6e2a4505
AE
1329static void
1330rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1331{
1332 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1333 obj_request, obj_request->img_request, obj_request->result,
1334 obj_request->xferred, obj_request->length);
1335 /*
1336 * ENOENT means a hole in the image. We zero-fill the
1337 * entire length of the request. A short read also implies
1338 * zero-fill to the end of the request. Either way we
1339 * update the xferred count to indicate the whole request
1340 * was satisfied.
1341 */
1342 BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
1343 if (obj_request->result == -ENOENT) {
1344 zero_bio_chain(obj_request->bio_list, 0);
1345 obj_request->result = 0;
1346 obj_request->xferred = obj_request->length;
1347 } else if (obj_request->xferred < obj_request->length &&
1348 !obj_request->result) {
1349 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1350 obj_request->xferred = obj_request->length;
1351 }
1352 obj_request_done_set(obj_request);
1353}
1354
bf0d5f50
AE
1355static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1356{
37206ee5
AE
1357 dout("%s: obj %p cb %p\n", __func__, obj_request,
1358 obj_request->callback);
bf0d5f50
AE
1359 if (obj_request->callback)
1360 obj_request->callback(obj_request);
788e2df3
AE
1361 else
1362 complete_all(&obj_request->completion);
bf0d5f50
AE
1363}
1364
c47f9371 1365static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1366{
1367 dout("%s: obj %p\n", __func__, obj_request);
1368 obj_request_done_set(obj_request);
1369}
1370
c47f9371 1371static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1372{
57acbaa7
AE
1373 struct rbd_img_request *img_request = NULL;
1374 bool layered = false;
1375
1376 if (obj_request_img_data_test(obj_request)) {
1377 img_request = obj_request->img_request;
1378 layered = img_request && img_request_layered_test(img_request);
1379 } else {
1380 img_request = NULL;
1381 layered = false;
1382 }
8b3e1a56
AE
1383
1384 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1385 obj_request, img_request, obj_request->result,
1386 obj_request->xferred, obj_request->length);
1387 if (layered && obj_request->result == -ENOENT)
1388 rbd_img_parent_read(obj_request);
1389 else if (img_request)
6e2a4505
AE
1390 rbd_img_obj_request_read_callback(obj_request);
1391 else
1392 obj_request_done_set(obj_request);
bf0d5f50
AE
1393}
1394
c47f9371 1395static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1396{
1b83bef2
SW
1397 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1398 obj_request->result, obj_request->length);
1399 /*
8b3e1a56
AE
1400 * There is no such thing as a successful short write. Set
1401 * it to our originally-requested length.
1b83bef2
SW
1402 */
1403 obj_request->xferred = obj_request->length;
07741308 1404 obj_request_done_set(obj_request);
bf0d5f50
AE
1405}
1406
fbfab539
AE
1407/*
1408 * For a simple stat call there's nothing to do. We'll do more if
1409 * this is part of a write sequence for a layered image.
1410 */
c47f9371 1411static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1412{
37206ee5 1413 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1414 obj_request_done_set(obj_request);
1415}
1416
bf0d5f50
AE
1417static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1418 struct ceph_msg *msg)
1419{
1420 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1421 u16 opcode;
1422
37206ee5 1423 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50 1424 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1425 if (obj_request_img_data_test(obj_request)) {
1426 rbd_assert(obj_request->img_request);
1427 rbd_assert(obj_request->which != BAD_WHICH);
1428 } else {
1429 rbd_assert(obj_request->which == BAD_WHICH);
1430 }
bf0d5f50 1431
1b83bef2
SW
1432 if (osd_req->r_result < 0)
1433 obj_request->result = osd_req->r_result;
bf0d5f50
AE
1434 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1435
1b83bef2 1436 WARN_ON(osd_req->r_num_ops != 1); /* For now */
bf0d5f50 1437
c47f9371
AE
1438 /*
1439 * We support a 64-bit length, but ultimately it has to be
1440 * passed to blk_end_request(), which takes an unsigned int.
1441 */
1b83bef2 1442 obj_request->xferred = osd_req->r_reply_op_len[0];
8b3e1a56 1443 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
79528734 1444 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1445 switch (opcode) {
1446 case CEPH_OSD_OP_READ:
c47f9371 1447 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1448 break;
1449 case CEPH_OSD_OP_WRITE:
c47f9371 1450 rbd_osd_write_callback(obj_request);
bf0d5f50 1451 break;
fbfab539 1452 case CEPH_OSD_OP_STAT:
c47f9371 1453 rbd_osd_stat_callback(obj_request);
fbfab539 1454 break;
36be9a76 1455 case CEPH_OSD_OP_CALL:
b8d70035 1456 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1457 case CEPH_OSD_OP_WATCH:
c47f9371 1458 rbd_osd_trivial_callback(obj_request);
9969ebc5 1459 break;
bf0d5f50
AE
1460 default:
1461 rbd_warn(NULL, "%s: unsupported op %hu\n",
1462 obj_request->object_name, (unsigned short) opcode);
1463 break;
1464 }
1465
07741308 1466 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1467 rbd_obj_request_complete(obj_request);
1468}
1469
2fa12320 1470static void rbd_osd_req_format(struct rbd_obj_request *obj_request,
79528734 1471 bool write_request)
430c28c3
AE
1472{
1473 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1474 struct ceph_osd_request *osd_req = obj_request->osd_req;
430c28c3
AE
1475 struct ceph_snap_context *snapc = NULL;
1476 u64 snap_id = CEPH_NOSNAP;
1477 struct timespec *mtime = NULL;
1478 struct timespec now;
1479
8c042b0d 1480 rbd_assert(osd_req != NULL);
430c28c3
AE
1481
1482 if (write_request) {
1483 now = CURRENT_TIME;
1484 mtime = &now;
1485 if (img_request)
1486 snapc = img_request->snapc;
2fa12320
AE
1487 } else if (img_request) {
1488 snap_id = img_request->snap_id;
8c042b0d
AE
1489 }
1490 ceph_osdc_build_request(osd_req, obj_request->offset,
79528734 1491 snapc, snap_id, mtime);
430c28c3
AE
1492}
1493
bf0d5f50
AE
1494static struct ceph_osd_request *rbd_osd_req_create(
1495 struct rbd_device *rbd_dev,
1496 bool write_request,
430c28c3 1497 struct rbd_obj_request *obj_request)
bf0d5f50 1498{
bf0d5f50
AE
1499 struct ceph_snap_context *snapc = NULL;
1500 struct ceph_osd_client *osdc;
1501 struct ceph_osd_request *osd_req;
bf0d5f50 1502
6365d33a
AE
1503 if (obj_request_img_data_test(obj_request)) {
1504 struct rbd_img_request *img_request = obj_request->img_request;
1505
0c425248
AE
1506 rbd_assert(write_request ==
1507 img_request_write_test(img_request));
1508 if (write_request)
bf0d5f50 1509 snapc = img_request->snapc;
bf0d5f50
AE
1510 }
1511
1512 /* Allocate and initialize the request, for the single op */
1513
1514 osdc = &rbd_dev->rbd_client->client->osdc;
1515 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1516 if (!osd_req)
1517 return NULL; /* ENOMEM */
bf0d5f50 1518
430c28c3 1519 if (write_request)
bf0d5f50 1520 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1521 else
bf0d5f50 1522 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1523
1524 osd_req->r_callback = rbd_osd_req_callback;
1525 osd_req->r_priv = obj_request;
1526
1527 osd_req->r_oid_len = strlen(obj_request->object_name);
1528 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1529 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1530
1531 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1532
bf0d5f50
AE
1533 return osd_req;
1534}
1535
1536static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1537{
1538 ceph_osdc_put_request(osd_req);
1539}
1540
1541/* object_name is assumed to be a non-null pointer and NUL-terminated */
1542
1543static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1544 u64 offset, u64 length,
1545 enum obj_request_type type)
1546{
1547 struct rbd_obj_request *obj_request;
1548 size_t size;
1549 char *name;
1550
1551 rbd_assert(obj_request_type_valid(type));
1552
1553 size = strlen(object_name) + 1;
1554 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1555 if (!obj_request)
1556 return NULL;
1557
1558 name = (char *)(obj_request + 1);
1559 obj_request->object_name = memcpy(name, object_name, size);
1560 obj_request->offset = offset;
1561 obj_request->length = length;
926f9b3f 1562 obj_request->flags = 0;
bf0d5f50
AE
1563 obj_request->which = BAD_WHICH;
1564 obj_request->type = type;
1565 INIT_LIST_HEAD(&obj_request->links);
788e2df3 1566 init_completion(&obj_request->completion);
bf0d5f50
AE
1567 kref_init(&obj_request->kref);
1568
37206ee5
AE
1569 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1570 offset, length, (int)type, obj_request);
1571
bf0d5f50
AE
1572 return obj_request;
1573}
1574
1575static void rbd_obj_request_destroy(struct kref *kref)
1576{
1577 struct rbd_obj_request *obj_request;
1578
1579 obj_request = container_of(kref, struct rbd_obj_request, kref);
1580
37206ee5
AE
1581 dout("%s: obj %p\n", __func__, obj_request);
1582
bf0d5f50
AE
1583 rbd_assert(obj_request->img_request == NULL);
1584 rbd_assert(obj_request->which == BAD_WHICH);
1585
1586 if (obj_request->osd_req)
1587 rbd_osd_req_destroy(obj_request->osd_req);
1588
1589 rbd_assert(obj_request_type_valid(obj_request->type));
1590 switch (obj_request->type) {
9969ebc5
AE
1591 case OBJ_REQUEST_NODATA:
1592 break; /* Nothing to do */
bf0d5f50
AE
1593 case OBJ_REQUEST_BIO:
1594 if (obj_request->bio_list)
1595 bio_chain_put(obj_request->bio_list);
1596 break;
788e2df3
AE
1597 case OBJ_REQUEST_PAGES:
1598 if (obj_request->pages)
1599 ceph_release_page_vector(obj_request->pages,
1600 obj_request->page_count);
1601 break;
bf0d5f50
AE
1602 }
1603
1604 kfree(obj_request);
1605}
1606
1607/*
1608 * Caller is responsible for filling in the list of object requests
1609 * that comprises the image request, and the Linux request pointer
1610 * (if there is one).
1611 */
cc344fa1
AE
1612static struct rbd_img_request *rbd_img_request_create(
1613 struct rbd_device *rbd_dev,
bf0d5f50 1614 u64 offset, u64 length,
9849e986
AE
1615 bool write_request,
1616 bool child_request)
bf0d5f50
AE
1617{
1618 struct rbd_img_request *img_request;
1619 struct ceph_snap_context *snapc = NULL;
1620
1621 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1622 if (!img_request)
1623 return NULL;
1624
1625 if (write_request) {
1626 down_read(&rbd_dev->header_rwsem);
1627 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1628 up_read(&rbd_dev->header_rwsem);
1629 if (WARN_ON(!snapc)) {
1630 kfree(img_request);
1631 return NULL; /* Shouldn't happen */
1632 }
0c425248 1633
bf0d5f50
AE
1634 }
1635
1636 img_request->rq = NULL;
1637 img_request->rbd_dev = rbd_dev;
1638 img_request->offset = offset;
1639 img_request->length = length;
0c425248
AE
1640 img_request->flags = 0;
1641 if (write_request) {
1642 img_request_write_set(img_request);
bf0d5f50 1643 img_request->snapc = snapc;
0c425248 1644 } else {
bf0d5f50 1645 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 1646 }
9849e986
AE
1647 if (child_request)
1648 img_request_child_set(img_request);
d0b2e944
AE
1649 if (rbd_dev->parent_spec)
1650 img_request_layered_set(img_request);
bf0d5f50
AE
1651 spin_lock_init(&img_request->completion_lock);
1652 img_request->next_completion = 0;
1653 img_request->callback = NULL;
a5a337d4 1654 img_request->result = 0;
bf0d5f50
AE
1655 img_request->obj_request_count = 0;
1656 INIT_LIST_HEAD(&img_request->obj_requests);
1657 kref_init(&img_request->kref);
1658
5679c59f
AE
1659 (void) obj_request_existence_set;
1660 (void) obj_request_known_test;
1661 (void) obj_request_exists_test;
1662
bf0d5f50
AE
1663 rbd_img_request_get(img_request); /* Avoid a warning */
1664 rbd_img_request_put(img_request); /* TEMPORARY */
1665
37206ee5
AE
1666 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1667 write_request ? "write" : "read", offset, length,
1668 img_request);
1669
bf0d5f50
AE
1670 return img_request;
1671}
1672
1673static void rbd_img_request_destroy(struct kref *kref)
1674{
1675 struct rbd_img_request *img_request;
1676 struct rbd_obj_request *obj_request;
1677 struct rbd_obj_request *next_obj_request;
1678
1679 img_request = container_of(kref, struct rbd_img_request, kref);
1680
37206ee5
AE
1681 dout("%s: img %p\n", __func__, img_request);
1682
bf0d5f50
AE
1683 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1684 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 1685 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 1686
0c425248 1687 if (img_request_write_test(img_request))
bf0d5f50
AE
1688 ceph_put_snap_context(img_request->snapc);
1689
8b3e1a56
AE
1690 if (img_request_child_test(img_request))
1691 rbd_obj_request_put(img_request->obj_request);
1692
bf0d5f50
AE
1693 kfree(img_request);
1694}
1695
1217857f
AE
1696static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1697{
6365d33a 1698 struct rbd_img_request *img_request;
1217857f
AE
1699 unsigned int xferred;
1700 int result;
8b3e1a56 1701 bool more;
1217857f 1702
6365d33a
AE
1703 rbd_assert(obj_request_img_data_test(obj_request));
1704 img_request = obj_request->img_request;
1705
1217857f
AE
1706 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1707 xferred = (unsigned int)obj_request->xferred;
1708 result = obj_request->result;
1709 if (result) {
1710 struct rbd_device *rbd_dev = img_request->rbd_dev;
1711
1712 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1713 img_request_write_test(img_request) ? "write" : "read",
1714 obj_request->length, obj_request->img_offset,
1715 obj_request->offset);
1716 rbd_warn(rbd_dev, " result %d xferred %x\n",
1717 result, xferred);
1718 if (!img_request->result)
1719 img_request->result = result;
1720 }
1721
8b3e1a56
AE
1722 if (img_request_child_test(img_request)) {
1723 rbd_assert(img_request->obj_request != NULL);
1724 more = obj_request->which < img_request->obj_request_count - 1;
1725 } else {
1726 rbd_assert(img_request->rq != NULL);
1727 more = blk_end_request(img_request->rq, result, xferred);
1728 }
1729
1730 return more;
1217857f
AE
1731}
1732
2169238d
AE
1733static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1734{
1735 struct rbd_img_request *img_request;
1736 u32 which = obj_request->which;
1737 bool more = true;
1738
6365d33a 1739 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
1740 img_request = obj_request->img_request;
1741
1742 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1743 rbd_assert(img_request != NULL);
2169238d
AE
1744 rbd_assert(img_request->obj_request_count > 0);
1745 rbd_assert(which != BAD_WHICH);
1746 rbd_assert(which < img_request->obj_request_count);
1747 rbd_assert(which >= img_request->next_completion);
1748
1749 spin_lock_irq(&img_request->completion_lock);
1750 if (which != img_request->next_completion)
1751 goto out;
1752
1753 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
1754 rbd_assert(more);
1755 rbd_assert(which < img_request->obj_request_count);
1756
1757 if (!obj_request_done_test(obj_request))
1758 break;
1217857f 1759 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
1760 which++;
1761 }
1762
1763 rbd_assert(more ^ (which == img_request->obj_request_count));
1764 img_request->next_completion = which;
1765out:
1766 spin_unlock_irq(&img_request->completion_lock);
1767
1768 if (!more)
1769 rbd_img_request_complete(img_request);
1770}
1771
bf0d5f50
AE
1772static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1773 struct bio *bio_list)
1774{
1775 struct rbd_device *rbd_dev = img_request->rbd_dev;
1776 struct rbd_obj_request *obj_request = NULL;
1777 struct rbd_obj_request *next_obj_request;
0c425248 1778 bool write_request = img_request_write_test(img_request);
bf0d5f50 1779 unsigned int bio_offset;
7da22d29 1780 u64 img_offset;
bf0d5f50
AE
1781 u64 resid;
1782 u16 opcode;
1783
37206ee5
AE
1784 dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1785
430c28c3 1786 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
bf0d5f50 1787 bio_offset = 0;
7da22d29
AE
1788 img_offset = img_request->offset;
1789 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
bf0d5f50 1790 resid = img_request->length;
4dda41d3 1791 rbd_assert(resid > 0);
bf0d5f50 1792 while (resid) {
2fa12320 1793 struct ceph_osd_request *osd_req;
bf0d5f50
AE
1794 const char *object_name;
1795 unsigned int clone_size;
bf0d5f50
AE
1796 u64 offset;
1797 u64 length;
1798
7da22d29 1799 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
1800 if (!object_name)
1801 goto out_unwind;
7da22d29
AE
1802 offset = rbd_segment_offset(rbd_dev, img_offset);
1803 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50
AE
1804 obj_request = rbd_obj_request_create(object_name,
1805 offset, length,
1806 OBJ_REQUEST_BIO);
1807 kfree(object_name); /* object request has its own copy */
1808 if (!obj_request)
1809 goto out_unwind;
1810
1811 rbd_assert(length <= (u64) UINT_MAX);
1812 clone_size = (unsigned int) length;
1813 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1814 &bio_offset, clone_size,
1815 GFP_ATOMIC);
1816 if (!obj_request->bio_list)
1817 goto out_partial;
1818
2fa12320
AE
1819 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1820 obj_request);
1821 if (!osd_req)
bf0d5f50 1822 goto out_partial;
2fa12320 1823 obj_request->osd_req = osd_req;
2169238d 1824 obj_request->callback = rbd_img_obj_callback;
430c28c3 1825
2fa12320
AE
1826 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1827 0, 0);
406e2c9f 1828 osd_req_op_extent_osd_data_bio(osd_req, 0,
a4ce40a9 1829 obj_request->bio_list, obj_request->length);
2fa12320 1830 rbd_osd_req_format(obj_request, write_request);
430c28c3 1831
7da22d29 1832 obj_request->img_offset = img_offset;
bf0d5f50
AE
1833 rbd_img_obj_request_add(img_request, obj_request);
1834
7da22d29 1835 img_offset += length;
bf0d5f50
AE
1836 resid -= length;
1837 }
1838
1839 return 0;
1840
1841out_partial:
1842 rbd_obj_request_put(obj_request);
1843out_unwind:
1844 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1845 rbd_obj_request_put(obj_request);
1846
1847 return -ENOMEM;
1848}
1849
bf0d5f50
AE
1850static int rbd_img_request_submit(struct rbd_img_request *img_request)
1851{
1852 struct rbd_device *rbd_dev = img_request->rbd_dev;
1853 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1854 struct rbd_obj_request *obj_request;
46faeed4 1855 struct rbd_obj_request *next_obj_request;
bf0d5f50 1856
37206ee5 1857 dout("%s: img %p\n", __func__, img_request);
46faeed4 1858 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
1859 int ret;
1860
bf0d5f50
AE
1861 ret = rbd_obj_request_submit(osdc, obj_request);
1862 if (ret)
1863 return ret;
bf0d5f50
AE
1864 }
1865
1866 return 0;
1867}
8b3e1a56
AE
1868
1869static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
1870{
1871 struct rbd_obj_request *obj_request;
1872
1873 rbd_assert(img_request_child_test(img_request));
1874
1875 obj_request = img_request->obj_request;
1876 rbd_assert(obj_request != NULL);
1877 obj_request->result = img_request->result;
1878 obj_request->xferred = img_request->xferred;
1879
1880 rbd_img_obj_request_read_callback(obj_request);
1881 rbd_obj_request_complete(obj_request);
1882}
1883
1884static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
1885{
1886 struct rbd_device *rbd_dev;
1887 struct rbd_img_request *img_request;
1888 int result;
1889
1890 rbd_assert(obj_request_img_data_test(obj_request));
1891 rbd_assert(obj_request->img_request != NULL);
1892 rbd_assert(obj_request->result == (s32) -ENOENT);
1893 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
1894
1895 rbd_dev = obj_request->img_request->rbd_dev;
1896 rbd_assert(rbd_dev->parent != NULL);
1897 /* rbd_read_finish(obj_request, obj_request->length); */
1898 img_request = rbd_img_request_create(rbd_dev->parent,
1899 obj_request->img_offset,
1900 obj_request->length,
1901 false, true);
1902 result = -ENOMEM;
1903 if (!img_request)
1904 goto out_err;
1905
1906 rbd_obj_request_get(obj_request);
1907 img_request->obj_request = obj_request;
1908
1909 result = rbd_img_request_fill_bio(img_request, obj_request->bio_list);
1910 if (result)
1911 goto out_err;
1912
1913 img_request->callback = rbd_img_parent_read_callback;
1914 result = rbd_img_request_submit(img_request);
1915 if (result)
1916 goto out_err;
1917
1918 return;
1919out_err:
1920 if (img_request)
1921 rbd_img_request_put(img_request);
1922 obj_request->result = result;
1923 obj_request->xferred = 0;
1924 obj_request_done_set(obj_request);
1925}
bf0d5f50 1926
cf81b60e 1927static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
b8d70035
AE
1928 u64 ver, u64 notify_id)
1929{
1930 struct rbd_obj_request *obj_request;
2169238d 1931 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
b8d70035
AE
1932 int ret;
1933
1934 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1935 OBJ_REQUEST_NODATA);
1936 if (!obj_request)
1937 return -ENOMEM;
1938
1939 ret = -ENOMEM;
430c28c3 1940 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
b8d70035
AE
1941 if (!obj_request->osd_req)
1942 goto out;
2169238d 1943 obj_request->callback = rbd_obj_request_put;
b8d70035 1944
c99d2d4a
AE
1945 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
1946 notify_id, ver, 0);
2fa12320 1947 rbd_osd_req_format(obj_request, false);
430c28c3 1948
b8d70035 1949 ret = rbd_obj_request_submit(osdc, obj_request);
b8d70035 1950out:
cf81b60e
AE
1951 if (ret)
1952 rbd_obj_request_put(obj_request);
b8d70035
AE
1953
1954 return ret;
1955}
1956
1957static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1958{
1959 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1960 u64 hver;
1961 int rc;
1962
1963 if (!rbd_dev)
1964 return;
1965
37206ee5 1966 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
b8d70035
AE
1967 rbd_dev->header_name, (unsigned long long) notify_id,
1968 (unsigned int) opcode);
1969 rc = rbd_dev_refresh(rbd_dev, &hver);
1970 if (rc)
1971 rbd_warn(rbd_dev, "got notification but failed to "
1972 " update snaps: %d\n", rc);
1973
cf81b60e 1974 rbd_obj_notify_ack(rbd_dev, hver, notify_id);
b8d70035
AE
1975}
1976
9969ebc5
AE
1977/*
1978 * Request sync osd watch/unwatch. The value of "start" determines
1979 * whether a watch request is being initiated or torn down.
1980 */
1981static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1982{
1983 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1984 struct rbd_obj_request *obj_request;
9969ebc5
AE
1985 int ret;
1986
1987 rbd_assert(start ^ !!rbd_dev->watch_event);
1988 rbd_assert(start ^ !!rbd_dev->watch_request);
1989
1990 if (start) {
3c663bbd 1991 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
1992 &rbd_dev->watch_event);
1993 if (ret < 0)
1994 return ret;
8eb87565 1995 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
1996 }
1997
1998 ret = -ENOMEM;
1999 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2000 OBJ_REQUEST_NODATA);
2001 if (!obj_request)
2002 goto out_cancel;
2003
430c28c3
AE
2004 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2005 if (!obj_request->osd_req)
2006 goto out_cancel;
2007
8eb87565 2008 if (start)
975241af 2009 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 2010 else
6977c3f9 2011 ceph_osdc_unregister_linger_request(osdc,
975241af 2012 rbd_dev->watch_request->osd_req);
2169238d
AE
2013
2014 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2015 rbd_dev->watch_event->cookie,
2016 rbd_dev->header.obj_version, start);
2017 rbd_osd_req_format(obj_request, true);
2018
9969ebc5
AE
2019 ret = rbd_obj_request_submit(osdc, obj_request);
2020 if (ret)
2021 goto out_cancel;
2022 ret = rbd_obj_request_wait(obj_request);
2023 if (ret)
2024 goto out_cancel;
9969ebc5
AE
2025 ret = obj_request->result;
2026 if (ret)
2027 goto out_cancel;
2028
8eb87565
AE
2029 /*
2030 * A watch request is set to linger, so the underlying osd
2031 * request won't go away until we unregister it. We retain
2032 * a pointer to the object request during that time (in
2033 * rbd_dev->watch_request), so we'll keep a reference to
2034 * it. We'll drop that reference (below) after we've
2035 * unregistered it.
2036 */
2037 if (start) {
2038 rbd_dev->watch_request = obj_request;
2039
2040 return 0;
2041 }
2042
2043 /* We have successfully torn down the watch request */
2044
2045 rbd_obj_request_put(rbd_dev->watch_request);
2046 rbd_dev->watch_request = NULL;
9969ebc5
AE
2047out_cancel:
2048 /* Cancel the event if we're tearing down, or on error */
2049 ceph_osdc_cancel_event(rbd_dev->watch_event);
2050 rbd_dev->watch_event = NULL;
9969ebc5
AE
2051 if (obj_request)
2052 rbd_obj_request_put(obj_request);
2053
2054 return ret;
2055}
2056
36be9a76
AE
2057/*
2058 * Synchronous osd object method call
2059 */
2060static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2061 const char *object_name,
2062 const char *class_name,
2063 const char *method_name,
2064 const char *outbound,
2065 size_t outbound_size,
2066 char *inbound,
2067 size_t inbound_size,
2068 u64 *version)
2069{
2169238d 2070 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 2071 struct rbd_obj_request *obj_request;
36be9a76
AE
2072 struct page **pages;
2073 u32 page_count;
2074 int ret;
2075
2076 /*
6010a451
AE
2077 * Method calls are ultimately read operations. The result
2078 * should placed into the inbound buffer provided. They
2079 * also supply outbound data--parameters for the object
2080 * method. Currently if this is present it will be a
2081 * snapshot id.
36be9a76
AE
2082 */
2083 page_count = (u32) calc_pages_for(0, inbound_size);
2084 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2085 if (IS_ERR(pages))
2086 return PTR_ERR(pages);
2087
2088 ret = -ENOMEM;
6010a451 2089 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
2090 OBJ_REQUEST_PAGES);
2091 if (!obj_request)
2092 goto out;
2093
2094 obj_request->pages = pages;
2095 obj_request->page_count = page_count;
2096
430c28c3 2097 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
36be9a76
AE
2098 if (!obj_request->osd_req)
2099 goto out;
2100
c99d2d4a 2101 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
2102 class_name, method_name);
2103 if (outbound_size) {
2104 struct ceph_pagelist *pagelist;
2105
2106 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2107 if (!pagelist)
2108 goto out;
2109
2110 ceph_pagelist_init(pagelist);
2111 ceph_pagelist_append(pagelist, outbound, outbound_size);
2112 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2113 pagelist);
2114 }
a4ce40a9
AE
2115 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2116 obj_request->pages, inbound_size,
44cd188d 2117 0, false, false);
2fa12320 2118 rbd_osd_req_format(obj_request, false);
430c28c3 2119
36be9a76
AE
2120 ret = rbd_obj_request_submit(osdc, obj_request);
2121 if (ret)
2122 goto out;
2123 ret = rbd_obj_request_wait(obj_request);
2124 if (ret)
2125 goto out;
2126
2127 ret = obj_request->result;
2128 if (ret < 0)
2129 goto out;
23ed6e13 2130 ret = 0;
903bb32e 2131 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
2132 if (version)
2133 *version = obj_request->version;
2134out:
2135 if (obj_request)
2136 rbd_obj_request_put(obj_request);
2137 else
2138 ceph_release_page_vector(pages, page_count);
2139
2140 return ret;
2141}
2142
bf0d5f50 2143static void rbd_request_fn(struct request_queue *q)
cc344fa1 2144 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
2145{
2146 struct rbd_device *rbd_dev = q->queuedata;
2147 bool read_only = rbd_dev->mapping.read_only;
2148 struct request *rq;
2149 int result;
2150
2151 while ((rq = blk_fetch_request(q))) {
2152 bool write_request = rq_data_dir(rq) == WRITE;
2153 struct rbd_img_request *img_request;
2154 u64 offset;
2155 u64 length;
2156
2157 /* Ignore any non-FS requests that filter through. */
2158
2159 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
2160 dout("%s: non-fs request type %d\n", __func__,
2161 (int) rq->cmd_type);
2162 __blk_end_request_all(rq, 0);
2163 continue;
2164 }
2165
2166 /* Ignore/skip any zero-length requests */
2167
2168 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2169 length = (u64) blk_rq_bytes(rq);
2170
2171 if (!length) {
2172 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
2173 __blk_end_request_all(rq, 0);
2174 continue;
2175 }
2176
2177 spin_unlock_irq(q->queue_lock);
2178
2179 /* Disallow writes to a read-only device */
2180
2181 if (write_request) {
2182 result = -EROFS;
2183 if (read_only)
2184 goto end_request;
2185 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2186 }
2187
6d292906
AE
2188 /*
2189 * Quit early if the mapped snapshot no longer
2190 * exists. It's still possible the snapshot will
2191 * have disappeared by the time our request arrives
2192 * at the osd, but there's no sense in sending it if
2193 * we already know.
2194 */
2195 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
2196 dout("request for non-existent snapshot");
2197 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2198 result = -ENXIO;
2199 goto end_request;
2200 }
2201
bf0d5f50
AE
2202 result = -EINVAL;
2203 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2204 goto end_request; /* Shouldn't happen */
2205
2206 result = -ENOMEM;
2207 img_request = rbd_img_request_create(rbd_dev, offset, length,
9849e986 2208 write_request, false);
bf0d5f50
AE
2209 if (!img_request)
2210 goto end_request;
2211
2212 img_request->rq = rq;
2213
2214 result = rbd_img_request_fill_bio(img_request, rq->bio);
2215 if (!result)
2216 result = rbd_img_request_submit(img_request);
2217 if (result)
2218 rbd_img_request_put(img_request);
2219end_request:
2220 spin_lock_irq(q->queue_lock);
2221 if (result < 0) {
7da22d29
AE
2222 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2223 write_request ? "write" : "read",
2224 length, offset, result);
2225
bf0d5f50
AE
2226 __blk_end_request_all(rq, result);
2227 }
2228 }
2229}
2230
602adf40
YS
2231/*
2232 * a queue callback. Makes sure that we don't create a bio that spans across
2233 * multiple osd objects. One exception would be with a single page bios,
f7760dad 2234 * which we handle later at bio_chain_clone_range()
602adf40
YS
2235 */
2236static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2237 struct bio_vec *bvec)
2238{
2239 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
2240 sector_t sector_offset;
2241 sector_t sectors_per_obj;
2242 sector_t obj_sector_offset;
2243 int ret;
2244
2245 /*
2246 * Find how far into its rbd object the partition-relative
2247 * bio start sector is to offset relative to the enclosing
2248 * device.
2249 */
2250 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2251 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2252 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2253
2254 /*
2255 * Compute the number of bytes from that offset to the end
2256 * of the object. Account for what's already used by the bio.
2257 */
2258 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2259 if (ret > bmd->bi_size)
2260 ret -= bmd->bi_size;
2261 else
2262 ret = 0;
2263
2264 /*
2265 * Don't send back more than was asked for. And if the bio
2266 * was empty, let the whole thing through because: "Note
2267 * that a block device *must* allow a single page to be
2268 * added to an empty bio."
2269 */
2270 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2271 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2272 ret = (int) bvec->bv_len;
2273
2274 return ret;
602adf40
YS
2275}
2276
2277static void rbd_free_disk(struct rbd_device *rbd_dev)
2278{
2279 struct gendisk *disk = rbd_dev->disk;
2280
2281 if (!disk)
2282 return;
2283
602adf40
YS
2284 if (disk->flags & GENHD_FL_UP)
2285 del_gendisk(disk);
2286 if (disk->queue)
2287 blk_cleanup_queue(disk->queue);
2288 put_disk(disk);
2289}
2290
788e2df3
AE
2291static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2292 const char *object_name,
2293 u64 offset, u64 length,
2294 char *buf, u64 *version)
2295
2296{
2169238d 2297 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 2298 struct rbd_obj_request *obj_request;
788e2df3
AE
2299 struct page **pages = NULL;
2300 u32 page_count;
1ceae7ef 2301 size_t size;
788e2df3
AE
2302 int ret;
2303
2304 page_count = (u32) calc_pages_for(offset, length);
2305 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2306 if (IS_ERR(pages))
2307 ret = PTR_ERR(pages);
2308
2309 ret = -ENOMEM;
2310 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 2311 OBJ_REQUEST_PAGES);
788e2df3
AE
2312 if (!obj_request)
2313 goto out;
2314
2315 obj_request->pages = pages;
2316 obj_request->page_count = page_count;
2317
430c28c3 2318 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
788e2df3
AE
2319 if (!obj_request->osd_req)
2320 goto out;
2321
c99d2d4a
AE
2322 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2323 offset, length, 0, 0);
406e2c9f 2324 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 2325 obj_request->pages,
44cd188d
AE
2326 obj_request->length,
2327 obj_request->offset & ~PAGE_MASK,
2328 false, false);
2fa12320 2329 rbd_osd_req_format(obj_request, false);
430c28c3 2330
788e2df3
AE
2331 ret = rbd_obj_request_submit(osdc, obj_request);
2332 if (ret)
2333 goto out;
2334 ret = rbd_obj_request_wait(obj_request);
2335 if (ret)
2336 goto out;
2337
2338 ret = obj_request->result;
2339 if (ret < 0)
2340 goto out;
1ceae7ef
AE
2341
2342 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2343 size = (size_t) obj_request->xferred;
903bb32e 2344 ceph_copy_from_page_vector(pages, buf, 0, size);
23ed6e13
AE
2345 rbd_assert(size <= (size_t) INT_MAX);
2346 ret = (int) size;
788e2df3
AE
2347 if (version)
2348 *version = obj_request->version;
2349out:
2350 if (obj_request)
2351 rbd_obj_request_put(obj_request);
2352 else
2353 ceph_release_page_vector(pages, page_count);
2354
2355 return ret;
2356}
2357
602adf40 2358/*
4156d998
AE
2359 * Read the complete header for the given rbd device.
2360 *
2361 * Returns a pointer to a dynamically-allocated buffer containing
2362 * the complete and validated header. Caller can pass the address
2363 * of a variable that will be filled in with the version of the
2364 * header object at the time it was read.
2365 *
2366 * Returns a pointer-coded errno if a failure occurs.
602adf40 2367 */
4156d998
AE
2368static struct rbd_image_header_ondisk *
2369rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
602adf40 2370{
4156d998 2371 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 2372 u32 snap_count = 0;
4156d998
AE
2373 u64 names_size = 0;
2374 u32 want_count;
2375 int ret;
602adf40 2376
00f1f36f 2377 /*
4156d998
AE
2378 * The complete header will include an array of its 64-bit
2379 * snapshot ids, followed by the names of those snapshots as
2380 * a contiguous block of NUL-terminated strings. Note that
2381 * the number of snapshots could change by the time we read
2382 * it in, in which case we re-read it.
00f1f36f 2383 */
4156d998
AE
2384 do {
2385 size_t size;
2386
2387 kfree(ondisk);
2388
2389 size = sizeof (*ondisk);
2390 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2391 size += names_size;
2392 ondisk = kmalloc(size, GFP_KERNEL);
2393 if (!ondisk)
2394 return ERR_PTR(-ENOMEM);
2395
788e2df3 2396 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
4156d998
AE
2397 0, size,
2398 (char *) ondisk, version);
4156d998
AE
2399 if (ret < 0)
2400 goto out_err;
2401 if (WARN_ON((size_t) ret < size)) {
2402 ret = -ENXIO;
06ecc6cb
AE
2403 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2404 size, ret);
4156d998
AE
2405 goto out_err;
2406 }
2407 if (!rbd_dev_ondisk_valid(ondisk)) {
2408 ret = -ENXIO;
06ecc6cb 2409 rbd_warn(rbd_dev, "invalid header");
4156d998 2410 goto out_err;
81e759fb 2411 }
602adf40 2412
4156d998
AE
2413 names_size = le64_to_cpu(ondisk->snap_names_len);
2414 want_count = snap_count;
2415 snap_count = le32_to_cpu(ondisk->snap_count);
2416 } while (snap_count != want_count);
00f1f36f 2417
4156d998 2418 return ondisk;
00f1f36f 2419
4156d998
AE
2420out_err:
2421 kfree(ondisk);
2422
2423 return ERR_PTR(ret);
2424}
2425
2426/*
2427 * reload the ondisk the header
2428 */
2429static int rbd_read_header(struct rbd_device *rbd_dev,
2430 struct rbd_image_header *header)
2431{
2432 struct rbd_image_header_ondisk *ondisk;
2433 u64 ver = 0;
2434 int ret;
602adf40 2435
4156d998
AE
2436 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2437 if (IS_ERR(ondisk))
2438 return PTR_ERR(ondisk);
2439 ret = rbd_header_from_disk(header, ondisk);
2440 if (ret >= 0)
2441 header->obj_version = ver;
2442 kfree(ondisk);
2443
2444 return ret;
602adf40
YS
2445}
2446
41f38c2b 2447static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
dfc5606d
YS
2448{
2449 struct rbd_snap *snap;
a0593290 2450 struct rbd_snap *next;
dfc5606d 2451
a0593290 2452 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
41f38c2b 2453 rbd_remove_snap_dev(snap);
dfc5606d
YS
2454}
2455
9478554a
AE
2456static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2457{
2458 sector_t size;
2459
0d7dbfce 2460 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
9478554a
AE
2461 return;
2462
2463 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2464 dout("setting size to %llu sectors", (unsigned long long) size);
2465 rbd_dev->mapping.size = (u64) size;
2466 set_capacity(rbd_dev->disk, size);
2467}
2468
602adf40
YS
2469/*
2470 * only read the first part of the ondisk header, without the snaps info
2471 */
117973fb 2472static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
602adf40
YS
2473{
2474 int ret;
2475 struct rbd_image_header h;
602adf40
YS
2476
2477 ret = rbd_read_header(rbd_dev, &h);
2478 if (ret < 0)
2479 return ret;
2480
a51aa0c0
JD
2481 down_write(&rbd_dev->header_rwsem);
2482
9478554a
AE
2483 /* Update image size, and check for resize of mapped image */
2484 rbd_dev->header.image_size = h.image_size;
2485 rbd_update_mapping_size(rbd_dev);
9db4b3e3 2486
849b4260 2487 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 2488 kfree(rbd_dev->header.snap_sizes);
849b4260 2489 kfree(rbd_dev->header.snap_names);
d1d25646
JD
2490 /* osd requests may still refer to snapc */
2491 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 2492
b813623a
AE
2493 if (hver)
2494 *hver = h.obj_version;
a71b891b 2495 rbd_dev->header.obj_version = h.obj_version;
93a24e08 2496 rbd_dev->header.image_size = h.image_size;
602adf40
YS
2497 rbd_dev->header.snapc = h.snapc;
2498 rbd_dev->header.snap_names = h.snap_names;
2499 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260
AE
2500 /* Free the extra copy of the object prefix */
2501 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2502 kfree(h.object_prefix);
2503
304f6808
AE
2504 ret = rbd_dev_snaps_update(rbd_dev);
2505 if (!ret)
2506 ret = rbd_dev_snaps_register(rbd_dev);
dfc5606d 2507
c666601a 2508 up_write(&rbd_dev->header_rwsem);
602adf40 2509
dfc5606d 2510 return ret;
602adf40
YS
2511}
2512
117973fb 2513static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1fe5e993
AE
2514{
2515 int ret;
2516
117973fb 2517 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1fe5e993 2518 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
117973fb
AE
2519 if (rbd_dev->image_format == 1)
2520 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2521 else
2522 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1fe5e993
AE
2523 mutex_unlock(&ctl_mutex);
2524
2525 return ret;
2526}
2527
602adf40
YS
2528static int rbd_init_disk(struct rbd_device *rbd_dev)
2529{
2530 struct gendisk *disk;
2531 struct request_queue *q;
593a9e7b 2532 u64 segment_size;
602adf40 2533
602adf40 2534 /* create gendisk info */
602adf40
YS
2535 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2536 if (!disk)
1fcdb8aa 2537 return -ENOMEM;
602adf40 2538
f0f8cef5 2539 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 2540 rbd_dev->dev_id);
602adf40
YS
2541 disk->major = rbd_dev->major;
2542 disk->first_minor = 0;
2543 disk->fops = &rbd_bd_ops;
2544 disk->private_data = rbd_dev;
2545
bf0d5f50 2546 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
2547 if (!q)
2548 goto out_disk;
029bcbd8 2549
593a9e7b
AE
2550 /* We use the default size, but let's be explicit about it. */
2551 blk_queue_physical_block_size(q, SECTOR_SIZE);
2552
029bcbd8 2553 /* set io sizes to object size */
593a9e7b
AE
2554 segment_size = rbd_obj_bytes(&rbd_dev->header);
2555 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2556 blk_queue_max_segment_size(q, segment_size);
2557 blk_queue_io_min(q, segment_size);
2558 blk_queue_io_opt(q, segment_size);
029bcbd8 2559
602adf40
YS
2560 blk_queue_merge_bvec(q, rbd_merge_bvec);
2561 disk->queue = q;
2562
2563 q->queuedata = rbd_dev;
2564
2565 rbd_dev->disk = disk;
602adf40 2566
12f02944
AE
2567 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2568
602adf40 2569 return 0;
602adf40
YS
2570out_disk:
2571 put_disk(disk);
1fcdb8aa
AE
2572
2573 return -ENOMEM;
602adf40
YS
2574}
2575
dfc5606d
YS
2576/*
2577 sysfs
2578*/
2579
593a9e7b
AE
2580static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2581{
2582 return container_of(dev, struct rbd_device, dev);
2583}
2584
dfc5606d
YS
2585static ssize_t rbd_size_show(struct device *dev,
2586 struct device_attribute *attr, char *buf)
2587{
593a9e7b 2588 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0
JD
2589 sector_t size;
2590
2591 down_read(&rbd_dev->header_rwsem);
2592 size = get_capacity(rbd_dev->disk);
2593 up_read(&rbd_dev->header_rwsem);
dfc5606d 2594
a51aa0c0 2595 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
dfc5606d
YS
2596}
2597
34b13184
AE
2598/*
2599 * Note this shows the features for whatever's mapped, which is not
2600 * necessarily the base image.
2601 */
2602static ssize_t rbd_features_show(struct device *dev,
2603 struct device_attribute *attr, char *buf)
2604{
2605 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2606
2607 return sprintf(buf, "0x%016llx\n",
2608 (unsigned long long) rbd_dev->mapping.features);
2609}
2610
dfc5606d
YS
2611static ssize_t rbd_major_show(struct device *dev,
2612 struct device_attribute *attr, char *buf)
2613{
593a9e7b 2614 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 2615
dfc5606d
YS
2616 return sprintf(buf, "%d\n", rbd_dev->major);
2617}
2618
2619static ssize_t rbd_client_id_show(struct device *dev,
2620 struct device_attribute *attr, char *buf)
602adf40 2621{
593a9e7b 2622 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2623
1dbb4399
AE
2624 return sprintf(buf, "client%lld\n",
2625 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
2626}
2627
dfc5606d
YS
2628static ssize_t rbd_pool_show(struct device *dev,
2629 struct device_attribute *attr, char *buf)
602adf40 2630{
593a9e7b 2631 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2632
0d7dbfce 2633 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
2634}
2635
9bb2f334
AE
2636static ssize_t rbd_pool_id_show(struct device *dev,
2637 struct device_attribute *attr, char *buf)
2638{
2639 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2640
0d7dbfce
AE
2641 return sprintf(buf, "%llu\n",
2642 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
2643}
2644
dfc5606d
YS
2645static ssize_t rbd_name_show(struct device *dev,
2646 struct device_attribute *attr, char *buf)
2647{
593a9e7b 2648 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2649
a92ffdf8
AE
2650 if (rbd_dev->spec->image_name)
2651 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2652
2653 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
2654}
2655
589d30e0
AE
2656static ssize_t rbd_image_id_show(struct device *dev,
2657 struct device_attribute *attr, char *buf)
2658{
2659 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2660
0d7dbfce 2661 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
2662}
2663
34b13184
AE
2664/*
2665 * Shows the name of the currently-mapped snapshot (or
2666 * RBD_SNAP_HEAD_NAME for the base image).
2667 */
dfc5606d
YS
2668static ssize_t rbd_snap_show(struct device *dev,
2669 struct device_attribute *attr,
2670 char *buf)
2671{
593a9e7b 2672 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2673
0d7dbfce 2674 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
2675}
2676
86b00e0d
AE
2677/*
2678 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2679 * for the parent image. If there is no parent, simply shows
2680 * "(no parent image)".
2681 */
2682static ssize_t rbd_parent_show(struct device *dev,
2683 struct device_attribute *attr,
2684 char *buf)
2685{
2686 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2687 struct rbd_spec *spec = rbd_dev->parent_spec;
2688 int count;
2689 char *bufp = buf;
2690
2691 if (!spec)
2692 return sprintf(buf, "(no parent image)\n");
2693
2694 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2695 (unsigned long long) spec->pool_id, spec->pool_name);
2696 if (count < 0)
2697 return count;
2698 bufp += count;
2699
2700 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2701 spec->image_name ? spec->image_name : "(unknown)");
2702 if (count < 0)
2703 return count;
2704 bufp += count;
2705
2706 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2707 (unsigned long long) spec->snap_id, spec->snap_name);
2708 if (count < 0)
2709 return count;
2710 bufp += count;
2711
2712 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2713 if (count < 0)
2714 return count;
2715 bufp += count;
2716
2717 return (ssize_t) (bufp - buf);
2718}
2719
dfc5606d
YS
2720static ssize_t rbd_image_refresh(struct device *dev,
2721 struct device_attribute *attr,
2722 const char *buf,
2723 size_t size)
2724{
593a9e7b 2725 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 2726 int ret;
602adf40 2727
117973fb 2728 ret = rbd_dev_refresh(rbd_dev, NULL);
b813623a
AE
2729
2730 return ret < 0 ? ret : size;
dfc5606d 2731}
602adf40 2732
dfc5606d 2733static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 2734static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
2735static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2736static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2737static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 2738static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 2739static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 2740static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
2741static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2742static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 2743static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
2744
2745static struct attribute *rbd_attrs[] = {
2746 &dev_attr_size.attr,
34b13184 2747 &dev_attr_features.attr,
dfc5606d
YS
2748 &dev_attr_major.attr,
2749 &dev_attr_client_id.attr,
2750 &dev_attr_pool.attr,
9bb2f334 2751 &dev_attr_pool_id.attr,
dfc5606d 2752 &dev_attr_name.attr,
589d30e0 2753 &dev_attr_image_id.attr,
dfc5606d 2754 &dev_attr_current_snap.attr,
86b00e0d 2755 &dev_attr_parent.attr,
dfc5606d 2756 &dev_attr_refresh.attr,
dfc5606d
YS
2757 NULL
2758};
2759
2760static struct attribute_group rbd_attr_group = {
2761 .attrs = rbd_attrs,
2762};
2763
2764static const struct attribute_group *rbd_attr_groups[] = {
2765 &rbd_attr_group,
2766 NULL
2767};
2768
2769static void rbd_sysfs_dev_release(struct device *dev)
2770{
2771}
2772
2773static struct device_type rbd_device_type = {
2774 .name = "rbd",
2775 .groups = rbd_attr_groups,
2776 .release = rbd_sysfs_dev_release,
2777};
2778
2779
2780/*
2781 sysfs - snapshots
2782*/
2783
2784static ssize_t rbd_snap_size_show(struct device *dev,
2785 struct device_attribute *attr,
2786 char *buf)
2787{
2788 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2789
3591538f 2790 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
dfc5606d
YS
2791}
2792
2793static ssize_t rbd_snap_id_show(struct device *dev,
2794 struct device_attribute *attr,
2795 char *buf)
2796{
2797 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2798
3591538f 2799 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
dfc5606d
YS
2800}
2801
34b13184
AE
2802static ssize_t rbd_snap_features_show(struct device *dev,
2803 struct device_attribute *attr,
2804 char *buf)
2805{
2806 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2807
2808 return sprintf(buf, "0x%016llx\n",
2809 (unsigned long long) snap->features);
2810}
2811
dfc5606d
YS
2812static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2813static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
34b13184 2814static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
dfc5606d
YS
2815
2816static struct attribute *rbd_snap_attrs[] = {
2817 &dev_attr_snap_size.attr,
2818 &dev_attr_snap_id.attr,
34b13184 2819 &dev_attr_snap_features.attr,
dfc5606d
YS
2820 NULL,
2821};
2822
2823static struct attribute_group rbd_snap_attr_group = {
2824 .attrs = rbd_snap_attrs,
2825};
2826
2827static void rbd_snap_dev_release(struct device *dev)
2828{
2829 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2830 kfree(snap->name);
2831 kfree(snap);
2832}
2833
2834static const struct attribute_group *rbd_snap_attr_groups[] = {
2835 &rbd_snap_attr_group,
2836 NULL
2837};
2838
2839static struct device_type rbd_snap_device_type = {
2840 .groups = rbd_snap_attr_groups,
2841 .release = rbd_snap_dev_release,
2842};
2843
8b8fb99c
AE
2844static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2845{
2846 kref_get(&spec->kref);
2847
2848 return spec;
2849}
2850
2851static void rbd_spec_free(struct kref *kref);
2852static void rbd_spec_put(struct rbd_spec *spec)
2853{
2854 if (spec)
2855 kref_put(&spec->kref, rbd_spec_free);
2856}
2857
2858static struct rbd_spec *rbd_spec_alloc(void)
2859{
2860 struct rbd_spec *spec;
2861
2862 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2863 if (!spec)
2864 return NULL;
2865 kref_init(&spec->kref);
2866
8b8fb99c
AE
2867 return spec;
2868}
2869
2870static void rbd_spec_free(struct kref *kref)
2871{
2872 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2873
2874 kfree(spec->pool_name);
2875 kfree(spec->image_id);
2876 kfree(spec->image_name);
2877 kfree(spec->snap_name);
2878 kfree(spec);
2879}
2880
cc344fa1 2881static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
2882 struct rbd_spec *spec)
2883{
2884 struct rbd_device *rbd_dev;
2885
2886 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2887 if (!rbd_dev)
2888 return NULL;
2889
2890 spin_lock_init(&rbd_dev->lock);
6d292906 2891 rbd_dev->flags = 0;
c53d5893
AE
2892 INIT_LIST_HEAD(&rbd_dev->node);
2893 INIT_LIST_HEAD(&rbd_dev->snaps);
2894 init_rwsem(&rbd_dev->header_rwsem);
2895
2896 rbd_dev->spec = spec;
2897 rbd_dev->rbd_client = rbdc;
2898
0903e875
AE
2899 /* Initialize the layout used for all rbd requests */
2900
2901 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2902 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2903 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2904 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2905
c53d5893
AE
2906 return rbd_dev;
2907}
2908
2909static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2910{
86b00e0d 2911 rbd_spec_put(rbd_dev->parent_spec);
c53d5893
AE
2912 kfree(rbd_dev->header_name);
2913 rbd_put_client(rbd_dev->rbd_client);
2914 rbd_spec_put(rbd_dev->spec);
2915 kfree(rbd_dev);
2916}
2917
304f6808
AE
2918static bool rbd_snap_registered(struct rbd_snap *snap)
2919{
2920 bool ret = snap->dev.type == &rbd_snap_device_type;
2921 bool reg = device_is_registered(&snap->dev);
2922
2923 rbd_assert(!ret ^ reg);
2924
2925 return ret;
2926}
2927
41f38c2b 2928static void rbd_remove_snap_dev(struct rbd_snap *snap)
dfc5606d
YS
2929{
2930 list_del(&snap->node);
304f6808
AE
2931 if (device_is_registered(&snap->dev))
2932 device_unregister(&snap->dev);
dfc5606d
YS
2933}
2934
14e7085d 2935static int rbd_register_snap_dev(struct rbd_snap *snap,
dfc5606d
YS
2936 struct device *parent)
2937{
2938 struct device *dev = &snap->dev;
2939 int ret;
2940
2941 dev->type = &rbd_snap_device_type;
2942 dev->parent = parent;
2943 dev->release = rbd_snap_dev_release;
d4b125e9 2944 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
304f6808
AE
2945 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2946
dfc5606d
YS
2947 ret = device_register(dev);
2948
2949 return ret;
2950}
2951
4e891e0a 2952static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
c8d18425 2953 const char *snap_name,
34b13184
AE
2954 u64 snap_id, u64 snap_size,
2955 u64 snap_features)
dfc5606d 2956{
4e891e0a 2957 struct rbd_snap *snap;
dfc5606d 2958 int ret;
4e891e0a
AE
2959
2960 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
dfc5606d 2961 if (!snap)
4e891e0a
AE
2962 return ERR_PTR(-ENOMEM);
2963
2964 ret = -ENOMEM;
c8d18425 2965 snap->name = kstrdup(snap_name, GFP_KERNEL);
4e891e0a
AE
2966 if (!snap->name)
2967 goto err;
2968
c8d18425
AE
2969 snap->id = snap_id;
2970 snap->size = snap_size;
34b13184 2971 snap->features = snap_features;
4e891e0a
AE
2972
2973 return snap;
2974
dfc5606d
YS
2975err:
2976 kfree(snap->name);
2977 kfree(snap);
4e891e0a
AE
2978
2979 return ERR_PTR(ret);
dfc5606d
YS
2980}
2981
cd892126
AE
2982static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2983 u64 *snap_size, u64 *snap_features)
2984{
2985 char *snap_name;
2986
2987 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2988
2989 *snap_size = rbd_dev->header.snap_sizes[which];
2990 *snap_features = 0; /* No features for v1 */
2991
2992 /* Skip over names until we find the one we are looking for */
2993
2994 snap_name = rbd_dev->header.snap_names;
2995 while (which--)
2996 snap_name += strlen(snap_name) + 1;
2997
2998 return snap_name;
2999}
3000
9d475de5
AE
3001/*
3002 * Get the size and object order for an image snapshot, or if
3003 * snap_id is CEPH_NOSNAP, gets this information for the base
3004 * image.
3005 */
3006static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3007 u8 *order, u64 *snap_size)
3008{
3009 __le64 snapid = cpu_to_le64(snap_id);
3010 int ret;
3011 struct {
3012 u8 order;
3013 __le64 size;
3014 } __attribute__ ((packed)) size_buf = { 0 };
3015
36be9a76 3016 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5
AE
3017 "rbd", "get_size",
3018 (char *) &snapid, sizeof (snapid),
07b2391f 3019 (char *) &size_buf, sizeof (size_buf), NULL);
36be9a76 3020 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
3021 if (ret < 0)
3022 return ret;
3023
3024 *order = size_buf.order;
3025 *snap_size = le64_to_cpu(size_buf.size);
3026
3027 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
3028 (unsigned long long) snap_id, (unsigned int) *order,
3029 (unsigned long long) *snap_size);
3030
3031 return 0;
3032}
3033
3034static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3035{
3036 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3037 &rbd_dev->header.obj_order,
3038 &rbd_dev->header.image_size);
3039}
3040
1e130199
AE
3041static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3042{
3043 void *reply_buf;
3044 int ret;
3045 void *p;
3046
3047 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3048 if (!reply_buf)
3049 return -ENOMEM;
3050
36be9a76 3051 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
1e130199
AE
3052 "rbd", "get_object_prefix",
3053 NULL, 0,
07b2391f 3054 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
36be9a76 3055 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
3056 if (ret < 0)
3057 goto out;
3058
3059 p = reply_buf;
3060 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3061 p + RBD_OBJ_PREFIX_LEN_MAX,
3062 NULL, GFP_NOIO);
3063
3064 if (IS_ERR(rbd_dev->header.object_prefix)) {
3065 ret = PTR_ERR(rbd_dev->header.object_prefix);
3066 rbd_dev->header.object_prefix = NULL;
3067 } else {
3068 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3069 }
3070
3071out:
3072 kfree(reply_buf);
3073
3074 return ret;
3075}
3076
b1b5402a
AE
3077static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3078 u64 *snap_features)
3079{
3080 __le64 snapid = cpu_to_le64(snap_id);
3081 struct {
3082 __le64 features;
3083 __le64 incompat;
3084 } features_buf = { 0 };
d889140c 3085 u64 incompat;
b1b5402a
AE
3086 int ret;
3087
36be9a76 3088 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a
AE
3089 "rbd", "get_features",
3090 (char *) &snapid, sizeof (snapid),
3091 (char *) &features_buf, sizeof (features_buf),
07b2391f 3092 NULL);
36be9a76 3093 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
3094 if (ret < 0)
3095 return ret;
d889140c
AE
3096
3097 incompat = le64_to_cpu(features_buf.incompat);
5cbf6f12 3098 if (incompat & ~RBD_FEATURES_SUPPORTED)
b8f5c6ed 3099 return -ENXIO;
d889140c 3100
b1b5402a
AE
3101 *snap_features = le64_to_cpu(features_buf.features);
3102
3103 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3104 (unsigned long long) snap_id,
3105 (unsigned long long) *snap_features,
3106 (unsigned long long) le64_to_cpu(features_buf.incompat));
3107
3108 return 0;
3109}
3110
3111static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3112{
3113 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3114 &rbd_dev->header.features);
3115}
3116
86b00e0d
AE
3117static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3118{
3119 struct rbd_spec *parent_spec;
3120 size_t size;
3121 void *reply_buf = NULL;
3122 __le64 snapid;
3123 void *p;
3124 void *end;
3125 char *image_id;
3126 u64 overlap;
86b00e0d
AE
3127 int ret;
3128
3129 parent_spec = rbd_spec_alloc();
3130 if (!parent_spec)
3131 return -ENOMEM;
3132
3133 size = sizeof (__le64) + /* pool_id */
3134 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3135 sizeof (__le64) + /* snap_id */
3136 sizeof (__le64); /* overlap */
3137 reply_buf = kmalloc(size, GFP_KERNEL);
3138 if (!reply_buf) {
3139 ret = -ENOMEM;
3140 goto out_err;
3141 }
3142
3143 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 3144 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d
AE
3145 "rbd", "get_parent",
3146 (char *) &snapid, sizeof (snapid),
07b2391f 3147 (char *) reply_buf, size, NULL);
36be9a76 3148 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
3149 if (ret < 0)
3150 goto out_err;
3151
3152 ret = -ERANGE;
3153 p = reply_buf;
3154 end = (char *) reply_buf + size;
3155 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3156 if (parent_spec->pool_id == CEPH_NOPOOL)
3157 goto out; /* No parent? No problem. */
3158
0903e875
AE
3159 /* The ceph file layout needs to fit pool id in 32 bits */
3160
3161 ret = -EIO;
3162 if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
3163 goto out;
3164
979ed480 3165 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
3166 if (IS_ERR(image_id)) {
3167 ret = PTR_ERR(image_id);
3168 goto out_err;
3169 }
3170 parent_spec->image_id = image_id;
3171 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3172 ceph_decode_64_safe(&p, end, overlap, out_err);
3173
3174 rbd_dev->parent_overlap = overlap;
3175 rbd_dev->parent_spec = parent_spec;
3176 parent_spec = NULL; /* rbd_dev now owns this */
3177out:
3178 ret = 0;
3179out_err:
3180 kfree(reply_buf);
3181 rbd_spec_put(parent_spec);
3182
3183 return ret;
3184}
3185
9e15b77d
AE
3186static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3187{
3188 size_t image_id_size;
3189 char *image_id;
3190 void *p;
3191 void *end;
3192 size_t size;
3193 void *reply_buf = NULL;
3194 size_t len = 0;
3195 char *image_name = NULL;
3196 int ret;
3197
3198 rbd_assert(!rbd_dev->spec->image_name);
3199
69e7a02f
AE
3200 len = strlen(rbd_dev->spec->image_id);
3201 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
3202 image_id = kmalloc(image_id_size, GFP_KERNEL);
3203 if (!image_id)
3204 return NULL;
3205
3206 p = image_id;
3207 end = (char *) image_id + image_id_size;
69e7a02f 3208 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
9e15b77d
AE
3209
3210 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3211 reply_buf = kmalloc(size, GFP_KERNEL);
3212 if (!reply_buf)
3213 goto out;
3214
36be9a76 3215 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
3216 "rbd", "dir_get_name",
3217 image_id, image_id_size,
07b2391f 3218 (char *) reply_buf, size, NULL);
9e15b77d
AE
3219 if (ret < 0)
3220 goto out;
3221 p = reply_buf;
3222 end = (char *) reply_buf + size;
3223 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3224 if (IS_ERR(image_name))
3225 image_name = NULL;
3226 else
3227 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3228out:
3229 kfree(reply_buf);
3230 kfree(image_id);
3231
3232 return image_name;
3233}
3234
3235/*
3236 * When a parent image gets probed, we only have the pool, image,
3237 * and snapshot ids but not the names of any of them. This call
3238 * is made later to fill in those names. It has to be done after
3239 * rbd_dev_snaps_update() has completed because some of the
3240 * information (in particular, snapshot name) is not available
3241 * until then.
3242 */
3243static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3244{
3245 struct ceph_osd_client *osdc;
3246 const char *name;
3247 void *reply_buf = NULL;
3248 int ret;
3249
3250 if (rbd_dev->spec->pool_name)
3251 return 0; /* Already have the names */
3252
3253 /* Look up the pool name */
3254
3255 osdc = &rbd_dev->rbd_client->client->osdc;
3256 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
935dc89f
AE
3257 if (!name) {
3258 rbd_warn(rbd_dev, "there is no pool with id %llu",
3259 rbd_dev->spec->pool_id); /* Really a BUG() */
3260 return -EIO;
3261 }
9e15b77d
AE
3262
3263 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3264 if (!rbd_dev->spec->pool_name)
3265 return -ENOMEM;
3266
3267 /* Fetch the image name; tolerate failure here */
3268
3269 name = rbd_dev_image_name(rbd_dev);
69e7a02f 3270 if (name)
9e15b77d 3271 rbd_dev->spec->image_name = (char *) name;
69e7a02f 3272 else
06ecc6cb 3273 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d
AE
3274
3275 /* Look up the snapshot name. */
3276
3277 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3278 if (!name) {
935dc89f
AE
3279 rbd_warn(rbd_dev, "no snapshot with id %llu",
3280 rbd_dev->spec->snap_id); /* Really a BUG() */
9e15b77d
AE
3281 ret = -EIO;
3282 goto out_err;
3283 }
3284 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3285 if(!rbd_dev->spec->snap_name)
3286 goto out_err;
3287
3288 return 0;
3289out_err:
3290 kfree(reply_buf);
3291 kfree(rbd_dev->spec->pool_name);
3292 rbd_dev->spec->pool_name = NULL;
3293
3294 return ret;
3295}
3296
6e14b1a6 3297static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
35d489f9
AE
3298{
3299 size_t size;
3300 int ret;
3301 void *reply_buf;
3302 void *p;
3303 void *end;
3304 u64 seq;
3305 u32 snap_count;
3306 struct ceph_snap_context *snapc;
3307 u32 i;
3308
3309 /*
3310 * We'll need room for the seq value (maximum snapshot id),
3311 * snapshot count, and array of that many snapshot ids.
3312 * For now we have a fixed upper limit on the number we're
3313 * prepared to receive.
3314 */
3315 size = sizeof (__le64) + sizeof (__le32) +
3316 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3317 reply_buf = kzalloc(size, GFP_KERNEL);
3318 if (!reply_buf)
3319 return -ENOMEM;
3320
36be9a76 3321 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
35d489f9
AE
3322 "rbd", "get_snapcontext",
3323 NULL, 0,
07b2391f 3324 reply_buf, size, ver);
36be9a76 3325 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
3326 if (ret < 0)
3327 goto out;
3328
3329 ret = -ERANGE;
3330 p = reply_buf;
3331 end = (char *) reply_buf + size;
3332 ceph_decode_64_safe(&p, end, seq, out);
3333 ceph_decode_32_safe(&p, end, snap_count, out);
3334
3335 /*
3336 * Make sure the reported number of snapshot ids wouldn't go
3337 * beyond the end of our buffer. But before checking that,
3338 * make sure the computed size of the snapshot context we
3339 * allocate is representable in a size_t.
3340 */
3341 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3342 / sizeof (u64)) {
3343 ret = -EINVAL;
3344 goto out;
3345 }
3346 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3347 goto out;
3348
3349 size = sizeof (struct ceph_snap_context) +
3350 snap_count * sizeof (snapc->snaps[0]);
3351 snapc = kmalloc(size, GFP_KERNEL);
3352 if (!snapc) {
3353 ret = -ENOMEM;
3354 goto out;
3355 }
3356
3357 atomic_set(&snapc->nref, 1);
3358 snapc->seq = seq;
3359 snapc->num_snaps = snap_count;
3360 for (i = 0; i < snap_count; i++)
3361 snapc->snaps[i] = ceph_decode_64(&p);
3362
3363 rbd_dev->header.snapc = snapc;
3364
3365 dout(" snap context seq = %llu, snap_count = %u\n",
3366 (unsigned long long) seq, (unsigned int) snap_count);
3367
3368out:
3369 kfree(reply_buf);
3370
3371 return 0;
3372}
3373
b8b1e2db
AE
3374static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3375{
3376 size_t size;
3377 void *reply_buf;
3378 __le64 snap_id;
3379 int ret;
3380 void *p;
3381 void *end;
b8b1e2db
AE
3382 char *snap_name;
3383
3384 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3385 reply_buf = kmalloc(size, GFP_KERNEL);
3386 if (!reply_buf)
3387 return ERR_PTR(-ENOMEM);
3388
3389 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
36be9a76 3390 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db
AE
3391 "rbd", "get_snapshot_name",
3392 (char *) &snap_id, sizeof (snap_id),
07b2391f 3393 reply_buf, size, NULL);
36be9a76 3394 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b8b1e2db
AE
3395 if (ret < 0)
3396 goto out;
3397
3398 p = reply_buf;
3399 end = (char *) reply_buf + size;
e5c35534 3400 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
b8b1e2db
AE
3401 if (IS_ERR(snap_name)) {
3402 ret = PTR_ERR(snap_name);
3403 goto out;
3404 } else {
3405 dout(" snap_id 0x%016llx snap_name = %s\n",
3406 (unsigned long long) le64_to_cpu(snap_id), snap_name);
3407 }
3408 kfree(reply_buf);
3409
3410 return snap_name;
3411out:
3412 kfree(reply_buf);
3413
3414 return ERR_PTR(ret);
3415}
3416
3417static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3418 u64 *snap_size, u64 *snap_features)
3419{
e0b49868 3420 u64 snap_id;
b8b1e2db
AE
3421 u8 order;
3422 int ret;
3423
3424 snap_id = rbd_dev->header.snapc->snaps[which];
3425 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3426 if (ret)
3427 return ERR_PTR(ret);
3428 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3429 if (ret)
3430 return ERR_PTR(ret);
3431
3432 return rbd_dev_v2_snap_name(rbd_dev, which);
3433}
3434
3435static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3436 u64 *snap_size, u64 *snap_features)
3437{
3438 if (rbd_dev->image_format == 1)
3439 return rbd_dev_v1_snap_info(rbd_dev, which,
3440 snap_size, snap_features);
3441 if (rbd_dev->image_format == 2)
3442 return rbd_dev_v2_snap_info(rbd_dev, which,
3443 snap_size, snap_features);
3444 return ERR_PTR(-EINVAL);
3445}
3446
117973fb
AE
3447static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3448{
3449 int ret;
3450 __u8 obj_order;
3451
3452 down_write(&rbd_dev->header_rwsem);
3453
3454 /* Grab old order first, to see if it changes */
3455
3456 obj_order = rbd_dev->header.obj_order,
3457 ret = rbd_dev_v2_image_size(rbd_dev);
3458 if (ret)
3459 goto out;
3460 if (rbd_dev->header.obj_order != obj_order) {
3461 ret = -EIO;
3462 goto out;
3463 }
3464 rbd_update_mapping_size(rbd_dev);
3465
3466 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3467 dout("rbd_dev_v2_snap_context returned %d\n", ret);
3468 if (ret)
3469 goto out;
3470 ret = rbd_dev_snaps_update(rbd_dev);
3471 dout("rbd_dev_snaps_update returned %d\n", ret);
3472 if (ret)
3473 goto out;
3474 ret = rbd_dev_snaps_register(rbd_dev);
3475 dout("rbd_dev_snaps_register returned %d\n", ret);
3476out:
3477 up_write(&rbd_dev->header_rwsem);
3478
3479 return ret;
3480}
3481
dfc5606d 3482/*
35938150
AE
3483 * Scan the rbd device's current snapshot list and compare it to the
3484 * newly-received snapshot context. Remove any existing snapshots
3485 * not present in the new snapshot context. Add a new snapshot for
3486 * any snaphots in the snapshot context not in the current list.
3487 * And verify there are no changes to snapshots we already know
3488 * about.
3489 *
3490 * Assumes the snapshots in the snapshot context are sorted by
3491 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
3492 * are also maintained in that order.)
dfc5606d 3493 */
304f6808 3494static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
dfc5606d 3495{
35938150
AE
3496 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3497 const u32 snap_count = snapc->num_snaps;
35938150
AE
3498 struct list_head *head = &rbd_dev->snaps;
3499 struct list_head *links = head->next;
3500 u32 index = 0;
dfc5606d 3501
9fcbb800 3502 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
35938150
AE
3503 while (index < snap_count || links != head) {
3504 u64 snap_id;
3505 struct rbd_snap *snap;
cd892126
AE
3506 char *snap_name;
3507 u64 snap_size = 0;
3508 u64 snap_features = 0;
dfc5606d 3509
35938150
AE
3510 snap_id = index < snap_count ? snapc->snaps[index]
3511 : CEPH_NOSNAP;
3512 snap = links != head ? list_entry(links, struct rbd_snap, node)
3513 : NULL;
aafb230e 3514 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
dfc5606d 3515
35938150
AE
3516 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3517 struct list_head *next = links->next;
dfc5606d 3518
6d292906
AE
3519 /*
3520 * A previously-existing snapshot is not in
3521 * the new snap context.
3522 *
3523 * If the now missing snapshot is the one the
3524 * image is mapped to, clear its exists flag
3525 * so we can avoid sending any more requests
3526 * to it.
3527 */
0d7dbfce 3528 if (rbd_dev->spec->snap_id == snap->id)
6d292906 3529 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
41f38c2b 3530 rbd_remove_snap_dev(snap);
9fcbb800 3531 dout("%ssnap id %llu has been removed\n",
0d7dbfce
AE
3532 rbd_dev->spec->snap_id == snap->id ?
3533 "mapped " : "",
9fcbb800 3534 (unsigned long long) snap->id);
35938150
AE
3535
3536 /* Done with this list entry; advance */
3537
3538 links = next;
dfc5606d
YS
3539 continue;
3540 }
35938150 3541
b8b1e2db
AE
3542 snap_name = rbd_dev_snap_info(rbd_dev, index,
3543 &snap_size, &snap_features);
cd892126
AE
3544 if (IS_ERR(snap_name))
3545 return PTR_ERR(snap_name);
3546
9fcbb800
AE
3547 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3548 (unsigned long long) snap_id);
35938150
AE
3549 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3550 struct rbd_snap *new_snap;
3551
3552 /* We haven't seen this snapshot before */
3553
c8d18425 3554 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
cd892126 3555 snap_id, snap_size, snap_features);
9fcbb800
AE
3556 if (IS_ERR(new_snap)) {
3557 int err = PTR_ERR(new_snap);
3558
3559 dout(" failed to add dev, error %d\n", err);
3560
3561 return err;
3562 }
35938150
AE
3563
3564 /* New goes before existing, or at end of list */
3565
9fcbb800 3566 dout(" added dev%s\n", snap ? "" : " at end\n");
35938150
AE
3567 if (snap)
3568 list_add_tail(&new_snap->node, &snap->node);
3569 else
523f3258 3570 list_add_tail(&new_snap->node, head);
35938150
AE
3571 } else {
3572 /* Already have this one */
3573
9fcbb800
AE
3574 dout(" already present\n");
3575
cd892126 3576 rbd_assert(snap->size == snap_size);
aafb230e 3577 rbd_assert(!strcmp(snap->name, snap_name));
cd892126 3578 rbd_assert(snap->features == snap_features);
35938150
AE
3579
3580 /* Done with this list entry; advance */
3581
3582 links = links->next;
dfc5606d 3583 }
35938150
AE
3584
3585 /* Advance to the next entry in the snapshot context */
3586
3587 index++;
dfc5606d 3588 }
9fcbb800 3589 dout("%s: done\n", __func__);
dfc5606d
YS
3590
3591 return 0;
3592}
3593
304f6808
AE
3594/*
3595 * Scan the list of snapshots and register the devices for any that
3596 * have not already been registered.
3597 */
3598static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3599{
3600 struct rbd_snap *snap;
3601 int ret = 0;
3602
37206ee5 3603 dout("%s:\n", __func__);
86ff77bb
AE
3604 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3605 return -EIO;
304f6808
AE
3606
3607 list_for_each_entry(snap, &rbd_dev->snaps, node) {
3608 if (!rbd_snap_registered(snap)) {
3609 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3610 if (ret < 0)
3611 break;
3612 }
3613 }
3614 dout("%s: returning %d\n", __func__, ret);
3615
3616 return ret;
3617}
3618
dfc5606d
YS
3619static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3620{
dfc5606d 3621 struct device *dev;
cd789ab9 3622 int ret;
dfc5606d
YS
3623
3624 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
dfc5606d 3625
cd789ab9 3626 dev = &rbd_dev->dev;
dfc5606d
YS
3627 dev->bus = &rbd_bus_type;
3628 dev->type = &rbd_device_type;
3629 dev->parent = &rbd_root_dev;
3630 dev->release = rbd_dev_release;
de71a297 3631 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 3632 ret = device_register(dev);
dfc5606d 3633
dfc5606d 3634 mutex_unlock(&ctl_mutex);
cd789ab9 3635
dfc5606d 3636 return ret;
602adf40
YS
3637}
3638
dfc5606d
YS
3639static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3640{
3641 device_unregister(&rbd_dev->dev);
3642}
3643
e2839308 3644static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
3645
3646/*
499afd5b
AE
3647 * Get a unique rbd identifier for the given new rbd_dev, and add
3648 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 3649 */
e2839308 3650static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 3651{
e2839308 3652 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
3653
3654 spin_lock(&rbd_dev_list_lock);
3655 list_add_tail(&rbd_dev->node, &rbd_dev_list);
3656 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
3657 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3658 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 3659}
b7f23c36 3660
1ddbe94e 3661/*
499afd5b
AE
3662 * Remove an rbd_dev from the global list, and record that its
3663 * identifier is no longer in use.
1ddbe94e 3664 */
e2839308 3665static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 3666{
d184f6bf 3667 struct list_head *tmp;
de71a297 3668 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
3669 int max_id;
3670
aafb230e 3671 rbd_assert(rbd_id > 0);
499afd5b 3672
e2839308
AE
3673 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3674 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
3675 spin_lock(&rbd_dev_list_lock);
3676 list_del_init(&rbd_dev->node);
d184f6bf
AE
3677
3678 /*
3679 * If the id being "put" is not the current maximum, there
3680 * is nothing special we need to do.
3681 */
e2839308 3682 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
3683 spin_unlock(&rbd_dev_list_lock);
3684 return;
3685 }
3686
3687 /*
3688 * We need to update the current maximum id. Search the
3689 * list to find out what it is. We're more likely to find
3690 * the maximum at the end, so search the list backward.
3691 */
3692 max_id = 0;
3693 list_for_each_prev(tmp, &rbd_dev_list) {
3694 struct rbd_device *rbd_dev;
3695
3696 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
3697 if (rbd_dev->dev_id > max_id)
3698 max_id = rbd_dev->dev_id;
d184f6bf 3699 }
499afd5b 3700 spin_unlock(&rbd_dev_list_lock);
b7f23c36 3701
1ddbe94e 3702 /*
e2839308 3703 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
3704 * which case it now accurately reflects the new maximum.
3705 * Be careful not to overwrite the maximum value in that
3706 * case.
1ddbe94e 3707 */
e2839308
AE
3708 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3709 dout(" max dev id has been reset\n");
b7f23c36
AE
3710}
3711
e28fff26
AE
3712/*
3713 * Skips over white space at *buf, and updates *buf to point to the
3714 * first found non-space character (if any). Returns the length of
593a9e7b
AE
3715 * the token (string of non-white space characters) found. Note
3716 * that *buf must be terminated with '\0'.
e28fff26
AE
3717 */
3718static inline size_t next_token(const char **buf)
3719{
3720 /*
3721 * These are the characters that produce nonzero for
3722 * isspace() in the "C" and "POSIX" locales.
3723 */
3724 const char *spaces = " \f\n\r\t\v";
3725
3726 *buf += strspn(*buf, spaces); /* Find start of token */
3727
3728 return strcspn(*buf, spaces); /* Return token length */
3729}
3730
3731/*
3732 * Finds the next token in *buf, and if the provided token buffer is
3733 * big enough, copies the found token into it. The result, if
593a9e7b
AE
3734 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3735 * must be terminated with '\0' on entry.
e28fff26
AE
3736 *
3737 * Returns the length of the token found (not including the '\0').
3738 * Return value will be 0 if no token is found, and it will be >=
3739 * token_size if the token would not fit.
3740 *
593a9e7b 3741 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
3742 * found token. Note that this occurs even if the token buffer is
3743 * too small to hold it.
3744 */
3745static inline size_t copy_token(const char **buf,
3746 char *token,
3747 size_t token_size)
3748{
3749 size_t len;
3750
3751 len = next_token(buf);
3752 if (len < token_size) {
3753 memcpy(token, *buf, len);
3754 *(token + len) = '\0';
3755 }
3756 *buf += len;
3757
3758 return len;
3759}
3760
ea3352f4
AE
3761/*
3762 * Finds the next token in *buf, dynamically allocates a buffer big
3763 * enough to hold a copy of it, and copies the token into the new
3764 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3765 * that a duplicate buffer is created even for a zero-length token.
3766 *
3767 * Returns a pointer to the newly-allocated duplicate, or a null
3768 * pointer if memory for the duplicate was not available. If
3769 * the lenp argument is a non-null pointer, the length of the token
3770 * (not including the '\0') is returned in *lenp.
3771 *
3772 * If successful, the *buf pointer will be updated to point beyond
3773 * the end of the found token.
3774 *
3775 * Note: uses GFP_KERNEL for allocation.
3776 */
3777static inline char *dup_token(const char **buf, size_t *lenp)
3778{
3779 char *dup;
3780 size_t len;
3781
3782 len = next_token(buf);
4caf35f9 3783 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
3784 if (!dup)
3785 return NULL;
ea3352f4
AE
3786 *(dup + len) = '\0';
3787 *buf += len;
3788
3789 if (lenp)
3790 *lenp = len;
3791
3792 return dup;
3793}
3794
a725f65e 3795/*
859c31df
AE
3796 * Parse the options provided for an "rbd add" (i.e., rbd image
3797 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
3798 * and the data written is passed here via a NUL-terminated buffer.
3799 * Returns 0 if successful or an error code otherwise.
d22f76e7 3800 *
859c31df
AE
3801 * The information extracted from these options is recorded in
3802 * the other parameters which return dynamically-allocated
3803 * structures:
3804 * ceph_opts
3805 * The address of a pointer that will refer to a ceph options
3806 * structure. Caller must release the returned pointer using
3807 * ceph_destroy_options() when it is no longer needed.
3808 * rbd_opts
3809 * Address of an rbd options pointer. Fully initialized by
3810 * this function; caller must release with kfree().
3811 * spec
3812 * Address of an rbd image specification pointer. Fully
3813 * initialized by this function based on parsed options.
3814 * Caller must release with rbd_spec_put().
3815 *
3816 * The options passed take this form:
3817 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3818 * where:
3819 * <mon_addrs>
3820 * A comma-separated list of one or more monitor addresses.
3821 * A monitor address is an ip address, optionally followed
3822 * by a port number (separated by a colon).
3823 * I.e.: ip1[:port1][,ip2[:port2]...]
3824 * <options>
3825 * A comma-separated list of ceph and/or rbd options.
3826 * <pool_name>
3827 * The name of the rados pool containing the rbd image.
3828 * <image_name>
3829 * The name of the image in that pool to map.
3830 * <snap_id>
3831 * An optional snapshot id. If provided, the mapping will
3832 * present data from the image at the time that snapshot was
3833 * created. The image head is used if no snapshot id is
3834 * provided. Snapshot mappings are always read-only.
a725f65e 3835 */
859c31df 3836static int rbd_add_parse_args(const char *buf,
dc79b113 3837 struct ceph_options **ceph_opts,
859c31df
AE
3838 struct rbd_options **opts,
3839 struct rbd_spec **rbd_spec)
e28fff26 3840{
d22f76e7 3841 size_t len;
859c31df 3842 char *options;
0ddebc0c
AE
3843 const char *mon_addrs;
3844 size_t mon_addrs_size;
859c31df 3845 struct rbd_spec *spec = NULL;
4e9afeba 3846 struct rbd_options *rbd_opts = NULL;
859c31df 3847 struct ceph_options *copts;
dc79b113 3848 int ret;
e28fff26
AE
3849
3850 /* The first four tokens are required */
3851
7ef3214a 3852 len = next_token(&buf);
4fb5d671
AE
3853 if (!len) {
3854 rbd_warn(NULL, "no monitor address(es) provided");
3855 return -EINVAL;
3856 }
0ddebc0c 3857 mon_addrs = buf;
f28e565a 3858 mon_addrs_size = len + 1;
7ef3214a 3859 buf += len;
a725f65e 3860
dc79b113 3861 ret = -EINVAL;
f28e565a
AE
3862 options = dup_token(&buf, NULL);
3863 if (!options)
dc79b113 3864 return -ENOMEM;
4fb5d671
AE
3865 if (!*options) {
3866 rbd_warn(NULL, "no options provided");
3867 goto out_err;
3868 }
e28fff26 3869
859c31df
AE
3870 spec = rbd_spec_alloc();
3871 if (!spec)
f28e565a 3872 goto out_mem;
859c31df
AE
3873
3874 spec->pool_name = dup_token(&buf, NULL);
3875 if (!spec->pool_name)
3876 goto out_mem;
4fb5d671
AE
3877 if (!*spec->pool_name) {
3878 rbd_warn(NULL, "no pool name provided");
3879 goto out_err;
3880 }
e28fff26 3881
69e7a02f 3882 spec->image_name = dup_token(&buf, NULL);
859c31df 3883 if (!spec->image_name)
f28e565a 3884 goto out_mem;
4fb5d671
AE
3885 if (!*spec->image_name) {
3886 rbd_warn(NULL, "no image name provided");
3887 goto out_err;
3888 }
d4b125e9 3889
f28e565a
AE
3890 /*
3891 * Snapshot name is optional; default is to use "-"
3892 * (indicating the head/no snapshot).
3893 */
3feeb894 3894 len = next_token(&buf);
820a5f3e 3895 if (!len) {
3feeb894
AE
3896 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3897 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 3898 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 3899 ret = -ENAMETOOLONG;
f28e565a 3900 goto out_err;
849b4260 3901 }
4caf35f9 3902 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
859c31df 3903 if (!spec->snap_name)
f28e565a 3904 goto out_mem;
859c31df 3905 *(spec->snap_name + len) = '\0';
e5c35534 3906
0ddebc0c 3907 /* Initialize all rbd options to the defaults */
e28fff26 3908
4e9afeba
AE
3909 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3910 if (!rbd_opts)
3911 goto out_mem;
3912
3913 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 3914
859c31df 3915 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 3916 mon_addrs + mon_addrs_size - 1,
4e9afeba 3917 parse_rbd_opts_token, rbd_opts);
859c31df
AE
3918 if (IS_ERR(copts)) {
3919 ret = PTR_ERR(copts);
dc79b113
AE
3920 goto out_err;
3921 }
859c31df
AE
3922 kfree(options);
3923
3924 *ceph_opts = copts;
4e9afeba 3925 *opts = rbd_opts;
859c31df 3926 *rbd_spec = spec;
0ddebc0c 3927
dc79b113 3928 return 0;
f28e565a 3929out_mem:
dc79b113 3930 ret = -ENOMEM;
d22f76e7 3931out_err:
859c31df
AE
3932 kfree(rbd_opts);
3933 rbd_spec_put(spec);
f28e565a 3934 kfree(options);
d22f76e7 3935
dc79b113 3936 return ret;
a725f65e
AE
3937}
3938
589d30e0
AE
3939/*
3940 * An rbd format 2 image has a unique identifier, distinct from the
3941 * name given to it by the user. Internally, that identifier is
3942 * what's used to specify the names of objects related to the image.
3943 *
3944 * A special "rbd id" object is used to map an rbd image name to its
3945 * id. If that object doesn't exist, then there is no v2 rbd image
3946 * with the supplied name.
3947 *
3948 * This function will record the given rbd_dev's image_id field if
3949 * it can be determined, and in that case will return 0. If any
3950 * errors occur a negative errno will be returned and the rbd_dev's
3951 * image_id field will be unchanged (and should be NULL).
3952 */
3953static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3954{
3955 int ret;
3956 size_t size;
3957 char *object_name;
3958 void *response;
3959 void *p;
3960
2f82ee54
AE
3961 /* If we already have it we don't need to look it up */
3962
3963 if (rbd_dev->spec->image_id)
3964 return 0;
3965
2c0d0a10
AE
3966 /*
3967 * When probing a parent image, the image id is already
3968 * known (and the image name likely is not). There's no
3969 * need to fetch the image id again in this case.
3970 */
3971 if (rbd_dev->spec->image_id)
3972 return 0;
3973
589d30e0
AE
3974 /*
3975 * First, see if the format 2 image id file exists, and if
3976 * so, get the image's persistent id from it.
3977 */
69e7a02f 3978 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
3979 object_name = kmalloc(size, GFP_NOIO);
3980 if (!object_name)
3981 return -ENOMEM;
0d7dbfce 3982 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
3983 dout("rbd id object name is %s\n", object_name);
3984
3985 /* Response will be an encoded string, which includes a length */
3986
3987 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3988 response = kzalloc(size, GFP_NOIO);
3989 if (!response) {
3990 ret = -ENOMEM;
3991 goto out;
3992 }
3993
36be9a76 3994 ret = rbd_obj_method_sync(rbd_dev, object_name,
589d30e0
AE
3995 "rbd", "get_id",
3996 NULL, 0,
07b2391f 3997 response, RBD_IMAGE_ID_LEN_MAX, NULL);
36be9a76 3998 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
589d30e0
AE
3999 if (ret < 0)
4000 goto out;
4001
4002 p = response;
0d7dbfce 4003 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
589d30e0 4004 p + RBD_IMAGE_ID_LEN_MAX,
979ed480 4005 NULL, GFP_NOIO);
0d7dbfce
AE
4006 if (IS_ERR(rbd_dev->spec->image_id)) {
4007 ret = PTR_ERR(rbd_dev->spec->image_id);
4008 rbd_dev->spec->image_id = NULL;
589d30e0 4009 } else {
0d7dbfce 4010 dout("image_id is %s\n", rbd_dev->spec->image_id);
589d30e0
AE
4011 }
4012out:
4013 kfree(response);
4014 kfree(object_name);
4015
4016 return ret;
4017}
4018
a30b71b9
AE
4019static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4020{
4021 int ret;
4022 size_t size;
4023
4024 /* Version 1 images have no id; empty string is used */
4025
0d7dbfce
AE
4026 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
4027 if (!rbd_dev->spec->image_id)
a30b71b9 4028 return -ENOMEM;
a30b71b9
AE
4029
4030 /* Record the header object name for this rbd image. */
4031
69e7a02f 4032 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
a30b71b9
AE
4033 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4034 if (!rbd_dev->header_name) {
4035 ret = -ENOMEM;
4036 goto out_err;
4037 }
0d7dbfce
AE
4038 sprintf(rbd_dev->header_name, "%s%s",
4039 rbd_dev->spec->image_name, RBD_SUFFIX);
a30b71b9
AE
4040
4041 /* Populate rbd image metadata */
4042
4043 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4044 if (ret < 0)
4045 goto out_err;
86b00e0d
AE
4046
4047 /* Version 1 images have no parent (no layering) */
4048
4049 rbd_dev->parent_spec = NULL;
4050 rbd_dev->parent_overlap = 0;
4051
a30b71b9
AE
4052 rbd_dev->image_format = 1;
4053
4054 dout("discovered version 1 image, header name is %s\n",
4055 rbd_dev->header_name);
4056
4057 return 0;
4058
4059out_err:
4060 kfree(rbd_dev->header_name);
4061 rbd_dev->header_name = NULL;
0d7dbfce
AE
4062 kfree(rbd_dev->spec->image_id);
4063 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
4064
4065 return ret;
4066}
4067
4068static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4069{
4070 size_t size;
9d475de5 4071 int ret;
6e14b1a6 4072 u64 ver = 0;
a30b71b9
AE
4073
4074 /*
4075 * Image id was filled in by the caller. Record the header
4076 * object name for this rbd image.
4077 */
979ed480 4078 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
a30b71b9
AE
4079 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4080 if (!rbd_dev->header_name)
4081 return -ENOMEM;
4082 sprintf(rbd_dev->header_name, "%s%s",
0d7dbfce 4083 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
9d475de5
AE
4084
4085 /* Get the size and object order for the image */
4086
4087 ret = rbd_dev_v2_image_size(rbd_dev);
1e130199
AE
4088 if (ret < 0)
4089 goto out_err;
4090
4091 /* Get the object prefix (a.k.a. block_name) for the image */
4092
4093 ret = rbd_dev_v2_object_prefix(rbd_dev);
b1b5402a
AE
4094 if (ret < 0)
4095 goto out_err;
4096
d889140c 4097 /* Get the and check features for the image */
b1b5402a
AE
4098
4099 ret = rbd_dev_v2_features(rbd_dev);
9d475de5
AE
4100 if (ret < 0)
4101 goto out_err;
35d489f9 4102
86b00e0d
AE
4103 /* If the image supports layering, get the parent info */
4104
4105 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4106 ret = rbd_dev_v2_parent_info(rbd_dev);
4107 if (ret < 0)
4108 goto out_err;
4109 }
4110
6e14b1a6
AE
4111 /* crypto and compression type aren't (yet) supported for v2 images */
4112
4113 rbd_dev->header.crypt_type = 0;
4114 rbd_dev->header.comp_type = 0;
35d489f9 4115
6e14b1a6
AE
4116 /* Get the snapshot context, plus the header version */
4117
4118 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
35d489f9
AE
4119 if (ret)
4120 goto out_err;
6e14b1a6
AE
4121 rbd_dev->header.obj_version = ver;
4122
a30b71b9
AE
4123 rbd_dev->image_format = 2;
4124
4125 dout("discovered version 2 image, header name is %s\n",
4126 rbd_dev->header_name);
4127
35152979 4128 return 0;
9d475de5 4129out_err:
86b00e0d
AE
4130 rbd_dev->parent_overlap = 0;
4131 rbd_spec_put(rbd_dev->parent_spec);
4132 rbd_dev->parent_spec = NULL;
9d475de5
AE
4133 kfree(rbd_dev->header_name);
4134 rbd_dev->header_name = NULL;
1e130199
AE
4135 kfree(rbd_dev->header.object_prefix);
4136 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
4137
4138 return ret;
a30b71b9
AE
4139}
4140
83a06263
AE
4141static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4142{
2f82ee54
AE
4143 struct rbd_device *parent = NULL;
4144 struct rbd_spec *parent_spec = NULL;
4145 struct rbd_client *rbdc = NULL;
83a06263
AE
4146 int ret;
4147
4148 /* no need to lock here, as rbd_dev is not registered yet */
4149 ret = rbd_dev_snaps_update(rbd_dev);
4150 if (ret)
4151 return ret;
4152
9e15b77d
AE
4153 ret = rbd_dev_probe_update_spec(rbd_dev);
4154 if (ret)
4155 goto err_out_snaps;
4156
83a06263
AE
4157 ret = rbd_dev_set_mapping(rbd_dev);
4158 if (ret)
4159 goto err_out_snaps;
4160
4161 /* generate unique id: find highest unique id, add one */
4162 rbd_dev_id_get(rbd_dev);
4163
4164 /* Fill in the device name, now that we have its id. */
4165 BUILD_BUG_ON(DEV_NAME_LEN
4166 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4167 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4168
4169 /* Get our block major device number. */
4170
4171 ret = register_blkdev(0, rbd_dev->name);
4172 if (ret < 0)
4173 goto err_out_id;
4174 rbd_dev->major = ret;
4175
4176 /* Set up the blkdev mapping. */
4177
4178 ret = rbd_init_disk(rbd_dev);
4179 if (ret)
4180 goto err_out_blkdev;
4181
4182 ret = rbd_bus_add_dev(rbd_dev);
4183 if (ret)
4184 goto err_out_disk;
4185
4186 /*
4187 * At this point cleanup in the event of an error is the job
4188 * of the sysfs code (initiated by rbd_bus_del_dev()).
4189 */
2f82ee54
AE
4190 /* Probe the parent if there is one */
4191
4192 if (rbd_dev->parent_spec) {
4193 /*
4194 * We need to pass a reference to the client and the
4195 * parent spec when creating the parent rbd_dev.
4196 * Images related by parent/child relationships
4197 * always share both.
4198 */
4199 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4200 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4201
4202 parent = rbd_dev_create(rbdc, parent_spec);
4203 if (!parent) {
4204 ret = -ENOMEM;
4205 goto err_out_spec;
4206 }
4207 rbdc = NULL; /* parent now owns reference */
4208 parent_spec = NULL; /* parent now owns reference */
4209 ret = rbd_dev_probe(parent);
4210 if (ret < 0)
4211 goto err_out_parent;
4212 rbd_dev->parent = parent;
4213 }
4214
83a06263
AE
4215 down_write(&rbd_dev->header_rwsem);
4216 ret = rbd_dev_snaps_register(rbd_dev);
4217 up_write(&rbd_dev->header_rwsem);
4218 if (ret)
4219 goto err_out_bus;
4220
9969ebc5 4221 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
83a06263
AE
4222 if (ret)
4223 goto err_out_bus;
4224
4225 /* Everything's ready. Announce the disk to the world. */
4226
4227 add_disk(rbd_dev->disk);
4228
4229 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4230 (unsigned long long) rbd_dev->mapping.size);
4231
4232 return ret;
2f82ee54
AE
4233
4234err_out_parent:
4235 rbd_dev_destroy(parent);
4236err_out_spec:
4237 rbd_spec_put(parent_spec);
4238 rbd_put_client(rbdc);
83a06263
AE
4239err_out_bus:
4240 /* this will also clean up rest of rbd_dev stuff */
4241
4242 rbd_bus_del_dev(rbd_dev);
4243
4244 return ret;
4245err_out_disk:
4246 rbd_free_disk(rbd_dev);
4247err_out_blkdev:
4248 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4249err_out_id:
4250 rbd_dev_id_put(rbd_dev);
4251err_out_snaps:
4252 rbd_remove_all_snaps(rbd_dev);
4253
4254 return ret;
4255}
4256
a30b71b9
AE
4257/*
4258 * Probe for the existence of the header object for the given rbd
4259 * device. For format 2 images this includes determining the image
4260 * id.
4261 */
4262static int rbd_dev_probe(struct rbd_device *rbd_dev)
4263{
4264 int ret;
4265
4266 /*
4267 * Get the id from the image id object. If it's not a
4268 * format 2 image, we'll get ENOENT back, and we'll assume
4269 * it's a format 1 image.
4270 */
4271 ret = rbd_dev_image_id(rbd_dev);
4272 if (ret)
4273 ret = rbd_dev_v1_probe(rbd_dev);
4274 else
4275 ret = rbd_dev_v2_probe(rbd_dev);
83a06263 4276 if (ret) {
a30b71b9
AE
4277 dout("probe failed, returning %d\n", ret);
4278
83a06263
AE
4279 return ret;
4280 }
4281
4282 ret = rbd_dev_probe_finish(rbd_dev);
4283 if (ret)
4284 rbd_header_free(&rbd_dev->header);
4285
a30b71b9
AE
4286 return ret;
4287}
4288
59c2be1e
YS
4289static ssize_t rbd_add(struct bus_type *bus,
4290 const char *buf,
4291 size_t count)
602adf40 4292{
cb8627c7 4293 struct rbd_device *rbd_dev = NULL;
dc79b113 4294 struct ceph_options *ceph_opts = NULL;
4e9afeba 4295 struct rbd_options *rbd_opts = NULL;
859c31df 4296 struct rbd_spec *spec = NULL;
9d3997fd 4297 struct rbd_client *rbdc;
27cc2594
AE
4298 struct ceph_osd_client *osdc;
4299 int rc = -ENOMEM;
602adf40
YS
4300
4301 if (!try_module_get(THIS_MODULE))
4302 return -ENODEV;
4303
602adf40 4304 /* parse add command */
859c31df 4305 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 4306 if (rc < 0)
bd4ba655 4307 goto err_out_module;
78cea76e 4308
9d3997fd
AE
4309 rbdc = rbd_get_client(ceph_opts);
4310 if (IS_ERR(rbdc)) {
4311 rc = PTR_ERR(rbdc);
0ddebc0c 4312 goto err_out_args;
9d3997fd 4313 }
c53d5893 4314 ceph_opts = NULL; /* rbd_dev client now owns this */
602adf40 4315
602adf40 4316 /* pick the pool */
9d3997fd 4317 osdc = &rbdc->client->osdc;
859c31df 4318 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
4319 if (rc < 0)
4320 goto err_out_client;
859c31df
AE
4321 spec->pool_id = (u64) rc;
4322
0903e875
AE
4323 /* The ceph file layout needs to fit pool id in 32 bits */
4324
4325 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4326 rc = -EIO;
4327 goto err_out_client;
4328 }
4329
c53d5893 4330 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
4331 if (!rbd_dev)
4332 goto err_out_client;
c53d5893
AE
4333 rbdc = NULL; /* rbd_dev now owns this */
4334 spec = NULL; /* rbd_dev now owns this */
602adf40 4335
bd4ba655 4336 rbd_dev->mapping.read_only = rbd_opts->read_only;
c53d5893
AE
4337 kfree(rbd_opts);
4338 rbd_opts = NULL; /* done with this */
bd4ba655 4339
a30b71b9
AE
4340 rc = rbd_dev_probe(rbd_dev);
4341 if (rc < 0)
c53d5893 4342 goto err_out_rbd_dev;
05fd6f6f 4343
602adf40 4344 return count;
c53d5893
AE
4345err_out_rbd_dev:
4346 rbd_dev_destroy(rbd_dev);
bd4ba655 4347err_out_client:
9d3997fd 4348 rbd_put_client(rbdc);
0ddebc0c 4349err_out_args:
78cea76e
AE
4350 if (ceph_opts)
4351 ceph_destroy_options(ceph_opts);
4e9afeba 4352 kfree(rbd_opts);
859c31df 4353 rbd_spec_put(spec);
bd4ba655
AE
4354err_out_module:
4355 module_put(THIS_MODULE);
27cc2594 4356
602adf40 4357 dout("Error adding device %s\n", buf);
27cc2594
AE
4358
4359 return (ssize_t) rc;
602adf40
YS
4360}
4361
de71a297 4362static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
4363{
4364 struct list_head *tmp;
4365 struct rbd_device *rbd_dev;
4366
e124a82f 4367 spin_lock(&rbd_dev_list_lock);
602adf40
YS
4368 list_for_each(tmp, &rbd_dev_list) {
4369 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 4370 if (rbd_dev->dev_id == dev_id) {
e124a82f 4371 spin_unlock(&rbd_dev_list_lock);
602adf40 4372 return rbd_dev;
e124a82f 4373 }
602adf40 4374 }
e124a82f 4375 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
4376 return NULL;
4377}
4378
dfc5606d 4379static void rbd_dev_release(struct device *dev)
602adf40 4380{
593a9e7b 4381 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4382
59c2be1e 4383 if (rbd_dev->watch_event)
9969ebc5 4384 rbd_dev_header_watch_sync(rbd_dev, 0);
602adf40
YS
4385
4386 /* clean up and free blkdev */
4387 rbd_free_disk(rbd_dev);
4388 unregister_blkdev(rbd_dev->major, rbd_dev->name);
32eec68d 4389
2ac4e75d
AE
4390 /* release allocated disk header fields */
4391 rbd_header_free(&rbd_dev->header);
4392
32eec68d 4393 /* done with the id, and with the rbd_dev */
e2839308 4394 rbd_dev_id_put(rbd_dev);
c53d5893
AE
4395 rbd_assert(rbd_dev->rbd_client != NULL);
4396 rbd_dev_destroy(rbd_dev);
602adf40
YS
4397
4398 /* release module ref */
4399 module_put(THIS_MODULE);
602adf40
YS
4400}
4401
2f82ee54
AE
4402static void __rbd_remove(struct rbd_device *rbd_dev)
4403{
4404 rbd_remove_all_snaps(rbd_dev);
4405 rbd_bus_del_dev(rbd_dev);
4406}
4407
dfc5606d
YS
4408static ssize_t rbd_remove(struct bus_type *bus,
4409 const char *buf,
4410 size_t count)
602adf40
YS
4411{
4412 struct rbd_device *rbd_dev = NULL;
4413 int target_id, rc;
4414 unsigned long ul;
4415 int ret = count;
4416
4417 rc = strict_strtoul(buf, 10, &ul);
4418 if (rc)
4419 return rc;
4420
4421 /* convert to int; abort if we lost anything in the conversion */
4422 target_id = (int) ul;
4423 if (target_id != ul)
4424 return -EINVAL;
4425
4426 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4427
4428 rbd_dev = __rbd_get_dev(target_id);
4429 if (!rbd_dev) {
4430 ret = -ENOENT;
4431 goto done;
42382b70
AE
4432 }
4433
a14ea269 4434 spin_lock_irq(&rbd_dev->lock);
b82d167b 4435 if (rbd_dev->open_count)
42382b70 4436 ret = -EBUSY;
b82d167b
AE
4437 else
4438 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
a14ea269 4439 spin_unlock_irq(&rbd_dev->lock);
b82d167b 4440 if (ret < 0)
42382b70 4441 goto done;
602adf40 4442
2f82ee54
AE
4443 while (rbd_dev->parent_spec) {
4444 struct rbd_device *first = rbd_dev;
4445 struct rbd_device *second = first->parent;
4446 struct rbd_device *third;
4447
4448 /*
4449 * Follow to the parent with no grandparent and
4450 * remove it.
4451 */
4452 while (second && (third = second->parent)) {
4453 first = second;
4454 second = third;
4455 }
4456 __rbd_remove(second);
4457 rbd_spec_put(first->parent_spec);
4458 first->parent_spec = NULL;
4459 first->parent_overlap = 0;
4460 first->parent = NULL;
4461 }
4462 __rbd_remove(rbd_dev);
602adf40
YS
4463
4464done:
4465 mutex_unlock(&ctl_mutex);
aafb230e 4466
602adf40
YS
4467 return ret;
4468}
4469
602adf40
YS
4470/*
4471 * create control files in sysfs
dfc5606d 4472 * /sys/bus/rbd/...
602adf40
YS
4473 */
4474static int rbd_sysfs_init(void)
4475{
dfc5606d 4476 int ret;
602adf40 4477
fed4c143 4478 ret = device_register(&rbd_root_dev);
21079786 4479 if (ret < 0)
dfc5606d 4480 return ret;
602adf40 4481
fed4c143
AE
4482 ret = bus_register(&rbd_bus_type);
4483 if (ret < 0)
4484 device_unregister(&rbd_root_dev);
602adf40 4485
602adf40
YS
4486 return ret;
4487}
4488
4489static void rbd_sysfs_cleanup(void)
4490{
dfc5606d 4491 bus_unregister(&rbd_bus_type);
fed4c143 4492 device_unregister(&rbd_root_dev);
602adf40
YS
4493}
4494
cc344fa1 4495static int __init rbd_init(void)
602adf40
YS
4496{
4497 int rc;
4498
1e32d34c
AE
4499 if (!libceph_compatible(NULL)) {
4500 rbd_warn(NULL, "libceph incompatibility (quitting)");
4501
4502 return -EINVAL;
4503 }
602adf40
YS
4504 rc = rbd_sysfs_init();
4505 if (rc)
4506 return rc;
f0f8cef5 4507 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
4508 return 0;
4509}
4510
cc344fa1 4511static void __exit rbd_exit(void)
602adf40
YS
4512{
4513 rbd_sysfs_cleanup();
4514}
4515
4516module_init(rbd_init);
4517module_exit(rbd_exit);
4518
4519MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4520MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4521MODULE_DESCRIPTION("rados block device");
4522
4523/* following authorship retained from original osdblk.c */
4524MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4525
4526MODULE_LICENSE("GPL");