rbd: record image-relative offset in object requests
[linux-block.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
aafb230e
AE
44#define RBD_DEBUG /* Activate rbd_assert() calls */
45
593a9e7b
AE
46/*
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52#define SECTOR_SHIFT 9
53#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54
f0f8cef5
AE
55#define RBD_DRV_NAME "rbd"
56#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
57
58#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
59
d4b125e9
AE
60#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
61#define RBD_MAX_SNAP_NAME_LEN \
62 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
35d489f9 64#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
65
66#define RBD_SNAP_HEAD_NAME "-"
67
9e15b77d
AE
68/* This allows a single page to hold an image name sent by OSD */
69#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 70#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 71
1e130199 72#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 73
d889140c
AE
74/* Feature bits */
75
5cbf6f12
AE
76#define RBD_FEATURE_LAYERING (1<<0)
77#define RBD_FEATURE_STRIPINGV2 (1<<1)
78#define RBD_FEATURES_ALL \
79 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
80
81/* Features supported by this (client software) implementation. */
82
5cbf6f12 83#define RBD_FEATURES_SUPPORTED (0)
d889140c 84
81a89793
AE
85/*
86 * An RBD device name will be "rbd#", where the "rbd" comes from
87 * RBD_DRV_NAME above, and # is a unique integer identifier.
88 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89 * enough to hold all possible device names.
90 */
602adf40 91#define DEV_NAME_LEN 32
81a89793 92#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
93
94/*
95 * block device image metadata (in-memory version)
96 */
97struct rbd_image_header {
f84344f3 98 /* These four fields never change for a given rbd image */
849b4260 99 char *object_prefix;
34b13184 100 u64 features;
602adf40
YS
101 __u8 obj_order;
102 __u8 crypt_type;
103 __u8 comp_type;
602adf40 104
f84344f3
AE
105 /* The remaining fields need to be updated occasionally */
106 u64 image_size;
107 struct ceph_snap_context *snapc;
602adf40
YS
108 char *snap_names;
109 u64 *snap_sizes;
59c2be1e
YS
110
111 u64 obj_version;
112};
113
0d7dbfce
AE
114/*
115 * An rbd image specification.
116 *
117 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
118 * identify an image. Each rbd_dev structure includes a pointer to
119 * an rbd_spec structure that encapsulates this identity.
120 *
121 * Each of the id's in an rbd_spec has an associated name. For a
122 * user-mapped image, the names are supplied and the id's associated
123 * with them are looked up. For a layered image, a parent image is
124 * defined by the tuple, and the names are looked up.
125 *
126 * An rbd_dev structure contains a parent_spec pointer which is
127 * non-null if the image it represents is a child in a layered
128 * image. This pointer will refer to the rbd_spec structure used
129 * by the parent rbd_dev for its own identity (i.e., the structure
130 * is shared between the parent and child).
131 *
132 * Since these structures are populated once, during the discovery
133 * phase of image construction, they are effectively immutable so
134 * we make no effort to synchronize access to them.
135 *
136 * Note that code herein does not assume the image name is known (it
137 * could be a null pointer).
0d7dbfce
AE
138 */
139struct rbd_spec {
140 u64 pool_id;
141 char *pool_name;
142
143 char *image_id;
0d7dbfce 144 char *image_name;
0d7dbfce
AE
145
146 u64 snap_id;
147 char *snap_name;
148
149 struct kref kref;
150};
151
602adf40 152/*
f0f8cef5 153 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
154 */
155struct rbd_client {
156 struct ceph_client *client;
157 struct kref kref;
158 struct list_head node;
159};
160
bf0d5f50
AE
161struct rbd_img_request;
162typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
163
164#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
165
166struct rbd_obj_request;
167typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
168
9969ebc5
AE
169enum obj_request_type {
170 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
171};
bf0d5f50
AE
172
173struct rbd_obj_request {
174 const char *object_name;
175 u64 offset; /* object start byte */
176 u64 length; /* bytes from offset */
177
178 struct rbd_img_request *img_request;
7da22d29 179 u64 img_offset; /* image relative offset */
bf0d5f50
AE
180 struct list_head links; /* img_request->obj_requests */
181 u32 which; /* posn image request list */
182
183 enum obj_request_type type;
788e2df3
AE
184 union {
185 struct bio *bio_list;
186 struct {
187 struct page **pages;
188 u32 page_count;
189 };
190 };
bf0d5f50
AE
191
192 struct ceph_osd_request *osd_req;
193
194 u64 xferred; /* bytes transferred */
195 u64 version;
1b83bef2 196 int result;
bf0d5f50
AE
197 atomic_t done;
198
199 rbd_obj_callback_t callback;
788e2df3 200 struct completion completion;
bf0d5f50
AE
201
202 struct kref kref;
203};
204
205struct rbd_img_request {
206 struct request *rq;
207 struct rbd_device *rbd_dev;
208 u64 offset; /* starting image byte offset */
209 u64 length; /* byte count from offset */
210 bool write_request; /* false for read */
211 union {
212 struct ceph_snap_context *snapc; /* for writes */
213 u64 snap_id; /* for reads */
214 };
215 spinlock_t completion_lock;/* protects next_completion */
216 u32 next_completion;
217 rbd_img_callback_t callback;
55f27e09 218 u64 xferred;/* aggregate bytes transferred */
a5a337d4 219 int result; /* first nonzero obj_request result */
bf0d5f50
AE
220
221 u32 obj_request_count;
222 struct list_head obj_requests; /* rbd_obj_request structs */
223
224 struct kref kref;
225};
226
227#define for_each_obj_request(ireq, oreq) \
ef06f4d3 228 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 229#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 230 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 231#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 232 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 233
dfc5606d
YS
234struct rbd_snap {
235 struct device dev;
236 const char *name;
3591538f 237 u64 size;
dfc5606d
YS
238 struct list_head node;
239 u64 id;
34b13184 240 u64 features;
dfc5606d
YS
241};
242
f84344f3 243struct rbd_mapping {
99c1f08f 244 u64 size;
34b13184 245 u64 features;
f84344f3
AE
246 bool read_only;
247};
248
602adf40
YS
249/*
250 * a single device
251 */
252struct rbd_device {
de71a297 253 int dev_id; /* blkdev unique id */
602adf40
YS
254
255 int major; /* blkdev assigned major */
256 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 257
a30b71b9 258 u32 image_format; /* Either 1 or 2 */
602adf40
YS
259 struct rbd_client *rbd_client;
260
261 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
262
b82d167b 263 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
264
265 struct rbd_image_header header;
b82d167b 266 unsigned long flags; /* possibly lock protected */
0d7dbfce 267 struct rbd_spec *spec;
602adf40 268
0d7dbfce 269 char *header_name;
971f839a 270
0903e875
AE
271 struct ceph_file_layout layout;
272
59c2be1e 273 struct ceph_osd_event *watch_event;
975241af 274 struct rbd_obj_request *watch_request;
59c2be1e 275
86b00e0d
AE
276 struct rbd_spec *parent_spec;
277 u64 parent_overlap;
278
c666601a
JD
279 /* protects updating the header */
280 struct rw_semaphore header_rwsem;
f84344f3
AE
281
282 struct rbd_mapping mapping;
602adf40
YS
283
284 struct list_head node;
dfc5606d
YS
285
286 /* list of snapshots */
287 struct list_head snaps;
288
289 /* sysfs related */
290 struct device dev;
b82d167b 291 unsigned long open_count; /* protected by lock */
dfc5606d
YS
292};
293
b82d167b
AE
294/*
295 * Flag bits for rbd_dev->flags. If atomicity is required,
296 * rbd_dev->lock is used to protect access.
297 *
298 * Currently, only the "removing" flag (which is coupled with the
299 * "open_count" field) requires atomic access.
300 */
6d292906
AE
301enum rbd_dev_flags {
302 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 303 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
304};
305
602adf40 306static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 307
602adf40 308static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
309static DEFINE_SPINLOCK(rbd_dev_list_lock);
310
432b8587
AE
311static LIST_HEAD(rbd_client_list); /* clients */
312static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 313
304f6808
AE
314static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
315static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
316
dfc5606d 317static void rbd_dev_release(struct device *dev);
41f38c2b 318static void rbd_remove_snap_dev(struct rbd_snap *snap);
dfc5606d 319
f0f8cef5
AE
320static ssize_t rbd_add(struct bus_type *bus, const char *buf,
321 size_t count);
322static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
323 size_t count);
324
325static struct bus_attribute rbd_bus_attrs[] = {
326 __ATTR(add, S_IWUSR, NULL, rbd_add),
327 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
328 __ATTR_NULL
329};
330
331static struct bus_type rbd_bus_type = {
332 .name = "rbd",
333 .bus_attrs = rbd_bus_attrs,
334};
335
336static void rbd_root_dev_release(struct device *dev)
337{
338}
339
340static struct device rbd_root_dev = {
341 .init_name = "rbd",
342 .release = rbd_root_dev_release,
343};
344
06ecc6cb
AE
345static __printf(2, 3)
346void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
347{
348 struct va_format vaf;
349 va_list args;
350
351 va_start(args, fmt);
352 vaf.fmt = fmt;
353 vaf.va = &args;
354
355 if (!rbd_dev)
356 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
357 else if (rbd_dev->disk)
358 printk(KERN_WARNING "%s: %s: %pV\n",
359 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
360 else if (rbd_dev->spec && rbd_dev->spec->image_name)
361 printk(KERN_WARNING "%s: image %s: %pV\n",
362 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
363 else if (rbd_dev->spec && rbd_dev->spec->image_id)
364 printk(KERN_WARNING "%s: id %s: %pV\n",
365 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
366 else /* punt */
367 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
368 RBD_DRV_NAME, rbd_dev, &vaf);
369 va_end(args);
370}
371
aafb230e
AE
372#ifdef RBD_DEBUG
373#define rbd_assert(expr) \
374 if (unlikely(!(expr))) { \
375 printk(KERN_ERR "\nAssertion failure in %s() " \
376 "at line %d:\n\n" \
377 "\trbd_assert(%s);\n\n", \
378 __func__, __LINE__, #expr); \
379 BUG(); \
380 }
381#else /* !RBD_DEBUG */
382# define rbd_assert(expr) ((void) 0)
383#endif /* !RBD_DEBUG */
dfc5606d 384
117973fb
AE
385static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
386static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
59c2be1e 387
602adf40
YS
388static int rbd_open(struct block_device *bdev, fmode_t mode)
389{
f0f8cef5 390 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 391 bool removing = false;
602adf40 392
f84344f3 393 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
394 return -EROFS;
395
a14ea269 396 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
397 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
398 removing = true;
399 else
400 rbd_dev->open_count++;
a14ea269 401 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
402 if (removing)
403 return -ENOENT;
404
42382b70 405 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 406 (void) get_device(&rbd_dev->dev);
f84344f3 407 set_device_ro(bdev, rbd_dev->mapping.read_only);
42382b70 408 mutex_unlock(&ctl_mutex);
340c7a2b 409
602adf40
YS
410 return 0;
411}
412
dfc5606d
YS
413static int rbd_release(struct gendisk *disk, fmode_t mode)
414{
415 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
416 unsigned long open_count_before;
417
a14ea269 418 spin_lock_irq(&rbd_dev->lock);
b82d167b 419 open_count_before = rbd_dev->open_count--;
a14ea269 420 spin_unlock_irq(&rbd_dev->lock);
b82d167b 421 rbd_assert(open_count_before > 0);
dfc5606d 422
42382b70 423 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 424 put_device(&rbd_dev->dev);
42382b70 425 mutex_unlock(&ctl_mutex);
dfc5606d
YS
426
427 return 0;
428}
429
602adf40
YS
430static const struct block_device_operations rbd_bd_ops = {
431 .owner = THIS_MODULE,
432 .open = rbd_open,
dfc5606d 433 .release = rbd_release,
602adf40
YS
434};
435
436/*
437 * Initialize an rbd client instance.
43ae4701 438 * We own *ceph_opts.
602adf40 439 */
f8c38929 440static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
441{
442 struct rbd_client *rbdc;
443 int ret = -ENOMEM;
444
37206ee5 445 dout("%s:\n", __func__);
602adf40
YS
446 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
447 if (!rbdc)
448 goto out_opt;
449
450 kref_init(&rbdc->kref);
451 INIT_LIST_HEAD(&rbdc->node);
452
bc534d86
AE
453 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
454
43ae4701 455 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 456 if (IS_ERR(rbdc->client))
bc534d86 457 goto out_mutex;
43ae4701 458 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
459
460 ret = ceph_open_session(rbdc->client);
461 if (ret < 0)
462 goto out_err;
463
432b8587 464 spin_lock(&rbd_client_list_lock);
602adf40 465 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 466 spin_unlock(&rbd_client_list_lock);
602adf40 467
bc534d86 468 mutex_unlock(&ctl_mutex);
37206ee5 469 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 470
602adf40
YS
471 return rbdc;
472
473out_err:
474 ceph_destroy_client(rbdc->client);
bc534d86
AE
475out_mutex:
476 mutex_unlock(&ctl_mutex);
602adf40
YS
477 kfree(rbdc);
478out_opt:
43ae4701
AE
479 if (ceph_opts)
480 ceph_destroy_options(ceph_opts);
37206ee5
AE
481 dout("%s: error %d\n", __func__, ret);
482
28f259b7 483 return ERR_PTR(ret);
602adf40
YS
484}
485
486/*
1f7ba331
AE
487 * Find a ceph client with specific addr and configuration. If
488 * found, bump its reference count.
602adf40 489 */
1f7ba331 490static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
491{
492 struct rbd_client *client_node;
1f7ba331 493 bool found = false;
602adf40 494
43ae4701 495 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
496 return NULL;
497
1f7ba331
AE
498 spin_lock(&rbd_client_list_lock);
499 list_for_each_entry(client_node, &rbd_client_list, node) {
500 if (!ceph_compare_options(ceph_opts, client_node->client)) {
501 kref_get(&client_node->kref);
502 found = true;
503 break;
504 }
505 }
506 spin_unlock(&rbd_client_list_lock);
507
508 return found ? client_node : NULL;
602adf40
YS
509}
510
59c2be1e
YS
511/*
512 * mount options
513 */
514enum {
59c2be1e
YS
515 Opt_last_int,
516 /* int args above */
517 Opt_last_string,
518 /* string args above */
cc0538b6
AE
519 Opt_read_only,
520 Opt_read_write,
521 /* Boolean args above */
522 Opt_last_bool,
59c2be1e
YS
523};
524
43ae4701 525static match_table_t rbd_opts_tokens = {
59c2be1e
YS
526 /* int args above */
527 /* string args above */
be466c1c 528 {Opt_read_only, "read_only"},
cc0538b6
AE
529 {Opt_read_only, "ro"}, /* Alternate spelling */
530 {Opt_read_write, "read_write"},
531 {Opt_read_write, "rw"}, /* Alternate spelling */
532 /* Boolean args above */
59c2be1e
YS
533 {-1, NULL}
534};
535
98571b5a
AE
536struct rbd_options {
537 bool read_only;
538};
539
540#define RBD_READ_ONLY_DEFAULT false
541
59c2be1e
YS
542static int parse_rbd_opts_token(char *c, void *private)
543{
43ae4701 544 struct rbd_options *rbd_opts = private;
59c2be1e
YS
545 substring_t argstr[MAX_OPT_ARGS];
546 int token, intval, ret;
547
43ae4701 548 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
549 if (token < 0)
550 return -EINVAL;
551
552 if (token < Opt_last_int) {
553 ret = match_int(&argstr[0], &intval);
554 if (ret < 0) {
555 pr_err("bad mount option arg (not int) "
556 "at '%s'\n", c);
557 return ret;
558 }
559 dout("got int token %d val %d\n", token, intval);
560 } else if (token > Opt_last_int && token < Opt_last_string) {
561 dout("got string token %d val %s\n", token,
562 argstr[0].from);
cc0538b6
AE
563 } else if (token > Opt_last_string && token < Opt_last_bool) {
564 dout("got Boolean token %d\n", token);
59c2be1e
YS
565 } else {
566 dout("got token %d\n", token);
567 }
568
569 switch (token) {
cc0538b6
AE
570 case Opt_read_only:
571 rbd_opts->read_only = true;
572 break;
573 case Opt_read_write:
574 rbd_opts->read_only = false;
575 break;
59c2be1e 576 default:
aafb230e
AE
577 rbd_assert(false);
578 break;
59c2be1e
YS
579 }
580 return 0;
581}
582
602adf40
YS
583/*
584 * Get a ceph client with specific addr and configuration, if one does
585 * not exist create it.
586 */
9d3997fd 587static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 588{
f8c38929 589 struct rbd_client *rbdc;
59c2be1e 590
1f7ba331 591 rbdc = rbd_client_find(ceph_opts);
9d3997fd 592 if (rbdc) /* using an existing client */
43ae4701 593 ceph_destroy_options(ceph_opts);
9d3997fd 594 else
f8c38929 595 rbdc = rbd_client_create(ceph_opts);
602adf40 596
9d3997fd 597 return rbdc;
602adf40
YS
598}
599
600/*
601 * Destroy ceph client
d23a4b3f 602 *
432b8587 603 * Caller must hold rbd_client_list_lock.
602adf40
YS
604 */
605static void rbd_client_release(struct kref *kref)
606{
607 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
608
37206ee5 609 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 610 spin_lock(&rbd_client_list_lock);
602adf40 611 list_del(&rbdc->node);
cd9d9f5d 612 spin_unlock(&rbd_client_list_lock);
602adf40
YS
613
614 ceph_destroy_client(rbdc->client);
615 kfree(rbdc);
616}
617
618/*
619 * Drop reference to ceph client node. If it's not referenced anymore, release
620 * it.
621 */
9d3997fd 622static void rbd_put_client(struct rbd_client *rbdc)
602adf40 623{
c53d5893
AE
624 if (rbdc)
625 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
626}
627
a30b71b9
AE
628static bool rbd_image_format_valid(u32 image_format)
629{
630 return image_format == 1 || image_format == 2;
631}
632
8e94af8e
AE
633static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
634{
103a150f
AE
635 size_t size;
636 u32 snap_count;
637
638 /* The header has to start with the magic rbd header text */
639 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
640 return false;
641
db2388b6
AE
642 /* The bio layer requires at least sector-sized I/O */
643
644 if (ondisk->options.order < SECTOR_SHIFT)
645 return false;
646
647 /* If we use u64 in a few spots we may be able to loosen this */
648
649 if (ondisk->options.order > 8 * sizeof (int) - 1)
650 return false;
651
103a150f
AE
652 /*
653 * The size of a snapshot header has to fit in a size_t, and
654 * that limits the number of snapshots.
655 */
656 snap_count = le32_to_cpu(ondisk->snap_count);
657 size = SIZE_MAX - sizeof (struct ceph_snap_context);
658 if (snap_count > size / sizeof (__le64))
659 return false;
660
661 /*
662 * Not only that, but the size of the entire the snapshot
663 * header must also be representable in a size_t.
664 */
665 size -= snap_count * sizeof (__le64);
666 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
667 return false;
668
669 return true;
8e94af8e
AE
670}
671
602adf40
YS
672/*
673 * Create a new header structure, translate header format from the on-disk
674 * header.
675 */
676static int rbd_header_from_disk(struct rbd_image_header *header,
4156d998 677 struct rbd_image_header_ondisk *ondisk)
602adf40 678{
ccece235 679 u32 snap_count;
58c17b0e 680 size_t len;
d2bb24e5 681 size_t size;
621901d6 682 u32 i;
602adf40 683
6a52325f
AE
684 memset(header, 0, sizeof (*header));
685
103a150f
AE
686 snap_count = le32_to_cpu(ondisk->snap_count);
687
58c17b0e
AE
688 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
689 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
6a52325f 690 if (!header->object_prefix)
602adf40 691 return -ENOMEM;
58c17b0e
AE
692 memcpy(header->object_prefix, ondisk->object_prefix, len);
693 header->object_prefix[len] = '\0';
00f1f36f 694
602adf40 695 if (snap_count) {
f785cc1d
AE
696 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
697
621901d6
AE
698 /* Save a copy of the snapshot names */
699
f785cc1d
AE
700 if (snap_names_len > (u64) SIZE_MAX)
701 return -EIO;
702 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
602adf40 703 if (!header->snap_names)
6a52325f 704 goto out_err;
f785cc1d
AE
705 /*
706 * Note that rbd_dev_v1_header_read() guarantees
707 * the ondisk buffer we're working with has
708 * snap_names_len bytes beyond the end of the
709 * snapshot id array, this memcpy() is safe.
710 */
711 memcpy(header->snap_names, &ondisk->snaps[snap_count],
712 snap_names_len);
6a52325f 713
621901d6
AE
714 /* Record each snapshot's size */
715
d2bb24e5
AE
716 size = snap_count * sizeof (*header->snap_sizes);
717 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602adf40 718 if (!header->snap_sizes)
6a52325f 719 goto out_err;
621901d6
AE
720 for (i = 0; i < snap_count; i++)
721 header->snap_sizes[i] =
722 le64_to_cpu(ondisk->snaps[i].image_size);
602adf40 723 } else {
ccece235 724 WARN_ON(ondisk->snap_names_len);
602adf40
YS
725 header->snap_names = NULL;
726 header->snap_sizes = NULL;
727 }
849b4260 728
34b13184 729 header->features = 0; /* No features support in v1 images */
602adf40
YS
730 header->obj_order = ondisk->options.order;
731 header->crypt_type = ondisk->options.crypt_type;
732 header->comp_type = ondisk->options.comp_type;
6a52325f 733
621901d6
AE
734 /* Allocate and fill in the snapshot context */
735
f84344f3 736 header->image_size = le64_to_cpu(ondisk->image_size);
6a52325f
AE
737 size = sizeof (struct ceph_snap_context);
738 size += snap_count * sizeof (header->snapc->snaps[0]);
739 header->snapc = kzalloc(size, GFP_KERNEL);
740 if (!header->snapc)
741 goto out_err;
602adf40
YS
742
743 atomic_set(&header->snapc->nref, 1);
505cbb9b 744 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 745 header->snapc->num_snaps = snap_count;
621901d6
AE
746 for (i = 0; i < snap_count; i++)
747 header->snapc->snaps[i] =
748 le64_to_cpu(ondisk->snaps[i].id);
602adf40
YS
749
750 return 0;
751
6a52325f 752out_err:
849b4260 753 kfree(header->snap_sizes);
ccece235 754 header->snap_sizes = NULL;
602adf40 755 kfree(header->snap_names);
ccece235 756 header->snap_names = NULL;
6a52325f
AE
757 kfree(header->object_prefix);
758 header->object_prefix = NULL;
ccece235 759
00f1f36f 760 return -ENOMEM;
602adf40
YS
761}
762
9e15b77d
AE
763static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
764{
765 struct rbd_snap *snap;
766
767 if (snap_id == CEPH_NOSNAP)
768 return RBD_SNAP_HEAD_NAME;
769
770 list_for_each_entry(snap, &rbd_dev->snaps, node)
771 if (snap_id == snap->id)
772 return snap->name;
773
774 return NULL;
775}
776
8836b995 777static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
602adf40 778{
602adf40 779
e86924a8 780 struct rbd_snap *snap;
602adf40 781
e86924a8
AE
782 list_for_each_entry(snap, &rbd_dev->snaps, node) {
783 if (!strcmp(snap_name, snap->name)) {
0d7dbfce 784 rbd_dev->spec->snap_id = snap->id;
e86924a8 785 rbd_dev->mapping.size = snap->size;
34b13184 786 rbd_dev->mapping.features = snap->features;
602adf40 787
e86924a8 788 return 0;
00f1f36f 789 }
00f1f36f 790 }
e86924a8 791
00f1f36f 792 return -ENOENT;
602adf40
YS
793}
794
819d52bf 795static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
602adf40 796{
78dc447d 797 int ret;
602adf40 798
0d7dbfce 799 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
cc9d734c 800 sizeof (RBD_SNAP_HEAD_NAME))) {
0d7dbfce 801 rbd_dev->spec->snap_id = CEPH_NOSNAP;
99c1f08f 802 rbd_dev->mapping.size = rbd_dev->header.image_size;
34b13184 803 rbd_dev->mapping.features = rbd_dev->header.features;
e86924a8 804 ret = 0;
602adf40 805 } else {
0d7dbfce 806 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
602adf40
YS
807 if (ret < 0)
808 goto done;
f84344f3 809 rbd_dev->mapping.read_only = true;
602adf40 810 }
6d292906
AE
811 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
812
602adf40 813done:
602adf40
YS
814 return ret;
815}
816
817static void rbd_header_free(struct rbd_image_header *header)
818{
849b4260 819 kfree(header->object_prefix);
d78fd7ae 820 header->object_prefix = NULL;
602adf40 821 kfree(header->snap_sizes);
d78fd7ae 822 header->snap_sizes = NULL;
849b4260 823 kfree(header->snap_names);
d78fd7ae 824 header->snap_names = NULL;
d1d25646 825 ceph_put_snap_context(header->snapc);
d78fd7ae 826 header->snapc = NULL;
602adf40
YS
827}
828
98571b5a 829static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 830{
65ccfe21
AE
831 char *name;
832 u64 segment;
833 int ret;
602adf40 834
2fd82b9e 835 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
65ccfe21
AE
836 if (!name)
837 return NULL;
838 segment = offset >> rbd_dev->header.obj_order;
2fd82b9e 839 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
65ccfe21 840 rbd_dev->header.object_prefix, segment);
2fd82b9e 841 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
842 pr_err("error formatting segment name for #%llu (%d)\n",
843 segment, ret);
844 kfree(name);
845 name = NULL;
846 }
602adf40 847
65ccfe21
AE
848 return name;
849}
602adf40 850
65ccfe21
AE
851static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
852{
853 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 854
65ccfe21
AE
855 return offset & (segment_size - 1);
856}
857
858static u64 rbd_segment_length(struct rbd_device *rbd_dev,
859 u64 offset, u64 length)
860{
861 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
862
863 offset &= segment_size - 1;
864
aafb230e 865 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
866 if (offset + length > segment_size)
867 length = segment_size - offset;
868
869 return length;
602adf40
YS
870}
871
029bcbd8
JD
872/*
873 * returns the size of an object in the image
874 */
875static u64 rbd_obj_bytes(struct rbd_image_header *header)
876{
877 return 1 << header->obj_order;
878}
879
602adf40
YS
880/*
881 * bio helpers
882 */
883
884static void bio_chain_put(struct bio *chain)
885{
886 struct bio *tmp;
887
888 while (chain) {
889 tmp = chain;
890 chain = chain->bi_next;
891 bio_put(tmp);
892 }
893}
894
895/*
896 * zeros a bio chain, starting at specific offset
897 */
898static void zero_bio_chain(struct bio *chain, int start_ofs)
899{
900 struct bio_vec *bv;
901 unsigned long flags;
902 void *buf;
903 int i;
904 int pos = 0;
905
906 while (chain) {
907 bio_for_each_segment(bv, chain, i) {
908 if (pos + bv->bv_len > start_ofs) {
909 int remainder = max(start_ofs - pos, 0);
910 buf = bvec_kmap_irq(bv, &flags);
911 memset(buf + remainder, 0,
912 bv->bv_len - remainder);
85b5aaa6 913 bvec_kunmap_irq(buf, &flags);
602adf40
YS
914 }
915 pos += bv->bv_len;
916 }
917
918 chain = chain->bi_next;
919 }
920}
921
922/*
f7760dad
AE
923 * Clone a portion of a bio, starting at the given byte offset
924 * and continuing for the number of bytes indicated.
602adf40 925 */
f7760dad
AE
926static struct bio *bio_clone_range(struct bio *bio_src,
927 unsigned int offset,
928 unsigned int len,
929 gfp_t gfpmask)
602adf40 930{
f7760dad
AE
931 struct bio_vec *bv;
932 unsigned int resid;
933 unsigned short idx;
934 unsigned int voff;
935 unsigned short end_idx;
936 unsigned short vcnt;
937 struct bio *bio;
938
939 /* Handle the easy case for the caller */
940
941 if (!offset && len == bio_src->bi_size)
942 return bio_clone(bio_src, gfpmask);
943
944 if (WARN_ON_ONCE(!len))
945 return NULL;
946 if (WARN_ON_ONCE(len > bio_src->bi_size))
947 return NULL;
948 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
949 return NULL;
950
951 /* Find first affected segment... */
952
953 resid = offset;
954 __bio_for_each_segment(bv, bio_src, idx, 0) {
955 if (resid < bv->bv_len)
956 break;
957 resid -= bv->bv_len;
602adf40 958 }
f7760dad 959 voff = resid;
602adf40 960
f7760dad 961 /* ...and the last affected segment */
602adf40 962
f7760dad
AE
963 resid += len;
964 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
965 if (resid <= bv->bv_len)
966 break;
967 resid -= bv->bv_len;
968 }
969 vcnt = end_idx - idx + 1;
970
971 /* Build the clone */
972
973 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
974 if (!bio)
975 return NULL; /* ENOMEM */
602adf40 976
f7760dad
AE
977 bio->bi_bdev = bio_src->bi_bdev;
978 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
979 bio->bi_rw = bio_src->bi_rw;
980 bio->bi_flags |= 1 << BIO_CLONED;
981
982 /*
983 * Copy over our part of the bio_vec, then update the first
984 * and last (or only) entries.
985 */
986 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
987 vcnt * sizeof (struct bio_vec));
988 bio->bi_io_vec[0].bv_offset += voff;
989 if (vcnt > 1) {
990 bio->bi_io_vec[0].bv_len -= voff;
991 bio->bi_io_vec[vcnt - 1].bv_len = resid;
992 } else {
993 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
994 }
995
f7760dad
AE
996 bio->bi_vcnt = vcnt;
997 bio->bi_size = len;
998 bio->bi_idx = 0;
999
1000 return bio;
1001}
1002
1003/*
1004 * Clone a portion of a bio chain, starting at the given byte offset
1005 * into the first bio in the source chain and continuing for the
1006 * number of bytes indicated. The result is another bio chain of
1007 * exactly the given length, or a null pointer on error.
1008 *
1009 * The bio_src and offset parameters are both in-out. On entry they
1010 * refer to the first source bio and the offset into that bio where
1011 * the start of data to be cloned is located.
1012 *
1013 * On return, bio_src is updated to refer to the bio in the source
1014 * chain that contains first un-cloned byte, and *offset will
1015 * contain the offset of that byte within that bio.
1016 */
1017static struct bio *bio_chain_clone_range(struct bio **bio_src,
1018 unsigned int *offset,
1019 unsigned int len,
1020 gfp_t gfpmask)
1021{
1022 struct bio *bi = *bio_src;
1023 unsigned int off = *offset;
1024 struct bio *chain = NULL;
1025 struct bio **end;
1026
1027 /* Build up a chain of clone bios up to the limit */
1028
1029 if (!bi || off >= bi->bi_size || !len)
1030 return NULL; /* Nothing to clone */
602adf40 1031
f7760dad
AE
1032 end = &chain;
1033 while (len) {
1034 unsigned int bi_size;
1035 struct bio *bio;
1036
f5400b7a
AE
1037 if (!bi) {
1038 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1039 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1040 }
f7760dad
AE
1041 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1042 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1043 if (!bio)
1044 goto out_err; /* ENOMEM */
1045
1046 *end = bio;
1047 end = &bio->bi_next;
602adf40 1048
f7760dad
AE
1049 off += bi_size;
1050 if (off == bi->bi_size) {
1051 bi = bi->bi_next;
1052 off = 0;
1053 }
1054 len -= bi_size;
1055 }
1056 *bio_src = bi;
1057 *offset = off;
1058
1059 return chain;
1060out_err:
1061 bio_chain_put(chain);
602adf40 1062
602adf40
YS
1063 return NULL;
1064}
1065
bf0d5f50
AE
1066static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1067{
37206ee5
AE
1068 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1069 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1070 kref_get(&obj_request->kref);
1071}
1072
1073static void rbd_obj_request_destroy(struct kref *kref);
1074static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1075{
1076 rbd_assert(obj_request != NULL);
37206ee5
AE
1077 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1078 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1079 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1080}
1081
1082static void rbd_img_request_get(struct rbd_img_request *img_request)
1083{
37206ee5
AE
1084 dout("%s: img %p (was %d)\n", __func__, img_request,
1085 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1086 kref_get(&img_request->kref);
1087}
1088
1089static void rbd_img_request_destroy(struct kref *kref);
1090static void rbd_img_request_put(struct rbd_img_request *img_request)
1091{
1092 rbd_assert(img_request != NULL);
37206ee5
AE
1093 dout("%s: img %p (was %d)\n", __func__, img_request,
1094 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1095 kref_put(&img_request->kref, rbd_img_request_destroy);
1096}
1097
1098static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1099 struct rbd_obj_request *obj_request)
1100{
25dcf954
AE
1101 rbd_assert(obj_request->img_request == NULL);
1102
bf0d5f50
AE
1103 rbd_obj_request_get(obj_request);
1104 obj_request->img_request = img_request;
25dcf954 1105 obj_request->which = img_request->obj_request_count;
bf0d5f50 1106 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1107 img_request->obj_request_count++;
1108 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1109 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1110 obj_request->which);
bf0d5f50
AE
1111}
1112
1113static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1114 struct rbd_obj_request *obj_request)
1115{
1116 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1117
37206ee5
AE
1118 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1119 obj_request->which);
bf0d5f50 1120 list_del(&obj_request->links);
25dcf954
AE
1121 rbd_assert(img_request->obj_request_count > 0);
1122 img_request->obj_request_count--;
1123 rbd_assert(obj_request->which == img_request->obj_request_count);
1124 obj_request->which = BAD_WHICH;
bf0d5f50 1125 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1126 obj_request->img_request = NULL;
25dcf954 1127 obj_request->callback = NULL;
bf0d5f50
AE
1128 rbd_obj_request_put(obj_request);
1129}
1130
1131static bool obj_request_type_valid(enum obj_request_type type)
1132{
1133 switch (type) {
9969ebc5 1134 case OBJ_REQUEST_NODATA:
bf0d5f50 1135 case OBJ_REQUEST_BIO:
788e2df3 1136 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1137 return true;
1138 default:
1139 return false;
1140 }
1141}
1142
bf0d5f50
AE
1143static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1144 struct rbd_obj_request *obj_request)
1145{
37206ee5
AE
1146 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1147
bf0d5f50
AE
1148 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1149}
1150
1151static void rbd_img_request_complete(struct rbd_img_request *img_request)
1152{
55f27e09 1153
37206ee5 1154 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1155
1156 /*
1157 * If no error occurred, compute the aggregate transfer
1158 * count for the image request. We could instead use
1159 * atomic64_cmpxchg() to update it as each object request
1160 * completes; not clear which way is better off hand.
1161 */
1162 if (!img_request->result) {
1163 struct rbd_obj_request *obj_request;
1164 u64 xferred = 0;
1165
1166 for_each_obj_request(img_request, obj_request)
1167 xferred += obj_request->xferred;
1168 img_request->xferred = xferred;
1169 }
1170
bf0d5f50
AE
1171 if (img_request->callback)
1172 img_request->callback(img_request);
1173 else
1174 rbd_img_request_put(img_request);
1175}
1176
788e2df3
AE
1177/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1178
1179static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1180{
37206ee5
AE
1181 dout("%s: obj %p\n", __func__, obj_request);
1182
788e2df3
AE
1183 return wait_for_completion_interruptible(&obj_request->completion);
1184}
1185
07741308
AE
1186static void obj_request_done_init(struct rbd_obj_request *obj_request)
1187{
1188 atomic_set(&obj_request->done, 0);
1189 smp_wmb();
1190}
1191
1192static void obj_request_done_set(struct rbd_obj_request *obj_request)
1193{
632b88ca
AE
1194 int done;
1195
1196 done = atomic_inc_return(&obj_request->done);
1197 if (done > 1) {
1198 struct rbd_img_request *img_request = obj_request->img_request;
1199 struct rbd_device *rbd_dev;
1200
1201 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1202 rbd_warn(rbd_dev, "obj_request %p was already done\n",
1203 obj_request);
1204 }
07741308
AE
1205}
1206
1207static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1208{
632b88ca 1209 smp_mb();
07741308
AE
1210 return atomic_read(&obj_request->done) != 0;
1211}
1212
6e2a4505
AE
1213static void
1214rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1215{
1216 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1217 obj_request, obj_request->img_request, obj_request->result,
1218 obj_request->xferred, obj_request->length);
1219 /*
1220 * ENOENT means a hole in the image. We zero-fill the
1221 * entire length of the request. A short read also implies
1222 * zero-fill to the end of the request. Either way we
1223 * update the xferred count to indicate the whole request
1224 * was satisfied.
1225 */
1226 BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
1227 if (obj_request->result == -ENOENT) {
1228 zero_bio_chain(obj_request->bio_list, 0);
1229 obj_request->result = 0;
1230 obj_request->xferred = obj_request->length;
1231 } else if (obj_request->xferred < obj_request->length &&
1232 !obj_request->result) {
1233 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1234 obj_request->xferred = obj_request->length;
1235 }
1236 obj_request_done_set(obj_request);
1237}
1238
bf0d5f50
AE
1239static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1240{
37206ee5
AE
1241 dout("%s: obj %p cb %p\n", __func__, obj_request,
1242 obj_request->callback);
bf0d5f50
AE
1243 if (obj_request->callback)
1244 obj_request->callback(obj_request);
788e2df3
AE
1245 else
1246 complete_all(&obj_request->completion);
bf0d5f50
AE
1247}
1248
c47f9371 1249static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1250{
1251 dout("%s: obj %p\n", __func__, obj_request);
1252 obj_request_done_set(obj_request);
1253}
1254
c47f9371 1255static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1256{
37206ee5 1257 dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
c47f9371 1258 obj_request->result, obj_request->xferred, obj_request->length);
6e2a4505
AE
1259 if (obj_request->img_request)
1260 rbd_img_obj_request_read_callback(obj_request);
1261 else
1262 obj_request_done_set(obj_request);
bf0d5f50
AE
1263}
1264
c47f9371 1265static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1266{
1b83bef2
SW
1267 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1268 obj_request->result, obj_request->length);
1269 /*
1270 * There is no such thing as a successful short write.
1271 * Our xferred value is the number of bytes transferred
1272 * back. Set it to our originally-requested length.
1273 */
1274 obj_request->xferred = obj_request->length;
07741308 1275 obj_request_done_set(obj_request);
bf0d5f50
AE
1276}
1277
fbfab539
AE
1278/*
1279 * For a simple stat call there's nothing to do. We'll do more if
1280 * this is part of a write sequence for a layered image.
1281 */
c47f9371 1282static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1283{
37206ee5 1284 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1285 obj_request_done_set(obj_request);
1286}
1287
bf0d5f50
AE
1288static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1289 struct ceph_msg *msg)
1290{
1291 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1292 u16 opcode;
1293
37206ee5 1294 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50
AE
1295 rbd_assert(osd_req == obj_request->osd_req);
1296 rbd_assert(!!obj_request->img_request ^
1297 (obj_request->which == BAD_WHICH));
1298
1b83bef2
SW
1299 if (osd_req->r_result < 0)
1300 obj_request->result = osd_req->r_result;
bf0d5f50
AE
1301 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1302
1b83bef2 1303 WARN_ON(osd_req->r_num_ops != 1); /* For now */
bf0d5f50 1304
c47f9371
AE
1305 /*
1306 * We support a 64-bit length, but ultimately it has to be
1307 * passed to blk_end_request(), which takes an unsigned int.
1308 */
1b83bef2 1309 obj_request->xferred = osd_req->r_reply_op_len[0];
c47f9371 1310 rbd_assert(obj_request->xferred < (u64) UINT_MAX);
79528734 1311 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1312 switch (opcode) {
1313 case CEPH_OSD_OP_READ:
c47f9371 1314 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1315 break;
1316 case CEPH_OSD_OP_WRITE:
c47f9371 1317 rbd_osd_write_callback(obj_request);
bf0d5f50 1318 break;
fbfab539 1319 case CEPH_OSD_OP_STAT:
c47f9371 1320 rbd_osd_stat_callback(obj_request);
fbfab539 1321 break;
36be9a76 1322 case CEPH_OSD_OP_CALL:
b8d70035 1323 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1324 case CEPH_OSD_OP_WATCH:
c47f9371 1325 rbd_osd_trivial_callback(obj_request);
9969ebc5 1326 break;
bf0d5f50
AE
1327 default:
1328 rbd_warn(NULL, "%s: unsupported op %hu\n",
1329 obj_request->object_name, (unsigned short) opcode);
1330 break;
1331 }
1332
07741308 1333 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1334 rbd_obj_request_complete(obj_request);
1335}
1336
2fa12320 1337static void rbd_osd_req_format(struct rbd_obj_request *obj_request,
79528734 1338 bool write_request)
430c28c3
AE
1339{
1340 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1341 struct ceph_osd_request *osd_req = obj_request->osd_req;
430c28c3
AE
1342 struct ceph_snap_context *snapc = NULL;
1343 u64 snap_id = CEPH_NOSNAP;
1344 struct timespec *mtime = NULL;
1345 struct timespec now;
1346
8c042b0d 1347 rbd_assert(osd_req != NULL);
430c28c3
AE
1348
1349 if (write_request) {
1350 now = CURRENT_TIME;
1351 mtime = &now;
1352 if (img_request)
1353 snapc = img_request->snapc;
2fa12320
AE
1354 } else if (img_request) {
1355 snap_id = img_request->snap_id;
8c042b0d
AE
1356 }
1357 ceph_osdc_build_request(osd_req, obj_request->offset,
79528734 1358 snapc, snap_id, mtime);
430c28c3
AE
1359}
1360
bf0d5f50
AE
1361static struct ceph_osd_request *rbd_osd_req_create(
1362 struct rbd_device *rbd_dev,
1363 bool write_request,
430c28c3 1364 struct rbd_obj_request *obj_request)
bf0d5f50
AE
1365{
1366 struct rbd_img_request *img_request = obj_request->img_request;
1367 struct ceph_snap_context *snapc = NULL;
1368 struct ceph_osd_client *osdc;
1369 struct ceph_osd_request *osd_req;
bf0d5f50
AE
1370
1371 if (img_request) {
1372 rbd_assert(img_request->write_request == write_request);
1373 if (img_request->write_request)
1374 snapc = img_request->snapc;
bf0d5f50
AE
1375 }
1376
1377 /* Allocate and initialize the request, for the single op */
1378
1379 osdc = &rbd_dev->rbd_client->client->osdc;
1380 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1381 if (!osd_req)
1382 return NULL; /* ENOMEM */
bf0d5f50 1383
430c28c3 1384 if (write_request)
bf0d5f50 1385 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1386 else
bf0d5f50 1387 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1388
1389 osd_req->r_callback = rbd_osd_req_callback;
1390 osd_req->r_priv = obj_request;
1391
1392 osd_req->r_oid_len = strlen(obj_request->object_name);
1393 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1394 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1395
1396 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1397
bf0d5f50
AE
1398 return osd_req;
1399}
1400
1401static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1402{
1403 ceph_osdc_put_request(osd_req);
1404}
1405
1406/* object_name is assumed to be a non-null pointer and NUL-terminated */
1407
1408static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1409 u64 offset, u64 length,
1410 enum obj_request_type type)
1411{
1412 struct rbd_obj_request *obj_request;
1413 size_t size;
1414 char *name;
1415
1416 rbd_assert(obj_request_type_valid(type));
1417
1418 size = strlen(object_name) + 1;
1419 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1420 if (!obj_request)
1421 return NULL;
1422
1423 name = (char *)(obj_request + 1);
1424 obj_request->object_name = memcpy(name, object_name, size);
1425 obj_request->offset = offset;
1426 obj_request->length = length;
1427 obj_request->which = BAD_WHICH;
1428 obj_request->type = type;
1429 INIT_LIST_HEAD(&obj_request->links);
07741308 1430 obj_request_done_init(obj_request);
788e2df3 1431 init_completion(&obj_request->completion);
bf0d5f50
AE
1432 kref_init(&obj_request->kref);
1433
37206ee5
AE
1434 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1435 offset, length, (int)type, obj_request);
1436
bf0d5f50
AE
1437 return obj_request;
1438}
1439
1440static void rbd_obj_request_destroy(struct kref *kref)
1441{
1442 struct rbd_obj_request *obj_request;
1443
1444 obj_request = container_of(kref, struct rbd_obj_request, kref);
1445
37206ee5
AE
1446 dout("%s: obj %p\n", __func__, obj_request);
1447
bf0d5f50
AE
1448 rbd_assert(obj_request->img_request == NULL);
1449 rbd_assert(obj_request->which == BAD_WHICH);
1450
1451 if (obj_request->osd_req)
1452 rbd_osd_req_destroy(obj_request->osd_req);
1453
1454 rbd_assert(obj_request_type_valid(obj_request->type));
1455 switch (obj_request->type) {
9969ebc5
AE
1456 case OBJ_REQUEST_NODATA:
1457 break; /* Nothing to do */
bf0d5f50
AE
1458 case OBJ_REQUEST_BIO:
1459 if (obj_request->bio_list)
1460 bio_chain_put(obj_request->bio_list);
1461 break;
788e2df3
AE
1462 case OBJ_REQUEST_PAGES:
1463 if (obj_request->pages)
1464 ceph_release_page_vector(obj_request->pages,
1465 obj_request->page_count);
1466 break;
bf0d5f50
AE
1467 }
1468
1469 kfree(obj_request);
1470}
1471
1472/*
1473 * Caller is responsible for filling in the list of object requests
1474 * that comprises the image request, and the Linux request pointer
1475 * (if there is one).
1476 */
cc344fa1
AE
1477static struct rbd_img_request *rbd_img_request_create(
1478 struct rbd_device *rbd_dev,
bf0d5f50
AE
1479 u64 offset, u64 length,
1480 bool write_request)
1481{
1482 struct rbd_img_request *img_request;
1483 struct ceph_snap_context *snapc = NULL;
1484
1485 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1486 if (!img_request)
1487 return NULL;
1488
1489 if (write_request) {
1490 down_read(&rbd_dev->header_rwsem);
1491 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1492 up_read(&rbd_dev->header_rwsem);
1493 if (WARN_ON(!snapc)) {
1494 kfree(img_request);
1495 return NULL; /* Shouldn't happen */
1496 }
1497 }
1498
1499 img_request->rq = NULL;
1500 img_request->rbd_dev = rbd_dev;
1501 img_request->offset = offset;
1502 img_request->length = length;
1503 img_request->write_request = write_request;
1504 if (write_request)
1505 img_request->snapc = snapc;
1506 else
1507 img_request->snap_id = rbd_dev->spec->snap_id;
1508 spin_lock_init(&img_request->completion_lock);
1509 img_request->next_completion = 0;
1510 img_request->callback = NULL;
a5a337d4 1511 img_request->result = 0;
bf0d5f50
AE
1512 img_request->obj_request_count = 0;
1513 INIT_LIST_HEAD(&img_request->obj_requests);
1514 kref_init(&img_request->kref);
1515
1516 rbd_img_request_get(img_request); /* Avoid a warning */
1517 rbd_img_request_put(img_request); /* TEMPORARY */
1518
37206ee5
AE
1519 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1520 write_request ? "write" : "read", offset, length,
1521 img_request);
1522
bf0d5f50
AE
1523 return img_request;
1524}
1525
1526static void rbd_img_request_destroy(struct kref *kref)
1527{
1528 struct rbd_img_request *img_request;
1529 struct rbd_obj_request *obj_request;
1530 struct rbd_obj_request *next_obj_request;
1531
1532 img_request = container_of(kref, struct rbd_img_request, kref);
1533
37206ee5
AE
1534 dout("%s: img %p\n", __func__, img_request);
1535
bf0d5f50
AE
1536 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1537 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 1538 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50
AE
1539
1540 if (img_request->write_request)
1541 ceph_put_snap_context(img_request->snapc);
1542
1543 kfree(img_request);
1544}
1545
2169238d
AE
1546static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1547{
1548 struct rbd_img_request *img_request;
1549 u32 which = obj_request->which;
1550 bool more = true;
1551
1552 img_request = obj_request->img_request;
1553
1554 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1555 rbd_assert(img_request != NULL);
1556 rbd_assert(img_request->rq != NULL);
1557 rbd_assert(img_request->obj_request_count > 0);
1558 rbd_assert(which != BAD_WHICH);
1559 rbd_assert(which < img_request->obj_request_count);
1560 rbd_assert(which >= img_request->next_completion);
1561
1562 spin_lock_irq(&img_request->completion_lock);
1563 if (which != img_request->next_completion)
1564 goto out;
1565
1566 for_each_obj_request_from(img_request, obj_request) {
1567 unsigned int xferred;
1568 int result;
1569
1570 rbd_assert(more);
1571 rbd_assert(which < img_request->obj_request_count);
1572
1573 if (!obj_request_done_test(obj_request))
1574 break;
1575
a5a337d4
AE
1576 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1577 xferred = (unsigned int)obj_request->xferred;
1578 result = obj_request->result;
1579 if (result) {
7da22d29
AE
1580 struct rbd_device *rbd_dev = img_request->rbd_dev;
1581
1582 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
2169238d 1583 img_request->write_request ? "write" : "read",
7da22d29
AE
1584 obj_request->length, obj_request->img_offset,
1585 obj_request->offset);
1586 rbd_warn(rbd_dev, " result %d xferred %x\n",
2169238d 1587 result, xferred);
a5a337d4
AE
1588 if (!img_request->result)
1589 img_request->result = result;
1590 }
2169238d
AE
1591
1592 more = blk_end_request(img_request->rq, result, xferred);
1593 which++;
1594 }
1595
1596 rbd_assert(more ^ (which == img_request->obj_request_count));
1597 img_request->next_completion = which;
1598out:
1599 spin_unlock_irq(&img_request->completion_lock);
1600
1601 if (!more)
1602 rbd_img_request_complete(img_request);
1603}
1604
bf0d5f50
AE
1605static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1606 struct bio *bio_list)
1607{
1608 struct rbd_device *rbd_dev = img_request->rbd_dev;
1609 struct rbd_obj_request *obj_request = NULL;
1610 struct rbd_obj_request *next_obj_request;
430c28c3 1611 bool write_request = img_request->write_request;
bf0d5f50 1612 unsigned int bio_offset;
7da22d29 1613 u64 img_offset;
bf0d5f50
AE
1614 u64 resid;
1615 u16 opcode;
1616
37206ee5
AE
1617 dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1618
430c28c3 1619 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
bf0d5f50 1620 bio_offset = 0;
7da22d29
AE
1621 img_offset = img_request->offset;
1622 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
bf0d5f50 1623 resid = img_request->length;
4dda41d3 1624 rbd_assert(resid > 0);
bf0d5f50 1625 while (resid) {
2fa12320 1626 struct ceph_osd_request *osd_req;
bf0d5f50
AE
1627 const char *object_name;
1628 unsigned int clone_size;
bf0d5f50
AE
1629 u64 offset;
1630 u64 length;
1631
7da22d29 1632 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
1633 if (!object_name)
1634 goto out_unwind;
7da22d29
AE
1635 offset = rbd_segment_offset(rbd_dev, img_offset);
1636 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50
AE
1637 obj_request = rbd_obj_request_create(object_name,
1638 offset, length,
1639 OBJ_REQUEST_BIO);
1640 kfree(object_name); /* object request has its own copy */
1641 if (!obj_request)
1642 goto out_unwind;
1643
1644 rbd_assert(length <= (u64) UINT_MAX);
1645 clone_size = (unsigned int) length;
1646 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1647 &bio_offset, clone_size,
1648 GFP_ATOMIC);
1649 if (!obj_request->bio_list)
1650 goto out_partial;
1651
2fa12320
AE
1652 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1653 obj_request);
1654 if (!osd_req)
bf0d5f50 1655 goto out_partial;
2fa12320 1656 obj_request->osd_req = osd_req;
2169238d 1657 obj_request->callback = rbd_img_obj_callback;
430c28c3 1658
2fa12320
AE
1659 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1660 0, 0);
a4ce40a9
AE
1661 osd_req_op_extent_osd_data_bio(osd_req, 0, write_request,
1662 obj_request->bio_list, obj_request->length);
2fa12320 1663 rbd_osd_req_format(obj_request, write_request);
430c28c3 1664
7da22d29 1665 obj_request->img_offset = img_offset;
bf0d5f50
AE
1666 rbd_img_obj_request_add(img_request, obj_request);
1667
7da22d29 1668 img_offset += length;
bf0d5f50
AE
1669 resid -= length;
1670 }
1671
1672 return 0;
1673
1674out_partial:
1675 rbd_obj_request_put(obj_request);
1676out_unwind:
1677 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1678 rbd_obj_request_put(obj_request);
1679
1680 return -ENOMEM;
1681}
1682
bf0d5f50
AE
1683static int rbd_img_request_submit(struct rbd_img_request *img_request)
1684{
1685 struct rbd_device *rbd_dev = img_request->rbd_dev;
1686 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1687 struct rbd_obj_request *obj_request;
46faeed4 1688 struct rbd_obj_request *next_obj_request;
bf0d5f50 1689
37206ee5 1690 dout("%s: img %p\n", __func__, img_request);
46faeed4 1691 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
1692 int ret;
1693
bf0d5f50
AE
1694 ret = rbd_obj_request_submit(osdc, obj_request);
1695 if (ret)
1696 return ret;
1697 /*
1698 * The image request has its own reference to each
1699 * of its object requests, so we can safely drop the
1700 * initial one here.
1701 */
1702 rbd_obj_request_put(obj_request);
1703 }
1704
1705 return 0;
1706}
1707
cf81b60e 1708static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
b8d70035
AE
1709 u64 ver, u64 notify_id)
1710{
1711 struct rbd_obj_request *obj_request;
2169238d 1712 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
b8d70035
AE
1713 int ret;
1714
1715 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1716 OBJ_REQUEST_NODATA);
1717 if (!obj_request)
1718 return -ENOMEM;
1719
1720 ret = -ENOMEM;
430c28c3 1721 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
b8d70035
AE
1722 if (!obj_request->osd_req)
1723 goto out;
2169238d 1724 obj_request->callback = rbd_obj_request_put;
b8d70035 1725
c99d2d4a
AE
1726 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
1727 notify_id, ver, 0);
2fa12320 1728 rbd_osd_req_format(obj_request, false);
430c28c3 1729
b8d70035 1730 ret = rbd_obj_request_submit(osdc, obj_request);
b8d70035 1731out:
cf81b60e
AE
1732 if (ret)
1733 rbd_obj_request_put(obj_request);
b8d70035
AE
1734
1735 return ret;
1736}
1737
1738static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1739{
1740 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1741 u64 hver;
1742 int rc;
1743
1744 if (!rbd_dev)
1745 return;
1746
37206ee5 1747 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
b8d70035
AE
1748 rbd_dev->header_name, (unsigned long long) notify_id,
1749 (unsigned int) opcode);
1750 rc = rbd_dev_refresh(rbd_dev, &hver);
1751 if (rc)
1752 rbd_warn(rbd_dev, "got notification but failed to "
1753 " update snaps: %d\n", rc);
1754
cf81b60e 1755 rbd_obj_notify_ack(rbd_dev, hver, notify_id);
b8d70035
AE
1756}
1757
9969ebc5
AE
1758/*
1759 * Request sync osd watch/unwatch. The value of "start" determines
1760 * whether a watch request is being initiated or torn down.
1761 */
1762static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1763{
1764 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1765 struct rbd_obj_request *obj_request;
9969ebc5
AE
1766 int ret;
1767
1768 rbd_assert(start ^ !!rbd_dev->watch_event);
1769 rbd_assert(start ^ !!rbd_dev->watch_request);
1770
1771 if (start) {
3c663bbd 1772 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
1773 &rbd_dev->watch_event);
1774 if (ret < 0)
1775 return ret;
8eb87565 1776 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
1777 }
1778
1779 ret = -ENOMEM;
1780 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1781 OBJ_REQUEST_NODATA);
1782 if (!obj_request)
1783 goto out_cancel;
1784
430c28c3
AE
1785 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
1786 if (!obj_request->osd_req)
1787 goto out_cancel;
1788
8eb87565 1789 if (start)
975241af 1790 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 1791 else
6977c3f9 1792 ceph_osdc_unregister_linger_request(osdc,
975241af 1793 rbd_dev->watch_request->osd_req);
2169238d
AE
1794
1795 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1796 rbd_dev->watch_event->cookie,
1797 rbd_dev->header.obj_version, start);
1798 rbd_osd_req_format(obj_request, true);
1799
9969ebc5
AE
1800 ret = rbd_obj_request_submit(osdc, obj_request);
1801 if (ret)
1802 goto out_cancel;
1803 ret = rbd_obj_request_wait(obj_request);
1804 if (ret)
1805 goto out_cancel;
9969ebc5
AE
1806 ret = obj_request->result;
1807 if (ret)
1808 goto out_cancel;
1809
8eb87565
AE
1810 /*
1811 * A watch request is set to linger, so the underlying osd
1812 * request won't go away until we unregister it. We retain
1813 * a pointer to the object request during that time (in
1814 * rbd_dev->watch_request), so we'll keep a reference to
1815 * it. We'll drop that reference (below) after we've
1816 * unregistered it.
1817 */
1818 if (start) {
1819 rbd_dev->watch_request = obj_request;
1820
1821 return 0;
1822 }
1823
1824 /* We have successfully torn down the watch request */
1825
1826 rbd_obj_request_put(rbd_dev->watch_request);
1827 rbd_dev->watch_request = NULL;
9969ebc5
AE
1828out_cancel:
1829 /* Cancel the event if we're tearing down, or on error */
1830 ceph_osdc_cancel_event(rbd_dev->watch_event);
1831 rbd_dev->watch_event = NULL;
9969ebc5
AE
1832 if (obj_request)
1833 rbd_obj_request_put(obj_request);
1834
1835 return ret;
1836}
1837
36be9a76
AE
1838/*
1839 * Synchronous osd object method call
1840 */
1841static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1842 const char *object_name,
1843 const char *class_name,
1844 const char *method_name,
1845 const char *outbound,
1846 size_t outbound_size,
1847 char *inbound,
1848 size_t inbound_size,
1849 u64 *version)
1850{
2169238d 1851 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 1852 struct rbd_obj_request *obj_request;
36be9a76
AE
1853 struct page **pages;
1854 u32 page_count;
1855 int ret;
1856
1857 /*
6010a451
AE
1858 * Method calls are ultimately read operations. The result
1859 * should placed into the inbound buffer provided. They
1860 * also supply outbound data--parameters for the object
1861 * method. Currently if this is present it will be a
1862 * snapshot id.
36be9a76
AE
1863 */
1864 page_count = (u32) calc_pages_for(0, inbound_size);
1865 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1866 if (IS_ERR(pages))
1867 return PTR_ERR(pages);
1868
1869 ret = -ENOMEM;
6010a451 1870 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
1871 OBJ_REQUEST_PAGES);
1872 if (!obj_request)
1873 goto out;
1874
1875 obj_request->pages = pages;
1876 obj_request->page_count = page_count;
1877
430c28c3 1878 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
36be9a76
AE
1879 if (!obj_request->osd_req)
1880 goto out;
1881
c99d2d4a 1882 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
1883 class_name, method_name);
1884 if (outbound_size) {
1885 struct ceph_pagelist *pagelist;
1886
1887 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
1888 if (!pagelist)
1889 goto out;
1890
1891 ceph_pagelist_init(pagelist);
1892 ceph_pagelist_append(pagelist, outbound, outbound_size);
1893 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
1894 pagelist);
1895 }
a4ce40a9
AE
1896 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
1897 obj_request->pages, inbound_size,
44cd188d 1898 0, false, false);
2fa12320 1899 rbd_osd_req_format(obj_request, false);
430c28c3 1900
36be9a76
AE
1901 ret = rbd_obj_request_submit(osdc, obj_request);
1902 if (ret)
1903 goto out;
1904 ret = rbd_obj_request_wait(obj_request);
1905 if (ret)
1906 goto out;
1907
1908 ret = obj_request->result;
1909 if (ret < 0)
1910 goto out;
23ed6e13 1911 ret = 0;
903bb32e 1912 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
1913 if (version)
1914 *version = obj_request->version;
1915out:
1916 if (obj_request)
1917 rbd_obj_request_put(obj_request);
1918 else
1919 ceph_release_page_vector(pages, page_count);
1920
1921 return ret;
1922}
1923
bf0d5f50 1924static void rbd_request_fn(struct request_queue *q)
cc344fa1 1925 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
1926{
1927 struct rbd_device *rbd_dev = q->queuedata;
1928 bool read_only = rbd_dev->mapping.read_only;
1929 struct request *rq;
1930 int result;
1931
1932 while ((rq = blk_fetch_request(q))) {
1933 bool write_request = rq_data_dir(rq) == WRITE;
1934 struct rbd_img_request *img_request;
1935 u64 offset;
1936 u64 length;
1937
1938 /* Ignore any non-FS requests that filter through. */
1939
1940 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
1941 dout("%s: non-fs request type %d\n", __func__,
1942 (int) rq->cmd_type);
1943 __blk_end_request_all(rq, 0);
1944 continue;
1945 }
1946
1947 /* Ignore/skip any zero-length requests */
1948
1949 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
1950 length = (u64) blk_rq_bytes(rq);
1951
1952 if (!length) {
1953 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
1954 __blk_end_request_all(rq, 0);
1955 continue;
1956 }
1957
1958 spin_unlock_irq(q->queue_lock);
1959
1960 /* Disallow writes to a read-only device */
1961
1962 if (write_request) {
1963 result = -EROFS;
1964 if (read_only)
1965 goto end_request;
1966 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
1967 }
1968
6d292906
AE
1969 /*
1970 * Quit early if the mapped snapshot no longer
1971 * exists. It's still possible the snapshot will
1972 * have disappeared by the time our request arrives
1973 * at the osd, but there's no sense in sending it if
1974 * we already know.
1975 */
1976 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
1977 dout("request for non-existent snapshot");
1978 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1979 result = -ENXIO;
1980 goto end_request;
1981 }
1982
bf0d5f50
AE
1983 result = -EINVAL;
1984 if (WARN_ON(offset && length > U64_MAX - offset + 1))
1985 goto end_request; /* Shouldn't happen */
1986
1987 result = -ENOMEM;
1988 img_request = rbd_img_request_create(rbd_dev, offset, length,
1989 write_request);
1990 if (!img_request)
1991 goto end_request;
1992
1993 img_request->rq = rq;
1994
1995 result = rbd_img_request_fill_bio(img_request, rq->bio);
1996 if (!result)
1997 result = rbd_img_request_submit(img_request);
1998 if (result)
1999 rbd_img_request_put(img_request);
2000end_request:
2001 spin_lock_irq(q->queue_lock);
2002 if (result < 0) {
7da22d29
AE
2003 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2004 write_request ? "write" : "read",
2005 length, offset, result);
2006
bf0d5f50
AE
2007 __blk_end_request_all(rq, result);
2008 }
2009 }
2010}
2011
602adf40
YS
2012/*
2013 * a queue callback. Makes sure that we don't create a bio that spans across
2014 * multiple osd objects. One exception would be with a single page bios,
f7760dad 2015 * which we handle later at bio_chain_clone_range()
602adf40
YS
2016 */
2017static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2018 struct bio_vec *bvec)
2019{
2020 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
2021 sector_t sector_offset;
2022 sector_t sectors_per_obj;
2023 sector_t obj_sector_offset;
2024 int ret;
2025
2026 /*
2027 * Find how far into its rbd object the partition-relative
2028 * bio start sector is to offset relative to the enclosing
2029 * device.
2030 */
2031 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2032 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2033 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2034
2035 /*
2036 * Compute the number of bytes from that offset to the end
2037 * of the object. Account for what's already used by the bio.
2038 */
2039 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2040 if (ret > bmd->bi_size)
2041 ret -= bmd->bi_size;
2042 else
2043 ret = 0;
2044
2045 /*
2046 * Don't send back more than was asked for. And if the bio
2047 * was empty, let the whole thing through because: "Note
2048 * that a block device *must* allow a single page to be
2049 * added to an empty bio."
2050 */
2051 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2052 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2053 ret = (int) bvec->bv_len;
2054
2055 return ret;
602adf40
YS
2056}
2057
2058static void rbd_free_disk(struct rbd_device *rbd_dev)
2059{
2060 struct gendisk *disk = rbd_dev->disk;
2061
2062 if (!disk)
2063 return;
2064
602adf40
YS
2065 if (disk->flags & GENHD_FL_UP)
2066 del_gendisk(disk);
2067 if (disk->queue)
2068 blk_cleanup_queue(disk->queue);
2069 put_disk(disk);
2070}
2071
788e2df3
AE
2072static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2073 const char *object_name,
2074 u64 offset, u64 length,
2075 char *buf, u64 *version)
2076
2077{
2169238d 2078 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 2079 struct rbd_obj_request *obj_request;
788e2df3
AE
2080 struct page **pages = NULL;
2081 u32 page_count;
1ceae7ef 2082 size_t size;
788e2df3
AE
2083 int ret;
2084
2085 page_count = (u32) calc_pages_for(offset, length);
2086 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2087 if (IS_ERR(pages))
2088 ret = PTR_ERR(pages);
2089
2090 ret = -ENOMEM;
2091 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 2092 OBJ_REQUEST_PAGES);
788e2df3
AE
2093 if (!obj_request)
2094 goto out;
2095
2096 obj_request->pages = pages;
2097 obj_request->page_count = page_count;
2098
430c28c3 2099 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
788e2df3
AE
2100 if (!obj_request->osd_req)
2101 goto out;
2102
c99d2d4a
AE
2103 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2104 offset, length, 0, 0);
a4ce40a9
AE
2105 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, false,
2106 obj_request->pages,
44cd188d
AE
2107 obj_request->length,
2108 obj_request->offset & ~PAGE_MASK,
2109 false, false);
2fa12320 2110 rbd_osd_req_format(obj_request, false);
430c28c3 2111
788e2df3
AE
2112 ret = rbd_obj_request_submit(osdc, obj_request);
2113 if (ret)
2114 goto out;
2115 ret = rbd_obj_request_wait(obj_request);
2116 if (ret)
2117 goto out;
2118
2119 ret = obj_request->result;
2120 if (ret < 0)
2121 goto out;
1ceae7ef
AE
2122
2123 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2124 size = (size_t) obj_request->xferred;
903bb32e 2125 ceph_copy_from_page_vector(pages, buf, 0, size);
23ed6e13
AE
2126 rbd_assert(size <= (size_t) INT_MAX);
2127 ret = (int) size;
788e2df3
AE
2128 if (version)
2129 *version = obj_request->version;
2130out:
2131 if (obj_request)
2132 rbd_obj_request_put(obj_request);
2133 else
2134 ceph_release_page_vector(pages, page_count);
2135
2136 return ret;
2137}
2138
602adf40 2139/*
4156d998
AE
2140 * Read the complete header for the given rbd device.
2141 *
2142 * Returns a pointer to a dynamically-allocated buffer containing
2143 * the complete and validated header. Caller can pass the address
2144 * of a variable that will be filled in with the version of the
2145 * header object at the time it was read.
2146 *
2147 * Returns a pointer-coded errno if a failure occurs.
602adf40 2148 */
4156d998
AE
2149static struct rbd_image_header_ondisk *
2150rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
602adf40 2151{
4156d998 2152 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 2153 u32 snap_count = 0;
4156d998
AE
2154 u64 names_size = 0;
2155 u32 want_count;
2156 int ret;
602adf40 2157
00f1f36f 2158 /*
4156d998
AE
2159 * The complete header will include an array of its 64-bit
2160 * snapshot ids, followed by the names of those snapshots as
2161 * a contiguous block of NUL-terminated strings. Note that
2162 * the number of snapshots could change by the time we read
2163 * it in, in which case we re-read it.
00f1f36f 2164 */
4156d998
AE
2165 do {
2166 size_t size;
2167
2168 kfree(ondisk);
2169
2170 size = sizeof (*ondisk);
2171 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2172 size += names_size;
2173 ondisk = kmalloc(size, GFP_KERNEL);
2174 if (!ondisk)
2175 return ERR_PTR(-ENOMEM);
2176
788e2df3 2177 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
4156d998
AE
2178 0, size,
2179 (char *) ondisk, version);
4156d998
AE
2180 if (ret < 0)
2181 goto out_err;
2182 if (WARN_ON((size_t) ret < size)) {
2183 ret = -ENXIO;
06ecc6cb
AE
2184 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2185 size, ret);
4156d998
AE
2186 goto out_err;
2187 }
2188 if (!rbd_dev_ondisk_valid(ondisk)) {
2189 ret = -ENXIO;
06ecc6cb 2190 rbd_warn(rbd_dev, "invalid header");
4156d998 2191 goto out_err;
81e759fb 2192 }
602adf40 2193
4156d998
AE
2194 names_size = le64_to_cpu(ondisk->snap_names_len);
2195 want_count = snap_count;
2196 snap_count = le32_to_cpu(ondisk->snap_count);
2197 } while (snap_count != want_count);
00f1f36f 2198
4156d998 2199 return ondisk;
00f1f36f 2200
4156d998
AE
2201out_err:
2202 kfree(ondisk);
2203
2204 return ERR_PTR(ret);
2205}
2206
2207/*
2208 * reload the ondisk the header
2209 */
2210static int rbd_read_header(struct rbd_device *rbd_dev,
2211 struct rbd_image_header *header)
2212{
2213 struct rbd_image_header_ondisk *ondisk;
2214 u64 ver = 0;
2215 int ret;
602adf40 2216
4156d998
AE
2217 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2218 if (IS_ERR(ondisk))
2219 return PTR_ERR(ondisk);
2220 ret = rbd_header_from_disk(header, ondisk);
2221 if (ret >= 0)
2222 header->obj_version = ver;
2223 kfree(ondisk);
2224
2225 return ret;
602adf40
YS
2226}
2227
41f38c2b 2228static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
dfc5606d
YS
2229{
2230 struct rbd_snap *snap;
a0593290 2231 struct rbd_snap *next;
dfc5606d 2232
a0593290 2233 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
41f38c2b 2234 rbd_remove_snap_dev(snap);
dfc5606d
YS
2235}
2236
9478554a
AE
2237static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2238{
2239 sector_t size;
2240
0d7dbfce 2241 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
9478554a
AE
2242 return;
2243
2244 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2245 dout("setting size to %llu sectors", (unsigned long long) size);
2246 rbd_dev->mapping.size = (u64) size;
2247 set_capacity(rbd_dev->disk, size);
2248}
2249
602adf40
YS
2250/*
2251 * only read the first part of the ondisk header, without the snaps info
2252 */
117973fb 2253static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
602adf40
YS
2254{
2255 int ret;
2256 struct rbd_image_header h;
602adf40
YS
2257
2258 ret = rbd_read_header(rbd_dev, &h);
2259 if (ret < 0)
2260 return ret;
2261
a51aa0c0
JD
2262 down_write(&rbd_dev->header_rwsem);
2263
9478554a
AE
2264 /* Update image size, and check for resize of mapped image */
2265 rbd_dev->header.image_size = h.image_size;
2266 rbd_update_mapping_size(rbd_dev);
9db4b3e3 2267
849b4260 2268 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 2269 kfree(rbd_dev->header.snap_sizes);
849b4260 2270 kfree(rbd_dev->header.snap_names);
d1d25646
JD
2271 /* osd requests may still refer to snapc */
2272 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 2273
b813623a
AE
2274 if (hver)
2275 *hver = h.obj_version;
a71b891b 2276 rbd_dev->header.obj_version = h.obj_version;
93a24e08 2277 rbd_dev->header.image_size = h.image_size;
602adf40
YS
2278 rbd_dev->header.snapc = h.snapc;
2279 rbd_dev->header.snap_names = h.snap_names;
2280 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260
AE
2281 /* Free the extra copy of the object prefix */
2282 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2283 kfree(h.object_prefix);
2284
304f6808
AE
2285 ret = rbd_dev_snaps_update(rbd_dev);
2286 if (!ret)
2287 ret = rbd_dev_snaps_register(rbd_dev);
dfc5606d 2288
c666601a 2289 up_write(&rbd_dev->header_rwsem);
602adf40 2290
dfc5606d 2291 return ret;
602adf40
YS
2292}
2293
117973fb 2294static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1fe5e993
AE
2295{
2296 int ret;
2297
117973fb 2298 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1fe5e993 2299 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
117973fb
AE
2300 if (rbd_dev->image_format == 1)
2301 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2302 else
2303 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1fe5e993
AE
2304 mutex_unlock(&ctl_mutex);
2305
2306 return ret;
2307}
2308
602adf40
YS
2309static int rbd_init_disk(struct rbd_device *rbd_dev)
2310{
2311 struct gendisk *disk;
2312 struct request_queue *q;
593a9e7b 2313 u64 segment_size;
602adf40 2314
602adf40 2315 /* create gendisk info */
602adf40
YS
2316 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2317 if (!disk)
1fcdb8aa 2318 return -ENOMEM;
602adf40 2319
f0f8cef5 2320 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 2321 rbd_dev->dev_id);
602adf40
YS
2322 disk->major = rbd_dev->major;
2323 disk->first_minor = 0;
2324 disk->fops = &rbd_bd_ops;
2325 disk->private_data = rbd_dev;
2326
bf0d5f50 2327 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
2328 if (!q)
2329 goto out_disk;
029bcbd8 2330
593a9e7b
AE
2331 /* We use the default size, but let's be explicit about it. */
2332 blk_queue_physical_block_size(q, SECTOR_SIZE);
2333
029bcbd8 2334 /* set io sizes to object size */
593a9e7b
AE
2335 segment_size = rbd_obj_bytes(&rbd_dev->header);
2336 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2337 blk_queue_max_segment_size(q, segment_size);
2338 blk_queue_io_min(q, segment_size);
2339 blk_queue_io_opt(q, segment_size);
029bcbd8 2340
602adf40
YS
2341 blk_queue_merge_bvec(q, rbd_merge_bvec);
2342 disk->queue = q;
2343
2344 q->queuedata = rbd_dev;
2345
2346 rbd_dev->disk = disk;
602adf40 2347
12f02944
AE
2348 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2349
602adf40 2350 return 0;
602adf40
YS
2351out_disk:
2352 put_disk(disk);
1fcdb8aa
AE
2353
2354 return -ENOMEM;
602adf40
YS
2355}
2356
dfc5606d
YS
2357/*
2358 sysfs
2359*/
2360
593a9e7b
AE
2361static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2362{
2363 return container_of(dev, struct rbd_device, dev);
2364}
2365
dfc5606d
YS
2366static ssize_t rbd_size_show(struct device *dev,
2367 struct device_attribute *attr, char *buf)
2368{
593a9e7b 2369 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0
JD
2370 sector_t size;
2371
2372 down_read(&rbd_dev->header_rwsem);
2373 size = get_capacity(rbd_dev->disk);
2374 up_read(&rbd_dev->header_rwsem);
dfc5606d 2375
a51aa0c0 2376 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
dfc5606d
YS
2377}
2378
34b13184
AE
2379/*
2380 * Note this shows the features for whatever's mapped, which is not
2381 * necessarily the base image.
2382 */
2383static ssize_t rbd_features_show(struct device *dev,
2384 struct device_attribute *attr, char *buf)
2385{
2386 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2387
2388 return sprintf(buf, "0x%016llx\n",
2389 (unsigned long long) rbd_dev->mapping.features);
2390}
2391
dfc5606d
YS
2392static ssize_t rbd_major_show(struct device *dev,
2393 struct device_attribute *attr, char *buf)
2394{
593a9e7b 2395 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 2396
dfc5606d
YS
2397 return sprintf(buf, "%d\n", rbd_dev->major);
2398}
2399
2400static ssize_t rbd_client_id_show(struct device *dev,
2401 struct device_attribute *attr, char *buf)
602adf40 2402{
593a9e7b 2403 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2404
1dbb4399
AE
2405 return sprintf(buf, "client%lld\n",
2406 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
2407}
2408
dfc5606d
YS
2409static ssize_t rbd_pool_show(struct device *dev,
2410 struct device_attribute *attr, char *buf)
602adf40 2411{
593a9e7b 2412 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2413
0d7dbfce 2414 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
2415}
2416
9bb2f334
AE
2417static ssize_t rbd_pool_id_show(struct device *dev,
2418 struct device_attribute *attr, char *buf)
2419{
2420 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2421
0d7dbfce
AE
2422 return sprintf(buf, "%llu\n",
2423 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
2424}
2425
dfc5606d
YS
2426static ssize_t rbd_name_show(struct device *dev,
2427 struct device_attribute *attr, char *buf)
2428{
593a9e7b 2429 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2430
a92ffdf8
AE
2431 if (rbd_dev->spec->image_name)
2432 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2433
2434 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
2435}
2436
589d30e0
AE
2437static ssize_t rbd_image_id_show(struct device *dev,
2438 struct device_attribute *attr, char *buf)
2439{
2440 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2441
0d7dbfce 2442 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
2443}
2444
34b13184
AE
2445/*
2446 * Shows the name of the currently-mapped snapshot (or
2447 * RBD_SNAP_HEAD_NAME for the base image).
2448 */
dfc5606d
YS
2449static ssize_t rbd_snap_show(struct device *dev,
2450 struct device_attribute *attr,
2451 char *buf)
2452{
593a9e7b 2453 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2454
0d7dbfce 2455 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
2456}
2457
86b00e0d
AE
2458/*
2459 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2460 * for the parent image. If there is no parent, simply shows
2461 * "(no parent image)".
2462 */
2463static ssize_t rbd_parent_show(struct device *dev,
2464 struct device_attribute *attr,
2465 char *buf)
2466{
2467 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2468 struct rbd_spec *spec = rbd_dev->parent_spec;
2469 int count;
2470 char *bufp = buf;
2471
2472 if (!spec)
2473 return sprintf(buf, "(no parent image)\n");
2474
2475 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2476 (unsigned long long) spec->pool_id, spec->pool_name);
2477 if (count < 0)
2478 return count;
2479 bufp += count;
2480
2481 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2482 spec->image_name ? spec->image_name : "(unknown)");
2483 if (count < 0)
2484 return count;
2485 bufp += count;
2486
2487 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2488 (unsigned long long) spec->snap_id, spec->snap_name);
2489 if (count < 0)
2490 return count;
2491 bufp += count;
2492
2493 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2494 if (count < 0)
2495 return count;
2496 bufp += count;
2497
2498 return (ssize_t) (bufp - buf);
2499}
2500
dfc5606d
YS
2501static ssize_t rbd_image_refresh(struct device *dev,
2502 struct device_attribute *attr,
2503 const char *buf,
2504 size_t size)
2505{
593a9e7b 2506 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 2507 int ret;
602adf40 2508
117973fb 2509 ret = rbd_dev_refresh(rbd_dev, NULL);
b813623a
AE
2510
2511 return ret < 0 ? ret : size;
dfc5606d 2512}
602adf40 2513
dfc5606d 2514static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 2515static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
2516static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2517static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2518static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 2519static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 2520static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 2521static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
2522static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2523static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 2524static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
2525
2526static struct attribute *rbd_attrs[] = {
2527 &dev_attr_size.attr,
34b13184 2528 &dev_attr_features.attr,
dfc5606d
YS
2529 &dev_attr_major.attr,
2530 &dev_attr_client_id.attr,
2531 &dev_attr_pool.attr,
9bb2f334 2532 &dev_attr_pool_id.attr,
dfc5606d 2533 &dev_attr_name.attr,
589d30e0 2534 &dev_attr_image_id.attr,
dfc5606d 2535 &dev_attr_current_snap.attr,
86b00e0d 2536 &dev_attr_parent.attr,
dfc5606d 2537 &dev_attr_refresh.attr,
dfc5606d
YS
2538 NULL
2539};
2540
2541static struct attribute_group rbd_attr_group = {
2542 .attrs = rbd_attrs,
2543};
2544
2545static const struct attribute_group *rbd_attr_groups[] = {
2546 &rbd_attr_group,
2547 NULL
2548};
2549
2550static void rbd_sysfs_dev_release(struct device *dev)
2551{
2552}
2553
2554static struct device_type rbd_device_type = {
2555 .name = "rbd",
2556 .groups = rbd_attr_groups,
2557 .release = rbd_sysfs_dev_release,
2558};
2559
2560
2561/*
2562 sysfs - snapshots
2563*/
2564
2565static ssize_t rbd_snap_size_show(struct device *dev,
2566 struct device_attribute *attr,
2567 char *buf)
2568{
2569 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2570
3591538f 2571 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
dfc5606d
YS
2572}
2573
2574static ssize_t rbd_snap_id_show(struct device *dev,
2575 struct device_attribute *attr,
2576 char *buf)
2577{
2578 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2579
3591538f 2580 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
dfc5606d
YS
2581}
2582
34b13184
AE
2583static ssize_t rbd_snap_features_show(struct device *dev,
2584 struct device_attribute *attr,
2585 char *buf)
2586{
2587 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2588
2589 return sprintf(buf, "0x%016llx\n",
2590 (unsigned long long) snap->features);
2591}
2592
dfc5606d
YS
2593static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2594static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
34b13184 2595static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
dfc5606d
YS
2596
2597static struct attribute *rbd_snap_attrs[] = {
2598 &dev_attr_snap_size.attr,
2599 &dev_attr_snap_id.attr,
34b13184 2600 &dev_attr_snap_features.attr,
dfc5606d
YS
2601 NULL,
2602};
2603
2604static struct attribute_group rbd_snap_attr_group = {
2605 .attrs = rbd_snap_attrs,
2606};
2607
2608static void rbd_snap_dev_release(struct device *dev)
2609{
2610 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2611 kfree(snap->name);
2612 kfree(snap);
2613}
2614
2615static const struct attribute_group *rbd_snap_attr_groups[] = {
2616 &rbd_snap_attr_group,
2617 NULL
2618};
2619
2620static struct device_type rbd_snap_device_type = {
2621 .groups = rbd_snap_attr_groups,
2622 .release = rbd_snap_dev_release,
2623};
2624
8b8fb99c
AE
2625static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2626{
2627 kref_get(&spec->kref);
2628
2629 return spec;
2630}
2631
2632static void rbd_spec_free(struct kref *kref);
2633static void rbd_spec_put(struct rbd_spec *spec)
2634{
2635 if (spec)
2636 kref_put(&spec->kref, rbd_spec_free);
2637}
2638
2639static struct rbd_spec *rbd_spec_alloc(void)
2640{
2641 struct rbd_spec *spec;
2642
2643 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2644 if (!spec)
2645 return NULL;
2646 kref_init(&spec->kref);
2647
2648 rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */
2649
2650 return spec;
2651}
2652
2653static void rbd_spec_free(struct kref *kref)
2654{
2655 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2656
2657 kfree(spec->pool_name);
2658 kfree(spec->image_id);
2659 kfree(spec->image_name);
2660 kfree(spec->snap_name);
2661 kfree(spec);
2662}
2663
cc344fa1 2664static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
2665 struct rbd_spec *spec)
2666{
2667 struct rbd_device *rbd_dev;
2668
2669 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2670 if (!rbd_dev)
2671 return NULL;
2672
2673 spin_lock_init(&rbd_dev->lock);
6d292906 2674 rbd_dev->flags = 0;
c53d5893
AE
2675 INIT_LIST_HEAD(&rbd_dev->node);
2676 INIT_LIST_HEAD(&rbd_dev->snaps);
2677 init_rwsem(&rbd_dev->header_rwsem);
2678
2679 rbd_dev->spec = spec;
2680 rbd_dev->rbd_client = rbdc;
2681
0903e875
AE
2682 /* Initialize the layout used for all rbd requests */
2683
2684 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2685 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2686 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2687 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2688
c53d5893
AE
2689 return rbd_dev;
2690}
2691
2692static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2693{
86b00e0d 2694 rbd_spec_put(rbd_dev->parent_spec);
c53d5893
AE
2695 kfree(rbd_dev->header_name);
2696 rbd_put_client(rbd_dev->rbd_client);
2697 rbd_spec_put(rbd_dev->spec);
2698 kfree(rbd_dev);
2699}
2700
304f6808
AE
2701static bool rbd_snap_registered(struct rbd_snap *snap)
2702{
2703 bool ret = snap->dev.type == &rbd_snap_device_type;
2704 bool reg = device_is_registered(&snap->dev);
2705
2706 rbd_assert(!ret ^ reg);
2707
2708 return ret;
2709}
2710
41f38c2b 2711static void rbd_remove_snap_dev(struct rbd_snap *snap)
dfc5606d
YS
2712{
2713 list_del(&snap->node);
304f6808
AE
2714 if (device_is_registered(&snap->dev))
2715 device_unregister(&snap->dev);
dfc5606d
YS
2716}
2717
14e7085d 2718static int rbd_register_snap_dev(struct rbd_snap *snap,
dfc5606d
YS
2719 struct device *parent)
2720{
2721 struct device *dev = &snap->dev;
2722 int ret;
2723
2724 dev->type = &rbd_snap_device_type;
2725 dev->parent = parent;
2726 dev->release = rbd_snap_dev_release;
d4b125e9 2727 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
304f6808
AE
2728 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2729
dfc5606d
YS
2730 ret = device_register(dev);
2731
2732 return ret;
2733}
2734
4e891e0a 2735static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
c8d18425 2736 const char *snap_name,
34b13184
AE
2737 u64 snap_id, u64 snap_size,
2738 u64 snap_features)
dfc5606d 2739{
4e891e0a 2740 struct rbd_snap *snap;
dfc5606d 2741 int ret;
4e891e0a
AE
2742
2743 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
dfc5606d 2744 if (!snap)
4e891e0a
AE
2745 return ERR_PTR(-ENOMEM);
2746
2747 ret = -ENOMEM;
c8d18425 2748 snap->name = kstrdup(snap_name, GFP_KERNEL);
4e891e0a
AE
2749 if (!snap->name)
2750 goto err;
2751
c8d18425
AE
2752 snap->id = snap_id;
2753 snap->size = snap_size;
34b13184 2754 snap->features = snap_features;
4e891e0a
AE
2755
2756 return snap;
2757
dfc5606d
YS
2758err:
2759 kfree(snap->name);
2760 kfree(snap);
4e891e0a
AE
2761
2762 return ERR_PTR(ret);
dfc5606d
YS
2763}
2764
cd892126
AE
2765static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2766 u64 *snap_size, u64 *snap_features)
2767{
2768 char *snap_name;
2769
2770 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2771
2772 *snap_size = rbd_dev->header.snap_sizes[which];
2773 *snap_features = 0; /* No features for v1 */
2774
2775 /* Skip over names until we find the one we are looking for */
2776
2777 snap_name = rbd_dev->header.snap_names;
2778 while (which--)
2779 snap_name += strlen(snap_name) + 1;
2780
2781 return snap_name;
2782}
2783
9d475de5
AE
2784/*
2785 * Get the size and object order for an image snapshot, or if
2786 * snap_id is CEPH_NOSNAP, gets this information for the base
2787 * image.
2788 */
2789static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2790 u8 *order, u64 *snap_size)
2791{
2792 __le64 snapid = cpu_to_le64(snap_id);
2793 int ret;
2794 struct {
2795 u8 order;
2796 __le64 size;
2797 } __attribute__ ((packed)) size_buf = { 0 };
2798
36be9a76 2799 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5
AE
2800 "rbd", "get_size",
2801 (char *) &snapid, sizeof (snapid),
07b2391f 2802 (char *) &size_buf, sizeof (size_buf), NULL);
36be9a76 2803 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
2804 if (ret < 0)
2805 return ret;
2806
2807 *order = size_buf.order;
2808 *snap_size = le64_to_cpu(size_buf.size);
2809
2810 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2811 (unsigned long long) snap_id, (unsigned int) *order,
2812 (unsigned long long) *snap_size);
2813
2814 return 0;
2815}
2816
2817static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2818{
2819 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2820 &rbd_dev->header.obj_order,
2821 &rbd_dev->header.image_size);
2822}
2823
1e130199
AE
2824static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2825{
2826 void *reply_buf;
2827 int ret;
2828 void *p;
2829
2830 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2831 if (!reply_buf)
2832 return -ENOMEM;
2833
36be9a76 2834 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
1e130199
AE
2835 "rbd", "get_object_prefix",
2836 NULL, 0,
07b2391f 2837 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
36be9a76 2838 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
2839 if (ret < 0)
2840 goto out;
2841
2842 p = reply_buf;
2843 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2844 p + RBD_OBJ_PREFIX_LEN_MAX,
2845 NULL, GFP_NOIO);
2846
2847 if (IS_ERR(rbd_dev->header.object_prefix)) {
2848 ret = PTR_ERR(rbd_dev->header.object_prefix);
2849 rbd_dev->header.object_prefix = NULL;
2850 } else {
2851 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2852 }
2853
2854out:
2855 kfree(reply_buf);
2856
2857 return ret;
2858}
2859
b1b5402a
AE
2860static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2861 u64 *snap_features)
2862{
2863 __le64 snapid = cpu_to_le64(snap_id);
2864 struct {
2865 __le64 features;
2866 __le64 incompat;
2867 } features_buf = { 0 };
d889140c 2868 u64 incompat;
b1b5402a
AE
2869 int ret;
2870
36be9a76 2871 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a
AE
2872 "rbd", "get_features",
2873 (char *) &snapid, sizeof (snapid),
2874 (char *) &features_buf, sizeof (features_buf),
07b2391f 2875 NULL);
36be9a76 2876 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
2877 if (ret < 0)
2878 return ret;
d889140c
AE
2879
2880 incompat = le64_to_cpu(features_buf.incompat);
5cbf6f12 2881 if (incompat & ~RBD_FEATURES_SUPPORTED)
b8f5c6ed 2882 return -ENXIO;
d889140c 2883
b1b5402a
AE
2884 *snap_features = le64_to_cpu(features_buf.features);
2885
2886 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2887 (unsigned long long) snap_id,
2888 (unsigned long long) *snap_features,
2889 (unsigned long long) le64_to_cpu(features_buf.incompat));
2890
2891 return 0;
2892}
2893
2894static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2895{
2896 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2897 &rbd_dev->header.features);
2898}
2899
86b00e0d
AE
2900static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2901{
2902 struct rbd_spec *parent_spec;
2903 size_t size;
2904 void *reply_buf = NULL;
2905 __le64 snapid;
2906 void *p;
2907 void *end;
2908 char *image_id;
2909 u64 overlap;
86b00e0d
AE
2910 int ret;
2911
2912 parent_spec = rbd_spec_alloc();
2913 if (!parent_spec)
2914 return -ENOMEM;
2915
2916 size = sizeof (__le64) + /* pool_id */
2917 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
2918 sizeof (__le64) + /* snap_id */
2919 sizeof (__le64); /* overlap */
2920 reply_buf = kmalloc(size, GFP_KERNEL);
2921 if (!reply_buf) {
2922 ret = -ENOMEM;
2923 goto out_err;
2924 }
2925
2926 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 2927 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d
AE
2928 "rbd", "get_parent",
2929 (char *) &snapid, sizeof (snapid),
07b2391f 2930 (char *) reply_buf, size, NULL);
36be9a76 2931 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
2932 if (ret < 0)
2933 goto out_err;
2934
2935 ret = -ERANGE;
2936 p = reply_buf;
2937 end = (char *) reply_buf + size;
2938 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2939 if (parent_spec->pool_id == CEPH_NOPOOL)
2940 goto out; /* No parent? No problem. */
2941
0903e875
AE
2942 /* The ceph file layout needs to fit pool id in 32 bits */
2943
2944 ret = -EIO;
2945 if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2946 goto out;
2947
979ed480 2948 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
2949 if (IS_ERR(image_id)) {
2950 ret = PTR_ERR(image_id);
2951 goto out_err;
2952 }
2953 parent_spec->image_id = image_id;
2954 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2955 ceph_decode_64_safe(&p, end, overlap, out_err);
2956
2957 rbd_dev->parent_overlap = overlap;
2958 rbd_dev->parent_spec = parent_spec;
2959 parent_spec = NULL; /* rbd_dev now owns this */
2960out:
2961 ret = 0;
2962out_err:
2963 kfree(reply_buf);
2964 rbd_spec_put(parent_spec);
2965
2966 return ret;
2967}
2968
9e15b77d
AE
2969static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2970{
2971 size_t image_id_size;
2972 char *image_id;
2973 void *p;
2974 void *end;
2975 size_t size;
2976 void *reply_buf = NULL;
2977 size_t len = 0;
2978 char *image_name = NULL;
2979 int ret;
2980
2981 rbd_assert(!rbd_dev->spec->image_name);
2982
69e7a02f
AE
2983 len = strlen(rbd_dev->spec->image_id);
2984 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
2985 image_id = kmalloc(image_id_size, GFP_KERNEL);
2986 if (!image_id)
2987 return NULL;
2988
2989 p = image_id;
2990 end = (char *) image_id + image_id_size;
69e7a02f 2991 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
9e15b77d
AE
2992
2993 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2994 reply_buf = kmalloc(size, GFP_KERNEL);
2995 if (!reply_buf)
2996 goto out;
2997
36be9a76 2998 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
2999 "rbd", "dir_get_name",
3000 image_id, image_id_size,
07b2391f 3001 (char *) reply_buf, size, NULL);
9e15b77d
AE
3002 if (ret < 0)
3003 goto out;
3004 p = reply_buf;
3005 end = (char *) reply_buf + size;
3006 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3007 if (IS_ERR(image_name))
3008 image_name = NULL;
3009 else
3010 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3011out:
3012 kfree(reply_buf);
3013 kfree(image_id);
3014
3015 return image_name;
3016}
3017
3018/*
3019 * When a parent image gets probed, we only have the pool, image,
3020 * and snapshot ids but not the names of any of them. This call
3021 * is made later to fill in those names. It has to be done after
3022 * rbd_dev_snaps_update() has completed because some of the
3023 * information (in particular, snapshot name) is not available
3024 * until then.
3025 */
3026static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3027{
3028 struct ceph_osd_client *osdc;
3029 const char *name;
3030 void *reply_buf = NULL;
3031 int ret;
3032
3033 if (rbd_dev->spec->pool_name)
3034 return 0; /* Already have the names */
3035
3036 /* Look up the pool name */
3037
3038 osdc = &rbd_dev->rbd_client->client->osdc;
3039 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
935dc89f
AE
3040 if (!name) {
3041 rbd_warn(rbd_dev, "there is no pool with id %llu",
3042 rbd_dev->spec->pool_id); /* Really a BUG() */
3043 return -EIO;
3044 }
9e15b77d
AE
3045
3046 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3047 if (!rbd_dev->spec->pool_name)
3048 return -ENOMEM;
3049
3050 /* Fetch the image name; tolerate failure here */
3051
3052 name = rbd_dev_image_name(rbd_dev);
69e7a02f 3053 if (name)
9e15b77d 3054 rbd_dev->spec->image_name = (char *) name;
69e7a02f 3055 else
06ecc6cb 3056 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d
AE
3057
3058 /* Look up the snapshot name. */
3059
3060 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3061 if (!name) {
935dc89f
AE
3062 rbd_warn(rbd_dev, "no snapshot with id %llu",
3063 rbd_dev->spec->snap_id); /* Really a BUG() */
9e15b77d
AE
3064 ret = -EIO;
3065 goto out_err;
3066 }
3067 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3068 if(!rbd_dev->spec->snap_name)
3069 goto out_err;
3070
3071 return 0;
3072out_err:
3073 kfree(reply_buf);
3074 kfree(rbd_dev->spec->pool_name);
3075 rbd_dev->spec->pool_name = NULL;
3076
3077 return ret;
3078}
3079
6e14b1a6 3080static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
35d489f9
AE
3081{
3082 size_t size;
3083 int ret;
3084 void *reply_buf;
3085 void *p;
3086 void *end;
3087 u64 seq;
3088 u32 snap_count;
3089 struct ceph_snap_context *snapc;
3090 u32 i;
3091
3092 /*
3093 * We'll need room for the seq value (maximum snapshot id),
3094 * snapshot count, and array of that many snapshot ids.
3095 * For now we have a fixed upper limit on the number we're
3096 * prepared to receive.
3097 */
3098 size = sizeof (__le64) + sizeof (__le32) +
3099 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3100 reply_buf = kzalloc(size, GFP_KERNEL);
3101 if (!reply_buf)
3102 return -ENOMEM;
3103
36be9a76 3104 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
35d489f9
AE
3105 "rbd", "get_snapcontext",
3106 NULL, 0,
07b2391f 3107 reply_buf, size, ver);
36be9a76 3108 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
3109 if (ret < 0)
3110 goto out;
3111
3112 ret = -ERANGE;
3113 p = reply_buf;
3114 end = (char *) reply_buf + size;
3115 ceph_decode_64_safe(&p, end, seq, out);
3116 ceph_decode_32_safe(&p, end, snap_count, out);
3117
3118 /*
3119 * Make sure the reported number of snapshot ids wouldn't go
3120 * beyond the end of our buffer. But before checking that,
3121 * make sure the computed size of the snapshot context we
3122 * allocate is representable in a size_t.
3123 */
3124 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3125 / sizeof (u64)) {
3126 ret = -EINVAL;
3127 goto out;
3128 }
3129 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3130 goto out;
3131
3132 size = sizeof (struct ceph_snap_context) +
3133 snap_count * sizeof (snapc->snaps[0]);
3134 snapc = kmalloc(size, GFP_KERNEL);
3135 if (!snapc) {
3136 ret = -ENOMEM;
3137 goto out;
3138 }
3139
3140 atomic_set(&snapc->nref, 1);
3141 snapc->seq = seq;
3142 snapc->num_snaps = snap_count;
3143 for (i = 0; i < snap_count; i++)
3144 snapc->snaps[i] = ceph_decode_64(&p);
3145
3146 rbd_dev->header.snapc = snapc;
3147
3148 dout(" snap context seq = %llu, snap_count = %u\n",
3149 (unsigned long long) seq, (unsigned int) snap_count);
3150
3151out:
3152 kfree(reply_buf);
3153
3154 return 0;
3155}
3156
b8b1e2db
AE
3157static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3158{
3159 size_t size;
3160 void *reply_buf;
3161 __le64 snap_id;
3162 int ret;
3163 void *p;
3164 void *end;
b8b1e2db
AE
3165 char *snap_name;
3166
3167 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3168 reply_buf = kmalloc(size, GFP_KERNEL);
3169 if (!reply_buf)
3170 return ERR_PTR(-ENOMEM);
3171
3172 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
36be9a76 3173 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db
AE
3174 "rbd", "get_snapshot_name",
3175 (char *) &snap_id, sizeof (snap_id),
07b2391f 3176 reply_buf, size, NULL);
36be9a76 3177 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b8b1e2db
AE
3178 if (ret < 0)
3179 goto out;
3180
3181 p = reply_buf;
3182 end = (char *) reply_buf + size;
e5c35534 3183 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
b8b1e2db
AE
3184 if (IS_ERR(snap_name)) {
3185 ret = PTR_ERR(snap_name);
3186 goto out;
3187 } else {
3188 dout(" snap_id 0x%016llx snap_name = %s\n",
3189 (unsigned long long) le64_to_cpu(snap_id), snap_name);
3190 }
3191 kfree(reply_buf);
3192
3193 return snap_name;
3194out:
3195 kfree(reply_buf);
3196
3197 return ERR_PTR(ret);
3198}
3199
3200static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3201 u64 *snap_size, u64 *snap_features)
3202{
e0b49868 3203 u64 snap_id;
b8b1e2db
AE
3204 u8 order;
3205 int ret;
3206
3207 snap_id = rbd_dev->header.snapc->snaps[which];
3208 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3209 if (ret)
3210 return ERR_PTR(ret);
3211 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3212 if (ret)
3213 return ERR_PTR(ret);
3214
3215 return rbd_dev_v2_snap_name(rbd_dev, which);
3216}
3217
3218static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3219 u64 *snap_size, u64 *snap_features)
3220{
3221 if (rbd_dev->image_format == 1)
3222 return rbd_dev_v1_snap_info(rbd_dev, which,
3223 snap_size, snap_features);
3224 if (rbd_dev->image_format == 2)
3225 return rbd_dev_v2_snap_info(rbd_dev, which,
3226 snap_size, snap_features);
3227 return ERR_PTR(-EINVAL);
3228}
3229
117973fb
AE
3230static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3231{
3232 int ret;
3233 __u8 obj_order;
3234
3235 down_write(&rbd_dev->header_rwsem);
3236
3237 /* Grab old order first, to see if it changes */
3238
3239 obj_order = rbd_dev->header.obj_order,
3240 ret = rbd_dev_v2_image_size(rbd_dev);
3241 if (ret)
3242 goto out;
3243 if (rbd_dev->header.obj_order != obj_order) {
3244 ret = -EIO;
3245 goto out;
3246 }
3247 rbd_update_mapping_size(rbd_dev);
3248
3249 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3250 dout("rbd_dev_v2_snap_context returned %d\n", ret);
3251 if (ret)
3252 goto out;
3253 ret = rbd_dev_snaps_update(rbd_dev);
3254 dout("rbd_dev_snaps_update returned %d\n", ret);
3255 if (ret)
3256 goto out;
3257 ret = rbd_dev_snaps_register(rbd_dev);
3258 dout("rbd_dev_snaps_register returned %d\n", ret);
3259out:
3260 up_write(&rbd_dev->header_rwsem);
3261
3262 return ret;
3263}
3264
dfc5606d 3265/*
35938150
AE
3266 * Scan the rbd device's current snapshot list and compare it to the
3267 * newly-received snapshot context. Remove any existing snapshots
3268 * not present in the new snapshot context. Add a new snapshot for
3269 * any snaphots in the snapshot context not in the current list.
3270 * And verify there are no changes to snapshots we already know
3271 * about.
3272 *
3273 * Assumes the snapshots in the snapshot context are sorted by
3274 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
3275 * are also maintained in that order.)
dfc5606d 3276 */
304f6808 3277static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
dfc5606d 3278{
35938150
AE
3279 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3280 const u32 snap_count = snapc->num_snaps;
35938150
AE
3281 struct list_head *head = &rbd_dev->snaps;
3282 struct list_head *links = head->next;
3283 u32 index = 0;
dfc5606d 3284
9fcbb800 3285 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
35938150
AE
3286 while (index < snap_count || links != head) {
3287 u64 snap_id;
3288 struct rbd_snap *snap;
cd892126
AE
3289 char *snap_name;
3290 u64 snap_size = 0;
3291 u64 snap_features = 0;
dfc5606d 3292
35938150
AE
3293 snap_id = index < snap_count ? snapc->snaps[index]
3294 : CEPH_NOSNAP;
3295 snap = links != head ? list_entry(links, struct rbd_snap, node)
3296 : NULL;
aafb230e 3297 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
dfc5606d 3298
35938150
AE
3299 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3300 struct list_head *next = links->next;
dfc5606d 3301
6d292906
AE
3302 /*
3303 * A previously-existing snapshot is not in
3304 * the new snap context.
3305 *
3306 * If the now missing snapshot is the one the
3307 * image is mapped to, clear its exists flag
3308 * so we can avoid sending any more requests
3309 * to it.
3310 */
0d7dbfce 3311 if (rbd_dev->spec->snap_id == snap->id)
6d292906 3312 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
41f38c2b 3313 rbd_remove_snap_dev(snap);
9fcbb800 3314 dout("%ssnap id %llu has been removed\n",
0d7dbfce
AE
3315 rbd_dev->spec->snap_id == snap->id ?
3316 "mapped " : "",
9fcbb800 3317 (unsigned long long) snap->id);
35938150
AE
3318
3319 /* Done with this list entry; advance */
3320
3321 links = next;
dfc5606d
YS
3322 continue;
3323 }
35938150 3324
b8b1e2db
AE
3325 snap_name = rbd_dev_snap_info(rbd_dev, index,
3326 &snap_size, &snap_features);
cd892126
AE
3327 if (IS_ERR(snap_name))
3328 return PTR_ERR(snap_name);
3329
9fcbb800
AE
3330 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3331 (unsigned long long) snap_id);
35938150
AE
3332 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3333 struct rbd_snap *new_snap;
3334
3335 /* We haven't seen this snapshot before */
3336
c8d18425 3337 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
cd892126 3338 snap_id, snap_size, snap_features);
9fcbb800
AE
3339 if (IS_ERR(new_snap)) {
3340 int err = PTR_ERR(new_snap);
3341
3342 dout(" failed to add dev, error %d\n", err);
3343
3344 return err;
3345 }
35938150
AE
3346
3347 /* New goes before existing, or at end of list */
3348
9fcbb800 3349 dout(" added dev%s\n", snap ? "" : " at end\n");
35938150
AE
3350 if (snap)
3351 list_add_tail(&new_snap->node, &snap->node);
3352 else
523f3258 3353 list_add_tail(&new_snap->node, head);
35938150
AE
3354 } else {
3355 /* Already have this one */
3356
9fcbb800
AE
3357 dout(" already present\n");
3358
cd892126 3359 rbd_assert(snap->size == snap_size);
aafb230e 3360 rbd_assert(!strcmp(snap->name, snap_name));
cd892126 3361 rbd_assert(snap->features == snap_features);
35938150
AE
3362
3363 /* Done with this list entry; advance */
3364
3365 links = links->next;
dfc5606d 3366 }
35938150
AE
3367
3368 /* Advance to the next entry in the snapshot context */
3369
3370 index++;
dfc5606d 3371 }
9fcbb800 3372 dout("%s: done\n", __func__);
dfc5606d
YS
3373
3374 return 0;
3375}
3376
304f6808
AE
3377/*
3378 * Scan the list of snapshots and register the devices for any that
3379 * have not already been registered.
3380 */
3381static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3382{
3383 struct rbd_snap *snap;
3384 int ret = 0;
3385
37206ee5 3386 dout("%s:\n", __func__);
86ff77bb
AE
3387 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3388 return -EIO;
304f6808
AE
3389
3390 list_for_each_entry(snap, &rbd_dev->snaps, node) {
3391 if (!rbd_snap_registered(snap)) {
3392 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3393 if (ret < 0)
3394 break;
3395 }
3396 }
3397 dout("%s: returning %d\n", __func__, ret);
3398
3399 return ret;
3400}
3401
dfc5606d
YS
3402static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3403{
dfc5606d 3404 struct device *dev;
cd789ab9 3405 int ret;
dfc5606d
YS
3406
3407 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
dfc5606d 3408
cd789ab9 3409 dev = &rbd_dev->dev;
dfc5606d
YS
3410 dev->bus = &rbd_bus_type;
3411 dev->type = &rbd_device_type;
3412 dev->parent = &rbd_root_dev;
3413 dev->release = rbd_dev_release;
de71a297 3414 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 3415 ret = device_register(dev);
dfc5606d 3416
dfc5606d 3417 mutex_unlock(&ctl_mutex);
cd789ab9 3418
dfc5606d 3419 return ret;
602adf40
YS
3420}
3421
dfc5606d
YS
3422static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3423{
3424 device_unregister(&rbd_dev->dev);
3425}
3426
e2839308 3427static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
3428
3429/*
499afd5b
AE
3430 * Get a unique rbd identifier for the given new rbd_dev, and add
3431 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 3432 */
e2839308 3433static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 3434{
e2839308 3435 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
3436
3437 spin_lock(&rbd_dev_list_lock);
3438 list_add_tail(&rbd_dev->node, &rbd_dev_list);
3439 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
3440 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3441 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 3442}
b7f23c36 3443
1ddbe94e 3444/*
499afd5b
AE
3445 * Remove an rbd_dev from the global list, and record that its
3446 * identifier is no longer in use.
1ddbe94e 3447 */
e2839308 3448static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 3449{
d184f6bf 3450 struct list_head *tmp;
de71a297 3451 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
3452 int max_id;
3453
aafb230e 3454 rbd_assert(rbd_id > 0);
499afd5b 3455
e2839308
AE
3456 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3457 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
3458 spin_lock(&rbd_dev_list_lock);
3459 list_del_init(&rbd_dev->node);
d184f6bf
AE
3460
3461 /*
3462 * If the id being "put" is not the current maximum, there
3463 * is nothing special we need to do.
3464 */
e2839308 3465 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
3466 spin_unlock(&rbd_dev_list_lock);
3467 return;
3468 }
3469
3470 /*
3471 * We need to update the current maximum id. Search the
3472 * list to find out what it is. We're more likely to find
3473 * the maximum at the end, so search the list backward.
3474 */
3475 max_id = 0;
3476 list_for_each_prev(tmp, &rbd_dev_list) {
3477 struct rbd_device *rbd_dev;
3478
3479 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
3480 if (rbd_dev->dev_id > max_id)
3481 max_id = rbd_dev->dev_id;
d184f6bf 3482 }
499afd5b 3483 spin_unlock(&rbd_dev_list_lock);
b7f23c36 3484
1ddbe94e 3485 /*
e2839308 3486 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
3487 * which case it now accurately reflects the new maximum.
3488 * Be careful not to overwrite the maximum value in that
3489 * case.
1ddbe94e 3490 */
e2839308
AE
3491 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3492 dout(" max dev id has been reset\n");
b7f23c36
AE
3493}
3494
e28fff26
AE
3495/*
3496 * Skips over white space at *buf, and updates *buf to point to the
3497 * first found non-space character (if any). Returns the length of
593a9e7b
AE
3498 * the token (string of non-white space characters) found. Note
3499 * that *buf must be terminated with '\0'.
e28fff26
AE
3500 */
3501static inline size_t next_token(const char **buf)
3502{
3503 /*
3504 * These are the characters that produce nonzero for
3505 * isspace() in the "C" and "POSIX" locales.
3506 */
3507 const char *spaces = " \f\n\r\t\v";
3508
3509 *buf += strspn(*buf, spaces); /* Find start of token */
3510
3511 return strcspn(*buf, spaces); /* Return token length */
3512}
3513
3514/*
3515 * Finds the next token in *buf, and if the provided token buffer is
3516 * big enough, copies the found token into it. The result, if
593a9e7b
AE
3517 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3518 * must be terminated with '\0' on entry.
e28fff26
AE
3519 *
3520 * Returns the length of the token found (not including the '\0').
3521 * Return value will be 0 if no token is found, and it will be >=
3522 * token_size if the token would not fit.
3523 *
593a9e7b 3524 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
3525 * found token. Note that this occurs even if the token buffer is
3526 * too small to hold it.
3527 */
3528static inline size_t copy_token(const char **buf,
3529 char *token,
3530 size_t token_size)
3531{
3532 size_t len;
3533
3534 len = next_token(buf);
3535 if (len < token_size) {
3536 memcpy(token, *buf, len);
3537 *(token + len) = '\0';
3538 }
3539 *buf += len;
3540
3541 return len;
3542}
3543
ea3352f4
AE
3544/*
3545 * Finds the next token in *buf, dynamically allocates a buffer big
3546 * enough to hold a copy of it, and copies the token into the new
3547 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3548 * that a duplicate buffer is created even for a zero-length token.
3549 *
3550 * Returns a pointer to the newly-allocated duplicate, or a null
3551 * pointer if memory for the duplicate was not available. If
3552 * the lenp argument is a non-null pointer, the length of the token
3553 * (not including the '\0') is returned in *lenp.
3554 *
3555 * If successful, the *buf pointer will be updated to point beyond
3556 * the end of the found token.
3557 *
3558 * Note: uses GFP_KERNEL for allocation.
3559 */
3560static inline char *dup_token(const char **buf, size_t *lenp)
3561{
3562 char *dup;
3563 size_t len;
3564
3565 len = next_token(buf);
4caf35f9 3566 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
3567 if (!dup)
3568 return NULL;
ea3352f4
AE
3569 *(dup + len) = '\0';
3570 *buf += len;
3571
3572 if (lenp)
3573 *lenp = len;
3574
3575 return dup;
3576}
3577
a725f65e 3578/*
859c31df
AE
3579 * Parse the options provided for an "rbd add" (i.e., rbd image
3580 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
3581 * and the data written is passed here via a NUL-terminated buffer.
3582 * Returns 0 if successful or an error code otherwise.
d22f76e7 3583 *
859c31df
AE
3584 * The information extracted from these options is recorded in
3585 * the other parameters which return dynamically-allocated
3586 * structures:
3587 * ceph_opts
3588 * The address of a pointer that will refer to a ceph options
3589 * structure. Caller must release the returned pointer using
3590 * ceph_destroy_options() when it is no longer needed.
3591 * rbd_opts
3592 * Address of an rbd options pointer. Fully initialized by
3593 * this function; caller must release with kfree().
3594 * spec
3595 * Address of an rbd image specification pointer. Fully
3596 * initialized by this function based on parsed options.
3597 * Caller must release with rbd_spec_put().
3598 *
3599 * The options passed take this form:
3600 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3601 * where:
3602 * <mon_addrs>
3603 * A comma-separated list of one or more monitor addresses.
3604 * A monitor address is an ip address, optionally followed
3605 * by a port number (separated by a colon).
3606 * I.e.: ip1[:port1][,ip2[:port2]...]
3607 * <options>
3608 * A comma-separated list of ceph and/or rbd options.
3609 * <pool_name>
3610 * The name of the rados pool containing the rbd image.
3611 * <image_name>
3612 * The name of the image in that pool to map.
3613 * <snap_id>
3614 * An optional snapshot id. If provided, the mapping will
3615 * present data from the image at the time that snapshot was
3616 * created. The image head is used if no snapshot id is
3617 * provided. Snapshot mappings are always read-only.
a725f65e 3618 */
859c31df 3619static int rbd_add_parse_args(const char *buf,
dc79b113 3620 struct ceph_options **ceph_opts,
859c31df
AE
3621 struct rbd_options **opts,
3622 struct rbd_spec **rbd_spec)
e28fff26 3623{
d22f76e7 3624 size_t len;
859c31df 3625 char *options;
0ddebc0c
AE
3626 const char *mon_addrs;
3627 size_t mon_addrs_size;
859c31df 3628 struct rbd_spec *spec = NULL;
4e9afeba 3629 struct rbd_options *rbd_opts = NULL;
859c31df 3630 struct ceph_options *copts;
dc79b113 3631 int ret;
e28fff26
AE
3632
3633 /* The first four tokens are required */
3634
7ef3214a 3635 len = next_token(&buf);
4fb5d671
AE
3636 if (!len) {
3637 rbd_warn(NULL, "no monitor address(es) provided");
3638 return -EINVAL;
3639 }
0ddebc0c 3640 mon_addrs = buf;
f28e565a 3641 mon_addrs_size = len + 1;
7ef3214a 3642 buf += len;
a725f65e 3643
dc79b113 3644 ret = -EINVAL;
f28e565a
AE
3645 options = dup_token(&buf, NULL);
3646 if (!options)
dc79b113 3647 return -ENOMEM;
4fb5d671
AE
3648 if (!*options) {
3649 rbd_warn(NULL, "no options provided");
3650 goto out_err;
3651 }
e28fff26 3652
859c31df
AE
3653 spec = rbd_spec_alloc();
3654 if (!spec)
f28e565a 3655 goto out_mem;
859c31df
AE
3656
3657 spec->pool_name = dup_token(&buf, NULL);
3658 if (!spec->pool_name)
3659 goto out_mem;
4fb5d671
AE
3660 if (!*spec->pool_name) {
3661 rbd_warn(NULL, "no pool name provided");
3662 goto out_err;
3663 }
e28fff26 3664
69e7a02f 3665 spec->image_name = dup_token(&buf, NULL);
859c31df 3666 if (!spec->image_name)
f28e565a 3667 goto out_mem;
4fb5d671
AE
3668 if (!*spec->image_name) {
3669 rbd_warn(NULL, "no image name provided");
3670 goto out_err;
3671 }
d4b125e9 3672
f28e565a
AE
3673 /*
3674 * Snapshot name is optional; default is to use "-"
3675 * (indicating the head/no snapshot).
3676 */
3feeb894 3677 len = next_token(&buf);
820a5f3e 3678 if (!len) {
3feeb894
AE
3679 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3680 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 3681 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 3682 ret = -ENAMETOOLONG;
f28e565a 3683 goto out_err;
849b4260 3684 }
4caf35f9 3685 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
859c31df 3686 if (!spec->snap_name)
f28e565a 3687 goto out_mem;
859c31df 3688 *(spec->snap_name + len) = '\0';
e5c35534 3689
0ddebc0c 3690 /* Initialize all rbd options to the defaults */
e28fff26 3691
4e9afeba
AE
3692 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3693 if (!rbd_opts)
3694 goto out_mem;
3695
3696 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 3697
859c31df 3698 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 3699 mon_addrs + mon_addrs_size - 1,
4e9afeba 3700 parse_rbd_opts_token, rbd_opts);
859c31df
AE
3701 if (IS_ERR(copts)) {
3702 ret = PTR_ERR(copts);
dc79b113
AE
3703 goto out_err;
3704 }
859c31df
AE
3705 kfree(options);
3706
3707 *ceph_opts = copts;
4e9afeba 3708 *opts = rbd_opts;
859c31df 3709 *rbd_spec = spec;
0ddebc0c 3710
dc79b113 3711 return 0;
f28e565a 3712out_mem:
dc79b113 3713 ret = -ENOMEM;
d22f76e7 3714out_err:
859c31df
AE
3715 kfree(rbd_opts);
3716 rbd_spec_put(spec);
f28e565a 3717 kfree(options);
d22f76e7 3718
dc79b113 3719 return ret;
a725f65e
AE
3720}
3721
589d30e0
AE
3722/*
3723 * An rbd format 2 image has a unique identifier, distinct from the
3724 * name given to it by the user. Internally, that identifier is
3725 * what's used to specify the names of objects related to the image.
3726 *
3727 * A special "rbd id" object is used to map an rbd image name to its
3728 * id. If that object doesn't exist, then there is no v2 rbd image
3729 * with the supplied name.
3730 *
3731 * This function will record the given rbd_dev's image_id field if
3732 * it can be determined, and in that case will return 0. If any
3733 * errors occur a negative errno will be returned and the rbd_dev's
3734 * image_id field will be unchanged (and should be NULL).
3735 */
3736static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3737{
3738 int ret;
3739 size_t size;
3740 char *object_name;
3741 void *response;
3742 void *p;
3743
2c0d0a10
AE
3744 /*
3745 * When probing a parent image, the image id is already
3746 * known (and the image name likely is not). There's no
3747 * need to fetch the image id again in this case.
3748 */
3749 if (rbd_dev->spec->image_id)
3750 return 0;
3751
589d30e0
AE
3752 /*
3753 * First, see if the format 2 image id file exists, and if
3754 * so, get the image's persistent id from it.
3755 */
69e7a02f 3756 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
3757 object_name = kmalloc(size, GFP_NOIO);
3758 if (!object_name)
3759 return -ENOMEM;
0d7dbfce 3760 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
3761 dout("rbd id object name is %s\n", object_name);
3762
3763 /* Response will be an encoded string, which includes a length */
3764
3765 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3766 response = kzalloc(size, GFP_NOIO);
3767 if (!response) {
3768 ret = -ENOMEM;
3769 goto out;
3770 }
3771
36be9a76 3772 ret = rbd_obj_method_sync(rbd_dev, object_name,
589d30e0
AE
3773 "rbd", "get_id",
3774 NULL, 0,
07b2391f 3775 response, RBD_IMAGE_ID_LEN_MAX, NULL);
36be9a76 3776 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
589d30e0
AE
3777 if (ret < 0)
3778 goto out;
3779
3780 p = response;
0d7dbfce 3781 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
589d30e0 3782 p + RBD_IMAGE_ID_LEN_MAX,
979ed480 3783 NULL, GFP_NOIO);
0d7dbfce
AE
3784 if (IS_ERR(rbd_dev->spec->image_id)) {
3785 ret = PTR_ERR(rbd_dev->spec->image_id);
3786 rbd_dev->spec->image_id = NULL;
589d30e0 3787 } else {
0d7dbfce 3788 dout("image_id is %s\n", rbd_dev->spec->image_id);
589d30e0
AE
3789 }
3790out:
3791 kfree(response);
3792 kfree(object_name);
3793
3794 return ret;
3795}
3796
a30b71b9
AE
3797static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3798{
3799 int ret;
3800 size_t size;
3801
3802 /* Version 1 images have no id; empty string is used */
3803
0d7dbfce
AE
3804 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3805 if (!rbd_dev->spec->image_id)
a30b71b9 3806 return -ENOMEM;
a30b71b9
AE
3807
3808 /* Record the header object name for this rbd image. */
3809
69e7a02f 3810 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
a30b71b9
AE
3811 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3812 if (!rbd_dev->header_name) {
3813 ret = -ENOMEM;
3814 goto out_err;
3815 }
0d7dbfce
AE
3816 sprintf(rbd_dev->header_name, "%s%s",
3817 rbd_dev->spec->image_name, RBD_SUFFIX);
a30b71b9
AE
3818
3819 /* Populate rbd image metadata */
3820
3821 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3822 if (ret < 0)
3823 goto out_err;
86b00e0d
AE
3824
3825 /* Version 1 images have no parent (no layering) */
3826
3827 rbd_dev->parent_spec = NULL;
3828 rbd_dev->parent_overlap = 0;
3829
a30b71b9
AE
3830 rbd_dev->image_format = 1;
3831
3832 dout("discovered version 1 image, header name is %s\n",
3833 rbd_dev->header_name);
3834
3835 return 0;
3836
3837out_err:
3838 kfree(rbd_dev->header_name);
3839 rbd_dev->header_name = NULL;
0d7dbfce
AE
3840 kfree(rbd_dev->spec->image_id);
3841 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
3842
3843 return ret;
3844}
3845
3846static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3847{
3848 size_t size;
9d475de5 3849 int ret;
6e14b1a6 3850 u64 ver = 0;
a30b71b9
AE
3851
3852 /*
3853 * Image id was filled in by the caller. Record the header
3854 * object name for this rbd image.
3855 */
979ed480 3856 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
a30b71b9
AE
3857 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3858 if (!rbd_dev->header_name)
3859 return -ENOMEM;
3860 sprintf(rbd_dev->header_name, "%s%s",
0d7dbfce 3861 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
9d475de5
AE
3862
3863 /* Get the size and object order for the image */
3864
3865 ret = rbd_dev_v2_image_size(rbd_dev);
1e130199
AE
3866 if (ret < 0)
3867 goto out_err;
3868
3869 /* Get the object prefix (a.k.a. block_name) for the image */
3870
3871 ret = rbd_dev_v2_object_prefix(rbd_dev);
b1b5402a
AE
3872 if (ret < 0)
3873 goto out_err;
3874
d889140c 3875 /* Get the and check features for the image */
b1b5402a
AE
3876
3877 ret = rbd_dev_v2_features(rbd_dev);
9d475de5
AE
3878 if (ret < 0)
3879 goto out_err;
35d489f9 3880
86b00e0d
AE
3881 /* If the image supports layering, get the parent info */
3882
3883 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3884 ret = rbd_dev_v2_parent_info(rbd_dev);
3885 if (ret < 0)
3886 goto out_err;
3887 }
3888
6e14b1a6
AE
3889 /* crypto and compression type aren't (yet) supported for v2 images */
3890
3891 rbd_dev->header.crypt_type = 0;
3892 rbd_dev->header.comp_type = 0;
35d489f9 3893
6e14b1a6
AE
3894 /* Get the snapshot context, plus the header version */
3895
3896 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
35d489f9
AE
3897 if (ret)
3898 goto out_err;
6e14b1a6
AE
3899 rbd_dev->header.obj_version = ver;
3900
a30b71b9
AE
3901 rbd_dev->image_format = 2;
3902
3903 dout("discovered version 2 image, header name is %s\n",
3904 rbd_dev->header_name);
3905
35152979 3906 return 0;
9d475de5 3907out_err:
86b00e0d
AE
3908 rbd_dev->parent_overlap = 0;
3909 rbd_spec_put(rbd_dev->parent_spec);
3910 rbd_dev->parent_spec = NULL;
9d475de5
AE
3911 kfree(rbd_dev->header_name);
3912 rbd_dev->header_name = NULL;
1e130199
AE
3913 kfree(rbd_dev->header.object_prefix);
3914 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
3915
3916 return ret;
a30b71b9
AE
3917}
3918
83a06263
AE
3919static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3920{
3921 int ret;
3922
3923 /* no need to lock here, as rbd_dev is not registered yet */
3924 ret = rbd_dev_snaps_update(rbd_dev);
3925 if (ret)
3926 return ret;
3927
9e15b77d
AE
3928 ret = rbd_dev_probe_update_spec(rbd_dev);
3929 if (ret)
3930 goto err_out_snaps;
3931
83a06263
AE
3932 ret = rbd_dev_set_mapping(rbd_dev);
3933 if (ret)
3934 goto err_out_snaps;
3935
3936 /* generate unique id: find highest unique id, add one */
3937 rbd_dev_id_get(rbd_dev);
3938
3939 /* Fill in the device name, now that we have its id. */
3940 BUILD_BUG_ON(DEV_NAME_LEN
3941 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3942 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3943
3944 /* Get our block major device number. */
3945
3946 ret = register_blkdev(0, rbd_dev->name);
3947 if (ret < 0)
3948 goto err_out_id;
3949 rbd_dev->major = ret;
3950
3951 /* Set up the blkdev mapping. */
3952
3953 ret = rbd_init_disk(rbd_dev);
3954 if (ret)
3955 goto err_out_blkdev;
3956
3957 ret = rbd_bus_add_dev(rbd_dev);
3958 if (ret)
3959 goto err_out_disk;
3960
3961 /*
3962 * At this point cleanup in the event of an error is the job
3963 * of the sysfs code (initiated by rbd_bus_del_dev()).
3964 */
3965 down_write(&rbd_dev->header_rwsem);
3966 ret = rbd_dev_snaps_register(rbd_dev);
3967 up_write(&rbd_dev->header_rwsem);
3968 if (ret)
3969 goto err_out_bus;
3970
9969ebc5 3971 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
83a06263
AE
3972 if (ret)
3973 goto err_out_bus;
3974
3975 /* Everything's ready. Announce the disk to the world. */
3976
3977 add_disk(rbd_dev->disk);
3978
3979 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3980 (unsigned long long) rbd_dev->mapping.size);
3981
3982 return ret;
3983err_out_bus:
3984 /* this will also clean up rest of rbd_dev stuff */
3985
3986 rbd_bus_del_dev(rbd_dev);
3987
3988 return ret;
3989err_out_disk:
3990 rbd_free_disk(rbd_dev);
3991err_out_blkdev:
3992 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3993err_out_id:
3994 rbd_dev_id_put(rbd_dev);
3995err_out_snaps:
3996 rbd_remove_all_snaps(rbd_dev);
3997
3998 return ret;
3999}
4000
a30b71b9
AE
4001/*
4002 * Probe for the existence of the header object for the given rbd
4003 * device. For format 2 images this includes determining the image
4004 * id.
4005 */
4006static int rbd_dev_probe(struct rbd_device *rbd_dev)
4007{
4008 int ret;
4009
4010 /*
4011 * Get the id from the image id object. If it's not a
4012 * format 2 image, we'll get ENOENT back, and we'll assume
4013 * it's a format 1 image.
4014 */
4015 ret = rbd_dev_image_id(rbd_dev);
4016 if (ret)
4017 ret = rbd_dev_v1_probe(rbd_dev);
4018 else
4019 ret = rbd_dev_v2_probe(rbd_dev);
83a06263 4020 if (ret) {
a30b71b9
AE
4021 dout("probe failed, returning %d\n", ret);
4022
83a06263
AE
4023 return ret;
4024 }
4025
4026 ret = rbd_dev_probe_finish(rbd_dev);
4027 if (ret)
4028 rbd_header_free(&rbd_dev->header);
4029
a30b71b9
AE
4030 return ret;
4031}
4032
59c2be1e
YS
4033static ssize_t rbd_add(struct bus_type *bus,
4034 const char *buf,
4035 size_t count)
602adf40 4036{
cb8627c7 4037 struct rbd_device *rbd_dev = NULL;
dc79b113 4038 struct ceph_options *ceph_opts = NULL;
4e9afeba 4039 struct rbd_options *rbd_opts = NULL;
859c31df 4040 struct rbd_spec *spec = NULL;
9d3997fd 4041 struct rbd_client *rbdc;
27cc2594
AE
4042 struct ceph_osd_client *osdc;
4043 int rc = -ENOMEM;
602adf40
YS
4044
4045 if (!try_module_get(THIS_MODULE))
4046 return -ENODEV;
4047
602adf40 4048 /* parse add command */
859c31df 4049 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 4050 if (rc < 0)
bd4ba655 4051 goto err_out_module;
78cea76e 4052
9d3997fd
AE
4053 rbdc = rbd_get_client(ceph_opts);
4054 if (IS_ERR(rbdc)) {
4055 rc = PTR_ERR(rbdc);
0ddebc0c 4056 goto err_out_args;
9d3997fd 4057 }
c53d5893 4058 ceph_opts = NULL; /* rbd_dev client now owns this */
602adf40 4059
602adf40 4060 /* pick the pool */
9d3997fd 4061 osdc = &rbdc->client->osdc;
859c31df 4062 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
4063 if (rc < 0)
4064 goto err_out_client;
859c31df
AE
4065 spec->pool_id = (u64) rc;
4066
0903e875
AE
4067 /* The ceph file layout needs to fit pool id in 32 bits */
4068
4069 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4070 rc = -EIO;
4071 goto err_out_client;
4072 }
4073
c53d5893 4074 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
4075 if (!rbd_dev)
4076 goto err_out_client;
c53d5893
AE
4077 rbdc = NULL; /* rbd_dev now owns this */
4078 spec = NULL; /* rbd_dev now owns this */
602adf40 4079
bd4ba655 4080 rbd_dev->mapping.read_only = rbd_opts->read_only;
c53d5893
AE
4081 kfree(rbd_opts);
4082 rbd_opts = NULL; /* done with this */
bd4ba655 4083
a30b71b9
AE
4084 rc = rbd_dev_probe(rbd_dev);
4085 if (rc < 0)
c53d5893 4086 goto err_out_rbd_dev;
05fd6f6f 4087
602adf40 4088 return count;
c53d5893
AE
4089err_out_rbd_dev:
4090 rbd_dev_destroy(rbd_dev);
bd4ba655 4091err_out_client:
9d3997fd 4092 rbd_put_client(rbdc);
0ddebc0c 4093err_out_args:
78cea76e
AE
4094 if (ceph_opts)
4095 ceph_destroy_options(ceph_opts);
4e9afeba 4096 kfree(rbd_opts);
859c31df 4097 rbd_spec_put(spec);
bd4ba655
AE
4098err_out_module:
4099 module_put(THIS_MODULE);
27cc2594 4100
602adf40 4101 dout("Error adding device %s\n", buf);
27cc2594
AE
4102
4103 return (ssize_t) rc;
602adf40
YS
4104}
4105
de71a297 4106static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
4107{
4108 struct list_head *tmp;
4109 struct rbd_device *rbd_dev;
4110
e124a82f 4111 spin_lock(&rbd_dev_list_lock);
602adf40
YS
4112 list_for_each(tmp, &rbd_dev_list) {
4113 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 4114 if (rbd_dev->dev_id == dev_id) {
e124a82f 4115 spin_unlock(&rbd_dev_list_lock);
602adf40 4116 return rbd_dev;
e124a82f 4117 }
602adf40 4118 }
e124a82f 4119 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
4120 return NULL;
4121}
4122
dfc5606d 4123static void rbd_dev_release(struct device *dev)
602adf40 4124{
593a9e7b 4125 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4126
59c2be1e 4127 if (rbd_dev->watch_event)
9969ebc5 4128 rbd_dev_header_watch_sync(rbd_dev, 0);
602adf40
YS
4129
4130 /* clean up and free blkdev */
4131 rbd_free_disk(rbd_dev);
4132 unregister_blkdev(rbd_dev->major, rbd_dev->name);
32eec68d 4133
2ac4e75d
AE
4134 /* release allocated disk header fields */
4135 rbd_header_free(&rbd_dev->header);
4136
32eec68d 4137 /* done with the id, and with the rbd_dev */
e2839308 4138 rbd_dev_id_put(rbd_dev);
c53d5893
AE
4139 rbd_assert(rbd_dev->rbd_client != NULL);
4140 rbd_dev_destroy(rbd_dev);
602adf40
YS
4141
4142 /* release module ref */
4143 module_put(THIS_MODULE);
602adf40
YS
4144}
4145
dfc5606d
YS
4146static ssize_t rbd_remove(struct bus_type *bus,
4147 const char *buf,
4148 size_t count)
602adf40
YS
4149{
4150 struct rbd_device *rbd_dev = NULL;
4151 int target_id, rc;
4152 unsigned long ul;
4153 int ret = count;
4154
4155 rc = strict_strtoul(buf, 10, &ul);
4156 if (rc)
4157 return rc;
4158
4159 /* convert to int; abort if we lost anything in the conversion */
4160 target_id = (int) ul;
4161 if (target_id != ul)
4162 return -EINVAL;
4163
4164 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4165
4166 rbd_dev = __rbd_get_dev(target_id);
4167 if (!rbd_dev) {
4168 ret = -ENOENT;
4169 goto done;
42382b70
AE
4170 }
4171
a14ea269 4172 spin_lock_irq(&rbd_dev->lock);
b82d167b 4173 if (rbd_dev->open_count)
42382b70 4174 ret = -EBUSY;
b82d167b
AE
4175 else
4176 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
a14ea269 4177 spin_unlock_irq(&rbd_dev->lock);
b82d167b 4178 if (ret < 0)
42382b70 4179 goto done;
602adf40 4180
41f38c2b 4181 rbd_remove_all_snaps(rbd_dev);
dfc5606d 4182 rbd_bus_del_dev(rbd_dev);
602adf40
YS
4183
4184done:
4185 mutex_unlock(&ctl_mutex);
aafb230e 4186
602adf40
YS
4187 return ret;
4188}
4189
602adf40
YS
4190/*
4191 * create control files in sysfs
dfc5606d 4192 * /sys/bus/rbd/...
602adf40
YS
4193 */
4194static int rbd_sysfs_init(void)
4195{
dfc5606d 4196 int ret;
602adf40 4197
fed4c143 4198 ret = device_register(&rbd_root_dev);
21079786 4199 if (ret < 0)
dfc5606d 4200 return ret;
602adf40 4201
fed4c143
AE
4202 ret = bus_register(&rbd_bus_type);
4203 if (ret < 0)
4204 device_unregister(&rbd_root_dev);
602adf40 4205
602adf40
YS
4206 return ret;
4207}
4208
4209static void rbd_sysfs_cleanup(void)
4210{
dfc5606d 4211 bus_unregister(&rbd_bus_type);
fed4c143 4212 device_unregister(&rbd_root_dev);
602adf40
YS
4213}
4214
cc344fa1 4215static int __init rbd_init(void)
602adf40
YS
4216{
4217 int rc;
4218
1e32d34c
AE
4219 if (!libceph_compatible(NULL)) {
4220 rbd_warn(NULL, "libceph incompatibility (quitting)");
4221
4222 return -EINVAL;
4223 }
602adf40
YS
4224 rc = rbd_sysfs_init();
4225 if (rc)
4226 return rc;
f0f8cef5 4227 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
4228 return 0;
4229}
4230
cc344fa1 4231static void __exit rbd_exit(void)
602adf40
YS
4232{
4233 rbd_sysfs_cleanup();
4234}
4235
4236module_init(rbd_init);
4237module_exit(rbd_exit);
4238
4239MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4240MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4241MODULE_DESCRIPTION("rados block device");
4242
4243/* following authorship retained from original osdblk.c */
4244MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4245
4246MODULE_LICENSE("GPL");