rbd: probe the parent of an image if present
[linux-block.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
aafb230e
AE
44#define RBD_DEBUG /* Activate rbd_assert() calls */
45
593a9e7b
AE
46/*
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52#define SECTOR_SHIFT 9
53#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54
f0f8cef5
AE
55#define RBD_DRV_NAME "rbd"
56#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
57
58#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
59
d4b125e9
AE
60#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
61#define RBD_MAX_SNAP_NAME_LEN \
62 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
35d489f9 64#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
65
66#define RBD_SNAP_HEAD_NAME "-"
67
9e15b77d
AE
68/* This allows a single page to hold an image name sent by OSD */
69#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 70#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 71
1e130199 72#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 73
d889140c
AE
74/* Feature bits */
75
5cbf6f12
AE
76#define RBD_FEATURE_LAYERING (1<<0)
77#define RBD_FEATURE_STRIPINGV2 (1<<1)
78#define RBD_FEATURES_ALL \
79 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
80
81/* Features supported by this (client software) implementation. */
82
5cbf6f12 83#define RBD_FEATURES_SUPPORTED (0)
d889140c 84
81a89793
AE
85/*
86 * An RBD device name will be "rbd#", where the "rbd" comes from
87 * RBD_DRV_NAME above, and # is a unique integer identifier.
88 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89 * enough to hold all possible device names.
90 */
602adf40 91#define DEV_NAME_LEN 32
81a89793 92#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
93
94/*
95 * block device image metadata (in-memory version)
96 */
97struct rbd_image_header {
f84344f3 98 /* These four fields never change for a given rbd image */
849b4260 99 char *object_prefix;
34b13184 100 u64 features;
602adf40
YS
101 __u8 obj_order;
102 __u8 crypt_type;
103 __u8 comp_type;
602adf40 104
f84344f3
AE
105 /* The remaining fields need to be updated occasionally */
106 u64 image_size;
107 struct ceph_snap_context *snapc;
602adf40
YS
108 char *snap_names;
109 u64 *snap_sizes;
59c2be1e
YS
110
111 u64 obj_version;
112};
113
0d7dbfce
AE
114/*
115 * An rbd image specification.
116 *
117 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
118 * identify an image. Each rbd_dev structure includes a pointer to
119 * an rbd_spec structure that encapsulates this identity.
120 *
121 * Each of the id's in an rbd_spec has an associated name. For a
122 * user-mapped image, the names are supplied and the id's associated
123 * with them are looked up. For a layered image, a parent image is
124 * defined by the tuple, and the names are looked up.
125 *
126 * An rbd_dev structure contains a parent_spec pointer which is
127 * non-null if the image it represents is a child in a layered
128 * image. This pointer will refer to the rbd_spec structure used
129 * by the parent rbd_dev for its own identity (i.e., the structure
130 * is shared between the parent and child).
131 *
132 * Since these structures are populated once, during the discovery
133 * phase of image construction, they are effectively immutable so
134 * we make no effort to synchronize access to them.
135 *
136 * Note that code herein does not assume the image name is known (it
137 * could be a null pointer).
0d7dbfce
AE
138 */
139struct rbd_spec {
140 u64 pool_id;
141 char *pool_name;
142
143 char *image_id;
0d7dbfce 144 char *image_name;
0d7dbfce
AE
145
146 u64 snap_id;
147 char *snap_name;
148
149 struct kref kref;
150};
151
602adf40 152/*
f0f8cef5 153 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
154 */
155struct rbd_client {
156 struct ceph_client *client;
157 struct kref kref;
158 struct list_head node;
159};
160
bf0d5f50
AE
161struct rbd_img_request;
162typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
163
164#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
165
166struct rbd_obj_request;
167typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
168
9969ebc5
AE
169enum obj_request_type {
170 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
171};
bf0d5f50 172
926f9b3f
AE
173enum obj_req_flags {
174 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 175 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
926f9b3f
AE
176};
177
bf0d5f50
AE
178struct rbd_obj_request {
179 const char *object_name;
180 u64 offset; /* object start byte */
181 u64 length; /* bytes from offset */
926f9b3f 182 unsigned long flags;
bf0d5f50
AE
183
184 struct rbd_img_request *img_request;
7da22d29 185 u64 img_offset; /* image relative offset */
bf0d5f50
AE
186 struct list_head links; /* img_request->obj_requests */
187 u32 which; /* posn image request list */
188
189 enum obj_request_type type;
788e2df3
AE
190 union {
191 struct bio *bio_list;
192 struct {
193 struct page **pages;
194 u32 page_count;
195 };
196 };
bf0d5f50
AE
197
198 struct ceph_osd_request *osd_req;
199
200 u64 xferred; /* bytes transferred */
201 u64 version;
1b83bef2 202 int result;
bf0d5f50
AE
203
204 rbd_obj_callback_t callback;
788e2df3 205 struct completion completion;
bf0d5f50
AE
206
207 struct kref kref;
208};
209
0c425248 210enum img_req_flags {
9849e986
AE
211 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
212 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 213 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
214};
215
bf0d5f50 216struct rbd_img_request {
bf0d5f50
AE
217 struct rbd_device *rbd_dev;
218 u64 offset; /* starting image byte offset */
219 u64 length; /* byte count from offset */
0c425248 220 unsigned long flags;
bf0d5f50 221 union {
9849e986 222 u64 snap_id; /* for reads */
bf0d5f50 223 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
224 };
225 union {
226 struct request *rq; /* block request */
227 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50
AE
228 };
229 spinlock_t completion_lock;/* protects next_completion */
230 u32 next_completion;
231 rbd_img_callback_t callback;
55f27e09 232 u64 xferred;/* aggregate bytes transferred */
a5a337d4 233 int result; /* first nonzero obj_request result */
bf0d5f50
AE
234
235 u32 obj_request_count;
236 struct list_head obj_requests; /* rbd_obj_request structs */
237
238 struct kref kref;
239};
240
241#define for_each_obj_request(ireq, oreq) \
ef06f4d3 242 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 243#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 244 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 245#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 246 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 247
dfc5606d
YS
248struct rbd_snap {
249 struct device dev;
250 const char *name;
3591538f 251 u64 size;
dfc5606d
YS
252 struct list_head node;
253 u64 id;
34b13184 254 u64 features;
dfc5606d
YS
255};
256
f84344f3 257struct rbd_mapping {
99c1f08f 258 u64 size;
34b13184 259 u64 features;
f84344f3
AE
260 bool read_only;
261};
262
602adf40
YS
263/*
264 * a single device
265 */
266struct rbd_device {
de71a297 267 int dev_id; /* blkdev unique id */
602adf40
YS
268
269 int major; /* blkdev assigned major */
270 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 271
a30b71b9 272 u32 image_format; /* Either 1 or 2 */
602adf40
YS
273 struct rbd_client *rbd_client;
274
275 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
276
b82d167b 277 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
278
279 struct rbd_image_header header;
b82d167b 280 unsigned long flags; /* possibly lock protected */
0d7dbfce 281 struct rbd_spec *spec;
602adf40 282
0d7dbfce 283 char *header_name;
971f839a 284
0903e875
AE
285 struct ceph_file_layout layout;
286
59c2be1e 287 struct ceph_osd_event *watch_event;
975241af 288 struct rbd_obj_request *watch_request;
59c2be1e 289
86b00e0d
AE
290 struct rbd_spec *parent_spec;
291 u64 parent_overlap;
2f82ee54 292 struct rbd_device *parent;
86b00e0d 293
c666601a
JD
294 /* protects updating the header */
295 struct rw_semaphore header_rwsem;
f84344f3
AE
296
297 struct rbd_mapping mapping;
602adf40
YS
298
299 struct list_head node;
dfc5606d
YS
300
301 /* list of snapshots */
302 struct list_head snaps;
303
304 /* sysfs related */
305 struct device dev;
b82d167b 306 unsigned long open_count; /* protected by lock */
dfc5606d
YS
307};
308
b82d167b
AE
309/*
310 * Flag bits for rbd_dev->flags. If atomicity is required,
311 * rbd_dev->lock is used to protect access.
312 *
313 * Currently, only the "removing" flag (which is coupled with the
314 * "open_count" field) requires atomic access.
315 */
6d292906
AE
316enum rbd_dev_flags {
317 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 318 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
319};
320
602adf40 321static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 322
602adf40 323static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
324static DEFINE_SPINLOCK(rbd_dev_list_lock);
325
432b8587
AE
326static LIST_HEAD(rbd_client_list); /* clients */
327static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 328
304f6808
AE
329static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
330static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
331
dfc5606d 332static void rbd_dev_release(struct device *dev);
41f38c2b 333static void rbd_remove_snap_dev(struct rbd_snap *snap);
dfc5606d 334
f0f8cef5
AE
335static ssize_t rbd_add(struct bus_type *bus, const char *buf,
336 size_t count);
337static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
338 size_t count);
2f82ee54 339static int rbd_dev_probe(struct rbd_device *rbd_dev);
f0f8cef5
AE
340
341static struct bus_attribute rbd_bus_attrs[] = {
342 __ATTR(add, S_IWUSR, NULL, rbd_add),
343 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
344 __ATTR_NULL
345};
346
347static struct bus_type rbd_bus_type = {
348 .name = "rbd",
349 .bus_attrs = rbd_bus_attrs,
350};
351
352static void rbd_root_dev_release(struct device *dev)
353{
354}
355
356static struct device rbd_root_dev = {
357 .init_name = "rbd",
358 .release = rbd_root_dev_release,
359};
360
06ecc6cb
AE
361static __printf(2, 3)
362void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
363{
364 struct va_format vaf;
365 va_list args;
366
367 va_start(args, fmt);
368 vaf.fmt = fmt;
369 vaf.va = &args;
370
371 if (!rbd_dev)
372 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
373 else if (rbd_dev->disk)
374 printk(KERN_WARNING "%s: %s: %pV\n",
375 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
376 else if (rbd_dev->spec && rbd_dev->spec->image_name)
377 printk(KERN_WARNING "%s: image %s: %pV\n",
378 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
379 else if (rbd_dev->spec && rbd_dev->spec->image_id)
380 printk(KERN_WARNING "%s: id %s: %pV\n",
381 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
382 else /* punt */
383 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
384 RBD_DRV_NAME, rbd_dev, &vaf);
385 va_end(args);
386}
387
aafb230e
AE
388#ifdef RBD_DEBUG
389#define rbd_assert(expr) \
390 if (unlikely(!(expr))) { \
391 printk(KERN_ERR "\nAssertion failure in %s() " \
392 "at line %d:\n\n" \
393 "\trbd_assert(%s);\n\n", \
394 __func__, __LINE__, #expr); \
395 BUG(); \
396 }
397#else /* !RBD_DEBUG */
398# define rbd_assert(expr) ((void) 0)
399#endif /* !RBD_DEBUG */
dfc5606d 400
117973fb
AE
401static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
402static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
59c2be1e 403
602adf40
YS
404static int rbd_open(struct block_device *bdev, fmode_t mode)
405{
f0f8cef5 406 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 407 bool removing = false;
602adf40 408
f84344f3 409 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
410 return -EROFS;
411
a14ea269 412 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
413 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
414 removing = true;
415 else
416 rbd_dev->open_count++;
a14ea269 417 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
418 if (removing)
419 return -ENOENT;
420
42382b70 421 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 422 (void) get_device(&rbd_dev->dev);
f84344f3 423 set_device_ro(bdev, rbd_dev->mapping.read_only);
42382b70 424 mutex_unlock(&ctl_mutex);
340c7a2b 425
602adf40
YS
426 return 0;
427}
428
dfc5606d
YS
429static int rbd_release(struct gendisk *disk, fmode_t mode)
430{
431 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
432 unsigned long open_count_before;
433
a14ea269 434 spin_lock_irq(&rbd_dev->lock);
b82d167b 435 open_count_before = rbd_dev->open_count--;
a14ea269 436 spin_unlock_irq(&rbd_dev->lock);
b82d167b 437 rbd_assert(open_count_before > 0);
dfc5606d 438
42382b70 439 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 440 put_device(&rbd_dev->dev);
42382b70 441 mutex_unlock(&ctl_mutex);
dfc5606d
YS
442
443 return 0;
444}
445
602adf40
YS
446static const struct block_device_operations rbd_bd_ops = {
447 .owner = THIS_MODULE,
448 .open = rbd_open,
dfc5606d 449 .release = rbd_release,
602adf40
YS
450};
451
452/*
453 * Initialize an rbd client instance.
43ae4701 454 * We own *ceph_opts.
602adf40 455 */
f8c38929 456static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
457{
458 struct rbd_client *rbdc;
459 int ret = -ENOMEM;
460
37206ee5 461 dout("%s:\n", __func__);
602adf40
YS
462 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
463 if (!rbdc)
464 goto out_opt;
465
466 kref_init(&rbdc->kref);
467 INIT_LIST_HEAD(&rbdc->node);
468
bc534d86
AE
469 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
470
43ae4701 471 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 472 if (IS_ERR(rbdc->client))
bc534d86 473 goto out_mutex;
43ae4701 474 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
475
476 ret = ceph_open_session(rbdc->client);
477 if (ret < 0)
478 goto out_err;
479
432b8587 480 spin_lock(&rbd_client_list_lock);
602adf40 481 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 482 spin_unlock(&rbd_client_list_lock);
602adf40 483
bc534d86 484 mutex_unlock(&ctl_mutex);
37206ee5 485 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 486
602adf40
YS
487 return rbdc;
488
489out_err:
490 ceph_destroy_client(rbdc->client);
bc534d86
AE
491out_mutex:
492 mutex_unlock(&ctl_mutex);
602adf40
YS
493 kfree(rbdc);
494out_opt:
43ae4701
AE
495 if (ceph_opts)
496 ceph_destroy_options(ceph_opts);
37206ee5
AE
497 dout("%s: error %d\n", __func__, ret);
498
28f259b7 499 return ERR_PTR(ret);
602adf40
YS
500}
501
2f82ee54
AE
502static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
503{
504 kref_get(&rbdc->kref);
505
506 return rbdc;
507}
508
602adf40 509/*
1f7ba331
AE
510 * Find a ceph client with specific addr and configuration. If
511 * found, bump its reference count.
602adf40 512 */
1f7ba331 513static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
514{
515 struct rbd_client *client_node;
1f7ba331 516 bool found = false;
602adf40 517
43ae4701 518 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
519 return NULL;
520
1f7ba331
AE
521 spin_lock(&rbd_client_list_lock);
522 list_for_each_entry(client_node, &rbd_client_list, node) {
523 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
524 __rbd_get_client(client_node);
525
1f7ba331
AE
526 found = true;
527 break;
528 }
529 }
530 spin_unlock(&rbd_client_list_lock);
531
532 return found ? client_node : NULL;
602adf40
YS
533}
534
59c2be1e
YS
535/*
536 * mount options
537 */
538enum {
59c2be1e
YS
539 Opt_last_int,
540 /* int args above */
541 Opt_last_string,
542 /* string args above */
cc0538b6
AE
543 Opt_read_only,
544 Opt_read_write,
545 /* Boolean args above */
546 Opt_last_bool,
59c2be1e
YS
547};
548
43ae4701 549static match_table_t rbd_opts_tokens = {
59c2be1e
YS
550 /* int args above */
551 /* string args above */
be466c1c 552 {Opt_read_only, "read_only"},
cc0538b6
AE
553 {Opt_read_only, "ro"}, /* Alternate spelling */
554 {Opt_read_write, "read_write"},
555 {Opt_read_write, "rw"}, /* Alternate spelling */
556 /* Boolean args above */
59c2be1e
YS
557 {-1, NULL}
558};
559
98571b5a
AE
560struct rbd_options {
561 bool read_only;
562};
563
564#define RBD_READ_ONLY_DEFAULT false
565
59c2be1e
YS
566static int parse_rbd_opts_token(char *c, void *private)
567{
43ae4701 568 struct rbd_options *rbd_opts = private;
59c2be1e
YS
569 substring_t argstr[MAX_OPT_ARGS];
570 int token, intval, ret;
571
43ae4701 572 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
573 if (token < 0)
574 return -EINVAL;
575
576 if (token < Opt_last_int) {
577 ret = match_int(&argstr[0], &intval);
578 if (ret < 0) {
579 pr_err("bad mount option arg (not int) "
580 "at '%s'\n", c);
581 return ret;
582 }
583 dout("got int token %d val %d\n", token, intval);
584 } else if (token > Opt_last_int && token < Opt_last_string) {
585 dout("got string token %d val %s\n", token,
586 argstr[0].from);
cc0538b6
AE
587 } else if (token > Opt_last_string && token < Opt_last_bool) {
588 dout("got Boolean token %d\n", token);
59c2be1e
YS
589 } else {
590 dout("got token %d\n", token);
591 }
592
593 switch (token) {
cc0538b6
AE
594 case Opt_read_only:
595 rbd_opts->read_only = true;
596 break;
597 case Opt_read_write:
598 rbd_opts->read_only = false;
599 break;
59c2be1e 600 default:
aafb230e
AE
601 rbd_assert(false);
602 break;
59c2be1e
YS
603 }
604 return 0;
605}
606
602adf40
YS
607/*
608 * Get a ceph client with specific addr and configuration, if one does
609 * not exist create it.
610 */
9d3997fd 611static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 612{
f8c38929 613 struct rbd_client *rbdc;
59c2be1e 614
1f7ba331 615 rbdc = rbd_client_find(ceph_opts);
9d3997fd 616 if (rbdc) /* using an existing client */
43ae4701 617 ceph_destroy_options(ceph_opts);
9d3997fd 618 else
f8c38929 619 rbdc = rbd_client_create(ceph_opts);
602adf40 620
9d3997fd 621 return rbdc;
602adf40
YS
622}
623
624/*
625 * Destroy ceph client
d23a4b3f 626 *
432b8587 627 * Caller must hold rbd_client_list_lock.
602adf40
YS
628 */
629static void rbd_client_release(struct kref *kref)
630{
631 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
632
37206ee5 633 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 634 spin_lock(&rbd_client_list_lock);
602adf40 635 list_del(&rbdc->node);
cd9d9f5d 636 spin_unlock(&rbd_client_list_lock);
602adf40
YS
637
638 ceph_destroy_client(rbdc->client);
639 kfree(rbdc);
640}
641
642/*
643 * Drop reference to ceph client node. If it's not referenced anymore, release
644 * it.
645 */
9d3997fd 646static void rbd_put_client(struct rbd_client *rbdc)
602adf40 647{
c53d5893
AE
648 if (rbdc)
649 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
650}
651
a30b71b9
AE
652static bool rbd_image_format_valid(u32 image_format)
653{
654 return image_format == 1 || image_format == 2;
655}
656
8e94af8e
AE
657static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
658{
103a150f
AE
659 size_t size;
660 u32 snap_count;
661
662 /* The header has to start with the magic rbd header text */
663 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
664 return false;
665
db2388b6
AE
666 /* The bio layer requires at least sector-sized I/O */
667
668 if (ondisk->options.order < SECTOR_SHIFT)
669 return false;
670
671 /* If we use u64 in a few spots we may be able to loosen this */
672
673 if (ondisk->options.order > 8 * sizeof (int) - 1)
674 return false;
675
103a150f
AE
676 /*
677 * The size of a snapshot header has to fit in a size_t, and
678 * that limits the number of snapshots.
679 */
680 snap_count = le32_to_cpu(ondisk->snap_count);
681 size = SIZE_MAX - sizeof (struct ceph_snap_context);
682 if (snap_count > size / sizeof (__le64))
683 return false;
684
685 /*
686 * Not only that, but the size of the entire the snapshot
687 * header must also be representable in a size_t.
688 */
689 size -= snap_count * sizeof (__le64);
690 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
691 return false;
692
693 return true;
8e94af8e
AE
694}
695
602adf40
YS
696/*
697 * Create a new header structure, translate header format from the on-disk
698 * header.
699 */
700static int rbd_header_from_disk(struct rbd_image_header *header,
4156d998 701 struct rbd_image_header_ondisk *ondisk)
602adf40 702{
ccece235 703 u32 snap_count;
58c17b0e 704 size_t len;
d2bb24e5 705 size_t size;
621901d6 706 u32 i;
602adf40 707
6a52325f
AE
708 memset(header, 0, sizeof (*header));
709
103a150f
AE
710 snap_count = le32_to_cpu(ondisk->snap_count);
711
58c17b0e
AE
712 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
713 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
6a52325f 714 if (!header->object_prefix)
602adf40 715 return -ENOMEM;
58c17b0e
AE
716 memcpy(header->object_prefix, ondisk->object_prefix, len);
717 header->object_prefix[len] = '\0';
00f1f36f 718
602adf40 719 if (snap_count) {
f785cc1d
AE
720 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
721
621901d6
AE
722 /* Save a copy of the snapshot names */
723
f785cc1d
AE
724 if (snap_names_len > (u64) SIZE_MAX)
725 return -EIO;
726 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
602adf40 727 if (!header->snap_names)
6a52325f 728 goto out_err;
f785cc1d
AE
729 /*
730 * Note that rbd_dev_v1_header_read() guarantees
731 * the ondisk buffer we're working with has
732 * snap_names_len bytes beyond the end of the
733 * snapshot id array, this memcpy() is safe.
734 */
735 memcpy(header->snap_names, &ondisk->snaps[snap_count],
736 snap_names_len);
6a52325f 737
621901d6
AE
738 /* Record each snapshot's size */
739
d2bb24e5
AE
740 size = snap_count * sizeof (*header->snap_sizes);
741 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602adf40 742 if (!header->snap_sizes)
6a52325f 743 goto out_err;
621901d6
AE
744 for (i = 0; i < snap_count; i++)
745 header->snap_sizes[i] =
746 le64_to_cpu(ondisk->snaps[i].image_size);
602adf40 747 } else {
ccece235 748 WARN_ON(ondisk->snap_names_len);
602adf40
YS
749 header->snap_names = NULL;
750 header->snap_sizes = NULL;
751 }
849b4260 752
34b13184 753 header->features = 0; /* No features support in v1 images */
602adf40
YS
754 header->obj_order = ondisk->options.order;
755 header->crypt_type = ondisk->options.crypt_type;
756 header->comp_type = ondisk->options.comp_type;
6a52325f 757
621901d6
AE
758 /* Allocate and fill in the snapshot context */
759
f84344f3 760 header->image_size = le64_to_cpu(ondisk->image_size);
6a52325f
AE
761 size = sizeof (struct ceph_snap_context);
762 size += snap_count * sizeof (header->snapc->snaps[0]);
763 header->snapc = kzalloc(size, GFP_KERNEL);
764 if (!header->snapc)
765 goto out_err;
602adf40
YS
766
767 atomic_set(&header->snapc->nref, 1);
505cbb9b 768 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 769 header->snapc->num_snaps = snap_count;
621901d6
AE
770 for (i = 0; i < snap_count; i++)
771 header->snapc->snaps[i] =
772 le64_to_cpu(ondisk->snaps[i].id);
602adf40
YS
773
774 return 0;
775
6a52325f 776out_err:
849b4260 777 kfree(header->snap_sizes);
ccece235 778 header->snap_sizes = NULL;
602adf40 779 kfree(header->snap_names);
ccece235 780 header->snap_names = NULL;
6a52325f
AE
781 kfree(header->object_prefix);
782 header->object_prefix = NULL;
ccece235 783
00f1f36f 784 return -ENOMEM;
602adf40
YS
785}
786
9e15b77d
AE
787static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
788{
789 struct rbd_snap *snap;
790
791 if (snap_id == CEPH_NOSNAP)
792 return RBD_SNAP_HEAD_NAME;
793
794 list_for_each_entry(snap, &rbd_dev->snaps, node)
795 if (snap_id == snap->id)
796 return snap->name;
797
798 return NULL;
799}
800
8836b995 801static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
602adf40 802{
602adf40 803
e86924a8 804 struct rbd_snap *snap;
602adf40 805
e86924a8
AE
806 list_for_each_entry(snap, &rbd_dev->snaps, node) {
807 if (!strcmp(snap_name, snap->name)) {
0d7dbfce 808 rbd_dev->spec->snap_id = snap->id;
e86924a8 809 rbd_dev->mapping.size = snap->size;
34b13184 810 rbd_dev->mapping.features = snap->features;
602adf40 811
e86924a8 812 return 0;
00f1f36f 813 }
00f1f36f 814 }
e86924a8 815
00f1f36f 816 return -ENOENT;
602adf40
YS
817}
818
819d52bf 819static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
602adf40 820{
78dc447d 821 int ret;
602adf40 822
0d7dbfce 823 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
cc9d734c 824 sizeof (RBD_SNAP_HEAD_NAME))) {
0d7dbfce 825 rbd_dev->spec->snap_id = CEPH_NOSNAP;
99c1f08f 826 rbd_dev->mapping.size = rbd_dev->header.image_size;
34b13184 827 rbd_dev->mapping.features = rbd_dev->header.features;
e86924a8 828 ret = 0;
602adf40 829 } else {
0d7dbfce 830 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
602adf40
YS
831 if (ret < 0)
832 goto done;
f84344f3 833 rbd_dev->mapping.read_only = true;
602adf40 834 }
6d292906
AE
835 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
836
602adf40 837done:
602adf40
YS
838 return ret;
839}
840
841static void rbd_header_free(struct rbd_image_header *header)
842{
849b4260 843 kfree(header->object_prefix);
d78fd7ae 844 header->object_prefix = NULL;
602adf40 845 kfree(header->snap_sizes);
d78fd7ae 846 header->snap_sizes = NULL;
849b4260 847 kfree(header->snap_names);
d78fd7ae 848 header->snap_names = NULL;
d1d25646 849 ceph_put_snap_context(header->snapc);
d78fd7ae 850 header->snapc = NULL;
602adf40
YS
851}
852
98571b5a 853static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 854{
65ccfe21
AE
855 char *name;
856 u64 segment;
857 int ret;
602adf40 858
2fd82b9e 859 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
65ccfe21
AE
860 if (!name)
861 return NULL;
862 segment = offset >> rbd_dev->header.obj_order;
2fd82b9e 863 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
65ccfe21 864 rbd_dev->header.object_prefix, segment);
2fd82b9e 865 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
866 pr_err("error formatting segment name for #%llu (%d)\n",
867 segment, ret);
868 kfree(name);
869 name = NULL;
870 }
602adf40 871
65ccfe21
AE
872 return name;
873}
602adf40 874
65ccfe21
AE
875static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
876{
877 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 878
65ccfe21
AE
879 return offset & (segment_size - 1);
880}
881
882static u64 rbd_segment_length(struct rbd_device *rbd_dev,
883 u64 offset, u64 length)
884{
885 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
886
887 offset &= segment_size - 1;
888
aafb230e 889 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
890 if (offset + length > segment_size)
891 length = segment_size - offset;
892
893 return length;
602adf40
YS
894}
895
029bcbd8
JD
896/*
897 * returns the size of an object in the image
898 */
899static u64 rbd_obj_bytes(struct rbd_image_header *header)
900{
901 return 1 << header->obj_order;
902}
903
602adf40
YS
904/*
905 * bio helpers
906 */
907
908static void bio_chain_put(struct bio *chain)
909{
910 struct bio *tmp;
911
912 while (chain) {
913 tmp = chain;
914 chain = chain->bi_next;
915 bio_put(tmp);
916 }
917}
918
919/*
920 * zeros a bio chain, starting at specific offset
921 */
922static void zero_bio_chain(struct bio *chain, int start_ofs)
923{
924 struct bio_vec *bv;
925 unsigned long flags;
926 void *buf;
927 int i;
928 int pos = 0;
929
930 while (chain) {
931 bio_for_each_segment(bv, chain, i) {
932 if (pos + bv->bv_len > start_ofs) {
933 int remainder = max(start_ofs - pos, 0);
934 buf = bvec_kmap_irq(bv, &flags);
935 memset(buf + remainder, 0,
936 bv->bv_len - remainder);
85b5aaa6 937 bvec_kunmap_irq(buf, &flags);
602adf40
YS
938 }
939 pos += bv->bv_len;
940 }
941
942 chain = chain->bi_next;
943 }
944}
945
946/*
f7760dad
AE
947 * Clone a portion of a bio, starting at the given byte offset
948 * and continuing for the number of bytes indicated.
602adf40 949 */
f7760dad
AE
950static struct bio *bio_clone_range(struct bio *bio_src,
951 unsigned int offset,
952 unsigned int len,
953 gfp_t gfpmask)
602adf40 954{
f7760dad
AE
955 struct bio_vec *bv;
956 unsigned int resid;
957 unsigned short idx;
958 unsigned int voff;
959 unsigned short end_idx;
960 unsigned short vcnt;
961 struct bio *bio;
962
963 /* Handle the easy case for the caller */
964
965 if (!offset && len == bio_src->bi_size)
966 return bio_clone(bio_src, gfpmask);
967
968 if (WARN_ON_ONCE(!len))
969 return NULL;
970 if (WARN_ON_ONCE(len > bio_src->bi_size))
971 return NULL;
972 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
973 return NULL;
974
975 /* Find first affected segment... */
976
977 resid = offset;
978 __bio_for_each_segment(bv, bio_src, idx, 0) {
979 if (resid < bv->bv_len)
980 break;
981 resid -= bv->bv_len;
602adf40 982 }
f7760dad 983 voff = resid;
602adf40 984
f7760dad 985 /* ...and the last affected segment */
602adf40 986
f7760dad
AE
987 resid += len;
988 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
989 if (resid <= bv->bv_len)
990 break;
991 resid -= bv->bv_len;
992 }
993 vcnt = end_idx - idx + 1;
994
995 /* Build the clone */
996
997 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
998 if (!bio)
999 return NULL; /* ENOMEM */
602adf40 1000
f7760dad
AE
1001 bio->bi_bdev = bio_src->bi_bdev;
1002 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1003 bio->bi_rw = bio_src->bi_rw;
1004 bio->bi_flags |= 1 << BIO_CLONED;
1005
1006 /*
1007 * Copy over our part of the bio_vec, then update the first
1008 * and last (or only) entries.
1009 */
1010 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1011 vcnt * sizeof (struct bio_vec));
1012 bio->bi_io_vec[0].bv_offset += voff;
1013 if (vcnt > 1) {
1014 bio->bi_io_vec[0].bv_len -= voff;
1015 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1016 } else {
1017 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
1018 }
1019
f7760dad
AE
1020 bio->bi_vcnt = vcnt;
1021 bio->bi_size = len;
1022 bio->bi_idx = 0;
1023
1024 return bio;
1025}
1026
1027/*
1028 * Clone a portion of a bio chain, starting at the given byte offset
1029 * into the first bio in the source chain and continuing for the
1030 * number of bytes indicated. The result is another bio chain of
1031 * exactly the given length, or a null pointer on error.
1032 *
1033 * The bio_src and offset parameters are both in-out. On entry they
1034 * refer to the first source bio and the offset into that bio where
1035 * the start of data to be cloned is located.
1036 *
1037 * On return, bio_src is updated to refer to the bio in the source
1038 * chain that contains first un-cloned byte, and *offset will
1039 * contain the offset of that byte within that bio.
1040 */
1041static struct bio *bio_chain_clone_range(struct bio **bio_src,
1042 unsigned int *offset,
1043 unsigned int len,
1044 gfp_t gfpmask)
1045{
1046 struct bio *bi = *bio_src;
1047 unsigned int off = *offset;
1048 struct bio *chain = NULL;
1049 struct bio **end;
1050
1051 /* Build up a chain of clone bios up to the limit */
1052
1053 if (!bi || off >= bi->bi_size || !len)
1054 return NULL; /* Nothing to clone */
602adf40 1055
f7760dad
AE
1056 end = &chain;
1057 while (len) {
1058 unsigned int bi_size;
1059 struct bio *bio;
1060
f5400b7a
AE
1061 if (!bi) {
1062 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1063 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1064 }
f7760dad
AE
1065 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1066 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1067 if (!bio)
1068 goto out_err; /* ENOMEM */
1069
1070 *end = bio;
1071 end = &bio->bi_next;
602adf40 1072
f7760dad
AE
1073 off += bi_size;
1074 if (off == bi->bi_size) {
1075 bi = bi->bi_next;
1076 off = 0;
1077 }
1078 len -= bi_size;
1079 }
1080 *bio_src = bi;
1081 *offset = off;
1082
1083 return chain;
1084out_err:
1085 bio_chain_put(chain);
602adf40 1086
602adf40
YS
1087 return NULL;
1088}
1089
926f9b3f
AE
1090/*
1091 * The default/initial value for all object request flags is 0. For
1092 * each flag, once its value is set to 1 it is never reset to 0
1093 * again.
1094 */
1095static void obj_request_done_set(struct rbd_obj_request *obj_request)
1096{
1097 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1098 struct rbd_img_request *img_request = obj_request->img_request;
1099 struct rbd_device *rbd_dev;
1100
1101 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1102 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1103 obj_request);
1104 }
1105}
1106
1107static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1108{
1109 smp_mb();
1110 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1111}
1112
6365d33a
AE
1113static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1114{
1115 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1116 struct rbd_img_request *img_request = obj_request->img_request;
1117 struct rbd_device *rbd_dev;
1118
1119 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1120 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1121 obj_request);
1122 }
1123}
1124
1125static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1126{
1127 smp_mb();
1128 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1129}
1130
bf0d5f50
AE
1131static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1132{
37206ee5
AE
1133 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1134 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1135 kref_get(&obj_request->kref);
1136}
1137
1138static void rbd_obj_request_destroy(struct kref *kref);
1139static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1140{
1141 rbd_assert(obj_request != NULL);
37206ee5
AE
1142 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1143 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1144 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1145}
1146
1147static void rbd_img_request_get(struct rbd_img_request *img_request)
1148{
37206ee5
AE
1149 dout("%s: img %p (was %d)\n", __func__, img_request,
1150 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1151 kref_get(&img_request->kref);
1152}
1153
1154static void rbd_img_request_destroy(struct kref *kref);
1155static void rbd_img_request_put(struct rbd_img_request *img_request)
1156{
1157 rbd_assert(img_request != NULL);
37206ee5
AE
1158 dout("%s: img %p (was %d)\n", __func__, img_request,
1159 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1160 kref_put(&img_request->kref, rbd_img_request_destroy);
1161}
1162
1163static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1164 struct rbd_obj_request *obj_request)
1165{
25dcf954
AE
1166 rbd_assert(obj_request->img_request == NULL);
1167
bf0d5f50
AE
1168 rbd_obj_request_get(obj_request);
1169 obj_request->img_request = img_request;
25dcf954 1170 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1171 rbd_assert(!obj_request_img_data_test(obj_request));
1172 obj_request_img_data_set(obj_request);
bf0d5f50 1173 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1174 img_request->obj_request_count++;
1175 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1176 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1177 obj_request->which);
bf0d5f50
AE
1178}
1179
1180static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1181 struct rbd_obj_request *obj_request)
1182{
1183 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1184
37206ee5
AE
1185 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1186 obj_request->which);
bf0d5f50 1187 list_del(&obj_request->links);
25dcf954
AE
1188 rbd_assert(img_request->obj_request_count > 0);
1189 img_request->obj_request_count--;
1190 rbd_assert(obj_request->which == img_request->obj_request_count);
1191 obj_request->which = BAD_WHICH;
6365d33a 1192 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1193 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1194 obj_request->img_request = NULL;
25dcf954 1195 obj_request->callback = NULL;
bf0d5f50
AE
1196 rbd_obj_request_put(obj_request);
1197}
1198
1199static bool obj_request_type_valid(enum obj_request_type type)
1200{
1201 switch (type) {
9969ebc5 1202 case OBJ_REQUEST_NODATA:
bf0d5f50 1203 case OBJ_REQUEST_BIO:
788e2df3 1204 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1205 return true;
1206 default:
1207 return false;
1208 }
1209}
1210
bf0d5f50
AE
1211static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1212 struct rbd_obj_request *obj_request)
1213{
37206ee5
AE
1214 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1215
bf0d5f50
AE
1216 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1217}
1218
1219static void rbd_img_request_complete(struct rbd_img_request *img_request)
1220{
55f27e09 1221
37206ee5 1222 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1223
1224 /*
1225 * If no error occurred, compute the aggregate transfer
1226 * count for the image request. We could instead use
1227 * atomic64_cmpxchg() to update it as each object request
1228 * completes; not clear which way is better off hand.
1229 */
1230 if (!img_request->result) {
1231 struct rbd_obj_request *obj_request;
1232 u64 xferred = 0;
1233
1234 for_each_obj_request(img_request, obj_request)
1235 xferred += obj_request->xferred;
1236 img_request->xferred = xferred;
1237 }
1238
bf0d5f50
AE
1239 if (img_request->callback)
1240 img_request->callback(img_request);
1241 else
1242 rbd_img_request_put(img_request);
1243}
1244
788e2df3
AE
1245/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1246
1247static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1248{
37206ee5
AE
1249 dout("%s: obj %p\n", __func__, obj_request);
1250
788e2df3
AE
1251 return wait_for_completion_interruptible(&obj_request->completion);
1252}
1253
0c425248
AE
1254/*
1255 * The default/initial value for all image request flags is 0. Each
1256 * is conditionally set to 1 at image request initialization time
1257 * and currently never change thereafter.
1258 */
1259static void img_request_write_set(struct rbd_img_request *img_request)
1260{
1261 set_bit(IMG_REQ_WRITE, &img_request->flags);
1262 smp_mb();
1263}
1264
1265static bool img_request_write_test(struct rbd_img_request *img_request)
1266{
1267 smp_mb();
1268 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1269}
1270
9849e986
AE
1271static void img_request_child_set(struct rbd_img_request *img_request)
1272{
1273 set_bit(IMG_REQ_CHILD, &img_request->flags);
1274 smp_mb();
1275}
1276
1277static bool img_request_child_test(struct rbd_img_request *img_request)
1278{
1279 smp_mb();
1280 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1281}
1282
d0b2e944
AE
1283static void img_request_layered_set(struct rbd_img_request *img_request)
1284{
1285 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1286 smp_mb();
1287}
1288
1289static bool img_request_layered_test(struct rbd_img_request *img_request)
1290{
1291 smp_mb();
1292 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1293}
1294
6e2a4505
AE
1295static void
1296rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1297{
1298 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1299 obj_request, obj_request->img_request, obj_request->result,
1300 obj_request->xferred, obj_request->length);
1301 /*
1302 * ENOENT means a hole in the image. We zero-fill the
1303 * entire length of the request. A short read also implies
1304 * zero-fill to the end of the request. Either way we
1305 * update the xferred count to indicate the whole request
1306 * was satisfied.
1307 */
1308 BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
1309 if (obj_request->result == -ENOENT) {
1310 zero_bio_chain(obj_request->bio_list, 0);
1311 obj_request->result = 0;
1312 obj_request->xferred = obj_request->length;
1313 } else if (obj_request->xferred < obj_request->length &&
1314 !obj_request->result) {
1315 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1316 obj_request->xferred = obj_request->length;
1317 }
1318 obj_request_done_set(obj_request);
1319}
1320
bf0d5f50
AE
1321static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1322{
37206ee5
AE
1323 dout("%s: obj %p cb %p\n", __func__, obj_request,
1324 obj_request->callback);
bf0d5f50
AE
1325 if (obj_request->callback)
1326 obj_request->callback(obj_request);
788e2df3
AE
1327 else
1328 complete_all(&obj_request->completion);
bf0d5f50
AE
1329}
1330
c47f9371 1331static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1332{
1333 dout("%s: obj %p\n", __func__, obj_request);
1334 obj_request_done_set(obj_request);
1335}
1336
c47f9371 1337static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1338{
37206ee5 1339 dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
c47f9371 1340 obj_request->result, obj_request->xferred, obj_request->length);
6e2a4505
AE
1341 if (obj_request->img_request)
1342 rbd_img_obj_request_read_callback(obj_request);
1343 else
1344 obj_request_done_set(obj_request);
bf0d5f50
AE
1345}
1346
c47f9371 1347static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1348{
1b83bef2
SW
1349 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1350 obj_request->result, obj_request->length);
1351 /*
1352 * There is no such thing as a successful short write.
1353 * Our xferred value is the number of bytes transferred
1354 * back. Set it to our originally-requested length.
1355 */
1356 obj_request->xferred = obj_request->length;
07741308 1357 obj_request_done_set(obj_request);
bf0d5f50
AE
1358}
1359
fbfab539
AE
1360/*
1361 * For a simple stat call there's nothing to do. We'll do more if
1362 * this is part of a write sequence for a layered image.
1363 */
c47f9371 1364static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1365{
37206ee5 1366 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1367 obj_request_done_set(obj_request);
1368}
1369
bf0d5f50
AE
1370static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1371 struct ceph_msg *msg)
1372{
1373 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1374 u16 opcode;
1375
37206ee5 1376 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50 1377 rbd_assert(osd_req == obj_request->osd_req);
6365d33a
AE
1378 rbd_assert(obj_request_img_data_test(obj_request) ^
1379 !obj_request->img_request);
1380 rbd_assert(obj_request_img_data_test(obj_request) ^
bf0d5f50
AE
1381 (obj_request->which == BAD_WHICH));
1382
1b83bef2
SW
1383 if (osd_req->r_result < 0)
1384 obj_request->result = osd_req->r_result;
bf0d5f50
AE
1385 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1386
1b83bef2 1387 WARN_ON(osd_req->r_num_ops != 1); /* For now */
bf0d5f50 1388
c47f9371
AE
1389 /*
1390 * We support a 64-bit length, but ultimately it has to be
1391 * passed to blk_end_request(), which takes an unsigned int.
1392 */
1b83bef2 1393 obj_request->xferred = osd_req->r_reply_op_len[0];
c47f9371 1394 rbd_assert(obj_request->xferred < (u64) UINT_MAX);
79528734 1395 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1396 switch (opcode) {
1397 case CEPH_OSD_OP_READ:
c47f9371 1398 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1399 break;
1400 case CEPH_OSD_OP_WRITE:
c47f9371 1401 rbd_osd_write_callback(obj_request);
bf0d5f50 1402 break;
fbfab539 1403 case CEPH_OSD_OP_STAT:
c47f9371 1404 rbd_osd_stat_callback(obj_request);
fbfab539 1405 break;
36be9a76 1406 case CEPH_OSD_OP_CALL:
b8d70035 1407 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1408 case CEPH_OSD_OP_WATCH:
c47f9371 1409 rbd_osd_trivial_callback(obj_request);
9969ebc5 1410 break;
bf0d5f50
AE
1411 default:
1412 rbd_warn(NULL, "%s: unsupported op %hu\n",
1413 obj_request->object_name, (unsigned short) opcode);
1414 break;
1415 }
1416
07741308 1417 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1418 rbd_obj_request_complete(obj_request);
1419}
1420
2fa12320 1421static void rbd_osd_req_format(struct rbd_obj_request *obj_request,
79528734 1422 bool write_request)
430c28c3
AE
1423{
1424 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1425 struct ceph_osd_request *osd_req = obj_request->osd_req;
430c28c3
AE
1426 struct ceph_snap_context *snapc = NULL;
1427 u64 snap_id = CEPH_NOSNAP;
1428 struct timespec *mtime = NULL;
1429 struct timespec now;
1430
8c042b0d 1431 rbd_assert(osd_req != NULL);
430c28c3
AE
1432
1433 if (write_request) {
1434 now = CURRENT_TIME;
1435 mtime = &now;
1436 if (img_request)
1437 snapc = img_request->snapc;
2fa12320
AE
1438 } else if (img_request) {
1439 snap_id = img_request->snap_id;
8c042b0d
AE
1440 }
1441 ceph_osdc_build_request(osd_req, obj_request->offset,
79528734 1442 snapc, snap_id, mtime);
430c28c3
AE
1443}
1444
bf0d5f50
AE
1445static struct ceph_osd_request *rbd_osd_req_create(
1446 struct rbd_device *rbd_dev,
1447 bool write_request,
430c28c3 1448 struct rbd_obj_request *obj_request)
bf0d5f50 1449{
bf0d5f50
AE
1450 struct ceph_snap_context *snapc = NULL;
1451 struct ceph_osd_client *osdc;
1452 struct ceph_osd_request *osd_req;
bf0d5f50 1453
6365d33a
AE
1454 if (obj_request_img_data_test(obj_request)) {
1455 struct rbd_img_request *img_request = obj_request->img_request;
1456
0c425248
AE
1457 rbd_assert(write_request ==
1458 img_request_write_test(img_request));
1459 if (write_request)
bf0d5f50 1460 snapc = img_request->snapc;
bf0d5f50
AE
1461 }
1462
1463 /* Allocate and initialize the request, for the single op */
1464
1465 osdc = &rbd_dev->rbd_client->client->osdc;
1466 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1467 if (!osd_req)
1468 return NULL; /* ENOMEM */
bf0d5f50 1469
430c28c3 1470 if (write_request)
bf0d5f50 1471 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1472 else
bf0d5f50 1473 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1474
1475 osd_req->r_callback = rbd_osd_req_callback;
1476 osd_req->r_priv = obj_request;
1477
1478 osd_req->r_oid_len = strlen(obj_request->object_name);
1479 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1480 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1481
1482 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1483
bf0d5f50
AE
1484 return osd_req;
1485}
1486
1487static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1488{
1489 ceph_osdc_put_request(osd_req);
1490}
1491
1492/* object_name is assumed to be a non-null pointer and NUL-terminated */
1493
1494static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1495 u64 offset, u64 length,
1496 enum obj_request_type type)
1497{
1498 struct rbd_obj_request *obj_request;
1499 size_t size;
1500 char *name;
1501
1502 rbd_assert(obj_request_type_valid(type));
1503
1504 size = strlen(object_name) + 1;
1505 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1506 if (!obj_request)
1507 return NULL;
1508
1509 name = (char *)(obj_request + 1);
1510 obj_request->object_name = memcpy(name, object_name, size);
1511 obj_request->offset = offset;
1512 obj_request->length = length;
926f9b3f 1513 obj_request->flags = 0;
bf0d5f50
AE
1514 obj_request->which = BAD_WHICH;
1515 obj_request->type = type;
1516 INIT_LIST_HEAD(&obj_request->links);
788e2df3 1517 init_completion(&obj_request->completion);
bf0d5f50
AE
1518 kref_init(&obj_request->kref);
1519
37206ee5
AE
1520 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1521 offset, length, (int)type, obj_request);
1522
bf0d5f50
AE
1523 return obj_request;
1524}
1525
1526static void rbd_obj_request_destroy(struct kref *kref)
1527{
1528 struct rbd_obj_request *obj_request;
1529
1530 obj_request = container_of(kref, struct rbd_obj_request, kref);
1531
37206ee5
AE
1532 dout("%s: obj %p\n", __func__, obj_request);
1533
bf0d5f50
AE
1534 rbd_assert(obj_request->img_request == NULL);
1535 rbd_assert(obj_request->which == BAD_WHICH);
1536
1537 if (obj_request->osd_req)
1538 rbd_osd_req_destroy(obj_request->osd_req);
1539
1540 rbd_assert(obj_request_type_valid(obj_request->type));
1541 switch (obj_request->type) {
9969ebc5
AE
1542 case OBJ_REQUEST_NODATA:
1543 break; /* Nothing to do */
bf0d5f50
AE
1544 case OBJ_REQUEST_BIO:
1545 if (obj_request->bio_list)
1546 bio_chain_put(obj_request->bio_list);
1547 break;
788e2df3
AE
1548 case OBJ_REQUEST_PAGES:
1549 if (obj_request->pages)
1550 ceph_release_page_vector(obj_request->pages,
1551 obj_request->page_count);
1552 break;
bf0d5f50
AE
1553 }
1554
1555 kfree(obj_request);
1556}
1557
1558/*
1559 * Caller is responsible for filling in the list of object requests
1560 * that comprises the image request, and the Linux request pointer
1561 * (if there is one).
1562 */
cc344fa1
AE
1563static struct rbd_img_request *rbd_img_request_create(
1564 struct rbd_device *rbd_dev,
bf0d5f50 1565 u64 offset, u64 length,
9849e986
AE
1566 bool write_request,
1567 bool child_request)
bf0d5f50
AE
1568{
1569 struct rbd_img_request *img_request;
1570 struct ceph_snap_context *snapc = NULL;
1571
1572 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1573 if (!img_request)
1574 return NULL;
1575
1576 if (write_request) {
1577 down_read(&rbd_dev->header_rwsem);
1578 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1579 up_read(&rbd_dev->header_rwsem);
1580 if (WARN_ON(!snapc)) {
1581 kfree(img_request);
1582 return NULL; /* Shouldn't happen */
1583 }
0c425248 1584
bf0d5f50
AE
1585 }
1586
1587 img_request->rq = NULL;
1588 img_request->rbd_dev = rbd_dev;
1589 img_request->offset = offset;
1590 img_request->length = length;
0c425248
AE
1591 img_request->flags = 0;
1592 if (write_request) {
1593 img_request_write_set(img_request);
bf0d5f50 1594 img_request->snapc = snapc;
0c425248 1595 } else {
bf0d5f50 1596 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 1597 }
9849e986
AE
1598 if (child_request)
1599 img_request_child_set(img_request);
d0b2e944
AE
1600 if (rbd_dev->parent_spec)
1601 img_request_layered_set(img_request);
bf0d5f50
AE
1602 spin_lock_init(&img_request->completion_lock);
1603 img_request->next_completion = 0;
1604 img_request->callback = NULL;
a5a337d4 1605 img_request->result = 0;
bf0d5f50
AE
1606 img_request->obj_request_count = 0;
1607 INIT_LIST_HEAD(&img_request->obj_requests);
1608 kref_init(&img_request->kref);
1609
d0b2e944 1610 (void) img_request_layered_test(img_request); /* Avoid a warning */
bf0d5f50
AE
1611 rbd_img_request_get(img_request); /* Avoid a warning */
1612 rbd_img_request_put(img_request); /* TEMPORARY */
1613
37206ee5
AE
1614 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1615 write_request ? "write" : "read", offset, length,
1616 img_request);
1617
bf0d5f50
AE
1618 return img_request;
1619}
1620
1621static void rbd_img_request_destroy(struct kref *kref)
1622{
1623 struct rbd_img_request *img_request;
1624 struct rbd_obj_request *obj_request;
1625 struct rbd_obj_request *next_obj_request;
1626
1627 img_request = container_of(kref, struct rbd_img_request, kref);
1628
37206ee5
AE
1629 dout("%s: img %p\n", __func__, img_request);
1630
bf0d5f50
AE
1631 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1632 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 1633 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 1634
0c425248 1635 if (img_request_write_test(img_request))
bf0d5f50
AE
1636 ceph_put_snap_context(img_request->snapc);
1637
1638 kfree(img_request);
1639}
1640
1217857f
AE
1641static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1642{
6365d33a 1643 struct rbd_img_request *img_request;
1217857f
AE
1644 unsigned int xferred;
1645 int result;
1646
6365d33a
AE
1647 rbd_assert(obj_request_img_data_test(obj_request));
1648 img_request = obj_request->img_request;
1649
1217857f
AE
1650 rbd_assert(!img_request_child_test(img_request));
1651 rbd_assert(img_request->rq != NULL);
1652
1653 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1654 xferred = (unsigned int)obj_request->xferred;
1655 result = obj_request->result;
1656 if (result) {
1657 struct rbd_device *rbd_dev = img_request->rbd_dev;
1658
1659 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1660 img_request_write_test(img_request) ? "write" : "read",
1661 obj_request->length, obj_request->img_offset,
1662 obj_request->offset);
1663 rbd_warn(rbd_dev, " result %d xferred %x\n",
1664 result, xferred);
1665 if (!img_request->result)
1666 img_request->result = result;
1667 }
1668
1669 return blk_end_request(img_request->rq, result, xferred);
1670}
1671
2169238d
AE
1672static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1673{
1674 struct rbd_img_request *img_request;
1675 u32 which = obj_request->which;
1676 bool more = true;
1677
6365d33a 1678 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
1679 img_request = obj_request->img_request;
1680
1681 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1682 rbd_assert(img_request != NULL);
2169238d
AE
1683 rbd_assert(img_request->obj_request_count > 0);
1684 rbd_assert(which != BAD_WHICH);
1685 rbd_assert(which < img_request->obj_request_count);
1686 rbd_assert(which >= img_request->next_completion);
1687
1688 spin_lock_irq(&img_request->completion_lock);
1689 if (which != img_request->next_completion)
1690 goto out;
1691
1692 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
1693 rbd_assert(more);
1694 rbd_assert(which < img_request->obj_request_count);
1695
1696 if (!obj_request_done_test(obj_request))
1697 break;
1217857f 1698 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
1699 which++;
1700 }
1701
1702 rbd_assert(more ^ (which == img_request->obj_request_count));
1703 img_request->next_completion = which;
1704out:
1705 spin_unlock_irq(&img_request->completion_lock);
1706
1707 if (!more)
1708 rbd_img_request_complete(img_request);
1709}
1710
bf0d5f50
AE
1711static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1712 struct bio *bio_list)
1713{
1714 struct rbd_device *rbd_dev = img_request->rbd_dev;
1715 struct rbd_obj_request *obj_request = NULL;
1716 struct rbd_obj_request *next_obj_request;
0c425248 1717 bool write_request = img_request_write_test(img_request);
bf0d5f50 1718 unsigned int bio_offset;
7da22d29 1719 u64 img_offset;
bf0d5f50
AE
1720 u64 resid;
1721 u16 opcode;
1722
37206ee5
AE
1723 dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1724
430c28c3 1725 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
bf0d5f50 1726 bio_offset = 0;
7da22d29
AE
1727 img_offset = img_request->offset;
1728 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
bf0d5f50 1729 resid = img_request->length;
4dda41d3 1730 rbd_assert(resid > 0);
bf0d5f50 1731 while (resid) {
2fa12320 1732 struct ceph_osd_request *osd_req;
bf0d5f50
AE
1733 const char *object_name;
1734 unsigned int clone_size;
bf0d5f50
AE
1735 u64 offset;
1736 u64 length;
1737
7da22d29 1738 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
1739 if (!object_name)
1740 goto out_unwind;
7da22d29
AE
1741 offset = rbd_segment_offset(rbd_dev, img_offset);
1742 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50
AE
1743 obj_request = rbd_obj_request_create(object_name,
1744 offset, length,
1745 OBJ_REQUEST_BIO);
1746 kfree(object_name); /* object request has its own copy */
1747 if (!obj_request)
1748 goto out_unwind;
1749
1750 rbd_assert(length <= (u64) UINT_MAX);
1751 clone_size = (unsigned int) length;
1752 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1753 &bio_offset, clone_size,
1754 GFP_ATOMIC);
1755 if (!obj_request->bio_list)
1756 goto out_partial;
1757
2fa12320
AE
1758 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1759 obj_request);
1760 if (!osd_req)
bf0d5f50 1761 goto out_partial;
2fa12320 1762 obj_request->osd_req = osd_req;
2169238d 1763 obj_request->callback = rbd_img_obj_callback;
430c28c3 1764
2fa12320
AE
1765 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1766 0, 0);
a4ce40a9
AE
1767 osd_req_op_extent_osd_data_bio(osd_req, 0, write_request,
1768 obj_request->bio_list, obj_request->length);
2fa12320 1769 rbd_osd_req_format(obj_request, write_request);
430c28c3 1770
7da22d29 1771 obj_request->img_offset = img_offset;
bf0d5f50
AE
1772 rbd_img_obj_request_add(img_request, obj_request);
1773
7da22d29 1774 img_offset += length;
bf0d5f50
AE
1775 resid -= length;
1776 }
1777
1778 return 0;
1779
1780out_partial:
1781 rbd_obj_request_put(obj_request);
1782out_unwind:
1783 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1784 rbd_obj_request_put(obj_request);
1785
1786 return -ENOMEM;
1787}
1788
bf0d5f50
AE
1789static int rbd_img_request_submit(struct rbd_img_request *img_request)
1790{
1791 struct rbd_device *rbd_dev = img_request->rbd_dev;
1792 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1793 struct rbd_obj_request *obj_request;
46faeed4 1794 struct rbd_obj_request *next_obj_request;
bf0d5f50 1795
37206ee5 1796 dout("%s: img %p\n", __func__, img_request);
46faeed4 1797 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
1798 int ret;
1799
bf0d5f50
AE
1800 ret = rbd_obj_request_submit(osdc, obj_request);
1801 if (ret)
1802 return ret;
1803 /*
1804 * The image request has its own reference to each
1805 * of its object requests, so we can safely drop the
1806 * initial one here.
1807 */
1808 rbd_obj_request_put(obj_request);
1809 }
1810
1811 return 0;
1812}
1813
cf81b60e 1814static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
b8d70035
AE
1815 u64 ver, u64 notify_id)
1816{
1817 struct rbd_obj_request *obj_request;
2169238d 1818 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
b8d70035
AE
1819 int ret;
1820
1821 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1822 OBJ_REQUEST_NODATA);
1823 if (!obj_request)
1824 return -ENOMEM;
1825
1826 ret = -ENOMEM;
430c28c3 1827 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
b8d70035
AE
1828 if (!obj_request->osd_req)
1829 goto out;
2169238d 1830 obj_request->callback = rbd_obj_request_put;
b8d70035 1831
c99d2d4a
AE
1832 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
1833 notify_id, ver, 0);
2fa12320 1834 rbd_osd_req_format(obj_request, false);
430c28c3 1835
b8d70035 1836 ret = rbd_obj_request_submit(osdc, obj_request);
b8d70035 1837out:
cf81b60e
AE
1838 if (ret)
1839 rbd_obj_request_put(obj_request);
b8d70035
AE
1840
1841 return ret;
1842}
1843
1844static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1845{
1846 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1847 u64 hver;
1848 int rc;
1849
1850 if (!rbd_dev)
1851 return;
1852
37206ee5 1853 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
b8d70035
AE
1854 rbd_dev->header_name, (unsigned long long) notify_id,
1855 (unsigned int) opcode);
1856 rc = rbd_dev_refresh(rbd_dev, &hver);
1857 if (rc)
1858 rbd_warn(rbd_dev, "got notification but failed to "
1859 " update snaps: %d\n", rc);
1860
cf81b60e 1861 rbd_obj_notify_ack(rbd_dev, hver, notify_id);
b8d70035
AE
1862}
1863
9969ebc5
AE
1864/*
1865 * Request sync osd watch/unwatch. The value of "start" determines
1866 * whether a watch request is being initiated or torn down.
1867 */
1868static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1869{
1870 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1871 struct rbd_obj_request *obj_request;
9969ebc5
AE
1872 int ret;
1873
1874 rbd_assert(start ^ !!rbd_dev->watch_event);
1875 rbd_assert(start ^ !!rbd_dev->watch_request);
1876
1877 if (start) {
3c663bbd 1878 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
1879 &rbd_dev->watch_event);
1880 if (ret < 0)
1881 return ret;
8eb87565 1882 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
1883 }
1884
1885 ret = -ENOMEM;
1886 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1887 OBJ_REQUEST_NODATA);
1888 if (!obj_request)
1889 goto out_cancel;
1890
430c28c3
AE
1891 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
1892 if (!obj_request->osd_req)
1893 goto out_cancel;
1894
8eb87565 1895 if (start)
975241af 1896 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 1897 else
6977c3f9 1898 ceph_osdc_unregister_linger_request(osdc,
975241af 1899 rbd_dev->watch_request->osd_req);
2169238d
AE
1900
1901 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1902 rbd_dev->watch_event->cookie,
1903 rbd_dev->header.obj_version, start);
1904 rbd_osd_req_format(obj_request, true);
1905
9969ebc5
AE
1906 ret = rbd_obj_request_submit(osdc, obj_request);
1907 if (ret)
1908 goto out_cancel;
1909 ret = rbd_obj_request_wait(obj_request);
1910 if (ret)
1911 goto out_cancel;
9969ebc5
AE
1912 ret = obj_request->result;
1913 if (ret)
1914 goto out_cancel;
1915
8eb87565
AE
1916 /*
1917 * A watch request is set to linger, so the underlying osd
1918 * request won't go away until we unregister it. We retain
1919 * a pointer to the object request during that time (in
1920 * rbd_dev->watch_request), so we'll keep a reference to
1921 * it. We'll drop that reference (below) after we've
1922 * unregistered it.
1923 */
1924 if (start) {
1925 rbd_dev->watch_request = obj_request;
1926
1927 return 0;
1928 }
1929
1930 /* We have successfully torn down the watch request */
1931
1932 rbd_obj_request_put(rbd_dev->watch_request);
1933 rbd_dev->watch_request = NULL;
9969ebc5
AE
1934out_cancel:
1935 /* Cancel the event if we're tearing down, or on error */
1936 ceph_osdc_cancel_event(rbd_dev->watch_event);
1937 rbd_dev->watch_event = NULL;
9969ebc5
AE
1938 if (obj_request)
1939 rbd_obj_request_put(obj_request);
1940
1941 return ret;
1942}
1943
36be9a76
AE
1944/*
1945 * Synchronous osd object method call
1946 */
1947static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1948 const char *object_name,
1949 const char *class_name,
1950 const char *method_name,
1951 const char *outbound,
1952 size_t outbound_size,
1953 char *inbound,
1954 size_t inbound_size,
1955 u64 *version)
1956{
2169238d 1957 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 1958 struct rbd_obj_request *obj_request;
36be9a76
AE
1959 struct page **pages;
1960 u32 page_count;
1961 int ret;
1962
1963 /*
6010a451
AE
1964 * Method calls are ultimately read operations. The result
1965 * should placed into the inbound buffer provided. They
1966 * also supply outbound data--parameters for the object
1967 * method. Currently if this is present it will be a
1968 * snapshot id.
36be9a76
AE
1969 */
1970 page_count = (u32) calc_pages_for(0, inbound_size);
1971 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1972 if (IS_ERR(pages))
1973 return PTR_ERR(pages);
1974
1975 ret = -ENOMEM;
6010a451 1976 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
1977 OBJ_REQUEST_PAGES);
1978 if (!obj_request)
1979 goto out;
1980
1981 obj_request->pages = pages;
1982 obj_request->page_count = page_count;
1983
430c28c3 1984 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
36be9a76
AE
1985 if (!obj_request->osd_req)
1986 goto out;
1987
c99d2d4a 1988 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
1989 class_name, method_name);
1990 if (outbound_size) {
1991 struct ceph_pagelist *pagelist;
1992
1993 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
1994 if (!pagelist)
1995 goto out;
1996
1997 ceph_pagelist_init(pagelist);
1998 ceph_pagelist_append(pagelist, outbound, outbound_size);
1999 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2000 pagelist);
2001 }
a4ce40a9
AE
2002 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2003 obj_request->pages, inbound_size,
44cd188d 2004 0, false, false);
2fa12320 2005 rbd_osd_req_format(obj_request, false);
430c28c3 2006
36be9a76
AE
2007 ret = rbd_obj_request_submit(osdc, obj_request);
2008 if (ret)
2009 goto out;
2010 ret = rbd_obj_request_wait(obj_request);
2011 if (ret)
2012 goto out;
2013
2014 ret = obj_request->result;
2015 if (ret < 0)
2016 goto out;
23ed6e13 2017 ret = 0;
903bb32e 2018 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
2019 if (version)
2020 *version = obj_request->version;
2021out:
2022 if (obj_request)
2023 rbd_obj_request_put(obj_request);
2024 else
2025 ceph_release_page_vector(pages, page_count);
2026
2027 return ret;
2028}
2029
bf0d5f50 2030static void rbd_request_fn(struct request_queue *q)
cc344fa1 2031 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
2032{
2033 struct rbd_device *rbd_dev = q->queuedata;
2034 bool read_only = rbd_dev->mapping.read_only;
2035 struct request *rq;
2036 int result;
2037
2038 while ((rq = blk_fetch_request(q))) {
2039 bool write_request = rq_data_dir(rq) == WRITE;
2040 struct rbd_img_request *img_request;
2041 u64 offset;
2042 u64 length;
2043
2044 /* Ignore any non-FS requests that filter through. */
2045
2046 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
2047 dout("%s: non-fs request type %d\n", __func__,
2048 (int) rq->cmd_type);
2049 __blk_end_request_all(rq, 0);
2050 continue;
2051 }
2052
2053 /* Ignore/skip any zero-length requests */
2054
2055 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2056 length = (u64) blk_rq_bytes(rq);
2057
2058 if (!length) {
2059 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
2060 __blk_end_request_all(rq, 0);
2061 continue;
2062 }
2063
2064 spin_unlock_irq(q->queue_lock);
2065
2066 /* Disallow writes to a read-only device */
2067
2068 if (write_request) {
2069 result = -EROFS;
2070 if (read_only)
2071 goto end_request;
2072 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2073 }
2074
6d292906
AE
2075 /*
2076 * Quit early if the mapped snapshot no longer
2077 * exists. It's still possible the snapshot will
2078 * have disappeared by the time our request arrives
2079 * at the osd, but there's no sense in sending it if
2080 * we already know.
2081 */
2082 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
2083 dout("request for non-existent snapshot");
2084 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2085 result = -ENXIO;
2086 goto end_request;
2087 }
2088
bf0d5f50
AE
2089 result = -EINVAL;
2090 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2091 goto end_request; /* Shouldn't happen */
2092
2093 result = -ENOMEM;
2094 img_request = rbd_img_request_create(rbd_dev, offset, length,
9849e986 2095 write_request, false);
bf0d5f50
AE
2096 if (!img_request)
2097 goto end_request;
2098
2099 img_request->rq = rq;
2100
2101 result = rbd_img_request_fill_bio(img_request, rq->bio);
2102 if (!result)
2103 result = rbd_img_request_submit(img_request);
2104 if (result)
2105 rbd_img_request_put(img_request);
2106end_request:
2107 spin_lock_irq(q->queue_lock);
2108 if (result < 0) {
7da22d29
AE
2109 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2110 write_request ? "write" : "read",
2111 length, offset, result);
2112
bf0d5f50
AE
2113 __blk_end_request_all(rq, result);
2114 }
2115 }
2116}
2117
602adf40
YS
2118/*
2119 * a queue callback. Makes sure that we don't create a bio that spans across
2120 * multiple osd objects. One exception would be with a single page bios,
f7760dad 2121 * which we handle later at bio_chain_clone_range()
602adf40
YS
2122 */
2123static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2124 struct bio_vec *bvec)
2125{
2126 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
2127 sector_t sector_offset;
2128 sector_t sectors_per_obj;
2129 sector_t obj_sector_offset;
2130 int ret;
2131
2132 /*
2133 * Find how far into its rbd object the partition-relative
2134 * bio start sector is to offset relative to the enclosing
2135 * device.
2136 */
2137 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2138 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2139 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2140
2141 /*
2142 * Compute the number of bytes from that offset to the end
2143 * of the object. Account for what's already used by the bio.
2144 */
2145 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2146 if (ret > bmd->bi_size)
2147 ret -= bmd->bi_size;
2148 else
2149 ret = 0;
2150
2151 /*
2152 * Don't send back more than was asked for. And if the bio
2153 * was empty, let the whole thing through because: "Note
2154 * that a block device *must* allow a single page to be
2155 * added to an empty bio."
2156 */
2157 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2158 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2159 ret = (int) bvec->bv_len;
2160
2161 return ret;
602adf40
YS
2162}
2163
2164static void rbd_free_disk(struct rbd_device *rbd_dev)
2165{
2166 struct gendisk *disk = rbd_dev->disk;
2167
2168 if (!disk)
2169 return;
2170
602adf40
YS
2171 if (disk->flags & GENHD_FL_UP)
2172 del_gendisk(disk);
2173 if (disk->queue)
2174 blk_cleanup_queue(disk->queue);
2175 put_disk(disk);
2176}
2177
788e2df3
AE
2178static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2179 const char *object_name,
2180 u64 offset, u64 length,
2181 char *buf, u64 *version)
2182
2183{
2169238d 2184 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 2185 struct rbd_obj_request *obj_request;
788e2df3
AE
2186 struct page **pages = NULL;
2187 u32 page_count;
1ceae7ef 2188 size_t size;
788e2df3
AE
2189 int ret;
2190
2191 page_count = (u32) calc_pages_for(offset, length);
2192 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2193 if (IS_ERR(pages))
2194 ret = PTR_ERR(pages);
2195
2196 ret = -ENOMEM;
2197 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 2198 OBJ_REQUEST_PAGES);
788e2df3
AE
2199 if (!obj_request)
2200 goto out;
2201
2202 obj_request->pages = pages;
2203 obj_request->page_count = page_count;
2204
430c28c3 2205 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
788e2df3
AE
2206 if (!obj_request->osd_req)
2207 goto out;
2208
c99d2d4a
AE
2209 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2210 offset, length, 0, 0);
a4ce40a9
AE
2211 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, false,
2212 obj_request->pages,
44cd188d
AE
2213 obj_request->length,
2214 obj_request->offset & ~PAGE_MASK,
2215 false, false);
2fa12320 2216 rbd_osd_req_format(obj_request, false);
430c28c3 2217
788e2df3
AE
2218 ret = rbd_obj_request_submit(osdc, obj_request);
2219 if (ret)
2220 goto out;
2221 ret = rbd_obj_request_wait(obj_request);
2222 if (ret)
2223 goto out;
2224
2225 ret = obj_request->result;
2226 if (ret < 0)
2227 goto out;
1ceae7ef
AE
2228
2229 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2230 size = (size_t) obj_request->xferred;
903bb32e 2231 ceph_copy_from_page_vector(pages, buf, 0, size);
23ed6e13
AE
2232 rbd_assert(size <= (size_t) INT_MAX);
2233 ret = (int) size;
788e2df3
AE
2234 if (version)
2235 *version = obj_request->version;
2236out:
2237 if (obj_request)
2238 rbd_obj_request_put(obj_request);
2239 else
2240 ceph_release_page_vector(pages, page_count);
2241
2242 return ret;
2243}
2244
602adf40 2245/*
4156d998
AE
2246 * Read the complete header for the given rbd device.
2247 *
2248 * Returns a pointer to a dynamically-allocated buffer containing
2249 * the complete and validated header. Caller can pass the address
2250 * of a variable that will be filled in with the version of the
2251 * header object at the time it was read.
2252 *
2253 * Returns a pointer-coded errno if a failure occurs.
602adf40 2254 */
4156d998
AE
2255static struct rbd_image_header_ondisk *
2256rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
602adf40 2257{
4156d998 2258 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 2259 u32 snap_count = 0;
4156d998
AE
2260 u64 names_size = 0;
2261 u32 want_count;
2262 int ret;
602adf40 2263
00f1f36f 2264 /*
4156d998
AE
2265 * The complete header will include an array of its 64-bit
2266 * snapshot ids, followed by the names of those snapshots as
2267 * a contiguous block of NUL-terminated strings. Note that
2268 * the number of snapshots could change by the time we read
2269 * it in, in which case we re-read it.
00f1f36f 2270 */
4156d998
AE
2271 do {
2272 size_t size;
2273
2274 kfree(ondisk);
2275
2276 size = sizeof (*ondisk);
2277 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2278 size += names_size;
2279 ondisk = kmalloc(size, GFP_KERNEL);
2280 if (!ondisk)
2281 return ERR_PTR(-ENOMEM);
2282
788e2df3 2283 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
4156d998
AE
2284 0, size,
2285 (char *) ondisk, version);
4156d998
AE
2286 if (ret < 0)
2287 goto out_err;
2288 if (WARN_ON((size_t) ret < size)) {
2289 ret = -ENXIO;
06ecc6cb
AE
2290 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2291 size, ret);
4156d998
AE
2292 goto out_err;
2293 }
2294 if (!rbd_dev_ondisk_valid(ondisk)) {
2295 ret = -ENXIO;
06ecc6cb 2296 rbd_warn(rbd_dev, "invalid header");
4156d998 2297 goto out_err;
81e759fb 2298 }
602adf40 2299
4156d998
AE
2300 names_size = le64_to_cpu(ondisk->snap_names_len);
2301 want_count = snap_count;
2302 snap_count = le32_to_cpu(ondisk->snap_count);
2303 } while (snap_count != want_count);
00f1f36f 2304
4156d998 2305 return ondisk;
00f1f36f 2306
4156d998
AE
2307out_err:
2308 kfree(ondisk);
2309
2310 return ERR_PTR(ret);
2311}
2312
2313/*
2314 * reload the ondisk the header
2315 */
2316static int rbd_read_header(struct rbd_device *rbd_dev,
2317 struct rbd_image_header *header)
2318{
2319 struct rbd_image_header_ondisk *ondisk;
2320 u64 ver = 0;
2321 int ret;
602adf40 2322
4156d998
AE
2323 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2324 if (IS_ERR(ondisk))
2325 return PTR_ERR(ondisk);
2326 ret = rbd_header_from_disk(header, ondisk);
2327 if (ret >= 0)
2328 header->obj_version = ver;
2329 kfree(ondisk);
2330
2331 return ret;
602adf40
YS
2332}
2333
41f38c2b 2334static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
dfc5606d
YS
2335{
2336 struct rbd_snap *snap;
a0593290 2337 struct rbd_snap *next;
dfc5606d 2338
a0593290 2339 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
41f38c2b 2340 rbd_remove_snap_dev(snap);
dfc5606d
YS
2341}
2342
9478554a
AE
2343static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2344{
2345 sector_t size;
2346
0d7dbfce 2347 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
9478554a
AE
2348 return;
2349
2350 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2351 dout("setting size to %llu sectors", (unsigned long long) size);
2352 rbd_dev->mapping.size = (u64) size;
2353 set_capacity(rbd_dev->disk, size);
2354}
2355
602adf40
YS
2356/*
2357 * only read the first part of the ondisk header, without the snaps info
2358 */
117973fb 2359static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
602adf40
YS
2360{
2361 int ret;
2362 struct rbd_image_header h;
602adf40
YS
2363
2364 ret = rbd_read_header(rbd_dev, &h);
2365 if (ret < 0)
2366 return ret;
2367
a51aa0c0
JD
2368 down_write(&rbd_dev->header_rwsem);
2369
9478554a
AE
2370 /* Update image size, and check for resize of mapped image */
2371 rbd_dev->header.image_size = h.image_size;
2372 rbd_update_mapping_size(rbd_dev);
9db4b3e3 2373
849b4260 2374 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 2375 kfree(rbd_dev->header.snap_sizes);
849b4260 2376 kfree(rbd_dev->header.snap_names);
d1d25646
JD
2377 /* osd requests may still refer to snapc */
2378 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 2379
b813623a
AE
2380 if (hver)
2381 *hver = h.obj_version;
a71b891b 2382 rbd_dev->header.obj_version = h.obj_version;
93a24e08 2383 rbd_dev->header.image_size = h.image_size;
602adf40
YS
2384 rbd_dev->header.snapc = h.snapc;
2385 rbd_dev->header.snap_names = h.snap_names;
2386 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260
AE
2387 /* Free the extra copy of the object prefix */
2388 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2389 kfree(h.object_prefix);
2390
304f6808
AE
2391 ret = rbd_dev_snaps_update(rbd_dev);
2392 if (!ret)
2393 ret = rbd_dev_snaps_register(rbd_dev);
dfc5606d 2394
c666601a 2395 up_write(&rbd_dev->header_rwsem);
602adf40 2396
dfc5606d 2397 return ret;
602adf40
YS
2398}
2399
117973fb 2400static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1fe5e993
AE
2401{
2402 int ret;
2403
117973fb 2404 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1fe5e993 2405 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
117973fb
AE
2406 if (rbd_dev->image_format == 1)
2407 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2408 else
2409 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1fe5e993
AE
2410 mutex_unlock(&ctl_mutex);
2411
2412 return ret;
2413}
2414
602adf40
YS
2415static int rbd_init_disk(struct rbd_device *rbd_dev)
2416{
2417 struct gendisk *disk;
2418 struct request_queue *q;
593a9e7b 2419 u64 segment_size;
602adf40 2420
602adf40 2421 /* create gendisk info */
602adf40
YS
2422 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2423 if (!disk)
1fcdb8aa 2424 return -ENOMEM;
602adf40 2425
f0f8cef5 2426 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 2427 rbd_dev->dev_id);
602adf40
YS
2428 disk->major = rbd_dev->major;
2429 disk->first_minor = 0;
2430 disk->fops = &rbd_bd_ops;
2431 disk->private_data = rbd_dev;
2432
bf0d5f50 2433 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
2434 if (!q)
2435 goto out_disk;
029bcbd8 2436
593a9e7b
AE
2437 /* We use the default size, but let's be explicit about it. */
2438 blk_queue_physical_block_size(q, SECTOR_SIZE);
2439
029bcbd8 2440 /* set io sizes to object size */
593a9e7b
AE
2441 segment_size = rbd_obj_bytes(&rbd_dev->header);
2442 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2443 blk_queue_max_segment_size(q, segment_size);
2444 blk_queue_io_min(q, segment_size);
2445 blk_queue_io_opt(q, segment_size);
029bcbd8 2446
602adf40
YS
2447 blk_queue_merge_bvec(q, rbd_merge_bvec);
2448 disk->queue = q;
2449
2450 q->queuedata = rbd_dev;
2451
2452 rbd_dev->disk = disk;
602adf40 2453
12f02944
AE
2454 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2455
602adf40 2456 return 0;
602adf40
YS
2457out_disk:
2458 put_disk(disk);
1fcdb8aa
AE
2459
2460 return -ENOMEM;
602adf40
YS
2461}
2462
dfc5606d
YS
2463/*
2464 sysfs
2465*/
2466
593a9e7b
AE
2467static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2468{
2469 return container_of(dev, struct rbd_device, dev);
2470}
2471
dfc5606d
YS
2472static ssize_t rbd_size_show(struct device *dev,
2473 struct device_attribute *attr, char *buf)
2474{
593a9e7b 2475 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0
JD
2476 sector_t size;
2477
2478 down_read(&rbd_dev->header_rwsem);
2479 size = get_capacity(rbd_dev->disk);
2480 up_read(&rbd_dev->header_rwsem);
dfc5606d 2481
a51aa0c0 2482 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
dfc5606d
YS
2483}
2484
34b13184
AE
2485/*
2486 * Note this shows the features for whatever's mapped, which is not
2487 * necessarily the base image.
2488 */
2489static ssize_t rbd_features_show(struct device *dev,
2490 struct device_attribute *attr, char *buf)
2491{
2492 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2493
2494 return sprintf(buf, "0x%016llx\n",
2495 (unsigned long long) rbd_dev->mapping.features);
2496}
2497
dfc5606d
YS
2498static ssize_t rbd_major_show(struct device *dev,
2499 struct device_attribute *attr, char *buf)
2500{
593a9e7b 2501 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 2502
dfc5606d
YS
2503 return sprintf(buf, "%d\n", rbd_dev->major);
2504}
2505
2506static ssize_t rbd_client_id_show(struct device *dev,
2507 struct device_attribute *attr, char *buf)
602adf40 2508{
593a9e7b 2509 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2510
1dbb4399
AE
2511 return sprintf(buf, "client%lld\n",
2512 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
2513}
2514
dfc5606d
YS
2515static ssize_t rbd_pool_show(struct device *dev,
2516 struct device_attribute *attr, char *buf)
602adf40 2517{
593a9e7b 2518 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2519
0d7dbfce 2520 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
2521}
2522
9bb2f334
AE
2523static ssize_t rbd_pool_id_show(struct device *dev,
2524 struct device_attribute *attr, char *buf)
2525{
2526 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2527
0d7dbfce
AE
2528 return sprintf(buf, "%llu\n",
2529 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
2530}
2531
dfc5606d
YS
2532static ssize_t rbd_name_show(struct device *dev,
2533 struct device_attribute *attr, char *buf)
2534{
593a9e7b 2535 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2536
a92ffdf8
AE
2537 if (rbd_dev->spec->image_name)
2538 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2539
2540 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
2541}
2542
589d30e0
AE
2543static ssize_t rbd_image_id_show(struct device *dev,
2544 struct device_attribute *attr, char *buf)
2545{
2546 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2547
0d7dbfce 2548 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
2549}
2550
34b13184
AE
2551/*
2552 * Shows the name of the currently-mapped snapshot (or
2553 * RBD_SNAP_HEAD_NAME for the base image).
2554 */
dfc5606d
YS
2555static ssize_t rbd_snap_show(struct device *dev,
2556 struct device_attribute *attr,
2557 char *buf)
2558{
593a9e7b 2559 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2560
0d7dbfce 2561 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
2562}
2563
86b00e0d
AE
2564/*
2565 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2566 * for the parent image. If there is no parent, simply shows
2567 * "(no parent image)".
2568 */
2569static ssize_t rbd_parent_show(struct device *dev,
2570 struct device_attribute *attr,
2571 char *buf)
2572{
2573 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2574 struct rbd_spec *spec = rbd_dev->parent_spec;
2575 int count;
2576 char *bufp = buf;
2577
2578 if (!spec)
2579 return sprintf(buf, "(no parent image)\n");
2580
2581 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2582 (unsigned long long) spec->pool_id, spec->pool_name);
2583 if (count < 0)
2584 return count;
2585 bufp += count;
2586
2587 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2588 spec->image_name ? spec->image_name : "(unknown)");
2589 if (count < 0)
2590 return count;
2591 bufp += count;
2592
2593 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2594 (unsigned long long) spec->snap_id, spec->snap_name);
2595 if (count < 0)
2596 return count;
2597 bufp += count;
2598
2599 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2600 if (count < 0)
2601 return count;
2602 bufp += count;
2603
2604 return (ssize_t) (bufp - buf);
2605}
2606
dfc5606d
YS
2607static ssize_t rbd_image_refresh(struct device *dev,
2608 struct device_attribute *attr,
2609 const char *buf,
2610 size_t size)
2611{
593a9e7b 2612 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 2613 int ret;
602adf40 2614
117973fb 2615 ret = rbd_dev_refresh(rbd_dev, NULL);
b813623a
AE
2616
2617 return ret < 0 ? ret : size;
dfc5606d 2618}
602adf40 2619
dfc5606d 2620static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 2621static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
2622static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2623static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2624static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 2625static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 2626static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 2627static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
2628static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2629static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 2630static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
2631
2632static struct attribute *rbd_attrs[] = {
2633 &dev_attr_size.attr,
34b13184 2634 &dev_attr_features.attr,
dfc5606d
YS
2635 &dev_attr_major.attr,
2636 &dev_attr_client_id.attr,
2637 &dev_attr_pool.attr,
9bb2f334 2638 &dev_attr_pool_id.attr,
dfc5606d 2639 &dev_attr_name.attr,
589d30e0 2640 &dev_attr_image_id.attr,
dfc5606d 2641 &dev_attr_current_snap.attr,
86b00e0d 2642 &dev_attr_parent.attr,
dfc5606d 2643 &dev_attr_refresh.attr,
dfc5606d
YS
2644 NULL
2645};
2646
2647static struct attribute_group rbd_attr_group = {
2648 .attrs = rbd_attrs,
2649};
2650
2651static const struct attribute_group *rbd_attr_groups[] = {
2652 &rbd_attr_group,
2653 NULL
2654};
2655
2656static void rbd_sysfs_dev_release(struct device *dev)
2657{
2658}
2659
2660static struct device_type rbd_device_type = {
2661 .name = "rbd",
2662 .groups = rbd_attr_groups,
2663 .release = rbd_sysfs_dev_release,
2664};
2665
2666
2667/*
2668 sysfs - snapshots
2669*/
2670
2671static ssize_t rbd_snap_size_show(struct device *dev,
2672 struct device_attribute *attr,
2673 char *buf)
2674{
2675 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2676
3591538f 2677 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
dfc5606d
YS
2678}
2679
2680static ssize_t rbd_snap_id_show(struct device *dev,
2681 struct device_attribute *attr,
2682 char *buf)
2683{
2684 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2685
3591538f 2686 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
dfc5606d
YS
2687}
2688
34b13184
AE
2689static ssize_t rbd_snap_features_show(struct device *dev,
2690 struct device_attribute *attr,
2691 char *buf)
2692{
2693 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2694
2695 return sprintf(buf, "0x%016llx\n",
2696 (unsigned long long) snap->features);
2697}
2698
dfc5606d
YS
2699static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2700static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
34b13184 2701static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
dfc5606d
YS
2702
2703static struct attribute *rbd_snap_attrs[] = {
2704 &dev_attr_snap_size.attr,
2705 &dev_attr_snap_id.attr,
34b13184 2706 &dev_attr_snap_features.attr,
dfc5606d
YS
2707 NULL,
2708};
2709
2710static struct attribute_group rbd_snap_attr_group = {
2711 .attrs = rbd_snap_attrs,
2712};
2713
2714static void rbd_snap_dev_release(struct device *dev)
2715{
2716 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2717 kfree(snap->name);
2718 kfree(snap);
2719}
2720
2721static const struct attribute_group *rbd_snap_attr_groups[] = {
2722 &rbd_snap_attr_group,
2723 NULL
2724};
2725
2726static struct device_type rbd_snap_device_type = {
2727 .groups = rbd_snap_attr_groups,
2728 .release = rbd_snap_dev_release,
2729};
2730
8b8fb99c
AE
2731static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2732{
2733 kref_get(&spec->kref);
2734
2735 return spec;
2736}
2737
2738static void rbd_spec_free(struct kref *kref);
2739static void rbd_spec_put(struct rbd_spec *spec)
2740{
2741 if (spec)
2742 kref_put(&spec->kref, rbd_spec_free);
2743}
2744
2745static struct rbd_spec *rbd_spec_alloc(void)
2746{
2747 struct rbd_spec *spec;
2748
2749 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2750 if (!spec)
2751 return NULL;
2752 kref_init(&spec->kref);
2753
8b8fb99c
AE
2754 return spec;
2755}
2756
2757static void rbd_spec_free(struct kref *kref)
2758{
2759 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2760
2761 kfree(spec->pool_name);
2762 kfree(spec->image_id);
2763 kfree(spec->image_name);
2764 kfree(spec->snap_name);
2765 kfree(spec);
2766}
2767
cc344fa1 2768static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
2769 struct rbd_spec *spec)
2770{
2771 struct rbd_device *rbd_dev;
2772
2773 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2774 if (!rbd_dev)
2775 return NULL;
2776
2777 spin_lock_init(&rbd_dev->lock);
6d292906 2778 rbd_dev->flags = 0;
c53d5893
AE
2779 INIT_LIST_HEAD(&rbd_dev->node);
2780 INIT_LIST_HEAD(&rbd_dev->snaps);
2781 init_rwsem(&rbd_dev->header_rwsem);
2782
2783 rbd_dev->spec = spec;
2784 rbd_dev->rbd_client = rbdc;
2785
0903e875
AE
2786 /* Initialize the layout used for all rbd requests */
2787
2788 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2789 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2790 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2791 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2792
c53d5893
AE
2793 return rbd_dev;
2794}
2795
2796static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2797{
86b00e0d 2798 rbd_spec_put(rbd_dev->parent_spec);
c53d5893
AE
2799 kfree(rbd_dev->header_name);
2800 rbd_put_client(rbd_dev->rbd_client);
2801 rbd_spec_put(rbd_dev->spec);
2802 kfree(rbd_dev);
2803}
2804
304f6808
AE
2805static bool rbd_snap_registered(struct rbd_snap *snap)
2806{
2807 bool ret = snap->dev.type == &rbd_snap_device_type;
2808 bool reg = device_is_registered(&snap->dev);
2809
2810 rbd_assert(!ret ^ reg);
2811
2812 return ret;
2813}
2814
41f38c2b 2815static void rbd_remove_snap_dev(struct rbd_snap *snap)
dfc5606d
YS
2816{
2817 list_del(&snap->node);
304f6808
AE
2818 if (device_is_registered(&snap->dev))
2819 device_unregister(&snap->dev);
dfc5606d
YS
2820}
2821
14e7085d 2822static int rbd_register_snap_dev(struct rbd_snap *snap,
dfc5606d
YS
2823 struct device *parent)
2824{
2825 struct device *dev = &snap->dev;
2826 int ret;
2827
2828 dev->type = &rbd_snap_device_type;
2829 dev->parent = parent;
2830 dev->release = rbd_snap_dev_release;
d4b125e9 2831 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
304f6808
AE
2832 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2833
dfc5606d
YS
2834 ret = device_register(dev);
2835
2836 return ret;
2837}
2838
4e891e0a 2839static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
c8d18425 2840 const char *snap_name,
34b13184
AE
2841 u64 snap_id, u64 snap_size,
2842 u64 snap_features)
dfc5606d 2843{
4e891e0a 2844 struct rbd_snap *snap;
dfc5606d 2845 int ret;
4e891e0a
AE
2846
2847 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
dfc5606d 2848 if (!snap)
4e891e0a
AE
2849 return ERR_PTR(-ENOMEM);
2850
2851 ret = -ENOMEM;
c8d18425 2852 snap->name = kstrdup(snap_name, GFP_KERNEL);
4e891e0a
AE
2853 if (!snap->name)
2854 goto err;
2855
c8d18425
AE
2856 snap->id = snap_id;
2857 snap->size = snap_size;
34b13184 2858 snap->features = snap_features;
4e891e0a
AE
2859
2860 return snap;
2861
dfc5606d
YS
2862err:
2863 kfree(snap->name);
2864 kfree(snap);
4e891e0a
AE
2865
2866 return ERR_PTR(ret);
dfc5606d
YS
2867}
2868
cd892126
AE
2869static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2870 u64 *snap_size, u64 *snap_features)
2871{
2872 char *snap_name;
2873
2874 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2875
2876 *snap_size = rbd_dev->header.snap_sizes[which];
2877 *snap_features = 0; /* No features for v1 */
2878
2879 /* Skip over names until we find the one we are looking for */
2880
2881 snap_name = rbd_dev->header.snap_names;
2882 while (which--)
2883 snap_name += strlen(snap_name) + 1;
2884
2885 return snap_name;
2886}
2887
9d475de5
AE
2888/*
2889 * Get the size and object order for an image snapshot, or if
2890 * snap_id is CEPH_NOSNAP, gets this information for the base
2891 * image.
2892 */
2893static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2894 u8 *order, u64 *snap_size)
2895{
2896 __le64 snapid = cpu_to_le64(snap_id);
2897 int ret;
2898 struct {
2899 u8 order;
2900 __le64 size;
2901 } __attribute__ ((packed)) size_buf = { 0 };
2902
36be9a76 2903 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5
AE
2904 "rbd", "get_size",
2905 (char *) &snapid, sizeof (snapid),
07b2391f 2906 (char *) &size_buf, sizeof (size_buf), NULL);
36be9a76 2907 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
2908 if (ret < 0)
2909 return ret;
2910
2911 *order = size_buf.order;
2912 *snap_size = le64_to_cpu(size_buf.size);
2913
2914 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2915 (unsigned long long) snap_id, (unsigned int) *order,
2916 (unsigned long long) *snap_size);
2917
2918 return 0;
2919}
2920
2921static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2922{
2923 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2924 &rbd_dev->header.obj_order,
2925 &rbd_dev->header.image_size);
2926}
2927
1e130199
AE
2928static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2929{
2930 void *reply_buf;
2931 int ret;
2932 void *p;
2933
2934 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2935 if (!reply_buf)
2936 return -ENOMEM;
2937
36be9a76 2938 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
1e130199
AE
2939 "rbd", "get_object_prefix",
2940 NULL, 0,
07b2391f 2941 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
36be9a76 2942 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
2943 if (ret < 0)
2944 goto out;
2945
2946 p = reply_buf;
2947 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2948 p + RBD_OBJ_PREFIX_LEN_MAX,
2949 NULL, GFP_NOIO);
2950
2951 if (IS_ERR(rbd_dev->header.object_prefix)) {
2952 ret = PTR_ERR(rbd_dev->header.object_prefix);
2953 rbd_dev->header.object_prefix = NULL;
2954 } else {
2955 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2956 }
2957
2958out:
2959 kfree(reply_buf);
2960
2961 return ret;
2962}
2963
b1b5402a
AE
2964static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2965 u64 *snap_features)
2966{
2967 __le64 snapid = cpu_to_le64(snap_id);
2968 struct {
2969 __le64 features;
2970 __le64 incompat;
2971 } features_buf = { 0 };
d889140c 2972 u64 incompat;
b1b5402a
AE
2973 int ret;
2974
36be9a76 2975 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a
AE
2976 "rbd", "get_features",
2977 (char *) &snapid, sizeof (snapid),
2978 (char *) &features_buf, sizeof (features_buf),
07b2391f 2979 NULL);
36be9a76 2980 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
2981 if (ret < 0)
2982 return ret;
d889140c
AE
2983
2984 incompat = le64_to_cpu(features_buf.incompat);
5cbf6f12 2985 if (incompat & ~RBD_FEATURES_SUPPORTED)
b8f5c6ed 2986 return -ENXIO;
d889140c 2987
b1b5402a
AE
2988 *snap_features = le64_to_cpu(features_buf.features);
2989
2990 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2991 (unsigned long long) snap_id,
2992 (unsigned long long) *snap_features,
2993 (unsigned long long) le64_to_cpu(features_buf.incompat));
2994
2995 return 0;
2996}
2997
2998static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2999{
3000 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3001 &rbd_dev->header.features);
3002}
3003
86b00e0d
AE
3004static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3005{
3006 struct rbd_spec *parent_spec;
3007 size_t size;
3008 void *reply_buf = NULL;
3009 __le64 snapid;
3010 void *p;
3011 void *end;
3012 char *image_id;
3013 u64 overlap;
86b00e0d
AE
3014 int ret;
3015
3016 parent_spec = rbd_spec_alloc();
3017 if (!parent_spec)
3018 return -ENOMEM;
3019
3020 size = sizeof (__le64) + /* pool_id */
3021 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3022 sizeof (__le64) + /* snap_id */
3023 sizeof (__le64); /* overlap */
3024 reply_buf = kmalloc(size, GFP_KERNEL);
3025 if (!reply_buf) {
3026 ret = -ENOMEM;
3027 goto out_err;
3028 }
3029
3030 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 3031 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d
AE
3032 "rbd", "get_parent",
3033 (char *) &snapid, sizeof (snapid),
07b2391f 3034 (char *) reply_buf, size, NULL);
36be9a76 3035 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
3036 if (ret < 0)
3037 goto out_err;
3038
3039 ret = -ERANGE;
3040 p = reply_buf;
3041 end = (char *) reply_buf + size;
3042 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3043 if (parent_spec->pool_id == CEPH_NOPOOL)
3044 goto out; /* No parent? No problem. */
3045
0903e875
AE
3046 /* The ceph file layout needs to fit pool id in 32 bits */
3047
3048 ret = -EIO;
3049 if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
3050 goto out;
3051
979ed480 3052 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
3053 if (IS_ERR(image_id)) {
3054 ret = PTR_ERR(image_id);
3055 goto out_err;
3056 }
3057 parent_spec->image_id = image_id;
3058 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3059 ceph_decode_64_safe(&p, end, overlap, out_err);
3060
3061 rbd_dev->parent_overlap = overlap;
3062 rbd_dev->parent_spec = parent_spec;
3063 parent_spec = NULL; /* rbd_dev now owns this */
3064out:
3065 ret = 0;
3066out_err:
3067 kfree(reply_buf);
3068 rbd_spec_put(parent_spec);
3069
3070 return ret;
3071}
3072
9e15b77d
AE
3073static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3074{
3075 size_t image_id_size;
3076 char *image_id;
3077 void *p;
3078 void *end;
3079 size_t size;
3080 void *reply_buf = NULL;
3081 size_t len = 0;
3082 char *image_name = NULL;
3083 int ret;
3084
3085 rbd_assert(!rbd_dev->spec->image_name);
3086
69e7a02f
AE
3087 len = strlen(rbd_dev->spec->image_id);
3088 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
3089 image_id = kmalloc(image_id_size, GFP_KERNEL);
3090 if (!image_id)
3091 return NULL;
3092
3093 p = image_id;
3094 end = (char *) image_id + image_id_size;
69e7a02f 3095 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
9e15b77d
AE
3096
3097 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3098 reply_buf = kmalloc(size, GFP_KERNEL);
3099 if (!reply_buf)
3100 goto out;
3101
36be9a76 3102 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
3103 "rbd", "dir_get_name",
3104 image_id, image_id_size,
07b2391f 3105 (char *) reply_buf, size, NULL);
9e15b77d
AE
3106 if (ret < 0)
3107 goto out;
3108 p = reply_buf;
3109 end = (char *) reply_buf + size;
3110 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3111 if (IS_ERR(image_name))
3112 image_name = NULL;
3113 else
3114 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3115out:
3116 kfree(reply_buf);
3117 kfree(image_id);
3118
3119 return image_name;
3120}
3121
3122/*
3123 * When a parent image gets probed, we only have the pool, image,
3124 * and snapshot ids but not the names of any of them. This call
3125 * is made later to fill in those names. It has to be done after
3126 * rbd_dev_snaps_update() has completed because some of the
3127 * information (in particular, snapshot name) is not available
3128 * until then.
3129 */
3130static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3131{
3132 struct ceph_osd_client *osdc;
3133 const char *name;
3134 void *reply_buf = NULL;
3135 int ret;
3136
3137 if (rbd_dev->spec->pool_name)
3138 return 0; /* Already have the names */
3139
3140 /* Look up the pool name */
3141
3142 osdc = &rbd_dev->rbd_client->client->osdc;
3143 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
935dc89f
AE
3144 if (!name) {
3145 rbd_warn(rbd_dev, "there is no pool with id %llu",
3146 rbd_dev->spec->pool_id); /* Really a BUG() */
3147 return -EIO;
3148 }
9e15b77d
AE
3149
3150 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3151 if (!rbd_dev->spec->pool_name)
3152 return -ENOMEM;
3153
3154 /* Fetch the image name; tolerate failure here */
3155
3156 name = rbd_dev_image_name(rbd_dev);
69e7a02f 3157 if (name)
9e15b77d 3158 rbd_dev->spec->image_name = (char *) name;
69e7a02f 3159 else
06ecc6cb 3160 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d
AE
3161
3162 /* Look up the snapshot name. */
3163
3164 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3165 if (!name) {
935dc89f
AE
3166 rbd_warn(rbd_dev, "no snapshot with id %llu",
3167 rbd_dev->spec->snap_id); /* Really a BUG() */
9e15b77d
AE
3168 ret = -EIO;
3169 goto out_err;
3170 }
3171 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3172 if(!rbd_dev->spec->snap_name)
3173 goto out_err;
3174
3175 return 0;
3176out_err:
3177 kfree(reply_buf);
3178 kfree(rbd_dev->spec->pool_name);
3179 rbd_dev->spec->pool_name = NULL;
3180
3181 return ret;
3182}
3183
6e14b1a6 3184static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
35d489f9
AE
3185{
3186 size_t size;
3187 int ret;
3188 void *reply_buf;
3189 void *p;
3190 void *end;
3191 u64 seq;
3192 u32 snap_count;
3193 struct ceph_snap_context *snapc;
3194 u32 i;
3195
3196 /*
3197 * We'll need room for the seq value (maximum snapshot id),
3198 * snapshot count, and array of that many snapshot ids.
3199 * For now we have a fixed upper limit on the number we're
3200 * prepared to receive.
3201 */
3202 size = sizeof (__le64) + sizeof (__le32) +
3203 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3204 reply_buf = kzalloc(size, GFP_KERNEL);
3205 if (!reply_buf)
3206 return -ENOMEM;
3207
36be9a76 3208 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
35d489f9
AE
3209 "rbd", "get_snapcontext",
3210 NULL, 0,
07b2391f 3211 reply_buf, size, ver);
36be9a76 3212 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
3213 if (ret < 0)
3214 goto out;
3215
3216 ret = -ERANGE;
3217 p = reply_buf;
3218 end = (char *) reply_buf + size;
3219 ceph_decode_64_safe(&p, end, seq, out);
3220 ceph_decode_32_safe(&p, end, snap_count, out);
3221
3222 /*
3223 * Make sure the reported number of snapshot ids wouldn't go
3224 * beyond the end of our buffer. But before checking that,
3225 * make sure the computed size of the snapshot context we
3226 * allocate is representable in a size_t.
3227 */
3228 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3229 / sizeof (u64)) {
3230 ret = -EINVAL;
3231 goto out;
3232 }
3233 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3234 goto out;
3235
3236 size = sizeof (struct ceph_snap_context) +
3237 snap_count * sizeof (snapc->snaps[0]);
3238 snapc = kmalloc(size, GFP_KERNEL);
3239 if (!snapc) {
3240 ret = -ENOMEM;
3241 goto out;
3242 }
3243
3244 atomic_set(&snapc->nref, 1);
3245 snapc->seq = seq;
3246 snapc->num_snaps = snap_count;
3247 for (i = 0; i < snap_count; i++)
3248 snapc->snaps[i] = ceph_decode_64(&p);
3249
3250 rbd_dev->header.snapc = snapc;
3251
3252 dout(" snap context seq = %llu, snap_count = %u\n",
3253 (unsigned long long) seq, (unsigned int) snap_count);
3254
3255out:
3256 kfree(reply_buf);
3257
3258 return 0;
3259}
3260
b8b1e2db
AE
3261static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3262{
3263 size_t size;
3264 void *reply_buf;
3265 __le64 snap_id;
3266 int ret;
3267 void *p;
3268 void *end;
b8b1e2db
AE
3269 char *snap_name;
3270
3271 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3272 reply_buf = kmalloc(size, GFP_KERNEL);
3273 if (!reply_buf)
3274 return ERR_PTR(-ENOMEM);
3275
3276 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
36be9a76 3277 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db
AE
3278 "rbd", "get_snapshot_name",
3279 (char *) &snap_id, sizeof (snap_id),
07b2391f 3280 reply_buf, size, NULL);
36be9a76 3281 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b8b1e2db
AE
3282 if (ret < 0)
3283 goto out;
3284
3285 p = reply_buf;
3286 end = (char *) reply_buf + size;
e5c35534 3287 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
b8b1e2db
AE
3288 if (IS_ERR(snap_name)) {
3289 ret = PTR_ERR(snap_name);
3290 goto out;
3291 } else {
3292 dout(" snap_id 0x%016llx snap_name = %s\n",
3293 (unsigned long long) le64_to_cpu(snap_id), snap_name);
3294 }
3295 kfree(reply_buf);
3296
3297 return snap_name;
3298out:
3299 kfree(reply_buf);
3300
3301 return ERR_PTR(ret);
3302}
3303
3304static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3305 u64 *snap_size, u64 *snap_features)
3306{
e0b49868 3307 u64 snap_id;
b8b1e2db
AE
3308 u8 order;
3309 int ret;
3310
3311 snap_id = rbd_dev->header.snapc->snaps[which];
3312 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3313 if (ret)
3314 return ERR_PTR(ret);
3315 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3316 if (ret)
3317 return ERR_PTR(ret);
3318
3319 return rbd_dev_v2_snap_name(rbd_dev, which);
3320}
3321
3322static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3323 u64 *snap_size, u64 *snap_features)
3324{
3325 if (rbd_dev->image_format == 1)
3326 return rbd_dev_v1_snap_info(rbd_dev, which,
3327 snap_size, snap_features);
3328 if (rbd_dev->image_format == 2)
3329 return rbd_dev_v2_snap_info(rbd_dev, which,
3330 snap_size, snap_features);
3331 return ERR_PTR(-EINVAL);
3332}
3333
117973fb
AE
3334static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3335{
3336 int ret;
3337 __u8 obj_order;
3338
3339 down_write(&rbd_dev->header_rwsem);
3340
3341 /* Grab old order first, to see if it changes */
3342
3343 obj_order = rbd_dev->header.obj_order,
3344 ret = rbd_dev_v2_image_size(rbd_dev);
3345 if (ret)
3346 goto out;
3347 if (rbd_dev->header.obj_order != obj_order) {
3348 ret = -EIO;
3349 goto out;
3350 }
3351 rbd_update_mapping_size(rbd_dev);
3352
3353 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3354 dout("rbd_dev_v2_snap_context returned %d\n", ret);
3355 if (ret)
3356 goto out;
3357 ret = rbd_dev_snaps_update(rbd_dev);
3358 dout("rbd_dev_snaps_update returned %d\n", ret);
3359 if (ret)
3360 goto out;
3361 ret = rbd_dev_snaps_register(rbd_dev);
3362 dout("rbd_dev_snaps_register returned %d\n", ret);
3363out:
3364 up_write(&rbd_dev->header_rwsem);
3365
3366 return ret;
3367}
3368
dfc5606d 3369/*
35938150
AE
3370 * Scan the rbd device's current snapshot list and compare it to the
3371 * newly-received snapshot context. Remove any existing snapshots
3372 * not present in the new snapshot context. Add a new snapshot for
3373 * any snaphots in the snapshot context not in the current list.
3374 * And verify there are no changes to snapshots we already know
3375 * about.
3376 *
3377 * Assumes the snapshots in the snapshot context are sorted by
3378 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
3379 * are also maintained in that order.)
dfc5606d 3380 */
304f6808 3381static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
dfc5606d 3382{
35938150
AE
3383 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3384 const u32 snap_count = snapc->num_snaps;
35938150
AE
3385 struct list_head *head = &rbd_dev->snaps;
3386 struct list_head *links = head->next;
3387 u32 index = 0;
dfc5606d 3388
9fcbb800 3389 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
35938150
AE
3390 while (index < snap_count || links != head) {
3391 u64 snap_id;
3392 struct rbd_snap *snap;
cd892126
AE
3393 char *snap_name;
3394 u64 snap_size = 0;
3395 u64 snap_features = 0;
dfc5606d 3396
35938150
AE
3397 snap_id = index < snap_count ? snapc->snaps[index]
3398 : CEPH_NOSNAP;
3399 snap = links != head ? list_entry(links, struct rbd_snap, node)
3400 : NULL;
aafb230e 3401 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
dfc5606d 3402
35938150
AE
3403 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3404 struct list_head *next = links->next;
dfc5606d 3405
6d292906
AE
3406 /*
3407 * A previously-existing snapshot is not in
3408 * the new snap context.
3409 *
3410 * If the now missing snapshot is the one the
3411 * image is mapped to, clear its exists flag
3412 * so we can avoid sending any more requests
3413 * to it.
3414 */
0d7dbfce 3415 if (rbd_dev->spec->snap_id == snap->id)
6d292906 3416 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
41f38c2b 3417 rbd_remove_snap_dev(snap);
9fcbb800 3418 dout("%ssnap id %llu has been removed\n",
0d7dbfce
AE
3419 rbd_dev->spec->snap_id == snap->id ?
3420 "mapped " : "",
9fcbb800 3421 (unsigned long long) snap->id);
35938150
AE
3422
3423 /* Done with this list entry; advance */
3424
3425 links = next;
dfc5606d
YS
3426 continue;
3427 }
35938150 3428
b8b1e2db
AE
3429 snap_name = rbd_dev_snap_info(rbd_dev, index,
3430 &snap_size, &snap_features);
cd892126
AE
3431 if (IS_ERR(snap_name))
3432 return PTR_ERR(snap_name);
3433
9fcbb800
AE
3434 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3435 (unsigned long long) snap_id);
35938150
AE
3436 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3437 struct rbd_snap *new_snap;
3438
3439 /* We haven't seen this snapshot before */
3440
c8d18425 3441 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
cd892126 3442 snap_id, snap_size, snap_features);
9fcbb800
AE
3443 if (IS_ERR(new_snap)) {
3444 int err = PTR_ERR(new_snap);
3445
3446 dout(" failed to add dev, error %d\n", err);
3447
3448 return err;
3449 }
35938150
AE
3450
3451 /* New goes before existing, or at end of list */
3452
9fcbb800 3453 dout(" added dev%s\n", snap ? "" : " at end\n");
35938150
AE
3454 if (snap)
3455 list_add_tail(&new_snap->node, &snap->node);
3456 else
523f3258 3457 list_add_tail(&new_snap->node, head);
35938150
AE
3458 } else {
3459 /* Already have this one */
3460
9fcbb800
AE
3461 dout(" already present\n");
3462
cd892126 3463 rbd_assert(snap->size == snap_size);
aafb230e 3464 rbd_assert(!strcmp(snap->name, snap_name));
cd892126 3465 rbd_assert(snap->features == snap_features);
35938150
AE
3466
3467 /* Done with this list entry; advance */
3468
3469 links = links->next;
dfc5606d 3470 }
35938150
AE
3471
3472 /* Advance to the next entry in the snapshot context */
3473
3474 index++;
dfc5606d 3475 }
9fcbb800 3476 dout("%s: done\n", __func__);
dfc5606d
YS
3477
3478 return 0;
3479}
3480
304f6808
AE
3481/*
3482 * Scan the list of snapshots and register the devices for any that
3483 * have not already been registered.
3484 */
3485static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3486{
3487 struct rbd_snap *snap;
3488 int ret = 0;
3489
37206ee5 3490 dout("%s:\n", __func__);
86ff77bb
AE
3491 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3492 return -EIO;
304f6808
AE
3493
3494 list_for_each_entry(snap, &rbd_dev->snaps, node) {
3495 if (!rbd_snap_registered(snap)) {
3496 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3497 if (ret < 0)
3498 break;
3499 }
3500 }
3501 dout("%s: returning %d\n", __func__, ret);
3502
3503 return ret;
3504}
3505
dfc5606d
YS
3506static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3507{
dfc5606d 3508 struct device *dev;
cd789ab9 3509 int ret;
dfc5606d
YS
3510
3511 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
dfc5606d 3512
cd789ab9 3513 dev = &rbd_dev->dev;
dfc5606d
YS
3514 dev->bus = &rbd_bus_type;
3515 dev->type = &rbd_device_type;
3516 dev->parent = &rbd_root_dev;
3517 dev->release = rbd_dev_release;
de71a297 3518 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 3519 ret = device_register(dev);
dfc5606d 3520
dfc5606d 3521 mutex_unlock(&ctl_mutex);
cd789ab9 3522
dfc5606d 3523 return ret;
602adf40
YS
3524}
3525
dfc5606d
YS
3526static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3527{
3528 device_unregister(&rbd_dev->dev);
3529}
3530
e2839308 3531static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
3532
3533/*
499afd5b
AE
3534 * Get a unique rbd identifier for the given new rbd_dev, and add
3535 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 3536 */
e2839308 3537static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 3538{
e2839308 3539 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
3540
3541 spin_lock(&rbd_dev_list_lock);
3542 list_add_tail(&rbd_dev->node, &rbd_dev_list);
3543 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
3544 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3545 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 3546}
b7f23c36 3547
1ddbe94e 3548/*
499afd5b
AE
3549 * Remove an rbd_dev from the global list, and record that its
3550 * identifier is no longer in use.
1ddbe94e 3551 */
e2839308 3552static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 3553{
d184f6bf 3554 struct list_head *tmp;
de71a297 3555 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
3556 int max_id;
3557
aafb230e 3558 rbd_assert(rbd_id > 0);
499afd5b 3559
e2839308
AE
3560 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3561 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
3562 spin_lock(&rbd_dev_list_lock);
3563 list_del_init(&rbd_dev->node);
d184f6bf
AE
3564
3565 /*
3566 * If the id being "put" is not the current maximum, there
3567 * is nothing special we need to do.
3568 */
e2839308 3569 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
3570 spin_unlock(&rbd_dev_list_lock);
3571 return;
3572 }
3573
3574 /*
3575 * We need to update the current maximum id. Search the
3576 * list to find out what it is. We're more likely to find
3577 * the maximum at the end, so search the list backward.
3578 */
3579 max_id = 0;
3580 list_for_each_prev(tmp, &rbd_dev_list) {
3581 struct rbd_device *rbd_dev;
3582
3583 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
3584 if (rbd_dev->dev_id > max_id)
3585 max_id = rbd_dev->dev_id;
d184f6bf 3586 }
499afd5b 3587 spin_unlock(&rbd_dev_list_lock);
b7f23c36 3588
1ddbe94e 3589 /*
e2839308 3590 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
3591 * which case it now accurately reflects the new maximum.
3592 * Be careful not to overwrite the maximum value in that
3593 * case.
1ddbe94e 3594 */
e2839308
AE
3595 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3596 dout(" max dev id has been reset\n");
b7f23c36
AE
3597}
3598
e28fff26
AE
3599/*
3600 * Skips over white space at *buf, and updates *buf to point to the
3601 * first found non-space character (if any). Returns the length of
593a9e7b
AE
3602 * the token (string of non-white space characters) found. Note
3603 * that *buf must be terminated with '\0'.
e28fff26
AE
3604 */
3605static inline size_t next_token(const char **buf)
3606{
3607 /*
3608 * These are the characters that produce nonzero for
3609 * isspace() in the "C" and "POSIX" locales.
3610 */
3611 const char *spaces = " \f\n\r\t\v";
3612
3613 *buf += strspn(*buf, spaces); /* Find start of token */
3614
3615 return strcspn(*buf, spaces); /* Return token length */
3616}
3617
3618/*
3619 * Finds the next token in *buf, and if the provided token buffer is
3620 * big enough, copies the found token into it. The result, if
593a9e7b
AE
3621 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3622 * must be terminated with '\0' on entry.
e28fff26
AE
3623 *
3624 * Returns the length of the token found (not including the '\0').
3625 * Return value will be 0 if no token is found, and it will be >=
3626 * token_size if the token would not fit.
3627 *
593a9e7b 3628 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
3629 * found token. Note that this occurs even if the token buffer is
3630 * too small to hold it.
3631 */
3632static inline size_t copy_token(const char **buf,
3633 char *token,
3634 size_t token_size)
3635{
3636 size_t len;
3637
3638 len = next_token(buf);
3639 if (len < token_size) {
3640 memcpy(token, *buf, len);
3641 *(token + len) = '\0';
3642 }
3643 *buf += len;
3644
3645 return len;
3646}
3647
ea3352f4
AE
3648/*
3649 * Finds the next token in *buf, dynamically allocates a buffer big
3650 * enough to hold a copy of it, and copies the token into the new
3651 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3652 * that a duplicate buffer is created even for a zero-length token.
3653 *
3654 * Returns a pointer to the newly-allocated duplicate, or a null
3655 * pointer if memory for the duplicate was not available. If
3656 * the lenp argument is a non-null pointer, the length of the token
3657 * (not including the '\0') is returned in *lenp.
3658 *
3659 * If successful, the *buf pointer will be updated to point beyond
3660 * the end of the found token.
3661 *
3662 * Note: uses GFP_KERNEL for allocation.
3663 */
3664static inline char *dup_token(const char **buf, size_t *lenp)
3665{
3666 char *dup;
3667 size_t len;
3668
3669 len = next_token(buf);
4caf35f9 3670 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
3671 if (!dup)
3672 return NULL;
ea3352f4
AE
3673 *(dup + len) = '\0';
3674 *buf += len;
3675
3676 if (lenp)
3677 *lenp = len;
3678
3679 return dup;
3680}
3681
a725f65e 3682/*
859c31df
AE
3683 * Parse the options provided for an "rbd add" (i.e., rbd image
3684 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
3685 * and the data written is passed here via a NUL-terminated buffer.
3686 * Returns 0 if successful or an error code otherwise.
d22f76e7 3687 *
859c31df
AE
3688 * The information extracted from these options is recorded in
3689 * the other parameters which return dynamically-allocated
3690 * structures:
3691 * ceph_opts
3692 * The address of a pointer that will refer to a ceph options
3693 * structure. Caller must release the returned pointer using
3694 * ceph_destroy_options() when it is no longer needed.
3695 * rbd_opts
3696 * Address of an rbd options pointer. Fully initialized by
3697 * this function; caller must release with kfree().
3698 * spec
3699 * Address of an rbd image specification pointer. Fully
3700 * initialized by this function based on parsed options.
3701 * Caller must release with rbd_spec_put().
3702 *
3703 * The options passed take this form:
3704 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3705 * where:
3706 * <mon_addrs>
3707 * A comma-separated list of one or more monitor addresses.
3708 * A monitor address is an ip address, optionally followed
3709 * by a port number (separated by a colon).
3710 * I.e.: ip1[:port1][,ip2[:port2]...]
3711 * <options>
3712 * A comma-separated list of ceph and/or rbd options.
3713 * <pool_name>
3714 * The name of the rados pool containing the rbd image.
3715 * <image_name>
3716 * The name of the image in that pool to map.
3717 * <snap_id>
3718 * An optional snapshot id. If provided, the mapping will
3719 * present data from the image at the time that snapshot was
3720 * created. The image head is used if no snapshot id is
3721 * provided. Snapshot mappings are always read-only.
a725f65e 3722 */
859c31df 3723static int rbd_add_parse_args(const char *buf,
dc79b113 3724 struct ceph_options **ceph_opts,
859c31df
AE
3725 struct rbd_options **opts,
3726 struct rbd_spec **rbd_spec)
e28fff26 3727{
d22f76e7 3728 size_t len;
859c31df 3729 char *options;
0ddebc0c
AE
3730 const char *mon_addrs;
3731 size_t mon_addrs_size;
859c31df 3732 struct rbd_spec *spec = NULL;
4e9afeba 3733 struct rbd_options *rbd_opts = NULL;
859c31df 3734 struct ceph_options *copts;
dc79b113 3735 int ret;
e28fff26
AE
3736
3737 /* The first four tokens are required */
3738
7ef3214a 3739 len = next_token(&buf);
4fb5d671
AE
3740 if (!len) {
3741 rbd_warn(NULL, "no monitor address(es) provided");
3742 return -EINVAL;
3743 }
0ddebc0c 3744 mon_addrs = buf;
f28e565a 3745 mon_addrs_size = len + 1;
7ef3214a 3746 buf += len;
a725f65e 3747
dc79b113 3748 ret = -EINVAL;
f28e565a
AE
3749 options = dup_token(&buf, NULL);
3750 if (!options)
dc79b113 3751 return -ENOMEM;
4fb5d671
AE
3752 if (!*options) {
3753 rbd_warn(NULL, "no options provided");
3754 goto out_err;
3755 }
e28fff26 3756
859c31df
AE
3757 spec = rbd_spec_alloc();
3758 if (!spec)
f28e565a 3759 goto out_mem;
859c31df
AE
3760
3761 spec->pool_name = dup_token(&buf, NULL);
3762 if (!spec->pool_name)
3763 goto out_mem;
4fb5d671
AE
3764 if (!*spec->pool_name) {
3765 rbd_warn(NULL, "no pool name provided");
3766 goto out_err;
3767 }
e28fff26 3768
69e7a02f 3769 spec->image_name = dup_token(&buf, NULL);
859c31df 3770 if (!spec->image_name)
f28e565a 3771 goto out_mem;
4fb5d671
AE
3772 if (!*spec->image_name) {
3773 rbd_warn(NULL, "no image name provided");
3774 goto out_err;
3775 }
d4b125e9 3776
f28e565a
AE
3777 /*
3778 * Snapshot name is optional; default is to use "-"
3779 * (indicating the head/no snapshot).
3780 */
3feeb894 3781 len = next_token(&buf);
820a5f3e 3782 if (!len) {
3feeb894
AE
3783 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3784 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 3785 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 3786 ret = -ENAMETOOLONG;
f28e565a 3787 goto out_err;
849b4260 3788 }
4caf35f9 3789 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
859c31df 3790 if (!spec->snap_name)
f28e565a 3791 goto out_mem;
859c31df 3792 *(spec->snap_name + len) = '\0';
e5c35534 3793
0ddebc0c 3794 /* Initialize all rbd options to the defaults */
e28fff26 3795
4e9afeba
AE
3796 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3797 if (!rbd_opts)
3798 goto out_mem;
3799
3800 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 3801
859c31df 3802 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 3803 mon_addrs + mon_addrs_size - 1,
4e9afeba 3804 parse_rbd_opts_token, rbd_opts);
859c31df
AE
3805 if (IS_ERR(copts)) {
3806 ret = PTR_ERR(copts);
dc79b113
AE
3807 goto out_err;
3808 }
859c31df
AE
3809 kfree(options);
3810
3811 *ceph_opts = copts;
4e9afeba 3812 *opts = rbd_opts;
859c31df 3813 *rbd_spec = spec;
0ddebc0c 3814
dc79b113 3815 return 0;
f28e565a 3816out_mem:
dc79b113 3817 ret = -ENOMEM;
d22f76e7 3818out_err:
859c31df
AE
3819 kfree(rbd_opts);
3820 rbd_spec_put(spec);
f28e565a 3821 kfree(options);
d22f76e7 3822
dc79b113 3823 return ret;
a725f65e
AE
3824}
3825
589d30e0
AE
3826/*
3827 * An rbd format 2 image has a unique identifier, distinct from the
3828 * name given to it by the user. Internally, that identifier is
3829 * what's used to specify the names of objects related to the image.
3830 *
3831 * A special "rbd id" object is used to map an rbd image name to its
3832 * id. If that object doesn't exist, then there is no v2 rbd image
3833 * with the supplied name.
3834 *
3835 * This function will record the given rbd_dev's image_id field if
3836 * it can be determined, and in that case will return 0. If any
3837 * errors occur a negative errno will be returned and the rbd_dev's
3838 * image_id field will be unchanged (and should be NULL).
3839 */
3840static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3841{
3842 int ret;
3843 size_t size;
3844 char *object_name;
3845 void *response;
3846 void *p;
3847
2f82ee54
AE
3848 /* If we already have it we don't need to look it up */
3849
3850 if (rbd_dev->spec->image_id)
3851 return 0;
3852
2c0d0a10
AE
3853 /*
3854 * When probing a parent image, the image id is already
3855 * known (and the image name likely is not). There's no
3856 * need to fetch the image id again in this case.
3857 */
3858 if (rbd_dev->spec->image_id)
3859 return 0;
3860
589d30e0
AE
3861 /*
3862 * First, see if the format 2 image id file exists, and if
3863 * so, get the image's persistent id from it.
3864 */
69e7a02f 3865 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
3866 object_name = kmalloc(size, GFP_NOIO);
3867 if (!object_name)
3868 return -ENOMEM;
0d7dbfce 3869 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
3870 dout("rbd id object name is %s\n", object_name);
3871
3872 /* Response will be an encoded string, which includes a length */
3873
3874 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3875 response = kzalloc(size, GFP_NOIO);
3876 if (!response) {
3877 ret = -ENOMEM;
3878 goto out;
3879 }
3880
36be9a76 3881 ret = rbd_obj_method_sync(rbd_dev, object_name,
589d30e0
AE
3882 "rbd", "get_id",
3883 NULL, 0,
07b2391f 3884 response, RBD_IMAGE_ID_LEN_MAX, NULL);
36be9a76 3885 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
589d30e0
AE
3886 if (ret < 0)
3887 goto out;
3888
3889 p = response;
0d7dbfce 3890 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
589d30e0 3891 p + RBD_IMAGE_ID_LEN_MAX,
979ed480 3892 NULL, GFP_NOIO);
0d7dbfce
AE
3893 if (IS_ERR(rbd_dev->spec->image_id)) {
3894 ret = PTR_ERR(rbd_dev->spec->image_id);
3895 rbd_dev->spec->image_id = NULL;
589d30e0 3896 } else {
0d7dbfce 3897 dout("image_id is %s\n", rbd_dev->spec->image_id);
589d30e0
AE
3898 }
3899out:
3900 kfree(response);
3901 kfree(object_name);
3902
3903 return ret;
3904}
3905
a30b71b9
AE
3906static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3907{
3908 int ret;
3909 size_t size;
3910
3911 /* Version 1 images have no id; empty string is used */
3912
0d7dbfce
AE
3913 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3914 if (!rbd_dev->spec->image_id)
a30b71b9 3915 return -ENOMEM;
a30b71b9
AE
3916
3917 /* Record the header object name for this rbd image. */
3918
69e7a02f 3919 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
a30b71b9
AE
3920 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3921 if (!rbd_dev->header_name) {
3922 ret = -ENOMEM;
3923 goto out_err;
3924 }
0d7dbfce
AE
3925 sprintf(rbd_dev->header_name, "%s%s",
3926 rbd_dev->spec->image_name, RBD_SUFFIX);
a30b71b9
AE
3927
3928 /* Populate rbd image metadata */
3929
3930 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3931 if (ret < 0)
3932 goto out_err;
86b00e0d
AE
3933
3934 /* Version 1 images have no parent (no layering) */
3935
3936 rbd_dev->parent_spec = NULL;
3937 rbd_dev->parent_overlap = 0;
3938
a30b71b9
AE
3939 rbd_dev->image_format = 1;
3940
3941 dout("discovered version 1 image, header name is %s\n",
3942 rbd_dev->header_name);
3943
3944 return 0;
3945
3946out_err:
3947 kfree(rbd_dev->header_name);
3948 rbd_dev->header_name = NULL;
0d7dbfce
AE
3949 kfree(rbd_dev->spec->image_id);
3950 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
3951
3952 return ret;
3953}
3954
3955static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3956{
3957 size_t size;
9d475de5 3958 int ret;
6e14b1a6 3959 u64 ver = 0;
a30b71b9
AE
3960
3961 /*
3962 * Image id was filled in by the caller. Record the header
3963 * object name for this rbd image.
3964 */
979ed480 3965 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
a30b71b9
AE
3966 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3967 if (!rbd_dev->header_name)
3968 return -ENOMEM;
3969 sprintf(rbd_dev->header_name, "%s%s",
0d7dbfce 3970 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
9d475de5
AE
3971
3972 /* Get the size and object order for the image */
3973
3974 ret = rbd_dev_v2_image_size(rbd_dev);
1e130199
AE
3975 if (ret < 0)
3976 goto out_err;
3977
3978 /* Get the object prefix (a.k.a. block_name) for the image */
3979
3980 ret = rbd_dev_v2_object_prefix(rbd_dev);
b1b5402a
AE
3981 if (ret < 0)
3982 goto out_err;
3983
d889140c 3984 /* Get the and check features for the image */
b1b5402a
AE
3985
3986 ret = rbd_dev_v2_features(rbd_dev);
9d475de5
AE
3987 if (ret < 0)
3988 goto out_err;
35d489f9 3989
86b00e0d
AE
3990 /* If the image supports layering, get the parent info */
3991
3992 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3993 ret = rbd_dev_v2_parent_info(rbd_dev);
3994 if (ret < 0)
3995 goto out_err;
3996 }
3997
6e14b1a6
AE
3998 /* crypto and compression type aren't (yet) supported for v2 images */
3999
4000 rbd_dev->header.crypt_type = 0;
4001 rbd_dev->header.comp_type = 0;
35d489f9 4002
6e14b1a6
AE
4003 /* Get the snapshot context, plus the header version */
4004
4005 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
35d489f9
AE
4006 if (ret)
4007 goto out_err;
6e14b1a6
AE
4008 rbd_dev->header.obj_version = ver;
4009
a30b71b9
AE
4010 rbd_dev->image_format = 2;
4011
4012 dout("discovered version 2 image, header name is %s\n",
4013 rbd_dev->header_name);
4014
35152979 4015 return 0;
9d475de5 4016out_err:
86b00e0d
AE
4017 rbd_dev->parent_overlap = 0;
4018 rbd_spec_put(rbd_dev->parent_spec);
4019 rbd_dev->parent_spec = NULL;
9d475de5
AE
4020 kfree(rbd_dev->header_name);
4021 rbd_dev->header_name = NULL;
1e130199
AE
4022 kfree(rbd_dev->header.object_prefix);
4023 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
4024
4025 return ret;
a30b71b9
AE
4026}
4027
83a06263
AE
4028static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4029{
2f82ee54
AE
4030 struct rbd_device *parent = NULL;
4031 struct rbd_spec *parent_spec = NULL;
4032 struct rbd_client *rbdc = NULL;
83a06263
AE
4033 int ret;
4034
4035 /* no need to lock here, as rbd_dev is not registered yet */
4036 ret = rbd_dev_snaps_update(rbd_dev);
4037 if (ret)
4038 return ret;
4039
9e15b77d
AE
4040 ret = rbd_dev_probe_update_spec(rbd_dev);
4041 if (ret)
4042 goto err_out_snaps;
4043
83a06263
AE
4044 ret = rbd_dev_set_mapping(rbd_dev);
4045 if (ret)
4046 goto err_out_snaps;
4047
4048 /* generate unique id: find highest unique id, add one */
4049 rbd_dev_id_get(rbd_dev);
4050
4051 /* Fill in the device name, now that we have its id. */
4052 BUILD_BUG_ON(DEV_NAME_LEN
4053 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4054 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4055
4056 /* Get our block major device number. */
4057
4058 ret = register_blkdev(0, rbd_dev->name);
4059 if (ret < 0)
4060 goto err_out_id;
4061 rbd_dev->major = ret;
4062
4063 /* Set up the blkdev mapping. */
4064
4065 ret = rbd_init_disk(rbd_dev);
4066 if (ret)
4067 goto err_out_blkdev;
4068
4069 ret = rbd_bus_add_dev(rbd_dev);
4070 if (ret)
4071 goto err_out_disk;
4072
4073 /*
4074 * At this point cleanup in the event of an error is the job
4075 * of the sysfs code (initiated by rbd_bus_del_dev()).
4076 */
2f82ee54
AE
4077 /* Probe the parent if there is one */
4078
4079 if (rbd_dev->parent_spec) {
4080 /*
4081 * We need to pass a reference to the client and the
4082 * parent spec when creating the parent rbd_dev.
4083 * Images related by parent/child relationships
4084 * always share both.
4085 */
4086 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4087 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4088
4089 parent = rbd_dev_create(rbdc, parent_spec);
4090 if (!parent) {
4091 ret = -ENOMEM;
4092 goto err_out_spec;
4093 }
4094 rbdc = NULL; /* parent now owns reference */
4095 parent_spec = NULL; /* parent now owns reference */
4096 ret = rbd_dev_probe(parent);
4097 if (ret < 0)
4098 goto err_out_parent;
4099 rbd_dev->parent = parent;
4100 }
4101
83a06263
AE
4102 down_write(&rbd_dev->header_rwsem);
4103 ret = rbd_dev_snaps_register(rbd_dev);
4104 up_write(&rbd_dev->header_rwsem);
4105 if (ret)
4106 goto err_out_bus;
4107
9969ebc5 4108 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
83a06263
AE
4109 if (ret)
4110 goto err_out_bus;
4111
4112 /* Everything's ready. Announce the disk to the world. */
4113
4114 add_disk(rbd_dev->disk);
4115
4116 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4117 (unsigned long long) rbd_dev->mapping.size);
4118
4119 return ret;
2f82ee54
AE
4120
4121err_out_parent:
4122 rbd_dev_destroy(parent);
4123err_out_spec:
4124 rbd_spec_put(parent_spec);
4125 rbd_put_client(rbdc);
83a06263
AE
4126err_out_bus:
4127 /* this will also clean up rest of rbd_dev stuff */
4128
4129 rbd_bus_del_dev(rbd_dev);
4130
4131 return ret;
4132err_out_disk:
4133 rbd_free_disk(rbd_dev);
4134err_out_blkdev:
4135 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4136err_out_id:
4137 rbd_dev_id_put(rbd_dev);
4138err_out_snaps:
4139 rbd_remove_all_snaps(rbd_dev);
4140
4141 return ret;
4142}
4143
a30b71b9
AE
4144/*
4145 * Probe for the existence of the header object for the given rbd
4146 * device. For format 2 images this includes determining the image
4147 * id.
4148 */
4149static int rbd_dev_probe(struct rbd_device *rbd_dev)
4150{
4151 int ret;
4152
4153 /*
4154 * Get the id from the image id object. If it's not a
4155 * format 2 image, we'll get ENOENT back, and we'll assume
4156 * it's a format 1 image.
4157 */
4158 ret = rbd_dev_image_id(rbd_dev);
4159 if (ret)
4160 ret = rbd_dev_v1_probe(rbd_dev);
4161 else
4162 ret = rbd_dev_v2_probe(rbd_dev);
83a06263 4163 if (ret) {
a30b71b9
AE
4164 dout("probe failed, returning %d\n", ret);
4165
83a06263
AE
4166 return ret;
4167 }
4168
4169 ret = rbd_dev_probe_finish(rbd_dev);
4170 if (ret)
4171 rbd_header_free(&rbd_dev->header);
4172
a30b71b9
AE
4173 return ret;
4174}
4175
59c2be1e
YS
4176static ssize_t rbd_add(struct bus_type *bus,
4177 const char *buf,
4178 size_t count)
602adf40 4179{
cb8627c7 4180 struct rbd_device *rbd_dev = NULL;
dc79b113 4181 struct ceph_options *ceph_opts = NULL;
4e9afeba 4182 struct rbd_options *rbd_opts = NULL;
859c31df 4183 struct rbd_spec *spec = NULL;
9d3997fd 4184 struct rbd_client *rbdc;
27cc2594
AE
4185 struct ceph_osd_client *osdc;
4186 int rc = -ENOMEM;
602adf40
YS
4187
4188 if (!try_module_get(THIS_MODULE))
4189 return -ENODEV;
4190
602adf40 4191 /* parse add command */
859c31df 4192 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 4193 if (rc < 0)
bd4ba655 4194 goto err_out_module;
78cea76e 4195
9d3997fd
AE
4196 rbdc = rbd_get_client(ceph_opts);
4197 if (IS_ERR(rbdc)) {
4198 rc = PTR_ERR(rbdc);
0ddebc0c 4199 goto err_out_args;
9d3997fd 4200 }
c53d5893 4201 ceph_opts = NULL; /* rbd_dev client now owns this */
602adf40 4202
602adf40 4203 /* pick the pool */
9d3997fd 4204 osdc = &rbdc->client->osdc;
859c31df 4205 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
4206 if (rc < 0)
4207 goto err_out_client;
859c31df
AE
4208 spec->pool_id = (u64) rc;
4209
0903e875
AE
4210 /* The ceph file layout needs to fit pool id in 32 bits */
4211
4212 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4213 rc = -EIO;
4214 goto err_out_client;
4215 }
4216
c53d5893 4217 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
4218 if (!rbd_dev)
4219 goto err_out_client;
c53d5893
AE
4220 rbdc = NULL; /* rbd_dev now owns this */
4221 spec = NULL; /* rbd_dev now owns this */
602adf40 4222
bd4ba655 4223 rbd_dev->mapping.read_only = rbd_opts->read_only;
c53d5893
AE
4224 kfree(rbd_opts);
4225 rbd_opts = NULL; /* done with this */
bd4ba655 4226
a30b71b9
AE
4227 rc = rbd_dev_probe(rbd_dev);
4228 if (rc < 0)
c53d5893 4229 goto err_out_rbd_dev;
05fd6f6f 4230
602adf40 4231 return count;
c53d5893
AE
4232err_out_rbd_dev:
4233 rbd_dev_destroy(rbd_dev);
bd4ba655 4234err_out_client:
9d3997fd 4235 rbd_put_client(rbdc);
0ddebc0c 4236err_out_args:
78cea76e
AE
4237 if (ceph_opts)
4238 ceph_destroy_options(ceph_opts);
4e9afeba 4239 kfree(rbd_opts);
859c31df 4240 rbd_spec_put(spec);
bd4ba655
AE
4241err_out_module:
4242 module_put(THIS_MODULE);
27cc2594 4243
602adf40 4244 dout("Error adding device %s\n", buf);
27cc2594
AE
4245
4246 return (ssize_t) rc;
602adf40
YS
4247}
4248
de71a297 4249static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
4250{
4251 struct list_head *tmp;
4252 struct rbd_device *rbd_dev;
4253
e124a82f 4254 spin_lock(&rbd_dev_list_lock);
602adf40
YS
4255 list_for_each(tmp, &rbd_dev_list) {
4256 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 4257 if (rbd_dev->dev_id == dev_id) {
e124a82f 4258 spin_unlock(&rbd_dev_list_lock);
602adf40 4259 return rbd_dev;
e124a82f 4260 }
602adf40 4261 }
e124a82f 4262 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
4263 return NULL;
4264}
4265
dfc5606d 4266static void rbd_dev_release(struct device *dev)
602adf40 4267{
593a9e7b 4268 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4269
59c2be1e 4270 if (rbd_dev->watch_event)
9969ebc5 4271 rbd_dev_header_watch_sync(rbd_dev, 0);
602adf40
YS
4272
4273 /* clean up and free blkdev */
4274 rbd_free_disk(rbd_dev);
4275 unregister_blkdev(rbd_dev->major, rbd_dev->name);
32eec68d 4276
2ac4e75d
AE
4277 /* release allocated disk header fields */
4278 rbd_header_free(&rbd_dev->header);
4279
32eec68d 4280 /* done with the id, and with the rbd_dev */
e2839308 4281 rbd_dev_id_put(rbd_dev);
c53d5893
AE
4282 rbd_assert(rbd_dev->rbd_client != NULL);
4283 rbd_dev_destroy(rbd_dev);
602adf40
YS
4284
4285 /* release module ref */
4286 module_put(THIS_MODULE);
602adf40
YS
4287}
4288
2f82ee54
AE
4289static void __rbd_remove(struct rbd_device *rbd_dev)
4290{
4291 rbd_remove_all_snaps(rbd_dev);
4292 rbd_bus_del_dev(rbd_dev);
4293}
4294
dfc5606d
YS
4295static ssize_t rbd_remove(struct bus_type *bus,
4296 const char *buf,
4297 size_t count)
602adf40
YS
4298{
4299 struct rbd_device *rbd_dev = NULL;
4300 int target_id, rc;
4301 unsigned long ul;
4302 int ret = count;
4303
4304 rc = strict_strtoul(buf, 10, &ul);
4305 if (rc)
4306 return rc;
4307
4308 /* convert to int; abort if we lost anything in the conversion */
4309 target_id = (int) ul;
4310 if (target_id != ul)
4311 return -EINVAL;
4312
4313 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4314
4315 rbd_dev = __rbd_get_dev(target_id);
4316 if (!rbd_dev) {
4317 ret = -ENOENT;
4318 goto done;
42382b70
AE
4319 }
4320
a14ea269 4321 spin_lock_irq(&rbd_dev->lock);
b82d167b 4322 if (rbd_dev->open_count)
42382b70 4323 ret = -EBUSY;
b82d167b
AE
4324 else
4325 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
a14ea269 4326 spin_unlock_irq(&rbd_dev->lock);
b82d167b 4327 if (ret < 0)
42382b70 4328 goto done;
602adf40 4329
2f82ee54
AE
4330 while (rbd_dev->parent_spec) {
4331 struct rbd_device *first = rbd_dev;
4332 struct rbd_device *second = first->parent;
4333 struct rbd_device *third;
4334
4335 /*
4336 * Follow to the parent with no grandparent and
4337 * remove it.
4338 */
4339 while (second && (third = second->parent)) {
4340 first = second;
4341 second = third;
4342 }
4343 __rbd_remove(second);
4344 rbd_spec_put(first->parent_spec);
4345 first->parent_spec = NULL;
4346 first->parent_overlap = 0;
4347 first->parent = NULL;
4348 }
4349 __rbd_remove(rbd_dev);
602adf40
YS
4350
4351done:
4352 mutex_unlock(&ctl_mutex);
aafb230e 4353
602adf40
YS
4354 return ret;
4355}
4356
602adf40
YS
4357/*
4358 * create control files in sysfs
dfc5606d 4359 * /sys/bus/rbd/...
602adf40
YS
4360 */
4361static int rbd_sysfs_init(void)
4362{
dfc5606d 4363 int ret;
602adf40 4364
fed4c143 4365 ret = device_register(&rbd_root_dev);
21079786 4366 if (ret < 0)
dfc5606d 4367 return ret;
602adf40 4368
fed4c143
AE
4369 ret = bus_register(&rbd_bus_type);
4370 if (ret < 0)
4371 device_unregister(&rbd_root_dev);
602adf40 4372
602adf40
YS
4373 return ret;
4374}
4375
4376static void rbd_sysfs_cleanup(void)
4377{
dfc5606d 4378 bus_unregister(&rbd_bus_type);
fed4c143 4379 device_unregister(&rbd_root_dev);
602adf40
YS
4380}
4381
cc344fa1 4382static int __init rbd_init(void)
602adf40
YS
4383{
4384 int rc;
4385
1e32d34c
AE
4386 if (!libceph_compatible(NULL)) {
4387 rbd_warn(NULL, "libceph incompatibility (quitting)");
4388
4389 return -EINVAL;
4390 }
602adf40
YS
4391 rc = rbd_sysfs_init();
4392 if (rc)
4393 return rc;
f0f8cef5 4394 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
4395 return 0;
4396}
4397
cc344fa1 4398static void __exit rbd_exit(void)
602adf40
YS
4399{
4400 rbd_sysfs_cleanup();
4401}
4402
4403module_init(rbd_init);
4404module_exit(rbd_exit);
4405
4406MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4407MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4408MODULE_DESCRIPTION("rados block device");
4409
4410/* following authorship retained from original osdblk.c */
4411MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4412
4413MODULE_LICENSE("GPL");