rbd: add parentheses to object request iterator macros
[linux-block.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
aafb230e
AE
44#define RBD_DEBUG /* Activate rbd_assert() calls */
45
593a9e7b
AE
46/*
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52#define SECTOR_SHIFT 9
53#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54
2647ba38 55/* It might be useful to have these defined elsewhere */
df111be6 56
2647ba38
AE
57#define U8_MAX ((u8) (~0U))
58#define U16_MAX ((u16) (~0U))
59#define U32_MAX ((u32) (~0U))
60#define U64_MAX ((u64) (~0ULL))
df111be6 61
f0f8cef5
AE
62#define RBD_DRV_NAME "rbd"
63#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
64
65#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
66
d4b125e9
AE
67#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
68#define RBD_MAX_SNAP_NAME_LEN \
69 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
70
35d489f9 71#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
72
73#define RBD_SNAP_HEAD_NAME "-"
74
9e15b77d
AE
75/* This allows a single page to hold an image name sent by OSD */
76#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 77#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 78
1e130199 79#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 80
d889140c
AE
81/* Feature bits */
82
83#define RBD_FEATURE_LAYERING 1
84
85/* Features supported by this (client software) implementation. */
86
87#define RBD_FEATURES_ALL (0)
88
81a89793
AE
89/*
90 * An RBD device name will be "rbd#", where the "rbd" comes from
91 * RBD_DRV_NAME above, and # is a unique integer identifier.
92 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
93 * enough to hold all possible device names.
94 */
602adf40 95#define DEV_NAME_LEN 32
81a89793 96#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
97
98/*
99 * block device image metadata (in-memory version)
100 */
101struct rbd_image_header {
f84344f3 102 /* These four fields never change for a given rbd image */
849b4260 103 char *object_prefix;
34b13184 104 u64 features;
602adf40
YS
105 __u8 obj_order;
106 __u8 crypt_type;
107 __u8 comp_type;
602adf40 108
f84344f3
AE
109 /* The remaining fields need to be updated occasionally */
110 u64 image_size;
111 struct ceph_snap_context *snapc;
602adf40
YS
112 char *snap_names;
113 u64 *snap_sizes;
59c2be1e
YS
114
115 u64 obj_version;
116};
117
0d7dbfce
AE
118/*
119 * An rbd image specification.
120 *
121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
122 * identify an image. Each rbd_dev structure includes a pointer to
123 * an rbd_spec structure that encapsulates this identity.
124 *
125 * Each of the id's in an rbd_spec has an associated name. For a
126 * user-mapped image, the names are supplied and the id's associated
127 * with them are looked up. For a layered image, a parent image is
128 * defined by the tuple, and the names are looked up.
129 *
130 * An rbd_dev structure contains a parent_spec pointer which is
131 * non-null if the image it represents is a child in a layered
132 * image. This pointer will refer to the rbd_spec structure used
133 * by the parent rbd_dev for its own identity (i.e., the structure
134 * is shared between the parent and child).
135 *
136 * Since these structures are populated once, during the discovery
137 * phase of image construction, they are effectively immutable so
138 * we make no effort to synchronize access to them.
139 *
140 * Note that code herein does not assume the image name is known (it
141 * could be a null pointer).
0d7dbfce
AE
142 */
143struct rbd_spec {
144 u64 pool_id;
145 char *pool_name;
146
147 char *image_id;
0d7dbfce 148 char *image_name;
0d7dbfce
AE
149
150 u64 snap_id;
151 char *snap_name;
152
153 struct kref kref;
154};
155
602adf40 156/*
f0f8cef5 157 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
158 */
159struct rbd_client {
160 struct ceph_client *client;
161 struct kref kref;
162 struct list_head node;
163};
164
bf0d5f50
AE
165struct rbd_img_request;
166typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
167
168#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
169
170struct rbd_obj_request;
171typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
172
9969ebc5
AE
173enum obj_request_type {
174 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
175};
bf0d5f50
AE
176
177struct rbd_obj_request {
178 const char *object_name;
179 u64 offset; /* object start byte */
180 u64 length; /* bytes from offset */
181
182 struct rbd_img_request *img_request;
183 struct list_head links; /* img_request->obj_requests */
184 u32 which; /* posn image request list */
185
186 enum obj_request_type type;
788e2df3
AE
187 union {
188 struct bio *bio_list;
189 struct {
190 struct page **pages;
191 u32 page_count;
192 };
193 };
bf0d5f50
AE
194
195 struct ceph_osd_request *osd_req;
196
197 u64 xferred; /* bytes transferred */
198 u64 version;
199 s32 result;
200 atomic_t done;
201
202 rbd_obj_callback_t callback;
788e2df3 203 struct completion completion;
bf0d5f50
AE
204
205 struct kref kref;
206};
207
208struct rbd_img_request {
209 struct request *rq;
210 struct rbd_device *rbd_dev;
211 u64 offset; /* starting image byte offset */
212 u64 length; /* byte count from offset */
213 bool write_request; /* false for read */
214 union {
215 struct ceph_snap_context *snapc; /* for writes */
216 u64 snap_id; /* for reads */
217 };
218 spinlock_t completion_lock;/* protects next_completion */
219 u32 next_completion;
220 rbd_img_callback_t callback;
221
222 u32 obj_request_count;
223 struct list_head obj_requests; /* rbd_obj_request structs */
224
225 struct kref kref;
226};
227
228#define for_each_obj_request(ireq, oreq) \
ef06f4d3 229 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 230#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 231 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 232#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 233 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 234
dfc5606d
YS
235struct rbd_snap {
236 struct device dev;
237 const char *name;
3591538f 238 u64 size;
dfc5606d
YS
239 struct list_head node;
240 u64 id;
34b13184 241 u64 features;
dfc5606d
YS
242};
243
f84344f3 244struct rbd_mapping {
99c1f08f 245 u64 size;
34b13184 246 u64 features;
f84344f3
AE
247 bool read_only;
248};
249
602adf40
YS
250/*
251 * a single device
252 */
253struct rbd_device {
de71a297 254 int dev_id; /* blkdev unique id */
602adf40
YS
255
256 int major; /* blkdev assigned major */
257 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 258
a30b71b9 259 u32 image_format; /* Either 1 or 2 */
602adf40
YS
260 struct rbd_client *rbd_client;
261
262 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
263
b82d167b 264 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
265
266 struct rbd_image_header header;
b82d167b 267 unsigned long flags; /* possibly lock protected */
0d7dbfce 268 struct rbd_spec *spec;
602adf40 269
0d7dbfce 270 char *header_name;
971f839a 271
0903e875
AE
272 struct ceph_file_layout layout;
273
59c2be1e 274 struct ceph_osd_event *watch_event;
975241af 275 struct rbd_obj_request *watch_request;
59c2be1e 276
86b00e0d
AE
277 struct rbd_spec *parent_spec;
278 u64 parent_overlap;
279
c666601a
JD
280 /* protects updating the header */
281 struct rw_semaphore header_rwsem;
f84344f3
AE
282
283 struct rbd_mapping mapping;
602adf40
YS
284
285 struct list_head node;
dfc5606d
YS
286
287 /* list of snapshots */
288 struct list_head snaps;
289
290 /* sysfs related */
291 struct device dev;
b82d167b 292 unsigned long open_count; /* protected by lock */
dfc5606d
YS
293};
294
b82d167b
AE
295/*
296 * Flag bits for rbd_dev->flags. If atomicity is required,
297 * rbd_dev->lock is used to protect access.
298 *
299 * Currently, only the "removing" flag (which is coupled with the
300 * "open_count" field) requires atomic access.
301 */
6d292906
AE
302enum rbd_dev_flags {
303 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 304 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
305};
306
602adf40 307static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 308
602adf40 309static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
310static DEFINE_SPINLOCK(rbd_dev_list_lock);
311
432b8587
AE
312static LIST_HEAD(rbd_client_list); /* clients */
313static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 314
304f6808
AE
315static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
316static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
317
dfc5606d 318static void rbd_dev_release(struct device *dev);
41f38c2b 319static void rbd_remove_snap_dev(struct rbd_snap *snap);
dfc5606d 320
f0f8cef5
AE
321static ssize_t rbd_add(struct bus_type *bus, const char *buf,
322 size_t count);
323static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
324 size_t count);
325
326static struct bus_attribute rbd_bus_attrs[] = {
327 __ATTR(add, S_IWUSR, NULL, rbd_add),
328 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
329 __ATTR_NULL
330};
331
332static struct bus_type rbd_bus_type = {
333 .name = "rbd",
334 .bus_attrs = rbd_bus_attrs,
335};
336
337static void rbd_root_dev_release(struct device *dev)
338{
339}
340
341static struct device rbd_root_dev = {
342 .init_name = "rbd",
343 .release = rbd_root_dev_release,
344};
345
06ecc6cb
AE
346static __printf(2, 3)
347void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
348{
349 struct va_format vaf;
350 va_list args;
351
352 va_start(args, fmt);
353 vaf.fmt = fmt;
354 vaf.va = &args;
355
356 if (!rbd_dev)
357 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
358 else if (rbd_dev->disk)
359 printk(KERN_WARNING "%s: %s: %pV\n",
360 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
361 else if (rbd_dev->spec && rbd_dev->spec->image_name)
362 printk(KERN_WARNING "%s: image %s: %pV\n",
363 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
364 else if (rbd_dev->spec && rbd_dev->spec->image_id)
365 printk(KERN_WARNING "%s: id %s: %pV\n",
366 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
367 else /* punt */
368 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
369 RBD_DRV_NAME, rbd_dev, &vaf);
370 va_end(args);
371}
372
aafb230e
AE
373#ifdef RBD_DEBUG
374#define rbd_assert(expr) \
375 if (unlikely(!(expr))) { \
376 printk(KERN_ERR "\nAssertion failure in %s() " \
377 "at line %d:\n\n" \
378 "\trbd_assert(%s);\n\n", \
379 __func__, __LINE__, #expr); \
380 BUG(); \
381 }
382#else /* !RBD_DEBUG */
383# define rbd_assert(expr) ((void) 0)
384#endif /* !RBD_DEBUG */
dfc5606d 385
117973fb
AE
386static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
387static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
59c2be1e 388
602adf40
YS
389static int rbd_open(struct block_device *bdev, fmode_t mode)
390{
f0f8cef5 391 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 392 bool removing = false;
602adf40 393
f84344f3 394 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
395 return -EROFS;
396
a14ea269 397 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
398 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
399 removing = true;
400 else
401 rbd_dev->open_count++;
a14ea269 402 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
403 if (removing)
404 return -ENOENT;
405
42382b70 406 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 407 (void) get_device(&rbd_dev->dev);
f84344f3 408 set_device_ro(bdev, rbd_dev->mapping.read_only);
42382b70 409 mutex_unlock(&ctl_mutex);
340c7a2b 410
602adf40
YS
411 return 0;
412}
413
dfc5606d
YS
414static int rbd_release(struct gendisk *disk, fmode_t mode)
415{
416 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
417 unsigned long open_count_before;
418
a14ea269 419 spin_lock_irq(&rbd_dev->lock);
b82d167b 420 open_count_before = rbd_dev->open_count--;
a14ea269 421 spin_unlock_irq(&rbd_dev->lock);
b82d167b 422 rbd_assert(open_count_before > 0);
dfc5606d 423
42382b70 424 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 425 put_device(&rbd_dev->dev);
42382b70 426 mutex_unlock(&ctl_mutex);
dfc5606d
YS
427
428 return 0;
429}
430
602adf40
YS
431static const struct block_device_operations rbd_bd_ops = {
432 .owner = THIS_MODULE,
433 .open = rbd_open,
dfc5606d 434 .release = rbd_release,
602adf40
YS
435};
436
437/*
438 * Initialize an rbd client instance.
43ae4701 439 * We own *ceph_opts.
602adf40 440 */
f8c38929 441static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
442{
443 struct rbd_client *rbdc;
444 int ret = -ENOMEM;
445
446 dout("rbd_client_create\n");
447 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
448 if (!rbdc)
449 goto out_opt;
450
451 kref_init(&rbdc->kref);
452 INIT_LIST_HEAD(&rbdc->node);
453
bc534d86
AE
454 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
455
43ae4701 456 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 457 if (IS_ERR(rbdc->client))
bc534d86 458 goto out_mutex;
43ae4701 459 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
460
461 ret = ceph_open_session(rbdc->client);
462 if (ret < 0)
463 goto out_err;
464
432b8587 465 spin_lock(&rbd_client_list_lock);
602adf40 466 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 467 spin_unlock(&rbd_client_list_lock);
602adf40 468
bc534d86
AE
469 mutex_unlock(&ctl_mutex);
470
602adf40
YS
471 dout("rbd_client_create created %p\n", rbdc);
472 return rbdc;
473
474out_err:
475 ceph_destroy_client(rbdc->client);
bc534d86
AE
476out_mutex:
477 mutex_unlock(&ctl_mutex);
602adf40
YS
478 kfree(rbdc);
479out_opt:
43ae4701
AE
480 if (ceph_opts)
481 ceph_destroy_options(ceph_opts);
28f259b7 482 return ERR_PTR(ret);
602adf40
YS
483}
484
485/*
1f7ba331
AE
486 * Find a ceph client with specific addr and configuration. If
487 * found, bump its reference count.
602adf40 488 */
1f7ba331 489static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
490{
491 struct rbd_client *client_node;
1f7ba331 492 bool found = false;
602adf40 493
43ae4701 494 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
495 return NULL;
496
1f7ba331
AE
497 spin_lock(&rbd_client_list_lock);
498 list_for_each_entry(client_node, &rbd_client_list, node) {
499 if (!ceph_compare_options(ceph_opts, client_node->client)) {
500 kref_get(&client_node->kref);
501 found = true;
502 break;
503 }
504 }
505 spin_unlock(&rbd_client_list_lock);
506
507 return found ? client_node : NULL;
602adf40
YS
508}
509
59c2be1e
YS
510/*
511 * mount options
512 */
513enum {
59c2be1e
YS
514 Opt_last_int,
515 /* int args above */
516 Opt_last_string,
517 /* string args above */
cc0538b6
AE
518 Opt_read_only,
519 Opt_read_write,
520 /* Boolean args above */
521 Opt_last_bool,
59c2be1e
YS
522};
523
43ae4701 524static match_table_t rbd_opts_tokens = {
59c2be1e
YS
525 /* int args above */
526 /* string args above */
be466c1c 527 {Opt_read_only, "read_only"},
cc0538b6
AE
528 {Opt_read_only, "ro"}, /* Alternate spelling */
529 {Opt_read_write, "read_write"},
530 {Opt_read_write, "rw"}, /* Alternate spelling */
531 /* Boolean args above */
59c2be1e
YS
532 {-1, NULL}
533};
534
98571b5a
AE
535struct rbd_options {
536 bool read_only;
537};
538
539#define RBD_READ_ONLY_DEFAULT false
540
59c2be1e
YS
541static int parse_rbd_opts_token(char *c, void *private)
542{
43ae4701 543 struct rbd_options *rbd_opts = private;
59c2be1e
YS
544 substring_t argstr[MAX_OPT_ARGS];
545 int token, intval, ret;
546
43ae4701 547 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
548 if (token < 0)
549 return -EINVAL;
550
551 if (token < Opt_last_int) {
552 ret = match_int(&argstr[0], &intval);
553 if (ret < 0) {
554 pr_err("bad mount option arg (not int) "
555 "at '%s'\n", c);
556 return ret;
557 }
558 dout("got int token %d val %d\n", token, intval);
559 } else if (token > Opt_last_int && token < Opt_last_string) {
560 dout("got string token %d val %s\n", token,
561 argstr[0].from);
cc0538b6
AE
562 } else if (token > Opt_last_string && token < Opt_last_bool) {
563 dout("got Boolean token %d\n", token);
59c2be1e
YS
564 } else {
565 dout("got token %d\n", token);
566 }
567
568 switch (token) {
cc0538b6
AE
569 case Opt_read_only:
570 rbd_opts->read_only = true;
571 break;
572 case Opt_read_write:
573 rbd_opts->read_only = false;
574 break;
59c2be1e 575 default:
aafb230e
AE
576 rbd_assert(false);
577 break;
59c2be1e
YS
578 }
579 return 0;
580}
581
602adf40
YS
582/*
583 * Get a ceph client with specific addr and configuration, if one does
584 * not exist create it.
585 */
9d3997fd 586static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 587{
f8c38929 588 struct rbd_client *rbdc;
59c2be1e 589
1f7ba331 590 rbdc = rbd_client_find(ceph_opts);
9d3997fd 591 if (rbdc) /* using an existing client */
43ae4701 592 ceph_destroy_options(ceph_opts);
9d3997fd 593 else
f8c38929 594 rbdc = rbd_client_create(ceph_opts);
602adf40 595
9d3997fd 596 return rbdc;
602adf40
YS
597}
598
599/*
600 * Destroy ceph client
d23a4b3f 601 *
432b8587 602 * Caller must hold rbd_client_list_lock.
602adf40
YS
603 */
604static void rbd_client_release(struct kref *kref)
605{
606 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
607
608 dout("rbd_release_client %p\n", rbdc);
cd9d9f5d 609 spin_lock(&rbd_client_list_lock);
602adf40 610 list_del(&rbdc->node);
cd9d9f5d 611 spin_unlock(&rbd_client_list_lock);
602adf40
YS
612
613 ceph_destroy_client(rbdc->client);
614 kfree(rbdc);
615}
616
617/*
618 * Drop reference to ceph client node. If it's not referenced anymore, release
619 * it.
620 */
9d3997fd 621static void rbd_put_client(struct rbd_client *rbdc)
602adf40 622{
c53d5893
AE
623 if (rbdc)
624 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
625}
626
a30b71b9
AE
627static bool rbd_image_format_valid(u32 image_format)
628{
629 return image_format == 1 || image_format == 2;
630}
631
8e94af8e
AE
632static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
633{
103a150f
AE
634 size_t size;
635 u32 snap_count;
636
637 /* The header has to start with the magic rbd header text */
638 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
639 return false;
640
db2388b6
AE
641 /* The bio layer requires at least sector-sized I/O */
642
643 if (ondisk->options.order < SECTOR_SHIFT)
644 return false;
645
646 /* If we use u64 in a few spots we may be able to loosen this */
647
648 if (ondisk->options.order > 8 * sizeof (int) - 1)
649 return false;
650
103a150f
AE
651 /*
652 * The size of a snapshot header has to fit in a size_t, and
653 * that limits the number of snapshots.
654 */
655 snap_count = le32_to_cpu(ondisk->snap_count);
656 size = SIZE_MAX - sizeof (struct ceph_snap_context);
657 if (snap_count > size / sizeof (__le64))
658 return false;
659
660 /*
661 * Not only that, but the size of the entire the snapshot
662 * header must also be representable in a size_t.
663 */
664 size -= snap_count * sizeof (__le64);
665 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
666 return false;
667
668 return true;
8e94af8e
AE
669}
670
602adf40
YS
671/*
672 * Create a new header structure, translate header format from the on-disk
673 * header.
674 */
675static int rbd_header_from_disk(struct rbd_image_header *header,
4156d998 676 struct rbd_image_header_ondisk *ondisk)
602adf40 677{
ccece235 678 u32 snap_count;
58c17b0e 679 size_t len;
d2bb24e5 680 size_t size;
621901d6 681 u32 i;
602adf40 682
6a52325f
AE
683 memset(header, 0, sizeof (*header));
684
103a150f
AE
685 snap_count = le32_to_cpu(ondisk->snap_count);
686
58c17b0e
AE
687 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
688 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
6a52325f 689 if (!header->object_prefix)
602adf40 690 return -ENOMEM;
58c17b0e
AE
691 memcpy(header->object_prefix, ondisk->object_prefix, len);
692 header->object_prefix[len] = '\0';
00f1f36f 693
602adf40 694 if (snap_count) {
f785cc1d
AE
695 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
696
621901d6
AE
697 /* Save a copy of the snapshot names */
698
f785cc1d
AE
699 if (snap_names_len > (u64) SIZE_MAX)
700 return -EIO;
701 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
602adf40 702 if (!header->snap_names)
6a52325f 703 goto out_err;
f785cc1d
AE
704 /*
705 * Note that rbd_dev_v1_header_read() guarantees
706 * the ondisk buffer we're working with has
707 * snap_names_len bytes beyond the end of the
708 * snapshot id array, this memcpy() is safe.
709 */
710 memcpy(header->snap_names, &ondisk->snaps[snap_count],
711 snap_names_len);
6a52325f 712
621901d6
AE
713 /* Record each snapshot's size */
714
d2bb24e5
AE
715 size = snap_count * sizeof (*header->snap_sizes);
716 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602adf40 717 if (!header->snap_sizes)
6a52325f 718 goto out_err;
621901d6
AE
719 for (i = 0; i < snap_count; i++)
720 header->snap_sizes[i] =
721 le64_to_cpu(ondisk->snaps[i].image_size);
602adf40 722 } else {
ccece235 723 WARN_ON(ondisk->snap_names_len);
602adf40
YS
724 header->snap_names = NULL;
725 header->snap_sizes = NULL;
726 }
849b4260 727
34b13184 728 header->features = 0; /* No features support in v1 images */
602adf40
YS
729 header->obj_order = ondisk->options.order;
730 header->crypt_type = ondisk->options.crypt_type;
731 header->comp_type = ondisk->options.comp_type;
6a52325f 732
621901d6
AE
733 /* Allocate and fill in the snapshot context */
734
f84344f3 735 header->image_size = le64_to_cpu(ondisk->image_size);
6a52325f
AE
736 size = sizeof (struct ceph_snap_context);
737 size += snap_count * sizeof (header->snapc->snaps[0]);
738 header->snapc = kzalloc(size, GFP_KERNEL);
739 if (!header->snapc)
740 goto out_err;
602adf40
YS
741
742 atomic_set(&header->snapc->nref, 1);
505cbb9b 743 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 744 header->snapc->num_snaps = snap_count;
621901d6
AE
745 for (i = 0; i < snap_count; i++)
746 header->snapc->snaps[i] =
747 le64_to_cpu(ondisk->snaps[i].id);
602adf40
YS
748
749 return 0;
750
6a52325f 751out_err:
849b4260 752 kfree(header->snap_sizes);
ccece235 753 header->snap_sizes = NULL;
602adf40 754 kfree(header->snap_names);
ccece235 755 header->snap_names = NULL;
6a52325f
AE
756 kfree(header->object_prefix);
757 header->object_prefix = NULL;
ccece235 758
00f1f36f 759 return -ENOMEM;
602adf40
YS
760}
761
9e15b77d
AE
762static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
763{
764 struct rbd_snap *snap;
765
766 if (snap_id == CEPH_NOSNAP)
767 return RBD_SNAP_HEAD_NAME;
768
769 list_for_each_entry(snap, &rbd_dev->snaps, node)
770 if (snap_id == snap->id)
771 return snap->name;
772
773 return NULL;
774}
775
8836b995 776static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
602adf40 777{
602adf40 778
e86924a8 779 struct rbd_snap *snap;
602adf40 780
e86924a8
AE
781 list_for_each_entry(snap, &rbd_dev->snaps, node) {
782 if (!strcmp(snap_name, snap->name)) {
0d7dbfce 783 rbd_dev->spec->snap_id = snap->id;
e86924a8 784 rbd_dev->mapping.size = snap->size;
34b13184 785 rbd_dev->mapping.features = snap->features;
602adf40 786
e86924a8 787 return 0;
00f1f36f 788 }
00f1f36f 789 }
e86924a8 790
00f1f36f 791 return -ENOENT;
602adf40
YS
792}
793
819d52bf 794static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
602adf40 795{
78dc447d 796 int ret;
602adf40 797
0d7dbfce 798 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
cc9d734c 799 sizeof (RBD_SNAP_HEAD_NAME))) {
0d7dbfce 800 rbd_dev->spec->snap_id = CEPH_NOSNAP;
99c1f08f 801 rbd_dev->mapping.size = rbd_dev->header.image_size;
34b13184 802 rbd_dev->mapping.features = rbd_dev->header.features;
e86924a8 803 ret = 0;
602adf40 804 } else {
0d7dbfce 805 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
602adf40
YS
806 if (ret < 0)
807 goto done;
f84344f3 808 rbd_dev->mapping.read_only = true;
602adf40 809 }
6d292906
AE
810 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
811
602adf40 812done:
602adf40
YS
813 return ret;
814}
815
816static void rbd_header_free(struct rbd_image_header *header)
817{
849b4260 818 kfree(header->object_prefix);
d78fd7ae 819 header->object_prefix = NULL;
602adf40 820 kfree(header->snap_sizes);
d78fd7ae 821 header->snap_sizes = NULL;
849b4260 822 kfree(header->snap_names);
d78fd7ae 823 header->snap_names = NULL;
d1d25646 824 ceph_put_snap_context(header->snapc);
d78fd7ae 825 header->snapc = NULL;
602adf40
YS
826}
827
98571b5a 828static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 829{
65ccfe21
AE
830 char *name;
831 u64 segment;
832 int ret;
602adf40 833
2fd82b9e 834 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
65ccfe21
AE
835 if (!name)
836 return NULL;
837 segment = offset >> rbd_dev->header.obj_order;
2fd82b9e 838 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
65ccfe21 839 rbd_dev->header.object_prefix, segment);
2fd82b9e 840 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
841 pr_err("error formatting segment name for #%llu (%d)\n",
842 segment, ret);
843 kfree(name);
844 name = NULL;
845 }
602adf40 846
65ccfe21
AE
847 return name;
848}
602adf40 849
65ccfe21
AE
850static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
851{
852 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 853
65ccfe21
AE
854 return offset & (segment_size - 1);
855}
856
857static u64 rbd_segment_length(struct rbd_device *rbd_dev,
858 u64 offset, u64 length)
859{
860 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
861
862 offset &= segment_size - 1;
863
aafb230e 864 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
865 if (offset + length > segment_size)
866 length = segment_size - offset;
867
868 return length;
602adf40
YS
869}
870
029bcbd8
JD
871/*
872 * returns the size of an object in the image
873 */
874static u64 rbd_obj_bytes(struct rbd_image_header *header)
875{
876 return 1 << header->obj_order;
877}
878
602adf40
YS
879/*
880 * bio helpers
881 */
882
883static void bio_chain_put(struct bio *chain)
884{
885 struct bio *tmp;
886
887 while (chain) {
888 tmp = chain;
889 chain = chain->bi_next;
890 bio_put(tmp);
891 }
892}
893
894/*
895 * zeros a bio chain, starting at specific offset
896 */
897static void zero_bio_chain(struct bio *chain, int start_ofs)
898{
899 struct bio_vec *bv;
900 unsigned long flags;
901 void *buf;
902 int i;
903 int pos = 0;
904
905 while (chain) {
906 bio_for_each_segment(bv, chain, i) {
907 if (pos + bv->bv_len > start_ofs) {
908 int remainder = max(start_ofs - pos, 0);
909 buf = bvec_kmap_irq(bv, &flags);
910 memset(buf + remainder, 0,
911 bv->bv_len - remainder);
85b5aaa6 912 bvec_kunmap_irq(buf, &flags);
602adf40
YS
913 }
914 pos += bv->bv_len;
915 }
916
917 chain = chain->bi_next;
918 }
919}
920
921/*
f7760dad
AE
922 * Clone a portion of a bio, starting at the given byte offset
923 * and continuing for the number of bytes indicated.
602adf40 924 */
f7760dad
AE
925static struct bio *bio_clone_range(struct bio *bio_src,
926 unsigned int offset,
927 unsigned int len,
928 gfp_t gfpmask)
602adf40 929{
f7760dad
AE
930 struct bio_vec *bv;
931 unsigned int resid;
932 unsigned short idx;
933 unsigned int voff;
934 unsigned short end_idx;
935 unsigned short vcnt;
936 struct bio *bio;
937
938 /* Handle the easy case for the caller */
939
940 if (!offset && len == bio_src->bi_size)
941 return bio_clone(bio_src, gfpmask);
942
943 if (WARN_ON_ONCE(!len))
944 return NULL;
945 if (WARN_ON_ONCE(len > bio_src->bi_size))
946 return NULL;
947 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
948 return NULL;
949
950 /* Find first affected segment... */
951
952 resid = offset;
953 __bio_for_each_segment(bv, bio_src, idx, 0) {
954 if (resid < bv->bv_len)
955 break;
956 resid -= bv->bv_len;
602adf40 957 }
f7760dad 958 voff = resid;
602adf40 959
f7760dad 960 /* ...and the last affected segment */
602adf40 961
f7760dad
AE
962 resid += len;
963 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
964 if (resid <= bv->bv_len)
965 break;
966 resid -= bv->bv_len;
967 }
968 vcnt = end_idx - idx + 1;
969
970 /* Build the clone */
971
972 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
973 if (!bio)
974 return NULL; /* ENOMEM */
602adf40 975
f7760dad
AE
976 bio->bi_bdev = bio_src->bi_bdev;
977 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
978 bio->bi_rw = bio_src->bi_rw;
979 bio->bi_flags |= 1 << BIO_CLONED;
980
981 /*
982 * Copy over our part of the bio_vec, then update the first
983 * and last (or only) entries.
984 */
985 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
986 vcnt * sizeof (struct bio_vec));
987 bio->bi_io_vec[0].bv_offset += voff;
988 if (vcnt > 1) {
989 bio->bi_io_vec[0].bv_len -= voff;
990 bio->bi_io_vec[vcnt - 1].bv_len = resid;
991 } else {
992 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
993 }
994
f7760dad
AE
995 bio->bi_vcnt = vcnt;
996 bio->bi_size = len;
997 bio->bi_idx = 0;
998
999 return bio;
1000}
1001
1002/*
1003 * Clone a portion of a bio chain, starting at the given byte offset
1004 * into the first bio in the source chain and continuing for the
1005 * number of bytes indicated. The result is another bio chain of
1006 * exactly the given length, or a null pointer on error.
1007 *
1008 * The bio_src and offset parameters are both in-out. On entry they
1009 * refer to the first source bio and the offset into that bio where
1010 * the start of data to be cloned is located.
1011 *
1012 * On return, bio_src is updated to refer to the bio in the source
1013 * chain that contains first un-cloned byte, and *offset will
1014 * contain the offset of that byte within that bio.
1015 */
1016static struct bio *bio_chain_clone_range(struct bio **bio_src,
1017 unsigned int *offset,
1018 unsigned int len,
1019 gfp_t gfpmask)
1020{
1021 struct bio *bi = *bio_src;
1022 unsigned int off = *offset;
1023 struct bio *chain = NULL;
1024 struct bio **end;
1025
1026 /* Build up a chain of clone bios up to the limit */
1027
1028 if (!bi || off >= bi->bi_size || !len)
1029 return NULL; /* Nothing to clone */
602adf40 1030
f7760dad
AE
1031 end = &chain;
1032 while (len) {
1033 unsigned int bi_size;
1034 struct bio *bio;
1035
f5400b7a
AE
1036 if (!bi) {
1037 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1038 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1039 }
f7760dad
AE
1040 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1041 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1042 if (!bio)
1043 goto out_err; /* ENOMEM */
1044
1045 *end = bio;
1046 end = &bio->bi_next;
602adf40 1047
f7760dad
AE
1048 off += bi_size;
1049 if (off == bi->bi_size) {
1050 bi = bi->bi_next;
1051 off = 0;
1052 }
1053 len -= bi_size;
1054 }
1055 *bio_src = bi;
1056 *offset = off;
1057
1058 return chain;
1059out_err:
1060 bio_chain_put(chain);
602adf40 1061
602adf40
YS
1062 return NULL;
1063}
1064
bf0d5f50
AE
1065static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1066{
1067 kref_get(&obj_request->kref);
1068}
1069
1070static void rbd_obj_request_destroy(struct kref *kref);
1071static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1072{
1073 rbd_assert(obj_request != NULL);
1074 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1075}
1076
1077static void rbd_img_request_get(struct rbd_img_request *img_request)
1078{
1079 kref_get(&img_request->kref);
1080}
1081
1082static void rbd_img_request_destroy(struct kref *kref);
1083static void rbd_img_request_put(struct rbd_img_request *img_request)
1084{
1085 rbd_assert(img_request != NULL);
1086 kref_put(&img_request->kref, rbd_img_request_destroy);
1087}
1088
1089static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1090 struct rbd_obj_request *obj_request)
1091{
25dcf954
AE
1092 rbd_assert(obj_request->img_request == NULL);
1093
bf0d5f50
AE
1094 rbd_obj_request_get(obj_request);
1095 obj_request->img_request = img_request;
25dcf954 1096 obj_request->which = img_request->obj_request_count;
bf0d5f50 1097 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1098 img_request->obj_request_count++;
1099 list_add_tail(&obj_request->links, &img_request->obj_requests);
bf0d5f50
AE
1100}
1101
1102static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1103 struct rbd_obj_request *obj_request)
1104{
1105 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1106
bf0d5f50 1107 list_del(&obj_request->links);
25dcf954
AE
1108 rbd_assert(img_request->obj_request_count > 0);
1109 img_request->obj_request_count--;
1110 rbd_assert(obj_request->which == img_request->obj_request_count);
1111 obj_request->which = BAD_WHICH;
bf0d5f50 1112 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1113 obj_request->img_request = NULL;
25dcf954 1114 obj_request->callback = NULL;
bf0d5f50
AE
1115 rbd_obj_request_put(obj_request);
1116}
1117
1118static bool obj_request_type_valid(enum obj_request_type type)
1119{
1120 switch (type) {
9969ebc5 1121 case OBJ_REQUEST_NODATA:
bf0d5f50 1122 case OBJ_REQUEST_BIO:
788e2df3 1123 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1124 return true;
1125 default:
1126 return false;
1127 }
1128}
1129
8d23bf29
AE
1130struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
1131{
1132 struct ceph_osd_req_op *op;
1133 va_list args;
2647ba38 1134 size_t size;
8d23bf29
AE
1135
1136 op = kzalloc(sizeof (*op), GFP_NOIO);
1137 if (!op)
1138 return NULL;
1139 op->op = opcode;
1140 va_start(args, opcode);
1141 switch (opcode) {
1142 case CEPH_OSD_OP_READ:
1143 case CEPH_OSD_OP_WRITE:
1144 /* rbd_osd_req_op_create(READ, offset, length) */
1145 /* rbd_osd_req_op_create(WRITE, offset, length) */
1146 op->extent.offset = va_arg(args, u64);
1147 op->extent.length = va_arg(args, u64);
1148 if (opcode == CEPH_OSD_OP_WRITE)
1149 op->payload_len = op->extent.length;
1150 break;
2647ba38
AE
1151 case CEPH_OSD_OP_CALL:
1152 /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
1153 op->cls.class_name = va_arg(args, char *);
1154 size = strlen(op->cls.class_name);
1155 rbd_assert(size <= (size_t) U8_MAX);
1156 op->cls.class_len = size;
1157 op->payload_len = size;
1158
1159 op->cls.method_name = va_arg(args, char *);
1160 size = strlen(op->cls.method_name);
1161 rbd_assert(size <= (size_t) U8_MAX);
1162 op->cls.method_len = size;
1163 op->payload_len += size;
1164
1165 op->cls.argc = 0;
1166 op->cls.indata = va_arg(args, void *);
1167 size = va_arg(args, size_t);
1168 rbd_assert(size <= (size_t) U32_MAX);
1169 op->cls.indata_len = (u32) size;
1170 op->payload_len += size;
1171 break;
5efea49a
AE
1172 case CEPH_OSD_OP_NOTIFY_ACK:
1173 case CEPH_OSD_OP_WATCH:
1174 /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
1175 /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
1176 op->watch.cookie = va_arg(args, u64);
1177 op->watch.ver = va_arg(args, u64);
1178 op->watch.ver = cpu_to_le64(op->watch.ver);
1179 if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
1180 op->watch.flag = (u8) 1;
1181 break;
8d23bf29
AE
1182 default:
1183 rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
1184 kfree(op);
1185 op = NULL;
1186 break;
1187 }
1188 va_end(args);
1189
1190 return op;
1191}
1192
1193static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
1194{
1195 kfree(op);
1196}
1197
bf0d5f50
AE
1198static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1199 struct rbd_obj_request *obj_request)
1200{
1201 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1202}
1203
1204static void rbd_img_request_complete(struct rbd_img_request *img_request)
1205{
1206 if (img_request->callback)
1207 img_request->callback(img_request);
1208 else
1209 rbd_img_request_put(img_request);
1210}
1211
788e2df3
AE
1212/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1213
1214static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1215{
1216 return wait_for_completion_interruptible(&obj_request->completion);
1217}
1218
07741308
AE
1219static void obj_request_done_init(struct rbd_obj_request *obj_request)
1220{
1221 atomic_set(&obj_request->done, 0);
1222 smp_wmb();
1223}
1224
1225static void obj_request_done_set(struct rbd_obj_request *obj_request)
1226{
1227 atomic_set(&obj_request->done, 1);
1228 smp_wmb();
1229}
1230
1231static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1232{
1233 smp_rmb();
1234 return atomic_read(&obj_request->done) != 0;
1235}
1236
9969ebc5
AE
1237static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request,
1238 struct ceph_osd_op *op)
1239{
07741308 1240 obj_request_done_set(obj_request);
9969ebc5
AE
1241}
1242
bf0d5f50
AE
1243static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1244{
1245 if (obj_request->callback)
1246 obj_request->callback(obj_request);
788e2df3
AE
1247 else
1248 complete_all(&obj_request->completion);
bf0d5f50
AE
1249}
1250
bf0d5f50
AE
1251static void rbd_osd_read_callback(struct rbd_obj_request *obj_request,
1252 struct ceph_osd_op *op)
1253{
1254 u64 xferred;
1255
1256 /*
1257 * We support a 64-bit length, but ultimately it has to be
1258 * passed to blk_end_request(), which takes an unsigned int.
1259 */
1260 xferred = le64_to_cpu(op->extent.length);
1261 rbd_assert(xferred < (u64) UINT_MAX);
1262 if (obj_request->result == (s32) -ENOENT) {
1263 zero_bio_chain(obj_request->bio_list, 0);
1264 obj_request->result = 0;
1265 } else if (xferred < obj_request->length && !obj_request->result) {
1266 zero_bio_chain(obj_request->bio_list, xferred);
1267 xferred = obj_request->length;
1268 }
1269 obj_request->xferred = xferred;
07741308 1270 obj_request_done_set(obj_request);
bf0d5f50
AE
1271}
1272
1273static void rbd_osd_write_callback(struct rbd_obj_request *obj_request,
1274 struct ceph_osd_op *op)
1275{
1276 obj_request->xferred = le64_to_cpu(op->extent.length);
07741308 1277 obj_request_done_set(obj_request);
bf0d5f50
AE
1278}
1279
1280static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1281 struct ceph_msg *msg)
1282{
1283 struct rbd_obj_request *obj_request = osd_req->r_priv;
1284 struct ceph_osd_reply_head *reply_head;
1285 struct ceph_osd_op *op;
1286 u32 num_ops;
1287 u16 opcode;
1288
1289 rbd_assert(osd_req == obj_request->osd_req);
1290 rbd_assert(!!obj_request->img_request ^
1291 (obj_request->which == BAD_WHICH));
1292
1293 obj_request->xferred = le32_to_cpu(msg->hdr.data_len);
1294 reply_head = msg->front.iov_base;
1295 obj_request->result = (s32) le32_to_cpu(reply_head->result);
1296 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1297
1298 num_ops = le32_to_cpu(reply_head->num_ops);
1299 WARN_ON(num_ops != 1); /* For now */
1300
1301 op = &reply_head->ops[0];
1302 opcode = le16_to_cpu(op->op);
1303 switch (opcode) {
1304 case CEPH_OSD_OP_READ:
1305 rbd_osd_read_callback(obj_request, op);
1306 break;
1307 case CEPH_OSD_OP_WRITE:
1308 rbd_osd_write_callback(obj_request, op);
1309 break;
36be9a76 1310 case CEPH_OSD_OP_CALL:
b8d70035 1311 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5
AE
1312 case CEPH_OSD_OP_WATCH:
1313 rbd_osd_trivial_callback(obj_request, op);
1314 break;
bf0d5f50
AE
1315 default:
1316 rbd_warn(NULL, "%s: unsupported op %hu\n",
1317 obj_request->object_name, (unsigned short) opcode);
1318 break;
1319 }
1320
07741308 1321 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1322 rbd_obj_request_complete(obj_request);
1323}
1324
1325static struct ceph_osd_request *rbd_osd_req_create(
1326 struct rbd_device *rbd_dev,
1327 bool write_request,
1328 struct rbd_obj_request *obj_request,
1329 struct ceph_osd_req_op *op)
1330{
1331 struct rbd_img_request *img_request = obj_request->img_request;
1332 struct ceph_snap_context *snapc = NULL;
1333 struct ceph_osd_client *osdc;
1334 struct ceph_osd_request *osd_req;
1335 struct timespec now;
1336 struct timespec *mtime;
1337 u64 snap_id = CEPH_NOSNAP;
1338 u64 offset = obj_request->offset;
1339 u64 length = obj_request->length;
1340
1341 if (img_request) {
1342 rbd_assert(img_request->write_request == write_request);
1343 if (img_request->write_request)
1344 snapc = img_request->snapc;
1345 else
1346 snap_id = img_request->snap_id;
1347 }
1348
1349 /* Allocate and initialize the request, for the single op */
1350
1351 osdc = &rbd_dev->rbd_client->client->osdc;
1352 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1353 if (!osd_req)
1354 return NULL; /* ENOMEM */
1355
1356 rbd_assert(obj_request_type_valid(obj_request->type));
1357 switch (obj_request->type) {
9969ebc5
AE
1358 case OBJ_REQUEST_NODATA:
1359 break; /* Nothing to do */
bf0d5f50
AE
1360 case OBJ_REQUEST_BIO:
1361 rbd_assert(obj_request->bio_list != NULL);
1362 osd_req->r_bio = obj_request->bio_list;
bf0d5f50 1363 break;
788e2df3
AE
1364 case OBJ_REQUEST_PAGES:
1365 osd_req->r_pages = obj_request->pages;
1366 osd_req->r_num_pages = obj_request->page_count;
1367 osd_req->r_page_alignment = offset & ~PAGE_MASK;
1368 break;
bf0d5f50
AE
1369 }
1370
1371 if (write_request) {
1372 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1373 now = CURRENT_TIME;
1374 mtime = &now;
1375 } else {
1376 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1377 mtime = NULL; /* not needed for reads */
1378 offset = 0; /* These are not used... */
1379 length = 0; /* ...for osd read requests */
1380 }
1381
1382 osd_req->r_callback = rbd_osd_req_callback;
1383 osd_req->r_priv = obj_request;
1384
1385 osd_req->r_oid_len = strlen(obj_request->object_name);
1386 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1387 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1388
1389 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1390
1391 /* osd_req will get its own reference to snapc (if non-null) */
1392
1393 ceph_osdc_build_request(osd_req, offset, length, 1, op,
1394 snapc, snap_id, mtime);
1395
1396 return osd_req;
1397}
1398
1399static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1400{
1401 ceph_osdc_put_request(osd_req);
1402}
1403
1404/* object_name is assumed to be a non-null pointer and NUL-terminated */
1405
1406static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1407 u64 offset, u64 length,
1408 enum obj_request_type type)
1409{
1410 struct rbd_obj_request *obj_request;
1411 size_t size;
1412 char *name;
1413
1414 rbd_assert(obj_request_type_valid(type));
1415
1416 size = strlen(object_name) + 1;
1417 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1418 if (!obj_request)
1419 return NULL;
1420
1421 name = (char *)(obj_request + 1);
1422 obj_request->object_name = memcpy(name, object_name, size);
1423 obj_request->offset = offset;
1424 obj_request->length = length;
1425 obj_request->which = BAD_WHICH;
1426 obj_request->type = type;
1427 INIT_LIST_HEAD(&obj_request->links);
07741308 1428 obj_request_done_init(obj_request);
788e2df3 1429 init_completion(&obj_request->completion);
bf0d5f50
AE
1430 kref_init(&obj_request->kref);
1431
1432 return obj_request;
1433}
1434
1435static void rbd_obj_request_destroy(struct kref *kref)
1436{
1437 struct rbd_obj_request *obj_request;
1438
1439 obj_request = container_of(kref, struct rbd_obj_request, kref);
1440
1441 rbd_assert(obj_request->img_request == NULL);
1442 rbd_assert(obj_request->which == BAD_WHICH);
1443
1444 if (obj_request->osd_req)
1445 rbd_osd_req_destroy(obj_request->osd_req);
1446
1447 rbd_assert(obj_request_type_valid(obj_request->type));
1448 switch (obj_request->type) {
9969ebc5
AE
1449 case OBJ_REQUEST_NODATA:
1450 break; /* Nothing to do */
bf0d5f50
AE
1451 case OBJ_REQUEST_BIO:
1452 if (obj_request->bio_list)
1453 bio_chain_put(obj_request->bio_list);
1454 break;
788e2df3
AE
1455 case OBJ_REQUEST_PAGES:
1456 if (obj_request->pages)
1457 ceph_release_page_vector(obj_request->pages,
1458 obj_request->page_count);
1459 break;
bf0d5f50
AE
1460 }
1461
1462 kfree(obj_request);
1463}
1464
1465/*
1466 * Caller is responsible for filling in the list of object requests
1467 * that comprises the image request, and the Linux request pointer
1468 * (if there is one).
1469 */
1470struct rbd_img_request *rbd_img_request_create(struct rbd_device *rbd_dev,
1471 u64 offset, u64 length,
1472 bool write_request)
1473{
1474 struct rbd_img_request *img_request;
1475 struct ceph_snap_context *snapc = NULL;
1476
1477 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1478 if (!img_request)
1479 return NULL;
1480
1481 if (write_request) {
1482 down_read(&rbd_dev->header_rwsem);
1483 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1484 up_read(&rbd_dev->header_rwsem);
1485 if (WARN_ON(!snapc)) {
1486 kfree(img_request);
1487 return NULL; /* Shouldn't happen */
1488 }
1489 }
1490
1491 img_request->rq = NULL;
1492 img_request->rbd_dev = rbd_dev;
1493 img_request->offset = offset;
1494 img_request->length = length;
1495 img_request->write_request = write_request;
1496 if (write_request)
1497 img_request->snapc = snapc;
1498 else
1499 img_request->snap_id = rbd_dev->spec->snap_id;
1500 spin_lock_init(&img_request->completion_lock);
1501 img_request->next_completion = 0;
1502 img_request->callback = NULL;
1503 img_request->obj_request_count = 0;
1504 INIT_LIST_HEAD(&img_request->obj_requests);
1505 kref_init(&img_request->kref);
1506
1507 rbd_img_request_get(img_request); /* Avoid a warning */
1508 rbd_img_request_put(img_request); /* TEMPORARY */
1509
1510 return img_request;
1511}
1512
1513static void rbd_img_request_destroy(struct kref *kref)
1514{
1515 struct rbd_img_request *img_request;
1516 struct rbd_obj_request *obj_request;
1517 struct rbd_obj_request *next_obj_request;
1518
1519 img_request = container_of(kref, struct rbd_img_request, kref);
1520
1521 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1522 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 1523 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50
AE
1524
1525 if (img_request->write_request)
1526 ceph_put_snap_context(img_request->snapc);
1527
1528 kfree(img_request);
1529}
1530
1531static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1532 struct bio *bio_list)
1533{
1534 struct rbd_device *rbd_dev = img_request->rbd_dev;
1535 struct rbd_obj_request *obj_request = NULL;
1536 struct rbd_obj_request *next_obj_request;
1537 unsigned int bio_offset;
1538 u64 image_offset;
1539 u64 resid;
1540 u16 opcode;
1541
1542 opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
1543 : CEPH_OSD_OP_READ;
1544 bio_offset = 0;
1545 image_offset = img_request->offset;
1546 rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
1547 resid = img_request->length;
1548 while (resid) {
1549 const char *object_name;
1550 unsigned int clone_size;
1551 struct ceph_osd_req_op *op;
1552 u64 offset;
1553 u64 length;
1554
1555 object_name = rbd_segment_name(rbd_dev, image_offset);
1556 if (!object_name)
1557 goto out_unwind;
1558 offset = rbd_segment_offset(rbd_dev, image_offset);
1559 length = rbd_segment_length(rbd_dev, image_offset, resid);
1560 obj_request = rbd_obj_request_create(object_name,
1561 offset, length,
1562 OBJ_REQUEST_BIO);
1563 kfree(object_name); /* object request has its own copy */
1564 if (!obj_request)
1565 goto out_unwind;
1566
1567 rbd_assert(length <= (u64) UINT_MAX);
1568 clone_size = (unsigned int) length;
1569 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1570 &bio_offset, clone_size,
1571 GFP_ATOMIC);
1572 if (!obj_request->bio_list)
1573 goto out_partial;
1574
1575 /*
1576 * Build up the op to use in building the osd
1577 * request. Note that the contents of the op are
1578 * copied by rbd_osd_req_create().
1579 */
1580 op = rbd_osd_req_op_create(opcode, offset, length);
1581 if (!op)
1582 goto out_partial;
1583 obj_request->osd_req = rbd_osd_req_create(rbd_dev,
1584 img_request->write_request,
1585 obj_request, op);
1586 rbd_osd_req_op_destroy(op);
1587 if (!obj_request->osd_req)
1588 goto out_partial;
1589 /* status and version are initially zero-filled */
1590
1591 rbd_img_obj_request_add(img_request, obj_request);
1592
1593 image_offset += length;
1594 resid -= length;
1595 }
1596
1597 return 0;
1598
1599out_partial:
1600 rbd_obj_request_put(obj_request);
1601out_unwind:
1602 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1603 rbd_obj_request_put(obj_request);
1604
1605 return -ENOMEM;
1606}
1607
1608static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1609{
1610 struct rbd_img_request *img_request;
1611 u32 which = obj_request->which;
1612 bool more = true;
1613
1614 img_request = obj_request->img_request;
1615 rbd_assert(img_request != NULL);
1616 rbd_assert(img_request->rq != NULL);
1617 rbd_assert(which != BAD_WHICH);
1618 rbd_assert(which < img_request->obj_request_count);
1619 rbd_assert(which >= img_request->next_completion);
1620
1621 spin_lock_irq(&img_request->completion_lock);
1622 if (which != img_request->next_completion)
1623 goto out;
1624
1625 for_each_obj_request_from(img_request, obj_request) {
1626 unsigned int xferred;
1627 int result;
1628
1629 rbd_assert(more);
1630 rbd_assert(which < img_request->obj_request_count);
1631
07741308 1632 if (!obj_request_done_test(obj_request))
bf0d5f50
AE
1633 break;
1634
1635 rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
1636 xferred = (unsigned int) obj_request->xferred;
1637 result = (int) obj_request->result;
1638 if (result)
1639 rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
1640 img_request->write_request ? "write" : "read",
1641 result, xferred);
1642
1643 more = blk_end_request(img_request->rq, result, xferred);
1644 which++;
1645 }
1646 rbd_assert(more ^ (which == img_request->obj_request_count));
1647 img_request->next_completion = which;
1648out:
1649 spin_unlock_irq(&img_request->completion_lock);
1650
1651 if (!more)
1652 rbd_img_request_complete(img_request);
1653}
1654
1655static int rbd_img_request_submit(struct rbd_img_request *img_request)
1656{
1657 struct rbd_device *rbd_dev = img_request->rbd_dev;
1658 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1659 struct rbd_obj_request *obj_request;
1660
1661 for_each_obj_request(img_request, obj_request) {
1662 int ret;
1663
1664 obj_request->callback = rbd_img_obj_callback;
1665 ret = rbd_obj_request_submit(osdc, obj_request);
1666 if (ret)
1667 return ret;
1668 /*
1669 * The image request has its own reference to each
1670 * of its object requests, so we can safely drop the
1671 * initial one here.
1672 */
1673 rbd_obj_request_put(obj_request);
1674 }
1675
1676 return 0;
1677}
1678
cf81b60e 1679static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
b8d70035
AE
1680 u64 ver, u64 notify_id)
1681{
1682 struct rbd_obj_request *obj_request;
1683 struct ceph_osd_req_op *op;
1684 struct ceph_osd_client *osdc;
1685 int ret;
1686
1687 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1688 OBJ_REQUEST_NODATA);
1689 if (!obj_request)
1690 return -ENOMEM;
1691
1692 ret = -ENOMEM;
1693 op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
1694 if (!op)
1695 goto out;
1696 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1697 obj_request, op);
1698 rbd_osd_req_op_destroy(op);
1699 if (!obj_request->osd_req)
1700 goto out;
1701
1702 osdc = &rbd_dev->rbd_client->client->osdc;
cf81b60e 1703 obj_request->callback = rbd_obj_request_put;
b8d70035 1704 ret = rbd_obj_request_submit(osdc, obj_request);
b8d70035 1705out:
cf81b60e
AE
1706 if (ret)
1707 rbd_obj_request_put(obj_request);
b8d70035
AE
1708
1709 return ret;
1710}
1711
1712static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1713{
1714 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1715 u64 hver;
1716 int rc;
1717
1718 if (!rbd_dev)
1719 return;
1720
1721 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1722 rbd_dev->header_name, (unsigned long long) notify_id,
1723 (unsigned int) opcode);
1724 rc = rbd_dev_refresh(rbd_dev, &hver);
1725 if (rc)
1726 rbd_warn(rbd_dev, "got notification but failed to "
1727 " update snaps: %d\n", rc);
1728
cf81b60e 1729 rbd_obj_notify_ack(rbd_dev, hver, notify_id);
b8d70035
AE
1730}
1731
9969ebc5
AE
1732/*
1733 * Request sync osd watch/unwatch. The value of "start" determines
1734 * whether a watch request is being initiated or torn down.
1735 */
1736static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1737{
1738 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1739 struct rbd_obj_request *obj_request;
1740 struct ceph_osd_req_op *op;
1741 int ret;
1742
1743 rbd_assert(start ^ !!rbd_dev->watch_event);
1744 rbd_assert(start ^ !!rbd_dev->watch_request);
1745
1746 if (start) {
3c663bbd 1747 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
1748 &rbd_dev->watch_event);
1749 if (ret < 0)
1750 return ret;
8eb87565 1751 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
1752 }
1753
1754 ret = -ENOMEM;
1755 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1756 OBJ_REQUEST_NODATA);
1757 if (!obj_request)
1758 goto out_cancel;
1759
1760 op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
1761 rbd_dev->watch_event->cookie,
1762 rbd_dev->header.obj_version, start);
1763 if (!op)
1764 goto out_cancel;
1765 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true,
1766 obj_request, op);
1767 rbd_osd_req_op_destroy(op);
1768 if (!obj_request->osd_req)
1769 goto out_cancel;
1770
8eb87565 1771 if (start)
975241af 1772 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 1773 else
6977c3f9 1774 ceph_osdc_unregister_linger_request(osdc,
975241af 1775 rbd_dev->watch_request->osd_req);
9969ebc5
AE
1776 ret = rbd_obj_request_submit(osdc, obj_request);
1777 if (ret)
1778 goto out_cancel;
1779 ret = rbd_obj_request_wait(obj_request);
1780 if (ret)
1781 goto out_cancel;
9969ebc5
AE
1782 ret = obj_request->result;
1783 if (ret)
1784 goto out_cancel;
1785
8eb87565
AE
1786 /*
1787 * A watch request is set to linger, so the underlying osd
1788 * request won't go away until we unregister it. We retain
1789 * a pointer to the object request during that time (in
1790 * rbd_dev->watch_request), so we'll keep a reference to
1791 * it. We'll drop that reference (below) after we've
1792 * unregistered it.
1793 */
1794 if (start) {
1795 rbd_dev->watch_request = obj_request;
1796
1797 return 0;
1798 }
1799
1800 /* We have successfully torn down the watch request */
1801
1802 rbd_obj_request_put(rbd_dev->watch_request);
1803 rbd_dev->watch_request = NULL;
9969ebc5
AE
1804out_cancel:
1805 /* Cancel the event if we're tearing down, or on error */
1806 ceph_osdc_cancel_event(rbd_dev->watch_event);
1807 rbd_dev->watch_event = NULL;
9969ebc5
AE
1808 if (obj_request)
1809 rbd_obj_request_put(obj_request);
1810
1811 return ret;
1812}
1813
36be9a76
AE
1814/*
1815 * Synchronous osd object method call
1816 */
1817static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1818 const char *object_name,
1819 const char *class_name,
1820 const char *method_name,
1821 const char *outbound,
1822 size_t outbound_size,
1823 char *inbound,
1824 size_t inbound_size,
1825 u64 *version)
1826{
1827 struct rbd_obj_request *obj_request;
1828 struct ceph_osd_client *osdc;
1829 struct ceph_osd_req_op *op;
1830 struct page **pages;
1831 u32 page_count;
1832 int ret;
1833
1834 /*
1835 * Method calls are ultimately read operations but they
1836 * don't involve object data (so no offset or length).
1837 * The result should placed into the inbound buffer
1838 * provided. They also supply outbound data--parameters for
1839 * the object method. Currently if this is present it will
1840 * be a snapshot id.
1841 */
1842 page_count = (u32) calc_pages_for(0, inbound_size);
1843 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1844 if (IS_ERR(pages))
1845 return PTR_ERR(pages);
1846
1847 ret = -ENOMEM;
1848 obj_request = rbd_obj_request_create(object_name, 0, 0,
1849 OBJ_REQUEST_PAGES);
1850 if (!obj_request)
1851 goto out;
1852
1853 obj_request->pages = pages;
1854 obj_request->page_count = page_count;
1855
1856 op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
1857 method_name, outbound, outbound_size);
1858 if (!op)
1859 goto out;
1860 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1861 obj_request, op);
1862 rbd_osd_req_op_destroy(op);
1863 if (!obj_request->osd_req)
1864 goto out;
1865
1866 osdc = &rbd_dev->rbd_client->client->osdc;
1867 ret = rbd_obj_request_submit(osdc, obj_request);
1868 if (ret)
1869 goto out;
1870 ret = rbd_obj_request_wait(obj_request);
1871 if (ret)
1872 goto out;
1873
1874 ret = obj_request->result;
1875 if (ret < 0)
1876 goto out;
1877 ret = ceph_copy_from_page_vector(pages, inbound, 0,
1878 obj_request->xferred);
1879 if (version)
1880 *version = obj_request->version;
1881out:
1882 if (obj_request)
1883 rbd_obj_request_put(obj_request);
1884 else
1885 ceph_release_page_vector(pages, page_count);
1886
1887 return ret;
1888}
1889
bf0d5f50
AE
1890static void rbd_request_fn(struct request_queue *q)
1891{
1892 struct rbd_device *rbd_dev = q->queuedata;
1893 bool read_only = rbd_dev->mapping.read_only;
1894 struct request *rq;
1895 int result;
1896
1897 while ((rq = blk_fetch_request(q))) {
1898 bool write_request = rq_data_dir(rq) == WRITE;
1899 struct rbd_img_request *img_request;
1900 u64 offset;
1901 u64 length;
1902
1903 /* Ignore any non-FS requests that filter through. */
1904
1905 if (rq->cmd_type != REQ_TYPE_FS) {
1906 __blk_end_request_all(rq, 0);
1907 continue;
1908 }
1909
1910 spin_unlock_irq(q->queue_lock);
1911
1912 /* Disallow writes to a read-only device */
1913
1914 if (write_request) {
1915 result = -EROFS;
1916 if (read_only)
1917 goto end_request;
1918 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
1919 }
1920
6d292906
AE
1921 /*
1922 * Quit early if the mapped snapshot no longer
1923 * exists. It's still possible the snapshot will
1924 * have disappeared by the time our request arrives
1925 * at the osd, but there's no sense in sending it if
1926 * we already know.
1927 */
1928 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
1929 dout("request for non-existent snapshot");
1930 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1931 result = -ENXIO;
1932 goto end_request;
1933 }
1934
1935 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
1936 length = (u64) blk_rq_bytes(rq);
1937
1938 result = -EINVAL;
1939 if (WARN_ON(offset && length > U64_MAX - offset + 1))
1940 goto end_request; /* Shouldn't happen */
1941
1942 result = -ENOMEM;
1943 img_request = rbd_img_request_create(rbd_dev, offset, length,
1944 write_request);
1945 if (!img_request)
1946 goto end_request;
1947
1948 img_request->rq = rq;
1949
1950 result = rbd_img_request_fill_bio(img_request, rq->bio);
1951 if (!result)
1952 result = rbd_img_request_submit(img_request);
1953 if (result)
1954 rbd_img_request_put(img_request);
1955end_request:
1956 spin_lock_irq(q->queue_lock);
1957 if (result < 0) {
1958 rbd_warn(rbd_dev, "obj_request %s result %d\n",
1959 write_request ? "write" : "read", result);
1960 __blk_end_request_all(rq, result);
1961 }
1962 }
1963}
1964
602adf40
YS
1965/*
1966 * a queue callback. Makes sure that we don't create a bio that spans across
1967 * multiple osd objects. One exception would be with a single page bios,
f7760dad 1968 * which we handle later at bio_chain_clone_range()
602adf40
YS
1969 */
1970static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1971 struct bio_vec *bvec)
1972{
1973 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
1974 sector_t sector_offset;
1975 sector_t sectors_per_obj;
1976 sector_t obj_sector_offset;
1977 int ret;
1978
1979 /*
1980 * Find how far into its rbd object the partition-relative
1981 * bio start sector is to offset relative to the enclosing
1982 * device.
1983 */
1984 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1985 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1986 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1987
1988 /*
1989 * Compute the number of bytes from that offset to the end
1990 * of the object. Account for what's already used by the bio.
1991 */
1992 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1993 if (ret > bmd->bi_size)
1994 ret -= bmd->bi_size;
1995 else
1996 ret = 0;
1997
1998 /*
1999 * Don't send back more than was asked for. And if the bio
2000 * was empty, let the whole thing through because: "Note
2001 * that a block device *must* allow a single page to be
2002 * added to an empty bio."
2003 */
2004 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2005 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2006 ret = (int) bvec->bv_len;
2007
2008 return ret;
602adf40
YS
2009}
2010
2011static void rbd_free_disk(struct rbd_device *rbd_dev)
2012{
2013 struct gendisk *disk = rbd_dev->disk;
2014
2015 if (!disk)
2016 return;
2017
602adf40
YS
2018 if (disk->flags & GENHD_FL_UP)
2019 del_gendisk(disk);
2020 if (disk->queue)
2021 blk_cleanup_queue(disk->queue);
2022 put_disk(disk);
2023}
2024
788e2df3
AE
2025static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2026 const char *object_name,
2027 u64 offset, u64 length,
2028 char *buf, u64 *version)
2029
2030{
2031 struct ceph_osd_req_op *op;
2032 struct rbd_obj_request *obj_request;
2033 struct ceph_osd_client *osdc;
2034 struct page **pages = NULL;
2035 u32 page_count;
2036 int ret;
2037
2038 page_count = (u32) calc_pages_for(offset, length);
2039 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2040 if (IS_ERR(pages))
2041 ret = PTR_ERR(pages);
2042
2043 ret = -ENOMEM;
2044 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 2045 OBJ_REQUEST_PAGES);
788e2df3
AE
2046 if (!obj_request)
2047 goto out;
2048
2049 obj_request->pages = pages;
2050 obj_request->page_count = page_count;
2051
2052 op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length);
2053 if (!op)
2054 goto out;
2055 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2056 obj_request, op);
2057 rbd_osd_req_op_destroy(op);
2058 if (!obj_request->osd_req)
2059 goto out;
2060
2061 osdc = &rbd_dev->rbd_client->client->osdc;
2062 ret = rbd_obj_request_submit(osdc, obj_request);
2063 if (ret)
2064 goto out;
2065 ret = rbd_obj_request_wait(obj_request);
2066 if (ret)
2067 goto out;
2068
2069 ret = obj_request->result;
2070 if (ret < 0)
2071 goto out;
2072 ret = ceph_copy_from_page_vector(pages, buf, 0, obj_request->xferred);
2073 if (version)
2074 *version = obj_request->version;
2075out:
2076 if (obj_request)
2077 rbd_obj_request_put(obj_request);
2078 else
2079 ceph_release_page_vector(pages, page_count);
2080
2081 return ret;
2082}
2083
602adf40 2084/*
4156d998
AE
2085 * Read the complete header for the given rbd device.
2086 *
2087 * Returns a pointer to a dynamically-allocated buffer containing
2088 * the complete and validated header. Caller can pass the address
2089 * of a variable that will be filled in with the version of the
2090 * header object at the time it was read.
2091 *
2092 * Returns a pointer-coded errno if a failure occurs.
602adf40 2093 */
4156d998
AE
2094static struct rbd_image_header_ondisk *
2095rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
602adf40 2096{
4156d998 2097 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 2098 u32 snap_count = 0;
4156d998
AE
2099 u64 names_size = 0;
2100 u32 want_count;
2101 int ret;
602adf40 2102
00f1f36f 2103 /*
4156d998
AE
2104 * The complete header will include an array of its 64-bit
2105 * snapshot ids, followed by the names of those snapshots as
2106 * a contiguous block of NUL-terminated strings. Note that
2107 * the number of snapshots could change by the time we read
2108 * it in, in which case we re-read it.
00f1f36f 2109 */
4156d998
AE
2110 do {
2111 size_t size;
2112
2113 kfree(ondisk);
2114
2115 size = sizeof (*ondisk);
2116 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2117 size += names_size;
2118 ondisk = kmalloc(size, GFP_KERNEL);
2119 if (!ondisk)
2120 return ERR_PTR(-ENOMEM);
2121
788e2df3 2122 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
4156d998
AE
2123 0, size,
2124 (char *) ondisk, version);
2125
2126 if (ret < 0)
2127 goto out_err;
2128 if (WARN_ON((size_t) ret < size)) {
2129 ret = -ENXIO;
06ecc6cb
AE
2130 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2131 size, ret);
4156d998
AE
2132 goto out_err;
2133 }
2134 if (!rbd_dev_ondisk_valid(ondisk)) {
2135 ret = -ENXIO;
06ecc6cb 2136 rbd_warn(rbd_dev, "invalid header");
4156d998 2137 goto out_err;
81e759fb 2138 }
602adf40 2139
4156d998
AE
2140 names_size = le64_to_cpu(ondisk->snap_names_len);
2141 want_count = snap_count;
2142 snap_count = le32_to_cpu(ondisk->snap_count);
2143 } while (snap_count != want_count);
00f1f36f 2144
4156d998 2145 return ondisk;
00f1f36f 2146
4156d998
AE
2147out_err:
2148 kfree(ondisk);
2149
2150 return ERR_PTR(ret);
2151}
2152
2153/*
2154 * reload the ondisk the header
2155 */
2156static int rbd_read_header(struct rbd_device *rbd_dev,
2157 struct rbd_image_header *header)
2158{
2159 struct rbd_image_header_ondisk *ondisk;
2160 u64 ver = 0;
2161 int ret;
602adf40 2162
4156d998
AE
2163 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2164 if (IS_ERR(ondisk))
2165 return PTR_ERR(ondisk);
2166 ret = rbd_header_from_disk(header, ondisk);
2167 if (ret >= 0)
2168 header->obj_version = ver;
2169 kfree(ondisk);
2170
2171 return ret;
602adf40
YS
2172}
2173
41f38c2b 2174static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
dfc5606d
YS
2175{
2176 struct rbd_snap *snap;
a0593290 2177 struct rbd_snap *next;
dfc5606d 2178
a0593290 2179 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
41f38c2b 2180 rbd_remove_snap_dev(snap);
dfc5606d
YS
2181}
2182
9478554a
AE
2183static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2184{
2185 sector_t size;
2186
0d7dbfce 2187 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
9478554a
AE
2188 return;
2189
2190 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2191 dout("setting size to %llu sectors", (unsigned long long) size);
2192 rbd_dev->mapping.size = (u64) size;
2193 set_capacity(rbd_dev->disk, size);
2194}
2195
602adf40
YS
2196/*
2197 * only read the first part of the ondisk header, without the snaps info
2198 */
117973fb 2199static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
602adf40
YS
2200{
2201 int ret;
2202 struct rbd_image_header h;
602adf40
YS
2203
2204 ret = rbd_read_header(rbd_dev, &h);
2205 if (ret < 0)
2206 return ret;
2207
a51aa0c0
JD
2208 down_write(&rbd_dev->header_rwsem);
2209
9478554a
AE
2210 /* Update image size, and check for resize of mapped image */
2211 rbd_dev->header.image_size = h.image_size;
2212 rbd_update_mapping_size(rbd_dev);
9db4b3e3 2213
849b4260 2214 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 2215 kfree(rbd_dev->header.snap_sizes);
849b4260 2216 kfree(rbd_dev->header.snap_names);
d1d25646
JD
2217 /* osd requests may still refer to snapc */
2218 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 2219
b813623a
AE
2220 if (hver)
2221 *hver = h.obj_version;
a71b891b 2222 rbd_dev->header.obj_version = h.obj_version;
93a24e08 2223 rbd_dev->header.image_size = h.image_size;
602adf40
YS
2224 rbd_dev->header.snapc = h.snapc;
2225 rbd_dev->header.snap_names = h.snap_names;
2226 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260
AE
2227 /* Free the extra copy of the object prefix */
2228 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2229 kfree(h.object_prefix);
2230
304f6808
AE
2231 ret = rbd_dev_snaps_update(rbd_dev);
2232 if (!ret)
2233 ret = rbd_dev_snaps_register(rbd_dev);
dfc5606d 2234
c666601a 2235 up_write(&rbd_dev->header_rwsem);
602adf40 2236
dfc5606d 2237 return ret;
602adf40
YS
2238}
2239
117973fb 2240static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1fe5e993
AE
2241{
2242 int ret;
2243
117973fb 2244 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1fe5e993 2245 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
117973fb
AE
2246 if (rbd_dev->image_format == 1)
2247 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2248 else
2249 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1fe5e993
AE
2250 mutex_unlock(&ctl_mutex);
2251
2252 return ret;
2253}
2254
602adf40
YS
2255static int rbd_init_disk(struct rbd_device *rbd_dev)
2256{
2257 struct gendisk *disk;
2258 struct request_queue *q;
593a9e7b 2259 u64 segment_size;
602adf40 2260
602adf40 2261 /* create gendisk info */
602adf40
YS
2262 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2263 if (!disk)
1fcdb8aa 2264 return -ENOMEM;
602adf40 2265
f0f8cef5 2266 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 2267 rbd_dev->dev_id);
602adf40
YS
2268 disk->major = rbd_dev->major;
2269 disk->first_minor = 0;
2270 disk->fops = &rbd_bd_ops;
2271 disk->private_data = rbd_dev;
2272
bf0d5f50 2273 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
2274 if (!q)
2275 goto out_disk;
029bcbd8 2276
593a9e7b
AE
2277 /* We use the default size, but let's be explicit about it. */
2278 blk_queue_physical_block_size(q, SECTOR_SIZE);
2279
029bcbd8 2280 /* set io sizes to object size */
593a9e7b
AE
2281 segment_size = rbd_obj_bytes(&rbd_dev->header);
2282 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2283 blk_queue_max_segment_size(q, segment_size);
2284 blk_queue_io_min(q, segment_size);
2285 blk_queue_io_opt(q, segment_size);
029bcbd8 2286
602adf40
YS
2287 blk_queue_merge_bvec(q, rbd_merge_bvec);
2288 disk->queue = q;
2289
2290 q->queuedata = rbd_dev;
2291
2292 rbd_dev->disk = disk;
602adf40 2293
12f02944
AE
2294 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2295
602adf40 2296 return 0;
602adf40
YS
2297out_disk:
2298 put_disk(disk);
1fcdb8aa
AE
2299
2300 return -ENOMEM;
602adf40
YS
2301}
2302
dfc5606d
YS
2303/*
2304 sysfs
2305*/
2306
593a9e7b
AE
2307static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2308{
2309 return container_of(dev, struct rbd_device, dev);
2310}
2311
dfc5606d
YS
2312static ssize_t rbd_size_show(struct device *dev,
2313 struct device_attribute *attr, char *buf)
2314{
593a9e7b 2315 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0
JD
2316 sector_t size;
2317
2318 down_read(&rbd_dev->header_rwsem);
2319 size = get_capacity(rbd_dev->disk);
2320 up_read(&rbd_dev->header_rwsem);
dfc5606d 2321
a51aa0c0 2322 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
dfc5606d
YS
2323}
2324
34b13184
AE
2325/*
2326 * Note this shows the features for whatever's mapped, which is not
2327 * necessarily the base image.
2328 */
2329static ssize_t rbd_features_show(struct device *dev,
2330 struct device_attribute *attr, char *buf)
2331{
2332 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2333
2334 return sprintf(buf, "0x%016llx\n",
2335 (unsigned long long) rbd_dev->mapping.features);
2336}
2337
dfc5606d
YS
2338static ssize_t rbd_major_show(struct device *dev,
2339 struct device_attribute *attr, char *buf)
2340{
593a9e7b 2341 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 2342
dfc5606d
YS
2343 return sprintf(buf, "%d\n", rbd_dev->major);
2344}
2345
2346static ssize_t rbd_client_id_show(struct device *dev,
2347 struct device_attribute *attr, char *buf)
602adf40 2348{
593a9e7b 2349 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2350
1dbb4399
AE
2351 return sprintf(buf, "client%lld\n",
2352 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
2353}
2354
dfc5606d
YS
2355static ssize_t rbd_pool_show(struct device *dev,
2356 struct device_attribute *attr, char *buf)
602adf40 2357{
593a9e7b 2358 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2359
0d7dbfce 2360 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
2361}
2362
9bb2f334
AE
2363static ssize_t rbd_pool_id_show(struct device *dev,
2364 struct device_attribute *attr, char *buf)
2365{
2366 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2367
0d7dbfce
AE
2368 return sprintf(buf, "%llu\n",
2369 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
2370}
2371
dfc5606d
YS
2372static ssize_t rbd_name_show(struct device *dev,
2373 struct device_attribute *attr, char *buf)
2374{
593a9e7b 2375 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2376
a92ffdf8
AE
2377 if (rbd_dev->spec->image_name)
2378 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2379
2380 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
2381}
2382
589d30e0
AE
2383static ssize_t rbd_image_id_show(struct device *dev,
2384 struct device_attribute *attr, char *buf)
2385{
2386 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2387
0d7dbfce 2388 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
2389}
2390
34b13184
AE
2391/*
2392 * Shows the name of the currently-mapped snapshot (or
2393 * RBD_SNAP_HEAD_NAME for the base image).
2394 */
dfc5606d
YS
2395static ssize_t rbd_snap_show(struct device *dev,
2396 struct device_attribute *attr,
2397 char *buf)
2398{
593a9e7b 2399 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2400
0d7dbfce 2401 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
2402}
2403
86b00e0d
AE
2404/*
2405 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2406 * for the parent image. If there is no parent, simply shows
2407 * "(no parent image)".
2408 */
2409static ssize_t rbd_parent_show(struct device *dev,
2410 struct device_attribute *attr,
2411 char *buf)
2412{
2413 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2414 struct rbd_spec *spec = rbd_dev->parent_spec;
2415 int count;
2416 char *bufp = buf;
2417
2418 if (!spec)
2419 return sprintf(buf, "(no parent image)\n");
2420
2421 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2422 (unsigned long long) spec->pool_id, spec->pool_name);
2423 if (count < 0)
2424 return count;
2425 bufp += count;
2426
2427 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2428 spec->image_name ? spec->image_name : "(unknown)");
2429 if (count < 0)
2430 return count;
2431 bufp += count;
2432
2433 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2434 (unsigned long long) spec->snap_id, spec->snap_name);
2435 if (count < 0)
2436 return count;
2437 bufp += count;
2438
2439 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2440 if (count < 0)
2441 return count;
2442 bufp += count;
2443
2444 return (ssize_t) (bufp - buf);
2445}
2446
dfc5606d
YS
2447static ssize_t rbd_image_refresh(struct device *dev,
2448 struct device_attribute *attr,
2449 const char *buf,
2450 size_t size)
2451{
593a9e7b 2452 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 2453 int ret;
602adf40 2454
117973fb 2455 ret = rbd_dev_refresh(rbd_dev, NULL);
b813623a
AE
2456
2457 return ret < 0 ? ret : size;
dfc5606d 2458}
602adf40 2459
dfc5606d 2460static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 2461static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
2462static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2463static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2464static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 2465static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 2466static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 2467static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
2468static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2469static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 2470static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
2471
2472static struct attribute *rbd_attrs[] = {
2473 &dev_attr_size.attr,
34b13184 2474 &dev_attr_features.attr,
dfc5606d
YS
2475 &dev_attr_major.attr,
2476 &dev_attr_client_id.attr,
2477 &dev_attr_pool.attr,
9bb2f334 2478 &dev_attr_pool_id.attr,
dfc5606d 2479 &dev_attr_name.attr,
589d30e0 2480 &dev_attr_image_id.attr,
dfc5606d 2481 &dev_attr_current_snap.attr,
86b00e0d 2482 &dev_attr_parent.attr,
dfc5606d 2483 &dev_attr_refresh.attr,
dfc5606d
YS
2484 NULL
2485};
2486
2487static struct attribute_group rbd_attr_group = {
2488 .attrs = rbd_attrs,
2489};
2490
2491static const struct attribute_group *rbd_attr_groups[] = {
2492 &rbd_attr_group,
2493 NULL
2494};
2495
2496static void rbd_sysfs_dev_release(struct device *dev)
2497{
2498}
2499
2500static struct device_type rbd_device_type = {
2501 .name = "rbd",
2502 .groups = rbd_attr_groups,
2503 .release = rbd_sysfs_dev_release,
2504};
2505
2506
2507/*
2508 sysfs - snapshots
2509*/
2510
2511static ssize_t rbd_snap_size_show(struct device *dev,
2512 struct device_attribute *attr,
2513 char *buf)
2514{
2515 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2516
3591538f 2517 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
dfc5606d
YS
2518}
2519
2520static ssize_t rbd_snap_id_show(struct device *dev,
2521 struct device_attribute *attr,
2522 char *buf)
2523{
2524 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2525
3591538f 2526 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
dfc5606d
YS
2527}
2528
34b13184
AE
2529static ssize_t rbd_snap_features_show(struct device *dev,
2530 struct device_attribute *attr,
2531 char *buf)
2532{
2533 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2534
2535 return sprintf(buf, "0x%016llx\n",
2536 (unsigned long long) snap->features);
2537}
2538
dfc5606d
YS
2539static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2540static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
34b13184 2541static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
dfc5606d
YS
2542
2543static struct attribute *rbd_snap_attrs[] = {
2544 &dev_attr_snap_size.attr,
2545 &dev_attr_snap_id.attr,
34b13184 2546 &dev_attr_snap_features.attr,
dfc5606d
YS
2547 NULL,
2548};
2549
2550static struct attribute_group rbd_snap_attr_group = {
2551 .attrs = rbd_snap_attrs,
2552};
2553
2554static void rbd_snap_dev_release(struct device *dev)
2555{
2556 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2557 kfree(snap->name);
2558 kfree(snap);
2559}
2560
2561static const struct attribute_group *rbd_snap_attr_groups[] = {
2562 &rbd_snap_attr_group,
2563 NULL
2564};
2565
2566static struct device_type rbd_snap_device_type = {
2567 .groups = rbd_snap_attr_groups,
2568 .release = rbd_snap_dev_release,
2569};
2570
8b8fb99c
AE
2571static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2572{
2573 kref_get(&spec->kref);
2574
2575 return spec;
2576}
2577
2578static void rbd_spec_free(struct kref *kref);
2579static void rbd_spec_put(struct rbd_spec *spec)
2580{
2581 if (spec)
2582 kref_put(&spec->kref, rbd_spec_free);
2583}
2584
2585static struct rbd_spec *rbd_spec_alloc(void)
2586{
2587 struct rbd_spec *spec;
2588
2589 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2590 if (!spec)
2591 return NULL;
2592 kref_init(&spec->kref);
2593
2594 rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */
2595
2596 return spec;
2597}
2598
2599static void rbd_spec_free(struct kref *kref)
2600{
2601 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2602
2603 kfree(spec->pool_name);
2604 kfree(spec->image_id);
2605 kfree(spec->image_name);
2606 kfree(spec->snap_name);
2607 kfree(spec);
2608}
2609
c53d5893
AE
2610struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2611 struct rbd_spec *spec)
2612{
2613 struct rbd_device *rbd_dev;
2614
2615 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2616 if (!rbd_dev)
2617 return NULL;
2618
2619 spin_lock_init(&rbd_dev->lock);
6d292906 2620 rbd_dev->flags = 0;
c53d5893
AE
2621 INIT_LIST_HEAD(&rbd_dev->node);
2622 INIT_LIST_HEAD(&rbd_dev->snaps);
2623 init_rwsem(&rbd_dev->header_rwsem);
2624
2625 rbd_dev->spec = spec;
2626 rbd_dev->rbd_client = rbdc;
2627
0903e875
AE
2628 /* Initialize the layout used for all rbd requests */
2629
2630 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2631 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2632 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2633 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2634
c53d5893
AE
2635 return rbd_dev;
2636}
2637
2638static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2639{
86b00e0d 2640 rbd_spec_put(rbd_dev->parent_spec);
c53d5893
AE
2641 kfree(rbd_dev->header_name);
2642 rbd_put_client(rbd_dev->rbd_client);
2643 rbd_spec_put(rbd_dev->spec);
2644 kfree(rbd_dev);
2645}
2646
304f6808
AE
2647static bool rbd_snap_registered(struct rbd_snap *snap)
2648{
2649 bool ret = snap->dev.type == &rbd_snap_device_type;
2650 bool reg = device_is_registered(&snap->dev);
2651
2652 rbd_assert(!ret ^ reg);
2653
2654 return ret;
2655}
2656
41f38c2b 2657static void rbd_remove_snap_dev(struct rbd_snap *snap)
dfc5606d
YS
2658{
2659 list_del(&snap->node);
304f6808
AE
2660 if (device_is_registered(&snap->dev))
2661 device_unregister(&snap->dev);
dfc5606d
YS
2662}
2663
14e7085d 2664static int rbd_register_snap_dev(struct rbd_snap *snap,
dfc5606d
YS
2665 struct device *parent)
2666{
2667 struct device *dev = &snap->dev;
2668 int ret;
2669
2670 dev->type = &rbd_snap_device_type;
2671 dev->parent = parent;
2672 dev->release = rbd_snap_dev_release;
d4b125e9 2673 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
304f6808
AE
2674 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2675
dfc5606d
YS
2676 ret = device_register(dev);
2677
2678 return ret;
2679}
2680
4e891e0a 2681static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
c8d18425 2682 const char *snap_name,
34b13184
AE
2683 u64 snap_id, u64 snap_size,
2684 u64 snap_features)
dfc5606d 2685{
4e891e0a 2686 struct rbd_snap *snap;
dfc5606d 2687 int ret;
4e891e0a
AE
2688
2689 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
dfc5606d 2690 if (!snap)
4e891e0a
AE
2691 return ERR_PTR(-ENOMEM);
2692
2693 ret = -ENOMEM;
c8d18425 2694 snap->name = kstrdup(snap_name, GFP_KERNEL);
4e891e0a
AE
2695 if (!snap->name)
2696 goto err;
2697
c8d18425
AE
2698 snap->id = snap_id;
2699 snap->size = snap_size;
34b13184 2700 snap->features = snap_features;
4e891e0a
AE
2701
2702 return snap;
2703
dfc5606d
YS
2704err:
2705 kfree(snap->name);
2706 kfree(snap);
4e891e0a
AE
2707
2708 return ERR_PTR(ret);
dfc5606d
YS
2709}
2710
cd892126
AE
2711static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2712 u64 *snap_size, u64 *snap_features)
2713{
2714 char *snap_name;
2715
2716 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2717
2718 *snap_size = rbd_dev->header.snap_sizes[which];
2719 *snap_features = 0; /* No features for v1 */
2720
2721 /* Skip over names until we find the one we are looking for */
2722
2723 snap_name = rbd_dev->header.snap_names;
2724 while (which--)
2725 snap_name += strlen(snap_name) + 1;
2726
2727 return snap_name;
2728}
2729
9d475de5
AE
2730/*
2731 * Get the size and object order for an image snapshot, or if
2732 * snap_id is CEPH_NOSNAP, gets this information for the base
2733 * image.
2734 */
2735static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2736 u8 *order, u64 *snap_size)
2737{
2738 __le64 snapid = cpu_to_le64(snap_id);
2739 int ret;
2740 struct {
2741 u8 order;
2742 __le64 size;
2743 } __attribute__ ((packed)) size_buf = { 0 };
2744
36be9a76 2745 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5
AE
2746 "rbd", "get_size",
2747 (char *) &snapid, sizeof (snapid),
07b2391f 2748 (char *) &size_buf, sizeof (size_buf), NULL);
36be9a76 2749 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
2750 if (ret < 0)
2751 return ret;
2752
2753 *order = size_buf.order;
2754 *snap_size = le64_to_cpu(size_buf.size);
2755
2756 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2757 (unsigned long long) snap_id, (unsigned int) *order,
2758 (unsigned long long) *snap_size);
2759
2760 return 0;
2761}
2762
2763static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2764{
2765 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2766 &rbd_dev->header.obj_order,
2767 &rbd_dev->header.image_size);
2768}
2769
1e130199
AE
2770static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2771{
2772 void *reply_buf;
2773 int ret;
2774 void *p;
2775
2776 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2777 if (!reply_buf)
2778 return -ENOMEM;
2779
36be9a76 2780 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
1e130199
AE
2781 "rbd", "get_object_prefix",
2782 NULL, 0,
07b2391f 2783 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
36be9a76 2784 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
2785 if (ret < 0)
2786 goto out;
36be9a76 2787 ret = 0; /* rbd_obj_method_sync() can return positive */
1e130199
AE
2788
2789 p = reply_buf;
2790 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2791 p + RBD_OBJ_PREFIX_LEN_MAX,
2792 NULL, GFP_NOIO);
2793
2794 if (IS_ERR(rbd_dev->header.object_prefix)) {
2795 ret = PTR_ERR(rbd_dev->header.object_prefix);
2796 rbd_dev->header.object_prefix = NULL;
2797 } else {
2798 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2799 }
2800
2801out:
2802 kfree(reply_buf);
2803
2804 return ret;
2805}
2806
b1b5402a
AE
2807static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2808 u64 *snap_features)
2809{
2810 __le64 snapid = cpu_to_le64(snap_id);
2811 struct {
2812 __le64 features;
2813 __le64 incompat;
2814 } features_buf = { 0 };
d889140c 2815 u64 incompat;
b1b5402a
AE
2816 int ret;
2817
36be9a76 2818 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a
AE
2819 "rbd", "get_features",
2820 (char *) &snapid, sizeof (snapid),
2821 (char *) &features_buf, sizeof (features_buf),
07b2391f 2822 NULL);
36be9a76 2823 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
2824 if (ret < 0)
2825 return ret;
d889140c
AE
2826
2827 incompat = le64_to_cpu(features_buf.incompat);
2828 if (incompat & ~RBD_FEATURES_ALL)
b8f5c6ed 2829 return -ENXIO;
d889140c 2830
b1b5402a
AE
2831 *snap_features = le64_to_cpu(features_buf.features);
2832
2833 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2834 (unsigned long long) snap_id,
2835 (unsigned long long) *snap_features,
2836 (unsigned long long) le64_to_cpu(features_buf.incompat));
2837
2838 return 0;
2839}
2840
2841static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2842{
2843 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2844 &rbd_dev->header.features);
2845}
2846
86b00e0d
AE
2847static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2848{
2849 struct rbd_spec *parent_spec;
2850 size_t size;
2851 void *reply_buf = NULL;
2852 __le64 snapid;
2853 void *p;
2854 void *end;
2855 char *image_id;
2856 u64 overlap;
86b00e0d
AE
2857 int ret;
2858
2859 parent_spec = rbd_spec_alloc();
2860 if (!parent_spec)
2861 return -ENOMEM;
2862
2863 size = sizeof (__le64) + /* pool_id */
2864 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
2865 sizeof (__le64) + /* snap_id */
2866 sizeof (__le64); /* overlap */
2867 reply_buf = kmalloc(size, GFP_KERNEL);
2868 if (!reply_buf) {
2869 ret = -ENOMEM;
2870 goto out_err;
2871 }
2872
2873 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 2874 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d
AE
2875 "rbd", "get_parent",
2876 (char *) &snapid, sizeof (snapid),
07b2391f 2877 (char *) reply_buf, size, NULL);
36be9a76 2878 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
2879 if (ret < 0)
2880 goto out_err;
2881
2882 ret = -ERANGE;
2883 p = reply_buf;
2884 end = (char *) reply_buf + size;
2885 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2886 if (parent_spec->pool_id == CEPH_NOPOOL)
2887 goto out; /* No parent? No problem. */
2888
0903e875
AE
2889 /* The ceph file layout needs to fit pool id in 32 bits */
2890
2891 ret = -EIO;
2892 if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2893 goto out;
2894
979ed480 2895 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
2896 if (IS_ERR(image_id)) {
2897 ret = PTR_ERR(image_id);
2898 goto out_err;
2899 }
2900 parent_spec->image_id = image_id;
2901 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2902 ceph_decode_64_safe(&p, end, overlap, out_err);
2903
2904 rbd_dev->parent_overlap = overlap;
2905 rbd_dev->parent_spec = parent_spec;
2906 parent_spec = NULL; /* rbd_dev now owns this */
2907out:
2908 ret = 0;
2909out_err:
2910 kfree(reply_buf);
2911 rbd_spec_put(parent_spec);
2912
2913 return ret;
2914}
2915
9e15b77d
AE
2916static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2917{
2918 size_t image_id_size;
2919 char *image_id;
2920 void *p;
2921 void *end;
2922 size_t size;
2923 void *reply_buf = NULL;
2924 size_t len = 0;
2925 char *image_name = NULL;
2926 int ret;
2927
2928 rbd_assert(!rbd_dev->spec->image_name);
2929
69e7a02f
AE
2930 len = strlen(rbd_dev->spec->image_id);
2931 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
2932 image_id = kmalloc(image_id_size, GFP_KERNEL);
2933 if (!image_id)
2934 return NULL;
2935
2936 p = image_id;
2937 end = (char *) image_id + image_id_size;
69e7a02f 2938 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
9e15b77d
AE
2939
2940 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2941 reply_buf = kmalloc(size, GFP_KERNEL);
2942 if (!reply_buf)
2943 goto out;
2944
36be9a76 2945 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
2946 "rbd", "dir_get_name",
2947 image_id, image_id_size,
07b2391f 2948 (char *) reply_buf, size, NULL);
9e15b77d
AE
2949 if (ret < 0)
2950 goto out;
2951 p = reply_buf;
2952 end = (char *) reply_buf + size;
2953 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2954 if (IS_ERR(image_name))
2955 image_name = NULL;
2956 else
2957 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2958out:
2959 kfree(reply_buf);
2960 kfree(image_id);
2961
2962 return image_name;
2963}
2964
2965/*
2966 * When a parent image gets probed, we only have the pool, image,
2967 * and snapshot ids but not the names of any of them. This call
2968 * is made later to fill in those names. It has to be done after
2969 * rbd_dev_snaps_update() has completed because some of the
2970 * information (in particular, snapshot name) is not available
2971 * until then.
2972 */
2973static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
2974{
2975 struct ceph_osd_client *osdc;
2976 const char *name;
2977 void *reply_buf = NULL;
2978 int ret;
2979
2980 if (rbd_dev->spec->pool_name)
2981 return 0; /* Already have the names */
2982
2983 /* Look up the pool name */
2984
2985 osdc = &rbd_dev->rbd_client->client->osdc;
2986 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
935dc89f
AE
2987 if (!name) {
2988 rbd_warn(rbd_dev, "there is no pool with id %llu",
2989 rbd_dev->spec->pool_id); /* Really a BUG() */
2990 return -EIO;
2991 }
9e15b77d
AE
2992
2993 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
2994 if (!rbd_dev->spec->pool_name)
2995 return -ENOMEM;
2996
2997 /* Fetch the image name; tolerate failure here */
2998
2999 name = rbd_dev_image_name(rbd_dev);
69e7a02f 3000 if (name)
9e15b77d 3001 rbd_dev->spec->image_name = (char *) name;
69e7a02f 3002 else
06ecc6cb 3003 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d
AE
3004
3005 /* Look up the snapshot name. */
3006
3007 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3008 if (!name) {
935dc89f
AE
3009 rbd_warn(rbd_dev, "no snapshot with id %llu",
3010 rbd_dev->spec->snap_id); /* Really a BUG() */
9e15b77d
AE
3011 ret = -EIO;
3012 goto out_err;
3013 }
3014 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3015 if(!rbd_dev->spec->snap_name)
3016 goto out_err;
3017
3018 return 0;
3019out_err:
3020 kfree(reply_buf);
3021 kfree(rbd_dev->spec->pool_name);
3022 rbd_dev->spec->pool_name = NULL;
3023
3024 return ret;
3025}
3026
6e14b1a6 3027static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
35d489f9
AE
3028{
3029 size_t size;
3030 int ret;
3031 void *reply_buf;
3032 void *p;
3033 void *end;
3034 u64 seq;
3035 u32 snap_count;
3036 struct ceph_snap_context *snapc;
3037 u32 i;
3038
3039 /*
3040 * We'll need room for the seq value (maximum snapshot id),
3041 * snapshot count, and array of that many snapshot ids.
3042 * For now we have a fixed upper limit on the number we're
3043 * prepared to receive.
3044 */
3045 size = sizeof (__le64) + sizeof (__le32) +
3046 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3047 reply_buf = kzalloc(size, GFP_KERNEL);
3048 if (!reply_buf)
3049 return -ENOMEM;
3050
36be9a76 3051 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
35d489f9
AE
3052 "rbd", "get_snapcontext",
3053 NULL, 0,
07b2391f 3054 reply_buf, size, ver);
36be9a76 3055 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
3056 if (ret < 0)
3057 goto out;
3058
3059 ret = -ERANGE;
3060 p = reply_buf;
3061 end = (char *) reply_buf + size;
3062 ceph_decode_64_safe(&p, end, seq, out);
3063 ceph_decode_32_safe(&p, end, snap_count, out);
3064
3065 /*
3066 * Make sure the reported number of snapshot ids wouldn't go
3067 * beyond the end of our buffer. But before checking that,
3068 * make sure the computed size of the snapshot context we
3069 * allocate is representable in a size_t.
3070 */
3071 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3072 / sizeof (u64)) {
3073 ret = -EINVAL;
3074 goto out;
3075 }
3076 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3077 goto out;
3078
3079 size = sizeof (struct ceph_snap_context) +
3080 snap_count * sizeof (snapc->snaps[0]);
3081 snapc = kmalloc(size, GFP_KERNEL);
3082 if (!snapc) {
3083 ret = -ENOMEM;
3084 goto out;
3085 }
3086
3087 atomic_set(&snapc->nref, 1);
3088 snapc->seq = seq;
3089 snapc->num_snaps = snap_count;
3090 for (i = 0; i < snap_count; i++)
3091 snapc->snaps[i] = ceph_decode_64(&p);
3092
3093 rbd_dev->header.snapc = snapc;
3094
3095 dout(" snap context seq = %llu, snap_count = %u\n",
3096 (unsigned long long) seq, (unsigned int) snap_count);
3097
3098out:
3099 kfree(reply_buf);
3100
3101 return 0;
3102}
3103
b8b1e2db
AE
3104static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3105{
3106 size_t size;
3107 void *reply_buf;
3108 __le64 snap_id;
3109 int ret;
3110 void *p;
3111 void *end;
b8b1e2db
AE
3112 char *snap_name;
3113
3114 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3115 reply_buf = kmalloc(size, GFP_KERNEL);
3116 if (!reply_buf)
3117 return ERR_PTR(-ENOMEM);
3118
3119 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
36be9a76 3120 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db
AE
3121 "rbd", "get_snapshot_name",
3122 (char *) &snap_id, sizeof (snap_id),
07b2391f 3123 reply_buf, size, NULL);
36be9a76 3124 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b8b1e2db
AE
3125 if (ret < 0)
3126 goto out;
3127
3128 p = reply_buf;
3129 end = (char *) reply_buf + size;
e5c35534 3130 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
b8b1e2db
AE
3131 if (IS_ERR(snap_name)) {
3132 ret = PTR_ERR(snap_name);
3133 goto out;
3134 } else {
3135 dout(" snap_id 0x%016llx snap_name = %s\n",
3136 (unsigned long long) le64_to_cpu(snap_id), snap_name);
3137 }
3138 kfree(reply_buf);
3139
3140 return snap_name;
3141out:
3142 kfree(reply_buf);
3143
3144 return ERR_PTR(ret);
3145}
3146
3147static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3148 u64 *snap_size, u64 *snap_features)
3149{
e0b49868 3150 u64 snap_id;
b8b1e2db
AE
3151 u8 order;
3152 int ret;
3153
3154 snap_id = rbd_dev->header.snapc->snaps[which];
3155 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3156 if (ret)
3157 return ERR_PTR(ret);
3158 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3159 if (ret)
3160 return ERR_PTR(ret);
3161
3162 return rbd_dev_v2_snap_name(rbd_dev, which);
3163}
3164
3165static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3166 u64 *snap_size, u64 *snap_features)
3167{
3168 if (rbd_dev->image_format == 1)
3169 return rbd_dev_v1_snap_info(rbd_dev, which,
3170 snap_size, snap_features);
3171 if (rbd_dev->image_format == 2)
3172 return rbd_dev_v2_snap_info(rbd_dev, which,
3173 snap_size, snap_features);
3174 return ERR_PTR(-EINVAL);
3175}
3176
117973fb
AE
3177static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3178{
3179 int ret;
3180 __u8 obj_order;
3181
3182 down_write(&rbd_dev->header_rwsem);
3183
3184 /* Grab old order first, to see if it changes */
3185
3186 obj_order = rbd_dev->header.obj_order,
3187 ret = rbd_dev_v2_image_size(rbd_dev);
3188 if (ret)
3189 goto out;
3190 if (rbd_dev->header.obj_order != obj_order) {
3191 ret = -EIO;
3192 goto out;
3193 }
3194 rbd_update_mapping_size(rbd_dev);
3195
3196 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3197 dout("rbd_dev_v2_snap_context returned %d\n", ret);
3198 if (ret)
3199 goto out;
3200 ret = rbd_dev_snaps_update(rbd_dev);
3201 dout("rbd_dev_snaps_update returned %d\n", ret);
3202 if (ret)
3203 goto out;
3204 ret = rbd_dev_snaps_register(rbd_dev);
3205 dout("rbd_dev_snaps_register returned %d\n", ret);
3206out:
3207 up_write(&rbd_dev->header_rwsem);
3208
3209 return ret;
3210}
3211
dfc5606d 3212/*
35938150
AE
3213 * Scan the rbd device's current snapshot list and compare it to the
3214 * newly-received snapshot context. Remove any existing snapshots
3215 * not present in the new snapshot context. Add a new snapshot for
3216 * any snaphots in the snapshot context not in the current list.
3217 * And verify there are no changes to snapshots we already know
3218 * about.
3219 *
3220 * Assumes the snapshots in the snapshot context are sorted by
3221 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
3222 * are also maintained in that order.)
dfc5606d 3223 */
304f6808 3224static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
dfc5606d 3225{
35938150
AE
3226 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3227 const u32 snap_count = snapc->num_snaps;
35938150
AE
3228 struct list_head *head = &rbd_dev->snaps;
3229 struct list_head *links = head->next;
3230 u32 index = 0;
dfc5606d 3231
9fcbb800 3232 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
35938150
AE
3233 while (index < snap_count || links != head) {
3234 u64 snap_id;
3235 struct rbd_snap *snap;
cd892126
AE
3236 char *snap_name;
3237 u64 snap_size = 0;
3238 u64 snap_features = 0;
dfc5606d 3239
35938150
AE
3240 snap_id = index < snap_count ? snapc->snaps[index]
3241 : CEPH_NOSNAP;
3242 snap = links != head ? list_entry(links, struct rbd_snap, node)
3243 : NULL;
aafb230e 3244 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
dfc5606d 3245
35938150
AE
3246 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3247 struct list_head *next = links->next;
dfc5606d 3248
6d292906
AE
3249 /*
3250 * A previously-existing snapshot is not in
3251 * the new snap context.
3252 *
3253 * If the now missing snapshot is the one the
3254 * image is mapped to, clear its exists flag
3255 * so we can avoid sending any more requests
3256 * to it.
3257 */
0d7dbfce 3258 if (rbd_dev->spec->snap_id == snap->id)
6d292906 3259 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
41f38c2b 3260 rbd_remove_snap_dev(snap);
9fcbb800 3261 dout("%ssnap id %llu has been removed\n",
0d7dbfce
AE
3262 rbd_dev->spec->snap_id == snap->id ?
3263 "mapped " : "",
9fcbb800 3264 (unsigned long long) snap->id);
35938150
AE
3265
3266 /* Done with this list entry; advance */
3267
3268 links = next;
dfc5606d
YS
3269 continue;
3270 }
35938150 3271
b8b1e2db
AE
3272 snap_name = rbd_dev_snap_info(rbd_dev, index,
3273 &snap_size, &snap_features);
cd892126
AE
3274 if (IS_ERR(snap_name))
3275 return PTR_ERR(snap_name);
3276
9fcbb800
AE
3277 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3278 (unsigned long long) snap_id);
35938150
AE
3279 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3280 struct rbd_snap *new_snap;
3281
3282 /* We haven't seen this snapshot before */
3283
c8d18425 3284 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
cd892126 3285 snap_id, snap_size, snap_features);
9fcbb800
AE
3286 if (IS_ERR(new_snap)) {
3287 int err = PTR_ERR(new_snap);
3288
3289 dout(" failed to add dev, error %d\n", err);
3290
3291 return err;
3292 }
35938150
AE
3293
3294 /* New goes before existing, or at end of list */
3295
9fcbb800 3296 dout(" added dev%s\n", snap ? "" : " at end\n");
35938150
AE
3297 if (snap)
3298 list_add_tail(&new_snap->node, &snap->node);
3299 else
523f3258 3300 list_add_tail(&new_snap->node, head);
35938150
AE
3301 } else {
3302 /* Already have this one */
3303
9fcbb800
AE
3304 dout(" already present\n");
3305
cd892126 3306 rbd_assert(snap->size == snap_size);
aafb230e 3307 rbd_assert(!strcmp(snap->name, snap_name));
cd892126 3308 rbd_assert(snap->features == snap_features);
35938150
AE
3309
3310 /* Done with this list entry; advance */
3311
3312 links = links->next;
dfc5606d 3313 }
35938150
AE
3314
3315 /* Advance to the next entry in the snapshot context */
3316
3317 index++;
dfc5606d 3318 }
9fcbb800 3319 dout("%s: done\n", __func__);
dfc5606d
YS
3320
3321 return 0;
3322}
3323
304f6808
AE
3324/*
3325 * Scan the list of snapshots and register the devices for any that
3326 * have not already been registered.
3327 */
3328static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3329{
3330 struct rbd_snap *snap;
3331 int ret = 0;
3332
3333 dout("%s called\n", __func__);
86ff77bb
AE
3334 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3335 return -EIO;
304f6808
AE
3336
3337 list_for_each_entry(snap, &rbd_dev->snaps, node) {
3338 if (!rbd_snap_registered(snap)) {
3339 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3340 if (ret < 0)
3341 break;
3342 }
3343 }
3344 dout("%s: returning %d\n", __func__, ret);
3345
3346 return ret;
3347}
3348
dfc5606d
YS
3349static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3350{
dfc5606d 3351 struct device *dev;
cd789ab9 3352 int ret;
dfc5606d
YS
3353
3354 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
dfc5606d 3355
cd789ab9 3356 dev = &rbd_dev->dev;
dfc5606d
YS
3357 dev->bus = &rbd_bus_type;
3358 dev->type = &rbd_device_type;
3359 dev->parent = &rbd_root_dev;
3360 dev->release = rbd_dev_release;
de71a297 3361 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 3362 ret = device_register(dev);
dfc5606d 3363
dfc5606d 3364 mutex_unlock(&ctl_mutex);
cd789ab9 3365
dfc5606d 3366 return ret;
602adf40
YS
3367}
3368
dfc5606d
YS
3369static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3370{
3371 device_unregister(&rbd_dev->dev);
3372}
3373
e2839308 3374static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
3375
3376/*
499afd5b
AE
3377 * Get a unique rbd identifier for the given new rbd_dev, and add
3378 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 3379 */
e2839308 3380static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 3381{
e2839308 3382 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
3383
3384 spin_lock(&rbd_dev_list_lock);
3385 list_add_tail(&rbd_dev->node, &rbd_dev_list);
3386 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
3387 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3388 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 3389}
b7f23c36 3390
1ddbe94e 3391/*
499afd5b
AE
3392 * Remove an rbd_dev from the global list, and record that its
3393 * identifier is no longer in use.
1ddbe94e 3394 */
e2839308 3395static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 3396{
d184f6bf 3397 struct list_head *tmp;
de71a297 3398 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
3399 int max_id;
3400
aafb230e 3401 rbd_assert(rbd_id > 0);
499afd5b 3402
e2839308
AE
3403 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3404 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
3405 spin_lock(&rbd_dev_list_lock);
3406 list_del_init(&rbd_dev->node);
d184f6bf
AE
3407
3408 /*
3409 * If the id being "put" is not the current maximum, there
3410 * is nothing special we need to do.
3411 */
e2839308 3412 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
3413 spin_unlock(&rbd_dev_list_lock);
3414 return;
3415 }
3416
3417 /*
3418 * We need to update the current maximum id. Search the
3419 * list to find out what it is. We're more likely to find
3420 * the maximum at the end, so search the list backward.
3421 */
3422 max_id = 0;
3423 list_for_each_prev(tmp, &rbd_dev_list) {
3424 struct rbd_device *rbd_dev;
3425
3426 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
3427 if (rbd_dev->dev_id > max_id)
3428 max_id = rbd_dev->dev_id;
d184f6bf 3429 }
499afd5b 3430 spin_unlock(&rbd_dev_list_lock);
b7f23c36 3431
1ddbe94e 3432 /*
e2839308 3433 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
3434 * which case it now accurately reflects the new maximum.
3435 * Be careful not to overwrite the maximum value in that
3436 * case.
1ddbe94e 3437 */
e2839308
AE
3438 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3439 dout(" max dev id has been reset\n");
b7f23c36
AE
3440}
3441
e28fff26
AE
3442/*
3443 * Skips over white space at *buf, and updates *buf to point to the
3444 * first found non-space character (if any). Returns the length of
593a9e7b
AE
3445 * the token (string of non-white space characters) found. Note
3446 * that *buf must be terminated with '\0'.
e28fff26
AE
3447 */
3448static inline size_t next_token(const char **buf)
3449{
3450 /*
3451 * These are the characters that produce nonzero for
3452 * isspace() in the "C" and "POSIX" locales.
3453 */
3454 const char *spaces = " \f\n\r\t\v";
3455
3456 *buf += strspn(*buf, spaces); /* Find start of token */
3457
3458 return strcspn(*buf, spaces); /* Return token length */
3459}
3460
3461/*
3462 * Finds the next token in *buf, and if the provided token buffer is
3463 * big enough, copies the found token into it. The result, if
593a9e7b
AE
3464 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3465 * must be terminated with '\0' on entry.
e28fff26
AE
3466 *
3467 * Returns the length of the token found (not including the '\0').
3468 * Return value will be 0 if no token is found, and it will be >=
3469 * token_size if the token would not fit.
3470 *
593a9e7b 3471 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
3472 * found token. Note that this occurs even if the token buffer is
3473 * too small to hold it.
3474 */
3475static inline size_t copy_token(const char **buf,
3476 char *token,
3477 size_t token_size)
3478{
3479 size_t len;
3480
3481 len = next_token(buf);
3482 if (len < token_size) {
3483 memcpy(token, *buf, len);
3484 *(token + len) = '\0';
3485 }
3486 *buf += len;
3487
3488 return len;
3489}
3490
ea3352f4
AE
3491/*
3492 * Finds the next token in *buf, dynamically allocates a buffer big
3493 * enough to hold a copy of it, and copies the token into the new
3494 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3495 * that a duplicate buffer is created even for a zero-length token.
3496 *
3497 * Returns a pointer to the newly-allocated duplicate, or a null
3498 * pointer if memory for the duplicate was not available. If
3499 * the lenp argument is a non-null pointer, the length of the token
3500 * (not including the '\0') is returned in *lenp.
3501 *
3502 * If successful, the *buf pointer will be updated to point beyond
3503 * the end of the found token.
3504 *
3505 * Note: uses GFP_KERNEL for allocation.
3506 */
3507static inline char *dup_token(const char **buf, size_t *lenp)
3508{
3509 char *dup;
3510 size_t len;
3511
3512 len = next_token(buf);
4caf35f9 3513 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
3514 if (!dup)
3515 return NULL;
ea3352f4
AE
3516 *(dup + len) = '\0';
3517 *buf += len;
3518
3519 if (lenp)
3520 *lenp = len;
3521
3522 return dup;
3523}
3524
a725f65e 3525/*
859c31df
AE
3526 * Parse the options provided for an "rbd add" (i.e., rbd image
3527 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
3528 * and the data written is passed here via a NUL-terminated buffer.
3529 * Returns 0 if successful or an error code otherwise.
d22f76e7 3530 *
859c31df
AE
3531 * The information extracted from these options is recorded in
3532 * the other parameters which return dynamically-allocated
3533 * structures:
3534 * ceph_opts
3535 * The address of a pointer that will refer to a ceph options
3536 * structure. Caller must release the returned pointer using
3537 * ceph_destroy_options() when it is no longer needed.
3538 * rbd_opts
3539 * Address of an rbd options pointer. Fully initialized by
3540 * this function; caller must release with kfree().
3541 * spec
3542 * Address of an rbd image specification pointer. Fully
3543 * initialized by this function based on parsed options.
3544 * Caller must release with rbd_spec_put().
3545 *
3546 * The options passed take this form:
3547 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3548 * where:
3549 * <mon_addrs>
3550 * A comma-separated list of one or more monitor addresses.
3551 * A monitor address is an ip address, optionally followed
3552 * by a port number (separated by a colon).
3553 * I.e.: ip1[:port1][,ip2[:port2]...]
3554 * <options>
3555 * A comma-separated list of ceph and/or rbd options.
3556 * <pool_name>
3557 * The name of the rados pool containing the rbd image.
3558 * <image_name>
3559 * The name of the image in that pool to map.
3560 * <snap_id>
3561 * An optional snapshot id. If provided, the mapping will
3562 * present data from the image at the time that snapshot was
3563 * created. The image head is used if no snapshot id is
3564 * provided. Snapshot mappings are always read-only.
a725f65e 3565 */
859c31df 3566static int rbd_add_parse_args(const char *buf,
dc79b113 3567 struct ceph_options **ceph_opts,
859c31df
AE
3568 struct rbd_options **opts,
3569 struct rbd_spec **rbd_spec)
e28fff26 3570{
d22f76e7 3571 size_t len;
859c31df 3572 char *options;
0ddebc0c
AE
3573 const char *mon_addrs;
3574 size_t mon_addrs_size;
859c31df 3575 struct rbd_spec *spec = NULL;
4e9afeba 3576 struct rbd_options *rbd_opts = NULL;
859c31df 3577 struct ceph_options *copts;
dc79b113 3578 int ret;
e28fff26
AE
3579
3580 /* The first four tokens are required */
3581
7ef3214a 3582 len = next_token(&buf);
4fb5d671
AE
3583 if (!len) {
3584 rbd_warn(NULL, "no monitor address(es) provided");
3585 return -EINVAL;
3586 }
0ddebc0c 3587 mon_addrs = buf;
f28e565a 3588 mon_addrs_size = len + 1;
7ef3214a 3589 buf += len;
a725f65e 3590
dc79b113 3591 ret = -EINVAL;
f28e565a
AE
3592 options = dup_token(&buf, NULL);
3593 if (!options)
dc79b113 3594 return -ENOMEM;
4fb5d671
AE
3595 if (!*options) {
3596 rbd_warn(NULL, "no options provided");
3597 goto out_err;
3598 }
e28fff26 3599
859c31df
AE
3600 spec = rbd_spec_alloc();
3601 if (!spec)
f28e565a 3602 goto out_mem;
859c31df
AE
3603
3604 spec->pool_name = dup_token(&buf, NULL);
3605 if (!spec->pool_name)
3606 goto out_mem;
4fb5d671
AE
3607 if (!*spec->pool_name) {
3608 rbd_warn(NULL, "no pool name provided");
3609 goto out_err;
3610 }
e28fff26 3611
69e7a02f 3612 spec->image_name = dup_token(&buf, NULL);
859c31df 3613 if (!spec->image_name)
f28e565a 3614 goto out_mem;
4fb5d671
AE
3615 if (!*spec->image_name) {
3616 rbd_warn(NULL, "no image name provided");
3617 goto out_err;
3618 }
d4b125e9 3619
f28e565a
AE
3620 /*
3621 * Snapshot name is optional; default is to use "-"
3622 * (indicating the head/no snapshot).
3623 */
3feeb894 3624 len = next_token(&buf);
820a5f3e 3625 if (!len) {
3feeb894
AE
3626 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3627 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 3628 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 3629 ret = -ENAMETOOLONG;
f28e565a 3630 goto out_err;
849b4260 3631 }
4caf35f9 3632 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
859c31df 3633 if (!spec->snap_name)
f28e565a 3634 goto out_mem;
859c31df 3635 *(spec->snap_name + len) = '\0';
e5c35534 3636
0ddebc0c 3637 /* Initialize all rbd options to the defaults */
e28fff26 3638
4e9afeba
AE
3639 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3640 if (!rbd_opts)
3641 goto out_mem;
3642
3643 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 3644
859c31df 3645 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 3646 mon_addrs + mon_addrs_size - 1,
4e9afeba 3647 parse_rbd_opts_token, rbd_opts);
859c31df
AE
3648 if (IS_ERR(copts)) {
3649 ret = PTR_ERR(copts);
dc79b113
AE
3650 goto out_err;
3651 }
859c31df
AE
3652 kfree(options);
3653
3654 *ceph_opts = copts;
4e9afeba 3655 *opts = rbd_opts;
859c31df 3656 *rbd_spec = spec;
0ddebc0c 3657
dc79b113 3658 return 0;
f28e565a 3659out_mem:
dc79b113 3660 ret = -ENOMEM;
d22f76e7 3661out_err:
859c31df
AE
3662 kfree(rbd_opts);
3663 rbd_spec_put(spec);
f28e565a 3664 kfree(options);
d22f76e7 3665
dc79b113 3666 return ret;
a725f65e
AE
3667}
3668
589d30e0
AE
3669/*
3670 * An rbd format 2 image has a unique identifier, distinct from the
3671 * name given to it by the user. Internally, that identifier is
3672 * what's used to specify the names of objects related to the image.
3673 *
3674 * A special "rbd id" object is used to map an rbd image name to its
3675 * id. If that object doesn't exist, then there is no v2 rbd image
3676 * with the supplied name.
3677 *
3678 * This function will record the given rbd_dev's image_id field if
3679 * it can be determined, and in that case will return 0. If any
3680 * errors occur a negative errno will be returned and the rbd_dev's
3681 * image_id field will be unchanged (and should be NULL).
3682 */
3683static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3684{
3685 int ret;
3686 size_t size;
3687 char *object_name;
3688 void *response;
3689 void *p;
3690
2c0d0a10
AE
3691 /*
3692 * When probing a parent image, the image id is already
3693 * known (and the image name likely is not). There's no
3694 * need to fetch the image id again in this case.
3695 */
3696 if (rbd_dev->spec->image_id)
3697 return 0;
3698
589d30e0
AE
3699 /*
3700 * First, see if the format 2 image id file exists, and if
3701 * so, get the image's persistent id from it.
3702 */
69e7a02f 3703 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
3704 object_name = kmalloc(size, GFP_NOIO);
3705 if (!object_name)
3706 return -ENOMEM;
0d7dbfce 3707 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
3708 dout("rbd id object name is %s\n", object_name);
3709
3710 /* Response will be an encoded string, which includes a length */
3711
3712 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3713 response = kzalloc(size, GFP_NOIO);
3714 if (!response) {
3715 ret = -ENOMEM;
3716 goto out;
3717 }
3718
36be9a76 3719 ret = rbd_obj_method_sync(rbd_dev, object_name,
589d30e0
AE
3720 "rbd", "get_id",
3721 NULL, 0,
07b2391f 3722 response, RBD_IMAGE_ID_LEN_MAX, NULL);
36be9a76 3723 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
589d30e0
AE
3724 if (ret < 0)
3725 goto out;
36be9a76 3726 ret = 0; /* rbd_obj_method_sync() can return positive */
589d30e0
AE
3727
3728 p = response;
0d7dbfce 3729 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
589d30e0 3730 p + RBD_IMAGE_ID_LEN_MAX,
979ed480 3731 NULL, GFP_NOIO);
0d7dbfce
AE
3732 if (IS_ERR(rbd_dev->spec->image_id)) {
3733 ret = PTR_ERR(rbd_dev->spec->image_id);
3734 rbd_dev->spec->image_id = NULL;
589d30e0 3735 } else {
0d7dbfce 3736 dout("image_id is %s\n", rbd_dev->spec->image_id);
589d30e0
AE
3737 }
3738out:
3739 kfree(response);
3740 kfree(object_name);
3741
3742 return ret;
3743}
3744
a30b71b9
AE
3745static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3746{
3747 int ret;
3748 size_t size;
3749
3750 /* Version 1 images have no id; empty string is used */
3751
0d7dbfce
AE
3752 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3753 if (!rbd_dev->spec->image_id)
a30b71b9 3754 return -ENOMEM;
a30b71b9
AE
3755
3756 /* Record the header object name for this rbd image. */
3757
69e7a02f 3758 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
a30b71b9
AE
3759 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3760 if (!rbd_dev->header_name) {
3761 ret = -ENOMEM;
3762 goto out_err;
3763 }
0d7dbfce
AE
3764 sprintf(rbd_dev->header_name, "%s%s",
3765 rbd_dev->spec->image_name, RBD_SUFFIX);
a30b71b9
AE
3766
3767 /* Populate rbd image metadata */
3768
3769 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3770 if (ret < 0)
3771 goto out_err;
86b00e0d
AE
3772
3773 /* Version 1 images have no parent (no layering) */
3774
3775 rbd_dev->parent_spec = NULL;
3776 rbd_dev->parent_overlap = 0;
3777
a30b71b9
AE
3778 rbd_dev->image_format = 1;
3779
3780 dout("discovered version 1 image, header name is %s\n",
3781 rbd_dev->header_name);
3782
3783 return 0;
3784
3785out_err:
3786 kfree(rbd_dev->header_name);
3787 rbd_dev->header_name = NULL;
0d7dbfce
AE
3788 kfree(rbd_dev->spec->image_id);
3789 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
3790
3791 return ret;
3792}
3793
3794static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3795{
3796 size_t size;
9d475de5 3797 int ret;
6e14b1a6 3798 u64 ver = 0;
a30b71b9
AE
3799
3800 /*
3801 * Image id was filled in by the caller. Record the header
3802 * object name for this rbd image.
3803 */
979ed480 3804 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
a30b71b9
AE
3805 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3806 if (!rbd_dev->header_name)
3807 return -ENOMEM;
3808 sprintf(rbd_dev->header_name, "%s%s",
0d7dbfce 3809 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
9d475de5
AE
3810
3811 /* Get the size and object order for the image */
3812
3813 ret = rbd_dev_v2_image_size(rbd_dev);
1e130199
AE
3814 if (ret < 0)
3815 goto out_err;
3816
3817 /* Get the object prefix (a.k.a. block_name) for the image */
3818
3819 ret = rbd_dev_v2_object_prefix(rbd_dev);
b1b5402a
AE
3820 if (ret < 0)
3821 goto out_err;
3822
d889140c 3823 /* Get the and check features for the image */
b1b5402a
AE
3824
3825 ret = rbd_dev_v2_features(rbd_dev);
9d475de5
AE
3826 if (ret < 0)
3827 goto out_err;
35d489f9 3828
86b00e0d
AE
3829 /* If the image supports layering, get the parent info */
3830
3831 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3832 ret = rbd_dev_v2_parent_info(rbd_dev);
3833 if (ret < 0)
3834 goto out_err;
3835 }
3836
6e14b1a6
AE
3837 /* crypto and compression type aren't (yet) supported for v2 images */
3838
3839 rbd_dev->header.crypt_type = 0;
3840 rbd_dev->header.comp_type = 0;
35d489f9 3841
6e14b1a6
AE
3842 /* Get the snapshot context, plus the header version */
3843
3844 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
35d489f9
AE
3845 if (ret)
3846 goto out_err;
6e14b1a6
AE
3847 rbd_dev->header.obj_version = ver;
3848
a30b71b9
AE
3849 rbd_dev->image_format = 2;
3850
3851 dout("discovered version 2 image, header name is %s\n",
3852 rbd_dev->header_name);
3853
35152979 3854 return 0;
9d475de5 3855out_err:
86b00e0d
AE
3856 rbd_dev->parent_overlap = 0;
3857 rbd_spec_put(rbd_dev->parent_spec);
3858 rbd_dev->parent_spec = NULL;
9d475de5
AE
3859 kfree(rbd_dev->header_name);
3860 rbd_dev->header_name = NULL;
1e130199
AE
3861 kfree(rbd_dev->header.object_prefix);
3862 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
3863
3864 return ret;
a30b71b9
AE
3865}
3866
83a06263
AE
3867static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3868{
3869 int ret;
3870
3871 /* no need to lock here, as rbd_dev is not registered yet */
3872 ret = rbd_dev_snaps_update(rbd_dev);
3873 if (ret)
3874 return ret;
3875
9e15b77d
AE
3876 ret = rbd_dev_probe_update_spec(rbd_dev);
3877 if (ret)
3878 goto err_out_snaps;
3879
83a06263
AE
3880 ret = rbd_dev_set_mapping(rbd_dev);
3881 if (ret)
3882 goto err_out_snaps;
3883
3884 /* generate unique id: find highest unique id, add one */
3885 rbd_dev_id_get(rbd_dev);
3886
3887 /* Fill in the device name, now that we have its id. */
3888 BUILD_BUG_ON(DEV_NAME_LEN
3889 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3890 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3891
3892 /* Get our block major device number. */
3893
3894 ret = register_blkdev(0, rbd_dev->name);
3895 if (ret < 0)
3896 goto err_out_id;
3897 rbd_dev->major = ret;
3898
3899 /* Set up the blkdev mapping. */
3900
3901 ret = rbd_init_disk(rbd_dev);
3902 if (ret)
3903 goto err_out_blkdev;
3904
3905 ret = rbd_bus_add_dev(rbd_dev);
3906 if (ret)
3907 goto err_out_disk;
3908
3909 /*
3910 * At this point cleanup in the event of an error is the job
3911 * of the sysfs code (initiated by rbd_bus_del_dev()).
3912 */
3913 down_write(&rbd_dev->header_rwsem);
3914 ret = rbd_dev_snaps_register(rbd_dev);
3915 up_write(&rbd_dev->header_rwsem);
3916 if (ret)
3917 goto err_out_bus;
3918
9969ebc5 3919 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
83a06263
AE
3920 if (ret)
3921 goto err_out_bus;
3922
3923 /* Everything's ready. Announce the disk to the world. */
3924
3925 add_disk(rbd_dev->disk);
3926
3927 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3928 (unsigned long long) rbd_dev->mapping.size);
3929
3930 return ret;
3931err_out_bus:
3932 /* this will also clean up rest of rbd_dev stuff */
3933
3934 rbd_bus_del_dev(rbd_dev);
3935
3936 return ret;
3937err_out_disk:
3938 rbd_free_disk(rbd_dev);
3939err_out_blkdev:
3940 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3941err_out_id:
3942 rbd_dev_id_put(rbd_dev);
3943err_out_snaps:
3944 rbd_remove_all_snaps(rbd_dev);
3945
3946 return ret;
3947}
3948
a30b71b9
AE
3949/*
3950 * Probe for the existence of the header object for the given rbd
3951 * device. For format 2 images this includes determining the image
3952 * id.
3953 */
3954static int rbd_dev_probe(struct rbd_device *rbd_dev)
3955{
3956 int ret;
3957
3958 /*
3959 * Get the id from the image id object. If it's not a
3960 * format 2 image, we'll get ENOENT back, and we'll assume
3961 * it's a format 1 image.
3962 */
3963 ret = rbd_dev_image_id(rbd_dev);
3964 if (ret)
3965 ret = rbd_dev_v1_probe(rbd_dev);
3966 else
3967 ret = rbd_dev_v2_probe(rbd_dev);
83a06263 3968 if (ret) {
a30b71b9
AE
3969 dout("probe failed, returning %d\n", ret);
3970
83a06263
AE
3971 return ret;
3972 }
3973
3974 ret = rbd_dev_probe_finish(rbd_dev);
3975 if (ret)
3976 rbd_header_free(&rbd_dev->header);
3977
a30b71b9
AE
3978 return ret;
3979}
3980
59c2be1e
YS
3981static ssize_t rbd_add(struct bus_type *bus,
3982 const char *buf,
3983 size_t count)
602adf40 3984{
cb8627c7 3985 struct rbd_device *rbd_dev = NULL;
dc79b113 3986 struct ceph_options *ceph_opts = NULL;
4e9afeba 3987 struct rbd_options *rbd_opts = NULL;
859c31df 3988 struct rbd_spec *spec = NULL;
9d3997fd 3989 struct rbd_client *rbdc;
27cc2594
AE
3990 struct ceph_osd_client *osdc;
3991 int rc = -ENOMEM;
602adf40
YS
3992
3993 if (!try_module_get(THIS_MODULE))
3994 return -ENODEV;
3995
602adf40 3996 /* parse add command */
859c31df 3997 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 3998 if (rc < 0)
bd4ba655 3999 goto err_out_module;
78cea76e 4000
9d3997fd
AE
4001 rbdc = rbd_get_client(ceph_opts);
4002 if (IS_ERR(rbdc)) {
4003 rc = PTR_ERR(rbdc);
0ddebc0c 4004 goto err_out_args;
9d3997fd 4005 }
c53d5893 4006 ceph_opts = NULL; /* rbd_dev client now owns this */
602adf40 4007
602adf40 4008 /* pick the pool */
9d3997fd 4009 osdc = &rbdc->client->osdc;
859c31df 4010 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
4011 if (rc < 0)
4012 goto err_out_client;
859c31df
AE
4013 spec->pool_id = (u64) rc;
4014
0903e875
AE
4015 /* The ceph file layout needs to fit pool id in 32 bits */
4016
4017 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4018 rc = -EIO;
4019 goto err_out_client;
4020 }
4021
c53d5893 4022 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
4023 if (!rbd_dev)
4024 goto err_out_client;
c53d5893
AE
4025 rbdc = NULL; /* rbd_dev now owns this */
4026 spec = NULL; /* rbd_dev now owns this */
602adf40 4027
bd4ba655 4028 rbd_dev->mapping.read_only = rbd_opts->read_only;
c53d5893
AE
4029 kfree(rbd_opts);
4030 rbd_opts = NULL; /* done with this */
bd4ba655 4031
a30b71b9
AE
4032 rc = rbd_dev_probe(rbd_dev);
4033 if (rc < 0)
c53d5893 4034 goto err_out_rbd_dev;
05fd6f6f 4035
602adf40 4036 return count;
c53d5893
AE
4037err_out_rbd_dev:
4038 rbd_dev_destroy(rbd_dev);
bd4ba655 4039err_out_client:
9d3997fd 4040 rbd_put_client(rbdc);
0ddebc0c 4041err_out_args:
78cea76e
AE
4042 if (ceph_opts)
4043 ceph_destroy_options(ceph_opts);
4e9afeba 4044 kfree(rbd_opts);
859c31df 4045 rbd_spec_put(spec);
bd4ba655
AE
4046err_out_module:
4047 module_put(THIS_MODULE);
27cc2594 4048
602adf40 4049 dout("Error adding device %s\n", buf);
27cc2594
AE
4050
4051 return (ssize_t) rc;
602adf40
YS
4052}
4053
de71a297 4054static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
4055{
4056 struct list_head *tmp;
4057 struct rbd_device *rbd_dev;
4058
e124a82f 4059 spin_lock(&rbd_dev_list_lock);
602adf40
YS
4060 list_for_each(tmp, &rbd_dev_list) {
4061 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 4062 if (rbd_dev->dev_id == dev_id) {
e124a82f 4063 spin_unlock(&rbd_dev_list_lock);
602adf40 4064 return rbd_dev;
e124a82f 4065 }
602adf40 4066 }
e124a82f 4067 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
4068 return NULL;
4069}
4070
dfc5606d 4071static void rbd_dev_release(struct device *dev)
602adf40 4072{
593a9e7b 4073 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4074
59c2be1e 4075 if (rbd_dev->watch_event)
9969ebc5 4076 rbd_dev_header_watch_sync(rbd_dev, 0);
602adf40
YS
4077
4078 /* clean up and free blkdev */
4079 rbd_free_disk(rbd_dev);
4080 unregister_blkdev(rbd_dev->major, rbd_dev->name);
32eec68d 4081
2ac4e75d
AE
4082 /* release allocated disk header fields */
4083 rbd_header_free(&rbd_dev->header);
4084
32eec68d 4085 /* done with the id, and with the rbd_dev */
e2839308 4086 rbd_dev_id_put(rbd_dev);
c53d5893
AE
4087 rbd_assert(rbd_dev->rbd_client != NULL);
4088 rbd_dev_destroy(rbd_dev);
602adf40
YS
4089
4090 /* release module ref */
4091 module_put(THIS_MODULE);
602adf40
YS
4092}
4093
dfc5606d
YS
4094static ssize_t rbd_remove(struct bus_type *bus,
4095 const char *buf,
4096 size_t count)
602adf40
YS
4097{
4098 struct rbd_device *rbd_dev = NULL;
4099 int target_id, rc;
4100 unsigned long ul;
4101 int ret = count;
4102
4103 rc = strict_strtoul(buf, 10, &ul);
4104 if (rc)
4105 return rc;
4106
4107 /* convert to int; abort if we lost anything in the conversion */
4108 target_id = (int) ul;
4109 if (target_id != ul)
4110 return -EINVAL;
4111
4112 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4113
4114 rbd_dev = __rbd_get_dev(target_id);
4115 if (!rbd_dev) {
4116 ret = -ENOENT;
4117 goto done;
42382b70
AE
4118 }
4119
a14ea269 4120 spin_lock_irq(&rbd_dev->lock);
b82d167b 4121 if (rbd_dev->open_count)
42382b70 4122 ret = -EBUSY;
b82d167b
AE
4123 else
4124 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
a14ea269 4125 spin_unlock_irq(&rbd_dev->lock);
b82d167b 4126 if (ret < 0)
42382b70 4127 goto done;
602adf40 4128
41f38c2b 4129 rbd_remove_all_snaps(rbd_dev);
dfc5606d 4130 rbd_bus_del_dev(rbd_dev);
602adf40
YS
4131
4132done:
4133 mutex_unlock(&ctl_mutex);
aafb230e 4134
602adf40
YS
4135 return ret;
4136}
4137
602adf40
YS
4138/*
4139 * create control files in sysfs
dfc5606d 4140 * /sys/bus/rbd/...
602adf40
YS
4141 */
4142static int rbd_sysfs_init(void)
4143{
dfc5606d 4144 int ret;
602adf40 4145
fed4c143 4146 ret = device_register(&rbd_root_dev);
21079786 4147 if (ret < 0)
dfc5606d 4148 return ret;
602adf40 4149
fed4c143
AE
4150 ret = bus_register(&rbd_bus_type);
4151 if (ret < 0)
4152 device_unregister(&rbd_root_dev);
602adf40 4153
602adf40
YS
4154 return ret;
4155}
4156
4157static void rbd_sysfs_cleanup(void)
4158{
dfc5606d 4159 bus_unregister(&rbd_bus_type);
fed4c143 4160 device_unregister(&rbd_root_dev);
602adf40
YS
4161}
4162
4163int __init rbd_init(void)
4164{
4165 int rc;
4166
1e32d34c
AE
4167 if (!libceph_compatible(NULL)) {
4168 rbd_warn(NULL, "libceph incompatibility (quitting)");
4169
4170 return -EINVAL;
4171 }
602adf40
YS
4172 rc = rbd_sysfs_init();
4173 if (rc)
4174 return rc;
f0f8cef5 4175 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
4176 return 0;
4177}
4178
4179void __exit rbd_exit(void)
4180{
4181 rbd_sysfs_cleanup();
4182}
4183
4184module_init(rbd_init);
4185module_exit(rbd_exit);
4186
4187MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4188MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4189MODULE_DESCRIPTION("rados block device");
4190
4191/* following authorship retained from original osdblk.c */
4192MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4193
4194MODULE_LICENSE("GPL");