libceph: a few more osd data cleanups
[linux-2.6-block.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
aafb230e
AE
44#define RBD_DEBUG /* Activate rbd_assert() calls */
45
593a9e7b
AE
46/*
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52#define SECTOR_SHIFT 9
53#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54
f0f8cef5
AE
55#define RBD_DRV_NAME "rbd"
56#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
57
58#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
59
d4b125e9
AE
60#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
61#define RBD_MAX_SNAP_NAME_LEN \
62 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
35d489f9 64#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
65
66#define RBD_SNAP_HEAD_NAME "-"
67
9e15b77d
AE
68/* This allows a single page to hold an image name sent by OSD */
69#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 70#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 71
1e130199 72#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 73
d889140c
AE
74/* Feature bits */
75
76#define RBD_FEATURE_LAYERING 1
77
78/* Features supported by this (client software) implementation. */
79
80#define RBD_FEATURES_ALL (0)
81
81a89793
AE
82/*
83 * An RBD device name will be "rbd#", where the "rbd" comes from
84 * RBD_DRV_NAME above, and # is a unique integer identifier.
85 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
86 * enough to hold all possible device names.
87 */
602adf40 88#define DEV_NAME_LEN 32
81a89793 89#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
90
91/*
92 * block device image metadata (in-memory version)
93 */
94struct rbd_image_header {
f84344f3 95 /* These four fields never change for a given rbd image */
849b4260 96 char *object_prefix;
34b13184 97 u64 features;
602adf40
YS
98 __u8 obj_order;
99 __u8 crypt_type;
100 __u8 comp_type;
602adf40 101
f84344f3
AE
102 /* The remaining fields need to be updated occasionally */
103 u64 image_size;
104 struct ceph_snap_context *snapc;
602adf40
YS
105 char *snap_names;
106 u64 *snap_sizes;
59c2be1e
YS
107
108 u64 obj_version;
109};
110
0d7dbfce
AE
111/*
112 * An rbd image specification.
113 *
114 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
115 * identify an image. Each rbd_dev structure includes a pointer to
116 * an rbd_spec structure that encapsulates this identity.
117 *
118 * Each of the id's in an rbd_spec has an associated name. For a
119 * user-mapped image, the names are supplied and the id's associated
120 * with them are looked up. For a layered image, a parent image is
121 * defined by the tuple, and the names are looked up.
122 *
123 * An rbd_dev structure contains a parent_spec pointer which is
124 * non-null if the image it represents is a child in a layered
125 * image. This pointer will refer to the rbd_spec structure used
126 * by the parent rbd_dev for its own identity (i.e., the structure
127 * is shared between the parent and child).
128 *
129 * Since these structures are populated once, during the discovery
130 * phase of image construction, they are effectively immutable so
131 * we make no effort to synchronize access to them.
132 *
133 * Note that code herein does not assume the image name is known (it
134 * could be a null pointer).
0d7dbfce
AE
135 */
136struct rbd_spec {
137 u64 pool_id;
138 char *pool_name;
139
140 char *image_id;
0d7dbfce 141 char *image_name;
0d7dbfce
AE
142
143 u64 snap_id;
144 char *snap_name;
145
146 struct kref kref;
147};
148
602adf40 149/*
f0f8cef5 150 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
151 */
152struct rbd_client {
153 struct ceph_client *client;
154 struct kref kref;
155 struct list_head node;
156};
157
bf0d5f50
AE
158struct rbd_img_request;
159typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
160
161#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
162
163struct rbd_obj_request;
164typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
165
9969ebc5
AE
166enum obj_request_type {
167 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
168};
bf0d5f50
AE
169
170struct rbd_obj_request {
171 const char *object_name;
172 u64 offset; /* object start byte */
173 u64 length; /* bytes from offset */
174
175 struct rbd_img_request *img_request;
176 struct list_head links; /* img_request->obj_requests */
177 u32 which; /* posn image request list */
178
179 enum obj_request_type type;
788e2df3
AE
180 union {
181 struct bio *bio_list;
182 struct {
183 struct page **pages;
184 u32 page_count;
185 };
186 };
bf0d5f50
AE
187
188 struct ceph_osd_request *osd_req;
189
190 u64 xferred; /* bytes transferred */
191 u64 version;
1b83bef2 192 int result;
bf0d5f50
AE
193 atomic_t done;
194
195 rbd_obj_callback_t callback;
788e2df3 196 struct completion completion;
bf0d5f50
AE
197
198 struct kref kref;
199};
200
201struct rbd_img_request {
202 struct request *rq;
203 struct rbd_device *rbd_dev;
204 u64 offset; /* starting image byte offset */
205 u64 length; /* byte count from offset */
206 bool write_request; /* false for read */
207 union {
208 struct ceph_snap_context *snapc; /* for writes */
209 u64 snap_id; /* for reads */
210 };
211 spinlock_t completion_lock;/* protects next_completion */
212 u32 next_completion;
213 rbd_img_callback_t callback;
214
215 u32 obj_request_count;
216 struct list_head obj_requests; /* rbd_obj_request structs */
217
218 struct kref kref;
219};
220
221#define for_each_obj_request(ireq, oreq) \
ef06f4d3 222 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 223#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 224 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 225#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 226 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 227
dfc5606d
YS
228struct rbd_snap {
229 struct device dev;
230 const char *name;
3591538f 231 u64 size;
dfc5606d
YS
232 struct list_head node;
233 u64 id;
34b13184 234 u64 features;
dfc5606d
YS
235};
236
f84344f3 237struct rbd_mapping {
99c1f08f 238 u64 size;
34b13184 239 u64 features;
f84344f3
AE
240 bool read_only;
241};
242
602adf40
YS
243/*
244 * a single device
245 */
246struct rbd_device {
de71a297 247 int dev_id; /* blkdev unique id */
602adf40
YS
248
249 int major; /* blkdev assigned major */
250 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 251
a30b71b9 252 u32 image_format; /* Either 1 or 2 */
602adf40
YS
253 struct rbd_client *rbd_client;
254
255 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
256
b82d167b 257 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
258
259 struct rbd_image_header header;
b82d167b 260 unsigned long flags; /* possibly lock protected */
0d7dbfce 261 struct rbd_spec *spec;
602adf40 262
0d7dbfce 263 char *header_name;
971f839a 264
0903e875
AE
265 struct ceph_file_layout layout;
266
59c2be1e 267 struct ceph_osd_event *watch_event;
975241af 268 struct rbd_obj_request *watch_request;
59c2be1e 269
86b00e0d
AE
270 struct rbd_spec *parent_spec;
271 u64 parent_overlap;
272
c666601a
JD
273 /* protects updating the header */
274 struct rw_semaphore header_rwsem;
f84344f3
AE
275
276 struct rbd_mapping mapping;
602adf40
YS
277
278 struct list_head node;
dfc5606d
YS
279
280 /* list of snapshots */
281 struct list_head snaps;
282
283 /* sysfs related */
284 struct device dev;
b82d167b 285 unsigned long open_count; /* protected by lock */
dfc5606d
YS
286};
287
b82d167b
AE
288/*
289 * Flag bits for rbd_dev->flags. If atomicity is required,
290 * rbd_dev->lock is used to protect access.
291 *
292 * Currently, only the "removing" flag (which is coupled with the
293 * "open_count" field) requires atomic access.
294 */
6d292906
AE
295enum rbd_dev_flags {
296 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 297 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
298};
299
602adf40 300static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 301
602adf40 302static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
303static DEFINE_SPINLOCK(rbd_dev_list_lock);
304
432b8587
AE
305static LIST_HEAD(rbd_client_list); /* clients */
306static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 307
304f6808
AE
308static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
309static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
310
dfc5606d 311static void rbd_dev_release(struct device *dev);
41f38c2b 312static void rbd_remove_snap_dev(struct rbd_snap *snap);
dfc5606d 313
f0f8cef5
AE
314static ssize_t rbd_add(struct bus_type *bus, const char *buf,
315 size_t count);
316static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
317 size_t count);
318
319static struct bus_attribute rbd_bus_attrs[] = {
320 __ATTR(add, S_IWUSR, NULL, rbd_add),
321 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
322 __ATTR_NULL
323};
324
325static struct bus_type rbd_bus_type = {
326 .name = "rbd",
327 .bus_attrs = rbd_bus_attrs,
328};
329
330static void rbd_root_dev_release(struct device *dev)
331{
332}
333
334static struct device rbd_root_dev = {
335 .init_name = "rbd",
336 .release = rbd_root_dev_release,
337};
338
06ecc6cb
AE
339static __printf(2, 3)
340void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
341{
342 struct va_format vaf;
343 va_list args;
344
345 va_start(args, fmt);
346 vaf.fmt = fmt;
347 vaf.va = &args;
348
349 if (!rbd_dev)
350 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
351 else if (rbd_dev->disk)
352 printk(KERN_WARNING "%s: %s: %pV\n",
353 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
354 else if (rbd_dev->spec && rbd_dev->spec->image_name)
355 printk(KERN_WARNING "%s: image %s: %pV\n",
356 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
357 else if (rbd_dev->spec && rbd_dev->spec->image_id)
358 printk(KERN_WARNING "%s: id %s: %pV\n",
359 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
360 else /* punt */
361 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
362 RBD_DRV_NAME, rbd_dev, &vaf);
363 va_end(args);
364}
365
aafb230e
AE
366#ifdef RBD_DEBUG
367#define rbd_assert(expr) \
368 if (unlikely(!(expr))) { \
369 printk(KERN_ERR "\nAssertion failure in %s() " \
370 "at line %d:\n\n" \
371 "\trbd_assert(%s);\n\n", \
372 __func__, __LINE__, #expr); \
373 BUG(); \
374 }
375#else /* !RBD_DEBUG */
376# define rbd_assert(expr) ((void) 0)
377#endif /* !RBD_DEBUG */
dfc5606d 378
117973fb
AE
379static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
380static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
59c2be1e 381
602adf40
YS
382static int rbd_open(struct block_device *bdev, fmode_t mode)
383{
f0f8cef5 384 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 385 bool removing = false;
602adf40 386
f84344f3 387 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
388 return -EROFS;
389
a14ea269 390 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
391 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
392 removing = true;
393 else
394 rbd_dev->open_count++;
a14ea269 395 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
396 if (removing)
397 return -ENOENT;
398
42382b70 399 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 400 (void) get_device(&rbd_dev->dev);
f84344f3 401 set_device_ro(bdev, rbd_dev->mapping.read_only);
42382b70 402 mutex_unlock(&ctl_mutex);
340c7a2b 403
602adf40
YS
404 return 0;
405}
406
dfc5606d
YS
407static int rbd_release(struct gendisk *disk, fmode_t mode)
408{
409 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
410 unsigned long open_count_before;
411
a14ea269 412 spin_lock_irq(&rbd_dev->lock);
b82d167b 413 open_count_before = rbd_dev->open_count--;
a14ea269 414 spin_unlock_irq(&rbd_dev->lock);
b82d167b 415 rbd_assert(open_count_before > 0);
dfc5606d 416
42382b70 417 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 418 put_device(&rbd_dev->dev);
42382b70 419 mutex_unlock(&ctl_mutex);
dfc5606d
YS
420
421 return 0;
422}
423
602adf40
YS
424static const struct block_device_operations rbd_bd_ops = {
425 .owner = THIS_MODULE,
426 .open = rbd_open,
dfc5606d 427 .release = rbd_release,
602adf40
YS
428};
429
430/*
431 * Initialize an rbd client instance.
43ae4701 432 * We own *ceph_opts.
602adf40 433 */
f8c38929 434static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
435{
436 struct rbd_client *rbdc;
437 int ret = -ENOMEM;
438
37206ee5 439 dout("%s:\n", __func__);
602adf40
YS
440 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
441 if (!rbdc)
442 goto out_opt;
443
444 kref_init(&rbdc->kref);
445 INIT_LIST_HEAD(&rbdc->node);
446
bc534d86
AE
447 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
448
43ae4701 449 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 450 if (IS_ERR(rbdc->client))
bc534d86 451 goto out_mutex;
43ae4701 452 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
453
454 ret = ceph_open_session(rbdc->client);
455 if (ret < 0)
456 goto out_err;
457
432b8587 458 spin_lock(&rbd_client_list_lock);
602adf40 459 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 460 spin_unlock(&rbd_client_list_lock);
602adf40 461
bc534d86 462 mutex_unlock(&ctl_mutex);
37206ee5 463 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 464
602adf40
YS
465 return rbdc;
466
467out_err:
468 ceph_destroy_client(rbdc->client);
bc534d86
AE
469out_mutex:
470 mutex_unlock(&ctl_mutex);
602adf40
YS
471 kfree(rbdc);
472out_opt:
43ae4701
AE
473 if (ceph_opts)
474 ceph_destroy_options(ceph_opts);
37206ee5
AE
475 dout("%s: error %d\n", __func__, ret);
476
28f259b7 477 return ERR_PTR(ret);
602adf40
YS
478}
479
480/*
1f7ba331
AE
481 * Find a ceph client with specific addr and configuration. If
482 * found, bump its reference count.
602adf40 483 */
1f7ba331 484static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
485{
486 struct rbd_client *client_node;
1f7ba331 487 bool found = false;
602adf40 488
43ae4701 489 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
490 return NULL;
491
1f7ba331
AE
492 spin_lock(&rbd_client_list_lock);
493 list_for_each_entry(client_node, &rbd_client_list, node) {
494 if (!ceph_compare_options(ceph_opts, client_node->client)) {
495 kref_get(&client_node->kref);
496 found = true;
497 break;
498 }
499 }
500 spin_unlock(&rbd_client_list_lock);
501
502 return found ? client_node : NULL;
602adf40
YS
503}
504
59c2be1e
YS
505/*
506 * mount options
507 */
508enum {
59c2be1e
YS
509 Opt_last_int,
510 /* int args above */
511 Opt_last_string,
512 /* string args above */
cc0538b6
AE
513 Opt_read_only,
514 Opt_read_write,
515 /* Boolean args above */
516 Opt_last_bool,
59c2be1e
YS
517};
518
43ae4701 519static match_table_t rbd_opts_tokens = {
59c2be1e
YS
520 /* int args above */
521 /* string args above */
be466c1c 522 {Opt_read_only, "read_only"},
cc0538b6
AE
523 {Opt_read_only, "ro"}, /* Alternate spelling */
524 {Opt_read_write, "read_write"},
525 {Opt_read_write, "rw"}, /* Alternate spelling */
526 /* Boolean args above */
59c2be1e
YS
527 {-1, NULL}
528};
529
98571b5a
AE
530struct rbd_options {
531 bool read_only;
532};
533
534#define RBD_READ_ONLY_DEFAULT false
535
59c2be1e
YS
536static int parse_rbd_opts_token(char *c, void *private)
537{
43ae4701 538 struct rbd_options *rbd_opts = private;
59c2be1e
YS
539 substring_t argstr[MAX_OPT_ARGS];
540 int token, intval, ret;
541
43ae4701 542 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
543 if (token < 0)
544 return -EINVAL;
545
546 if (token < Opt_last_int) {
547 ret = match_int(&argstr[0], &intval);
548 if (ret < 0) {
549 pr_err("bad mount option arg (not int) "
550 "at '%s'\n", c);
551 return ret;
552 }
553 dout("got int token %d val %d\n", token, intval);
554 } else if (token > Opt_last_int && token < Opt_last_string) {
555 dout("got string token %d val %s\n", token,
556 argstr[0].from);
cc0538b6
AE
557 } else if (token > Opt_last_string && token < Opt_last_bool) {
558 dout("got Boolean token %d\n", token);
59c2be1e
YS
559 } else {
560 dout("got token %d\n", token);
561 }
562
563 switch (token) {
cc0538b6
AE
564 case Opt_read_only:
565 rbd_opts->read_only = true;
566 break;
567 case Opt_read_write:
568 rbd_opts->read_only = false;
569 break;
59c2be1e 570 default:
aafb230e
AE
571 rbd_assert(false);
572 break;
59c2be1e
YS
573 }
574 return 0;
575}
576
602adf40
YS
577/*
578 * Get a ceph client with specific addr and configuration, if one does
579 * not exist create it.
580 */
9d3997fd 581static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 582{
f8c38929 583 struct rbd_client *rbdc;
59c2be1e 584
1f7ba331 585 rbdc = rbd_client_find(ceph_opts);
9d3997fd 586 if (rbdc) /* using an existing client */
43ae4701 587 ceph_destroy_options(ceph_opts);
9d3997fd 588 else
f8c38929 589 rbdc = rbd_client_create(ceph_opts);
602adf40 590
9d3997fd 591 return rbdc;
602adf40
YS
592}
593
594/*
595 * Destroy ceph client
d23a4b3f 596 *
432b8587 597 * Caller must hold rbd_client_list_lock.
602adf40
YS
598 */
599static void rbd_client_release(struct kref *kref)
600{
601 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
602
37206ee5 603 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 604 spin_lock(&rbd_client_list_lock);
602adf40 605 list_del(&rbdc->node);
cd9d9f5d 606 spin_unlock(&rbd_client_list_lock);
602adf40
YS
607
608 ceph_destroy_client(rbdc->client);
609 kfree(rbdc);
610}
611
612/*
613 * Drop reference to ceph client node. If it's not referenced anymore, release
614 * it.
615 */
9d3997fd 616static void rbd_put_client(struct rbd_client *rbdc)
602adf40 617{
c53d5893
AE
618 if (rbdc)
619 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
620}
621
a30b71b9
AE
622static bool rbd_image_format_valid(u32 image_format)
623{
624 return image_format == 1 || image_format == 2;
625}
626
8e94af8e
AE
627static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
628{
103a150f
AE
629 size_t size;
630 u32 snap_count;
631
632 /* The header has to start with the magic rbd header text */
633 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
634 return false;
635
db2388b6
AE
636 /* The bio layer requires at least sector-sized I/O */
637
638 if (ondisk->options.order < SECTOR_SHIFT)
639 return false;
640
641 /* If we use u64 in a few spots we may be able to loosen this */
642
643 if (ondisk->options.order > 8 * sizeof (int) - 1)
644 return false;
645
103a150f
AE
646 /*
647 * The size of a snapshot header has to fit in a size_t, and
648 * that limits the number of snapshots.
649 */
650 snap_count = le32_to_cpu(ondisk->snap_count);
651 size = SIZE_MAX - sizeof (struct ceph_snap_context);
652 if (snap_count > size / sizeof (__le64))
653 return false;
654
655 /*
656 * Not only that, but the size of the entire the snapshot
657 * header must also be representable in a size_t.
658 */
659 size -= snap_count * sizeof (__le64);
660 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
661 return false;
662
663 return true;
8e94af8e
AE
664}
665
602adf40
YS
666/*
667 * Create a new header structure, translate header format from the on-disk
668 * header.
669 */
670static int rbd_header_from_disk(struct rbd_image_header *header,
4156d998 671 struct rbd_image_header_ondisk *ondisk)
602adf40 672{
ccece235 673 u32 snap_count;
58c17b0e 674 size_t len;
d2bb24e5 675 size_t size;
621901d6 676 u32 i;
602adf40 677
6a52325f
AE
678 memset(header, 0, sizeof (*header));
679
103a150f
AE
680 snap_count = le32_to_cpu(ondisk->snap_count);
681
58c17b0e
AE
682 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
683 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
6a52325f 684 if (!header->object_prefix)
602adf40 685 return -ENOMEM;
58c17b0e
AE
686 memcpy(header->object_prefix, ondisk->object_prefix, len);
687 header->object_prefix[len] = '\0';
00f1f36f 688
602adf40 689 if (snap_count) {
f785cc1d
AE
690 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
691
621901d6
AE
692 /* Save a copy of the snapshot names */
693
f785cc1d
AE
694 if (snap_names_len > (u64) SIZE_MAX)
695 return -EIO;
696 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
602adf40 697 if (!header->snap_names)
6a52325f 698 goto out_err;
f785cc1d
AE
699 /*
700 * Note that rbd_dev_v1_header_read() guarantees
701 * the ondisk buffer we're working with has
702 * snap_names_len bytes beyond the end of the
703 * snapshot id array, this memcpy() is safe.
704 */
705 memcpy(header->snap_names, &ondisk->snaps[snap_count],
706 snap_names_len);
6a52325f 707
621901d6
AE
708 /* Record each snapshot's size */
709
d2bb24e5
AE
710 size = snap_count * sizeof (*header->snap_sizes);
711 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602adf40 712 if (!header->snap_sizes)
6a52325f 713 goto out_err;
621901d6
AE
714 for (i = 0; i < snap_count; i++)
715 header->snap_sizes[i] =
716 le64_to_cpu(ondisk->snaps[i].image_size);
602adf40 717 } else {
ccece235 718 WARN_ON(ondisk->snap_names_len);
602adf40
YS
719 header->snap_names = NULL;
720 header->snap_sizes = NULL;
721 }
849b4260 722
34b13184 723 header->features = 0; /* No features support in v1 images */
602adf40
YS
724 header->obj_order = ondisk->options.order;
725 header->crypt_type = ondisk->options.crypt_type;
726 header->comp_type = ondisk->options.comp_type;
6a52325f 727
621901d6
AE
728 /* Allocate and fill in the snapshot context */
729
f84344f3 730 header->image_size = le64_to_cpu(ondisk->image_size);
6a52325f
AE
731 size = sizeof (struct ceph_snap_context);
732 size += snap_count * sizeof (header->snapc->snaps[0]);
733 header->snapc = kzalloc(size, GFP_KERNEL);
734 if (!header->snapc)
735 goto out_err;
602adf40
YS
736
737 atomic_set(&header->snapc->nref, 1);
505cbb9b 738 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 739 header->snapc->num_snaps = snap_count;
621901d6
AE
740 for (i = 0; i < snap_count; i++)
741 header->snapc->snaps[i] =
742 le64_to_cpu(ondisk->snaps[i].id);
602adf40
YS
743
744 return 0;
745
6a52325f 746out_err:
849b4260 747 kfree(header->snap_sizes);
ccece235 748 header->snap_sizes = NULL;
602adf40 749 kfree(header->snap_names);
ccece235 750 header->snap_names = NULL;
6a52325f
AE
751 kfree(header->object_prefix);
752 header->object_prefix = NULL;
ccece235 753
00f1f36f 754 return -ENOMEM;
602adf40
YS
755}
756
9e15b77d
AE
757static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
758{
759 struct rbd_snap *snap;
760
761 if (snap_id == CEPH_NOSNAP)
762 return RBD_SNAP_HEAD_NAME;
763
764 list_for_each_entry(snap, &rbd_dev->snaps, node)
765 if (snap_id == snap->id)
766 return snap->name;
767
768 return NULL;
769}
770
8836b995 771static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
602adf40 772{
602adf40 773
e86924a8 774 struct rbd_snap *snap;
602adf40 775
e86924a8
AE
776 list_for_each_entry(snap, &rbd_dev->snaps, node) {
777 if (!strcmp(snap_name, snap->name)) {
0d7dbfce 778 rbd_dev->spec->snap_id = snap->id;
e86924a8 779 rbd_dev->mapping.size = snap->size;
34b13184 780 rbd_dev->mapping.features = snap->features;
602adf40 781
e86924a8 782 return 0;
00f1f36f 783 }
00f1f36f 784 }
e86924a8 785
00f1f36f 786 return -ENOENT;
602adf40
YS
787}
788
819d52bf 789static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
602adf40 790{
78dc447d 791 int ret;
602adf40 792
0d7dbfce 793 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
cc9d734c 794 sizeof (RBD_SNAP_HEAD_NAME))) {
0d7dbfce 795 rbd_dev->spec->snap_id = CEPH_NOSNAP;
99c1f08f 796 rbd_dev->mapping.size = rbd_dev->header.image_size;
34b13184 797 rbd_dev->mapping.features = rbd_dev->header.features;
e86924a8 798 ret = 0;
602adf40 799 } else {
0d7dbfce 800 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
602adf40
YS
801 if (ret < 0)
802 goto done;
f84344f3 803 rbd_dev->mapping.read_only = true;
602adf40 804 }
6d292906
AE
805 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
806
602adf40 807done:
602adf40
YS
808 return ret;
809}
810
811static void rbd_header_free(struct rbd_image_header *header)
812{
849b4260 813 kfree(header->object_prefix);
d78fd7ae 814 header->object_prefix = NULL;
602adf40 815 kfree(header->snap_sizes);
d78fd7ae 816 header->snap_sizes = NULL;
849b4260 817 kfree(header->snap_names);
d78fd7ae 818 header->snap_names = NULL;
d1d25646 819 ceph_put_snap_context(header->snapc);
d78fd7ae 820 header->snapc = NULL;
602adf40
YS
821}
822
98571b5a 823static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 824{
65ccfe21
AE
825 char *name;
826 u64 segment;
827 int ret;
602adf40 828
2fd82b9e 829 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
65ccfe21
AE
830 if (!name)
831 return NULL;
832 segment = offset >> rbd_dev->header.obj_order;
2fd82b9e 833 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
65ccfe21 834 rbd_dev->header.object_prefix, segment);
2fd82b9e 835 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
836 pr_err("error formatting segment name for #%llu (%d)\n",
837 segment, ret);
838 kfree(name);
839 name = NULL;
840 }
602adf40 841
65ccfe21
AE
842 return name;
843}
602adf40 844
65ccfe21
AE
845static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
846{
847 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 848
65ccfe21
AE
849 return offset & (segment_size - 1);
850}
851
852static u64 rbd_segment_length(struct rbd_device *rbd_dev,
853 u64 offset, u64 length)
854{
855 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
856
857 offset &= segment_size - 1;
858
aafb230e 859 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
860 if (offset + length > segment_size)
861 length = segment_size - offset;
862
863 return length;
602adf40
YS
864}
865
029bcbd8
JD
866/*
867 * returns the size of an object in the image
868 */
869static u64 rbd_obj_bytes(struct rbd_image_header *header)
870{
871 return 1 << header->obj_order;
872}
873
602adf40
YS
874/*
875 * bio helpers
876 */
877
878static void bio_chain_put(struct bio *chain)
879{
880 struct bio *tmp;
881
882 while (chain) {
883 tmp = chain;
884 chain = chain->bi_next;
885 bio_put(tmp);
886 }
887}
888
889/*
890 * zeros a bio chain, starting at specific offset
891 */
892static void zero_bio_chain(struct bio *chain, int start_ofs)
893{
894 struct bio_vec *bv;
895 unsigned long flags;
896 void *buf;
897 int i;
898 int pos = 0;
899
900 while (chain) {
901 bio_for_each_segment(bv, chain, i) {
902 if (pos + bv->bv_len > start_ofs) {
903 int remainder = max(start_ofs - pos, 0);
904 buf = bvec_kmap_irq(bv, &flags);
905 memset(buf + remainder, 0,
906 bv->bv_len - remainder);
85b5aaa6 907 bvec_kunmap_irq(buf, &flags);
602adf40
YS
908 }
909 pos += bv->bv_len;
910 }
911
912 chain = chain->bi_next;
913 }
914}
915
916/*
f7760dad
AE
917 * Clone a portion of a bio, starting at the given byte offset
918 * and continuing for the number of bytes indicated.
602adf40 919 */
f7760dad
AE
920static struct bio *bio_clone_range(struct bio *bio_src,
921 unsigned int offset,
922 unsigned int len,
923 gfp_t gfpmask)
602adf40 924{
f7760dad
AE
925 struct bio_vec *bv;
926 unsigned int resid;
927 unsigned short idx;
928 unsigned int voff;
929 unsigned short end_idx;
930 unsigned short vcnt;
931 struct bio *bio;
932
933 /* Handle the easy case for the caller */
934
935 if (!offset && len == bio_src->bi_size)
936 return bio_clone(bio_src, gfpmask);
937
938 if (WARN_ON_ONCE(!len))
939 return NULL;
940 if (WARN_ON_ONCE(len > bio_src->bi_size))
941 return NULL;
942 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
943 return NULL;
944
945 /* Find first affected segment... */
946
947 resid = offset;
948 __bio_for_each_segment(bv, bio_src, idx, 0) {
949 if (resid < bv->bv_len)
950 break;
951 resid -= bv->bv_len;
602adf40 952 }
f7760dad 953 voff = resid;
602adf40 954
f7760dad 955 /* ...and the last affected segment */
602adf40 956
f7760dad
AE
957 resid += len;
958 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
959 if (resid <= bv->bv_len)
960 break;
961 resid -= bv->bv_len;
962 }
963 vcnt = end_idx - idx + 1;
964
965 /* Build the clone */
966
967 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
968 if (!bio)
969 return NULL; /* ENOMEM */
602adf40 970
f7760dad
AE
971 bio->bi_bdev = bio_src->bi_bdev;
972 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
973 bio->bi_rw = bio_src->bi_rw;
974 bio->bi_flags |= 1 << BIO_CLONED;
975
976 /*
977 * Copy over our part of the bio_vec, then update the first
978 * and last (or only) entries.
979 */
980 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
981 vcnt * sizeof (struct bio_vec));
982 bio->bi_io_vec[0].bv_offset += voff;
983 if (vcnt > 1) {
984 bio->bi_io_vec[0].bv_len -= voff;
985 bio->bi_io_vec[vcnt - 1].bv_len = resid;
986 } else {
987 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
988 }
989
f7760dad
AE
990 bio->bi_vcnt = vcnt;
991 bio->bi_size = len;
992 bio->bi_idx = 0;
993
994 return bio;
995}
996
997/*
998 * Clone a portion of a bio chain, starting at the given byte offset
999 * into the first bio in the source chain and continuing for the
1000 * number of bytes indicated. The result is another bio chain of
1001 * exactly the given length, or a null pointer on error.
1002 *
1003 * The bio_src and offset parameters are both in-out. On entry they
1004 * refer to the first source bio and the offset into that bio where
1005 * the start of data to be cloned is located.
1006 *
1007 * On return, bio_src is updated to refer to the bio in the source
1008 * chain that contains first un-cloned byte, and *offset will
1009 * contain the offset of that byte within that bio.
1010 */
1011static struct bio *bio_chain_clone_range(struct bio **bio_src,
1012 unsigned int *offset,
1013 unsigned int len,
1014 gfp_t gfpmask)
1015{
1016 struct bio *bi = *bio_src;
1017 unsigned int off = *offset;
1018 struct bio *chain = NULL;
1019 struct bio **end;
1020
1021 /* Build up a chain of clone bios up to the limit */
1022
1023 if (!bi || off >= bi->bi_size || !len)
1024 return NULL; /* Nothing to clone */
602adf40 1025
f7760dad
AE
1026 end = &chain;
1027 while (len) {
1028 unsigned int bi_size;
1029 struct bio *bio;
1030
f5400b7a
AE
1031 if (!bi) {
1032 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1033 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1034 }
f7760dad
AE
1035 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1036 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1037 if (!bio)
1038 goto out_err; /* ENOMEM */
1039
1040 *end = bio;
1041 end = &bio->bi_next;
602adf40 1042
f7760dad
AE
1043 off += bi_size;
1044 if (off == bi->bi_size) {
1045 bi = bi->bi_next;
1046 off = 0;
1047 }
1048 len -= bi_size;
1049 }
1050 *bio_src = bi;
1051 *offset = off;
1052
1053 return chain;
1054out_err:
1055 bio_chain_put(chain);
602adf40 1056
602adf40
YS
1057 return NULL;
1058}
1059
bf0d5f50
AE
1060static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1061{
37206ee5
AE
1062 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1063 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1064 kref_get(&obj_request->kref);
1065}
1066
1067static void rbd_obj_request_destroy(struct kref *kref);
1068static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1069{
1070 rbd_assert(obj_request != NULL);
37206ee5
AE
1071 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1072 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1073 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1074}
1075
1076static void rbd_img_request_get(struct rbd_img_request *img_request)
1077{
37206ee5
AE
1078 dout("%s: img %p (was %d)\n", __func__, img_request,
1079 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1080 kref_get(&img_request->kref);
1081}
1082
1083static void rbd_img_request_destroy(struct kref *kref);
1084static void rbd_img_request_put(struct rbd_img_request *img_request)
1085{
1086 rbd_assert(img_request != NULL);
37206ee5
AE
1087 dout("%s: img %p (was %d)\n", __func__, img_request,
1088 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1089 kref_put(&img_request->kref, rbd_img_request_destroy);
1090}
1091
1092static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1093 struct rbd_obj_request *obj_request)
1094{
25dcf954
AE
1095 rbd_assert(obj_request->img_request == NULL);
1096
bf0d5f50
AE
1097 rbd_obj_request_get(obj_request);
1098 obj_request->img_request = img_request;
25dcf954 1099 obj_request->which = img_request->obj_request_count;
bf0d5f50 1100 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1101 img_request->obj_request_count++;
1102 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1103 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1104 obj_request->which);
bf0d5f50
AE
1105}
1106
1107static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1108 struct rbd_obj_request *obj_request)
1109{
1110 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1111
37206ee5
AE
1112 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1113 obj_request->which);
bf0d5f50 1114 list_del(&obj_request->links);
25dcf954
AE
1115 rbd_assert(img_request->obj_request_count > 0);
1116 img_request->obj_request_count--;
1117 rbd_assert(obj_request->which == img_request->obj_request_count);
1118 obj_request->which = BAD_WHICH;
bf0d5f50 1119 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1120 obj_request->img_request = NULL;
25dcf954 1121 obj_request->callback = NULL;
bf0d5f50
AE
1122 rbd_obj_request_put(obj_request);
1123}
1124
1125static bool obj_request_type_valid(enum obj_request_type type)
1126{
1127 switch (type) {
9969ebc5 1128 case OBJ_REQUEST_NODATA:
bf0d5f50 1129 case OBJ_REQUEST_BIO:
788e2df3 1130 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1131 return true;
1132 default:
1133 return false;
1134 }
1135}
1136
bf0d5f50
AE
1137static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1138 struct rbd_obj_request *obj_request)
1139{
37206ee5
AE
1140 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1141
bf0d5f50
AE
1142 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1143}
1144
1145static void rbd_img_request_complete(struct rbd_img_request *img_request)
1146{
37206ee5 1147 dout("%s: img %p\n", __func__, img_request);
bf0d5f50
AE
1148 if (img_request->callback)
1149 img_request->callback(img_request);
1150 else
1151 rbd_img_request_put(img_request);
1152}
1153
788e2df3
AE
1154/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1155
1156static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1157{
37206ee5
AE
1158 dout("%s: obj %p\n", __func__, obj_request);
1159
788e2df3
AE
1160 return wait_for_completion_interruptible(&obj_request->completion);
1161}
1162
07741308
AE
1163static void obj_request_done_init(struct rbd_obj_request *obj_request)
1164{
1165 atomic_set(&obj_request->done, 0);
1166 smp_wmb();
1167}
1168
1169static void obj_request_done_set(struct rbd_obj_request *obj_request)
1170{
632b88ca
AE
1171 int done;
1172
1173 done = atomic_inc_return(&obj_request->done);
1174 if (done > 1) {
1175 struct rbd_img_request *img_request = obj_request->img_request;
1176 struct rbd_device *rbd_dev;
1177
1178 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1179 rbd_warn(rbd_dev, "obj_request %p was already done\n",
1180 obj_request);
1181 }
07741308
AE
1182}
1183
1184static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1185{
632b88ca 1186 smp_mb();
07741308
AE
1187 return atomic_read(&obj_request->done) != 0;
1188}
1189
6e2a4505
AE
1190static void
1191rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1192{
1193 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1194 obj_request, obj_request->img_request, obj_request->result,
1195 obj_request->xferred, obj_request->length);
1196 /*
1197 * ENOENT means a hole in the image. We zero-fill the
1198 * entire length of the request. A short read also implies
1199 * zero-fill to the end of the request. Either way we
1200 * update the xferred count to indicate the whole request
1201 * was satisfied.
1202 */
1203 BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
1204 if (obj_request->result == -ENOENT) {
1205 zero_bio_chain(obj_request->bio_list, 0);
1206 obj_request->result = 0;
1207 obj_request->xferred = obj_request->length;
1208 } else if (obj_request->xferred < obj_request->length &&
1209 !obj_request->result) {
1210 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1211 obj_request->xferred = obj_request->length;
1212 }
1213 obj_request_done_set(obj_request);
1214}
1215
bf0d5f50
AE
1216static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1217{
37206ee5
AE
1218 dout("%s: obj %p cb %p\n", __func__, obj_request,
1219 obj_request->callback);
bf0d5f50
AE
1220 if (obj_request->callback)
1221 obj_request->callback(obj_request);
788e2df3
AE
1222 else
1223 complete_all(&obj_request->completion);
bf0d5f50
AE
1224}
1225
c47f9371 1226static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1227{
1228 dout("%s: obj %p\n", __func__, obj_request);
1229 obj_request_done_set(obj_request);
1230}
1231
c47f9371 1232static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1233{
37206ee5 1234 dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
c47f9371 1235 obj_request->result, obj_request->xferred, obj_request->length);
6e2a4505
AE
1236 if (obj_request->img_request)
1237 rbd_img_obj_request_read_callback(obj_request);
1238 else
1239 obj_request_done_set(obj_request);
bf0d5f50
AE
1240}
1241
c47f9371 1242static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1243{
1b83bef2
SW
1244 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1245 obj_request->result, obj_request->length);
1246 /*
1247 * There is no such thing as a successful short write.
1248 * Our xferred value is the number of bytes transferred
1249 * back. Set it to our originally-requested length.
1250 */
1251 obj_request->xferred = obj_request->length;
07741308 1252 obj_request_done_set(obj_request);
bf0d5f50
AE
1253}
1254
fbfab539
AE
1255/*
1256 * For a simple stat call there's nothing to do. We'll do more if
1257 * this is part of a write sequence for a layered image.
1258 */
c47f9371 1259static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1260{
37206ee5 1261 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1262 obj_request_done_set(obj_request);
1263}
1264
bf0d5f50
AE
1265static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1266 struct ceph_msg *msg)
1267{
1268 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1269 u16 opcode;
1270
37206ee5 1271 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50
AE
1272 rbd_assert(osd_req == obj_request->osd_req);
1273 rbd_assert(!!obj_request->img_request ^
1274 (obj_request->which == BAD_WHICH));
1275
1b83bef2
SW
1276 if (osd_req->r_result < 0)
1277 obj_request->result = osd_req->r_result;
bf0d5f50
AE
1278 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1279
1b83bef2 1280 WARN_ON(osd_req->r_num_ops != 1); /* For now */
bf0d5f50 1281
c47f9371
AE
1282 /*
1283 * We support a 64-bit length, but ultimately it has to be
1284 * passed to blk_end_request(), which takes an unsigned int.
1285 */
1b83bef2 1286 obj_request->xferred = osd_req->r_reply_op_len[0];
c47f9371 1287 rbd_assert(obj_request->xferred < (u64) UINT_MAX);
1b83bef2 1288 opcode = osd_req->r_request_ops[0].op;
bf0d5f50
AE
1289 switch (opcode) {
1290 case CEPH_OSD_OP_READ:
c47f9371 1291 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1292 break;
1293 case CEPH_OSD_OP_WRITE:
c47f9371 1294 rbd_osd_write_callback(obj_request);
bf0d5f50 1295 break;
fbfab539 1296 case CEPH_OSD_OP_STAT:
c47f9371 1297 rbd_osd_stat_callback(obj_request);
fbfab539 1298 break;
36be9a76 1299 case CEPH_OSD_OP_CALL:
b8d70035 1300 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1301 case CEPH_OSD_OP_WATCH:
c47f9371 1302 rbd_osd_trivial_callback(obj_request);
9969ebc5 1303 break;
bf0d5f50
AE
1304 default:
1305 rbd_warn(NULL, "%s: unsupported op %hu\n",
1306 obj_request->object_name, (unsigned short) opcode);
1307 break;
1308 }
1309
07741308 1310 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1311 rbd_obj_request_complete(obj_request);
1312}
1313
1314static struct ceph_osd_request *rbd_osd_req_create(
1315 struct rbd_device *rbd_dev,
1316 bool write_request,
1317 struct rbd_obj_request *obj_request,
1318 struct ceph_osd_req_op *op)
1319{
1320 struct rbd_img_request *img_request = obj_request->img_request;
1321 struct ceph_snap_context *snapc = NULL;
1322 struct ceph_osd_client *osdc;
1323 struct ceph_osd_request *osd_req;
0fff87ec 1324 struct ceph_osd_data *osd_data;
bf0d5f50
AE
1325 struct timespec now;
1326 struct timespec *mtime;
1327 u64 snap_id = CEPH_NOSNAP;
1328 u64 offset = obj_request->offset;
1329 u64 length = obj_request->length;
1330
1331 if (img_request) {
1332 rbd_assert(img_request->write_request == write_request);
1333 if (img_request->write_request)
1334 snapc = img_request->snapc;
1335 else
1336 snap_id = img_request->snap_id;
1337 }
1338
1339 /* Allocate and initialize the request, for the single op */
1340
1341 osdc = &rbd_dev->rbd_client->client->osdc;
1342 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1343 if (!osd_req)
1344 return NULL; /* ENOMEM */
0fff87ec 1345 osd_data = write_request ? &osd_req->r_data_out : &osd_req->r_data_in;
bf0d5f50
AE
1346
1347 rbd_assert(obj_request_type_valid(obj_request->type));
1348 switch (obj_request->type) {
9969ebc5
AE
1349 case OBJ_REQUEST_NODATA:
1350 break; /* Nothing to do */
bf0d5f50
AE
1351 case OBJ_REQUEST_BIO:
1352 rbd_assert(obj_request->bio_list != NULL);
43bfe5de
AE
1353 ceph_osd_data_bio_init(osd_data, obj_request->bio_list,
1354 obj_request->length);
bf0d5f50 1355 break;
788e2df3 1356 case OBJ_REQUEST_PAGES:
43bfe5de
AE
1357 ceph_osd_data_pages_init(osd_data, obj_request->pages,
1358 obj_request->length, offset & ~PAGE_MASK,
1359 false, false);
788e2df3 1360 break;
bf0d5f50
AE
1361 }
1362
1363 if (write_request) {
1364 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1365 now = CURRENT_TIME;
1366 mtime = &now;
1367 } else {
1368 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1369 mtime = NULL; /* not needed for reads */
1370 offset = 0; /* These are not used... */
1371 length = 0; /* ...for osd read requests */
1372 }
1373
1374 osd_req->r_callback = rbd_osd_req_callback;
1375 osd_req->r_priv = obj_request;
1376
1377 osd_req->r_oid_len = strlen(obj_request->object_name);
1378 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1379 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1380
1381 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1382
1383 /* osd_req will get its own reference to snapc (if non-null) */
1384
175face2 1385 ceph_osdc_build_request(osd_req, offset, 1, op,
bf0d5f50
AE
1386 snapc, snap_id, mtime);
1387
1388 return osd_req;
1389}
1390
1391static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1392{
1393 ceph_osdc_put_request(osd_req);
1394}
1395
1396/* object_name is assumed to be a non-null pointer and NUL-terminated */
1397
1398static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1399 u64 offset, u64 length,
1400 enum obj_request_type type)
1401{
1402 struct rbd_obj_request *obj_request;
1403 size_t size;
1404 char *name;
1405
1406 rbd_assert(obj_request_type_valid(type));
1407
1408 size = strlen(object_name) + 1;
1409 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1410 if (!obj_request)
1411 return NULL;
1412
1413 name = (char *)(obj_request + 1);
1414 obj_request->object_name = memcpy(name, object_name, size);
1415 obj_request->offset = offset;
1416 obj_request->length = length;
1417 obj_request->which = BAD_WHICH;
1418 obj_request->type = type;
1419 INIT_LIST_HEAD(&obj_request->links);
07741308 1420 obj_request_done_init(obj_request);
788e2df3 1421 init_completion(&obj_request->completion);
bf0d5f50
AE
1422 kref_init(&obj_request->kref);
1423
37206ee5
AE
1424 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1425 offset, length, (int)type, obj_request);
1426
bf0d5f50
AE
1427 return obj_request;
1428}
1429
1430static void rbd_obj_request_destroy(struct kref *kref)
1431{
1432 struct rbd_obj_request *obj_request;
1433
1434 obj_request = container_of(kref, struct rbd_obj_request, kref);
1435
37206ee5
AE
1436 dout("%s: obj %p\n", __func__, obj_request);
1437
bf0d5f50
AE
1438 rbd_assert(obj_request->img_request == NULL);
1439 rbd_assert(obj_request->which == BAD_WHICH);
1440
1441 if (obj_request->osd_req)
1442 rbd_osd_req_destroy(obj_request->osd_req);
1443
1444 rbd_assert(obj_request_type_valid(obj_request->type));
1445 switch (obj_request->type) {
9969ebc5
AE
1446 case OBJ_REQUEST_NODATA:
1447 break; /* Nothing to do */
bf0d5f50
AE
1448 case OBJ_REQUEST_BIO:
1449 if (obj_request->bio_list)
1450 bio_chain_put(obj_request->bio_list);
1451 break;
788e2df3
AE
1452 case OBJ_REQUEST_PAGES:
1453 if (obj_request->pages)
1454 ceph_release_page_vector(obj_request->pages,
1455 obj_request->page_count);
1456 break;
bf0d5f50
AE
1457 }
1458
1459 kfree(obj_request);
1460}
1461
1462/*
1463 * Caller is responsible for filling in the list of object requests
1464 * that comprises the image request, and the Linux request pointer
1465 * (if there is one).
1466 */
cc344fa1
AE
1467static struct rbd_img_request *rbd_img_request_create(
1468 struct rbd_device *rbd_dev,
bf0d5f50
AE
1469 u64 offset, u64 length,
1470 bool write_request)
1471{
1472 struct rbd_img_request *img_request;
1473 struct ceph_snap_context *snapc = NULL;
1474
1475 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1476 if (!img_request)
1477 return NULL;
1478
1479 if (write_request) {
1480 down_read(&rbd_dev->header_rwsem);
1481 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1482 up_read(&rbd_dev->header_rwsem);
1483 if (WARN_ON(!snapc)) {
1484 kfree(img_request);
1485 return NULL; /* Shouldn't happen */
1486 }
1487 }
1488
1489 img_request->rq = NULL;
1490 img_request->rbd_dev = rbd_dev;
1491 img_request->offset = offset;
1492 img_request->length = length;
1493 img_request->write_request = write_request;
1494 if (write_request)
1495 img_request->snapc = snapc;
1496 else
1497 img_request->snap_id = rbd_dev->spec->snap_id;
1498 spin_lock_init(&img_request->completion_lock);
1499 img_request->next_completion = 0;
1500 img_request->callback = NULL;
1501 img_request->obj_request_count = 0;
1502 INIT_LIST_HEAD(&img_request->obj_requests);
1503 kref_init(&img_request->kref);
1504
1505 rbd_img_request_get(img_request); /* Avoid a warning */
1506 rbd_img_request_put(img_request); /* TEMPORARY */
1507
37206ee5
AE
1508 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1509 write_request ? "write" : "read", offset, length,
1510 img_request);
1511
bf0d5f50
AE
1512 return img_request;
1513}
1514
1515static void rbd_img_request_destroy(struct kref *kref)
1516{
1517 struct rbd_img_request *img_request;
1518 struct rbd_obj_request *obj_request;
1519 struct rbd_obj_request *next_obj_request;
1520
1521 img_request = container_of(kref, struct rbd_img_request, kref);
1522
37206ee5
AE
1523 dout("%s: img %p\n", __func__, img_request);
1524
bf0d5f50
AE
1525 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1526 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 1527 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50
AE
1528
1529 if (img_request->write_request)
1530 ceph_put_snap_context(img_request->snapc);
1531
1532 kfree(img_request);
1533}
1534
1535static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1536 struct bio *bio_list)
1537{
1538 struct rbd_device *rbd_dev = img_request->rbd_dev;
1539 struct rbd_obj_request *obj_request = NULL;
1540 struct rbd_obj_request *next_obj_request;
1541 unsigned int bio_offset;
1542 u64 image_offset;
1543 u64 resid;
1544 u16 opcode;
1545
37206ee5
AE
1546 dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1547
bf0d5f50
AE
1548 opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
1549 : CEPH_OSD_OP_READ;
1550 bio_offset = 0;
1551 image_offset = img_request->offset;
1552 rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
1553 resid = img_request->length;
4dda41d3 1554 rbd_assert(resid > 0);
bf0d5f50
AE
1555 while (resid) {
1556 const char *object_name;
1557 unsigned int clone_size;
33803f33 1558 struct ceph_osd_req_op op;
bf0d5f50
AE
1559 u64 offset;
1560 u64 length;
1561
1562 object_name = rbd_segment_name(rbd_dev, image_offset);
1563 if (!object_name)
1564 goto out_unwind;
1565 offset = rbd_segment_offset(rbd_dev, image_offset);
1566 length = rbd_segment_length(rbd_dev, image_offset, resid);
1567 obj_request = rbd_obj_request_create(object_name,
1568 offset, length,
1569 OBJ_REQUEST_BIO);
1570 kfree(object_name); /* object request has its own copy */
1571 if (!obj_request)
1572 goto out_unwind;
1573
1574 rbd_assert(length <= (u64) UINT_MAX);
1575 clone_size = (unsigned int) length;
1576 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1577 &bio_offset, clone_size,
1578 GFP_ATOMIC);
1579 if (!obj_request->bio_list)
1580 goto out_partial;
1581
1582 /*
1583 * Build up the op to use in building the osd
1584 * request. Note that the contents of the op are
1585 * copied by rbd_osd_req_create().
1586 */
33803f33 1587 osd_req_op_extent_init(&op, opcode, offset, length, 0, 0);
bf0d5f50
AE
1588 obj_request->osd_req = rbd_osd_req_create(rbd_dev,
1589 img_request->write_request,
33803f33 1590 obj_request, &op);
bf0d5f50
AE
1591 if (!obj_request->osd_req)
1592 goto out_partial;
1593 /* status and version are initially zero-filled */
1594
1595 rbd_img_obj_request_add(img_request, obj_request);
1596
1597 image_offset += length;
1598 resid -= length;
1599 }
1600
1601 return 0;
1602
1603out_partial:
1604 rbd_obj_request_put(obj_request);
1605out_unwind:
1606 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1607 rbd_obj_request_put(obj_request);
1608
1609 return -ENOMEM;
1610}
1611
1612static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1613{
1614 struct rbd_img_request *img_request;
1615 u32 which = obj_request->which;
1616 bool more = true;
1617
1618 img_request = obj_request->img_request;
4dda41d3 1619
37206ee5 1620 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
bf0d5f50
AE
1621 rbd_assert(img_request != NULL);
1622 rbd_assert(img_request->rq != NULL);
4dda41d3 1623 rbd_assert(img_request->obj_request_count > 0);
bf0d5f50
AE
1624 rbd_assert(which != BAD_WHICH);
1625 rbd_assert(which < img_request->obj_request_count);
1626 rbd_assert(which >= img_request->next_completion);
1627
1628 spin_lock_irq(&img_request->completion_lock);
1629 if (which != img_request->next_completion)
1630 goto out;
1631
1632 for_each_obj_request_from(img_request, obj_request) {
1633 unsigned int xferred;
1634 int result;
1635
1636 rbd_assert(more);
1637 rbd_assert(which < img_request->obj_request_count);
1638
07741308 1639 if (!obj_request_done_test(obj_request))
bf0d5f50
AE
1640 break;
1641
1642 rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
1643 xferred = (unsigned int) obj_request->xferred;
1644 result = (int) obj_request->result;
1645 if (result)
1646 rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
1647 img_request->write_request ? "write" : "read",
1648 result, xferred);
1649
1650 more = blk_end_request(img_request->rq, result, xferred);
1651 which++;
1652 }
1b83bef2 1653
bf0d5f50
AE
1654 rbd_assert(more ^ (which == img_request->obj_request_count));
1655 img_request->next_completion = which;
1656out:
1657 spin_unlock_irq(&img_request->completion_lock);
1658
1659 if (!more)
1660 rbd_img_request_complete(img_request);
1661}
1662
1663static int rbd_img_request_submit(struct rbd_img_request *img_request)
1664{
1665 struct rbd_device *rbd_dev = img_request->rbd_dev;
1666 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1667 struct rbd_obj_request *obj_request;
46faeed4 1668 struct rbd_obj_request *next_obj_request;
bf0d5f50 1669
37206ee5 1670 dout("%s: img %p\n", __func__, img_request);
46faeed4 1671 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
1672 int ret;
1673
1674 obj_request->callback = rbd_img_obj_callback;
1675 ret = rbd_obj_request_submit(osdc, obj_request);
1676 if (ret)
1677 return ret;
1678 /*
1679 * The image request has its own reference to each
1680 * of its object requests, so we can safely drop the
1681 * initial one here.
1682 */
1683 rbd_obj_request_put(obj_request);
1684 }
1685
1686 return 0;
1687}
1688
cf81b60e 1689static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
b8d70035
AE
1690 u64 ver, u64 notify_id)
1691{
1692 struct rbd_obj_request *obj_request;
33803f33 1693 struct ceph_osd_req_op op;
b8d70035
AE
1694 struct ceph_osd_client *osdc;
1695 int ret;
1696
1697 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1698 OBJ_REQUEST_NODATA);
1699 if (!obj_request)
1700 return -ENOMEM;
1701
1702 ret = -ENOMEM;
33803f33 1703 osd_req_op_watch_init(&op, CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0);
b8d70035 1704 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
33803f33 1705 obj_request, &op);
b8d70035
AE
1706 if (!obj_request->osd_req)
1707 goto out;
1708
1709 osdc = &rbd_dev->rbd_client->client->osdc;
cf81b60e 1710 obj_request->callback = rbd_obj_request_put;
b8d70035 1711 ret = rbd_obj_request_submit(osdc, obj_request);
b8d70035 1712out:
cf81b60e
AE
1713 if (ret)
1714 rbd_obj_request_put(obj_request);
b8d70035
AE
1715
1716 return ret;
1717}
1718
1719static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1720{
1721 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1722 u64 hver;
1723 int rc;
1724
1725 if (!rbd_dev)
1726 return;
1727
37206ee5 1728 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
b8d70035
AE
1729 rbd_dev->header_name, (unsigned long long) notify_id,
1730 (unsigned int) opcode);
1731 rc = rbd_dev_refresh(rbd_dev, &hver);
1732 if (rc)
1733 rbd_warn(rbd_dev, "got notification but failed to "
1734 " update snaps: %d\n", rc);
1735
cf81b60e 1736 rbd_obj_notify_ack(rbd_dev, hver, notify_id);
b8d70035
AE
1737}
1738
9969ebc5
AE
1739/*
1740 * Request sync osd watch/unwatch. The value of "start" determines
1741 * whether a watch request is being initiated or torn down.
1742 */
1743static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1744{
1745 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1746 struct rbd_obj_request *obj_request;
33803f33 1747 struct ceph_osd_req_op op;
9969ebc5
AE
1748 int ret;
1749
1750 rbd_assert(start ^ !!rbd_dev->watch_event);
1751 rbd_assert(start ^ !!rbd_dev->watch_request);
1752
1753 if (start) {
3c663bbd 1754 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
1755 &rbd_dev->watch_event);
1756 if (ret < 0)
1757 return ret;
8eb87565 1758 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
1759 }
1760
1761 ret = -ENOMEM;
1762 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1763 OBJ_REQUEST_NODATA);
1764 if (!obj_request)
1765 goto out_cancel;
1766
33803f33 1767 osd_req_op_watch_init(&op, CEPH_OSD_OP_WATCH,
9969ebc5
AE
1768 rbd_dev->watch_event->cookie,
1769 rbd_dev->header.obj_version, start);
9969ebc5 1770 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true,
33803f33 1771 obj_request, &op);
9969ebc5
AE
1772 if (!obj_request->osd_req)
1773 goto out_cancel;
1774
8eb87565 1775 if (start)
975241af 1776 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 1777 else
6977c3f9 1778 ceph_osdc_unregister_linger_request(osdc,
975241af 1779 rbd_dev->watch_request->osd_req);
9969ebc5
AE
1780 ret = rbd_obj_request_submit(osdc, obj_request);
1781 if (ret)
1782 goto out_cancel;
1783 ret = rbd_obj_request_wait(obj_request);
1784 if (ret)
1785 goto out_cancel;
9969ebc5
AE
1786 ret = obj_request->result;
1787 if (ret)
1788 goto out_cancel;
1789
8eb87565
AE
1790 /*
1791 * A watch request is set to linger, so the underlying osd
1792 * request won't go away until we unregister it. We retain
1793 * a pointer to the object request during that time (in
1794 * rbd_dev->watch_request), so we'll keep a reference to
1795 * it. We'll drop that reference (below) after we've
1796 * unregistered it.
1797 */
1798 if (start) {
1799 rbd_dev->watch_request = obj_request;
1800
1801 return 0;
1802 }
1803
1804 /* We have successfully torn down the watch request */
1805
1806 rbd_obj_request_put(rbd_dev->watch_request);
1807 rbd_dev->watch_request = NULL;
9969ebc5
AE
1808out_cancel:
1809 /* Cancel the event if we're tearing down, or on error */
1810 ceph_osdc_cancel_event(rbd_dev->watch_event);
1811 rbd_dev->watch_event = NULL;
9969ebc5
AE
1812 if (obj_request)
1813 rbd_obj_request_put(obj_request);
1814
1815 return ret;
1816}
1817
36be9a76
AE
1818/*
1819 * Synchronous osd object method call
1820 */
1821static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1822 const char *object_name,
1823 const char *class_name,
1824 const char *method_name,
1825 const char *outbound,
1826 size_t outbound_size,
1827 char *inbound,
1828 size_t inbound_size,
1829 u64 *version)
1830{
1831 struct rbd_obj_request *obj_request;
1832 struct ceph_osd_client *osdc;
33803f33 1833 struct ceph_osd_req_op op;
36be9a76
AE
1834 struct page **pages;
1835 u32 page_count;
1836 int ret;
1837
1838 /*
6010a451
AE
1839 * Method calls are ultimately read operations. The result
1840 * should placed into the inbound buffer provided. They
1841 * also supply outbound data--parameters for the object
1842 * method. Currently if this is present it will be a
1843 * snapshot id.
36be9a76
AE
1844 */
1845 page_count = (u32) calc_pages_for(0, inbound_size);
1846 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1847 if (IS_ERR(pages))
1848 return PTR_ERR(pages);
1849
1850 ret = -ENOMEM;
6010a451 1851 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
1852 OBJ_REQUEST_PAGES);
1853 if (!obj_request)
1854 goto out;
1855
1856 obj_request->pages = pages;
1857 obj_request->page_count = page_count;
1858
33803f33
AE
1859 osd_req_op_cls_init(&op, CEPH_OSD_OP_CALL, class_name, method_name,
1860 outbound, outbound_size);
36be9a76 1861 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
33803f33 1862 obj_request, &op);
36be9a76
AE
1863 if (!obj_request->osd_req)
1864 goto out;
1865
1866 osdc = &rbd_dev->rbd_client->client->osdc;
1867 ret = rbd_obj_request_submit(osdc, obj_request);
1868 if (ret)
1869 goto out;
1870 ret = rbd_obj_request_wait(obj_request);
1871 if (ret)
1872 goto out;
1873
1874 ret = obj_request->result;
1875 if (ret < 0)
1876 goto out;
23ed6e13 1877 ret = 0;
903bb32e 1878 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
1879 if (version)
1880 *version = obj_request->version;
1881out:
1882 if (obj_request)
1883 rbd_obj_request_put(obj_request);
1884 else
1885 ceph_release_page_vector(pages, page_count);
1886
1887 return ret;
1888}
1889
bf0d5f50 1890static void rbd_request_fn(struct request_queue *q)
cc344fa1 1891 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
1892{
1893 struct rbd_device *rbd_dev = q->queuedata;
1894 bool read_only = rbd_dev->mapping.read_only;
1895 struct request *rq;
1896 int result;
1897
1898 while ((rq = blk_fetch_request(q))) {
1899 bool write_request = rq_data_dir(rq) == WRITE;
1900 struct rbd_img_request *img_request;
1901 u64 offset;
1902 u64 length;
1903
1904 /* Ignore any non-FS requests that filter through. */
1905
1906 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
1907 dout("%s: non-fs request type %d\n", __func__,
1908 (int) rq->cmd_type);
1909 __blk_end_request_all(rq, 0);
1910 continue;
1911 }
1912
1913 /* Ignore/skip any zero-length requests */
1914
1915 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
1916 length = (u64) blk_rq_bytes(rq);
1917
1918 if (!length) {
1919 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
1920 __blk_end_request_all(rq, 0);
1921 continue;
1922 }
1923
1924 spin_unlock_irq(q->queue_lock);
1925
1926 /* Disallow writes to a read-only device */
1927
1928 if (write_request) {
1929 result = -EROFS;
1930 if (read_only)
1931 goto end_request;
1932 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
1933 }
1934
6d292906
AE
1935 /*
1936 * Quit early if the mapped snapshot no longer
1937 * exists. It's still possible the snapshot will
1938 * have disappeared by the time our request arrives
1939 * at the osd, but there's no sense in sending it if
1940 * we already know.
1941 */
1942 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
1943 dout("request for non-existent snapshot");
1944 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1945 result = -ENXIO;
1946 goto end_request;
1947 }
1948
bf0d5f50
AE
1949 result = -EINVAL;
1950 if (WARN_ON(offset && length > U64_MAX - offset + 1))
1951 goto end_request; /* Shouldn't happen */
1952
1953 result = -ENOMEM;
1954 img_request = rbd_img_request_create(rbd_dev, offset, length,
1955 write_request);
1956 if (!img_request)
1957 goto end_request;
1958
1959 img_request->rq = rq;
1960
1961 result = rbd_img_request_fill_bio(img_request, rq->bio);
1962 if (!result)
1963 result = rbd_img_request_submit(img_request);
1964 if (result)
1965 rbd_img_request_put(img_request);
1966end_request:
1967 spin_lock_irq(q->queue_lock);
1968 if (result < 0) {
1969 rbd_warn(rbd_dev, "obj_request %s result %d\n",
1970 write_request ? "write" : "read", result);
1971 __blk_end_request_all(rq, result);
1972 }
1973 }
1974}
1975
602adf40
YS
1976/*
1977 * a queue callback. Makes sure that we don't create a bio that spans across
1978 * multiple osd objects. One exception would be with a single page bios,
f7760dad 1979 * which we handle later at bio_chain_clone_range()
602adf40
YS
1980 */
1981static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1982 struct bio_vec *bvec)
1983{
1984 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
1985 sector_t sector_offset;
1986 sector_t sectors_per_obj;
1987 sector_t obj_sector_offset;
1988 int ret;
1989
1990 /*
1991 * Find how far into its rbd object the partition-relative
1992 * bio start sector is to offset relative to the enclosing
1993 * device.
1994 */
1995 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1996 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1997 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1998
1999 /*
2000 * Compute the number of bytes from that offset to the end
2001 * of the object. Account for what's already used by the bio.
2002 */
2003 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2004 if (ret > bmd->bi_size)
2005 ret -= bmd->bi_size;
2006 else
2007 ret = 0;
2008
2009 /*
2010 * Don't send back more than was asked for. And if the bio
2011 * was empty, let the whole thing through because: "Note
2012 * that a block device *must* allow a single page to be
2013 * added to an empty bio."
2014 */
2015 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2016 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2017 ret = (int) bvec->bv_len;
2018
2019 return ret;
602adf40
YS
2020}
2021
2022static void rbd_free_disk(struct rbd_device *rbd_dev)
2023{
2024 struct gendisk *disk = rbd_dev->disk;
2025
2026 if (!disk)
2027 return;
2028
602adf40
YS
2029 if (disk->flags & GENHD_FL_UP)
2030 del_gendisk(disk);
2031 if (disk->queue)
2032 blk_cleanup_queue(disk->queue);
2033 put_disk(disk);
2034}
2035
788e2df3
AE
2036static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2037 const char *object_name,
2038 u64 offset, u64 length,
2039 char *buf, u64 *version)
2040
2041{
33803f33 2042 struct ceph_osd_req_op op;
788e2df3
AE
2043 struct rbd_obj_request *obj_request;
2044 struct ceph_osd_client *osdc;
2045 struct page **pages = NULL;
2046 u32 page_count;
1ceae7ef 2047 size_t size;
788e2df3
AE
2048 int ret;
2049
2050 page_count = (u32) calc_pages_for(offset, length);
2051 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2052 if (IS_ERR(pages))
2053 ret = PTR_ERR(pages);
2054
2055 ret = -ENOMEM;
2056 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 2057 OBJ_REQUEST_PAGES);
788e2df3
AE
2058 if (!obj_request)
2059 goto out;
2060
2061 obj_request->pages = pages;
2062 obj_request->page_count = page_count;
2063
33803f33 2064 osd_req_op_extent_init(&op, CEPH_OSD_OP_READ, offset, length, 0, 0);
788e2df3 2065 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
33803f33 2066 obj_request, &op);
788e2df3
AE
2067 if (!obj_request->osd_req)
2068 goto out;
2069
2070 osdc = &rbd_dev->rbd_client->client->osdc;
2071 ret = rbd_obj_request_submit(osdc, obj_request);
2072 if (ret)
2073 goto out;
2074 ret = rbd_obj_request_wait(obj_request);
2075 if (ret)
2076 goto out;
2077
2078 ret = obj_request->result;
2079 if (ret < 0)
2080 goto out;
1ceae7ef
AE
2081
2082 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2083 size = (size_t) obj_request->xferred;
903bb32e 2084 ceph_copy_from_page_vector(pages, buf, 0, size);
23ed6e13
AE
2085 rbd_assert(size <= (size_t) INT_MAX);
2086 ret = (int) size;
788e2df3
AE
2087 if (version)
2088 *version = obj_request->version;
2089out:
2090 if (obj_request)
2091 rbd_obj_request_put(obj_request);
2092 else
2093 ceph_release_page_vector(pages, page_count);
2094
2095 return ret;
2096}
2097
602adf40 2098/*
4156d998
AE
2099 * Read the complete header for the given rbd device.
2100 *
2101 * Returns a pointer to a dynamically-allocated buffer containing
2102 * the complete and validated header. Caller can pass the address
2103 * of a variable that will be filled in with the version of the
2104 * header object at the time it was read.
2105 *
2106 * Returns a pointer-coded errno if a failure occurs.
602adf40 2107 */
4156d998
AE
2108static struct rbd_image_header_ondisk *
2109rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
602adf40 2110{
4156d998 2111 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 2112 u32 snap_count = 0;
4156d998
AE
2113 u64 names_size = 0;
2114 u32 want_count;
2115 int ret;
602adf40 2116
00f1f36f 2117 /*
4156d998
AE
2118 * The complete header will include an array of its 64-bit
2119 * snapshot ids, followed by the names of those snapshots as
2120 * a contiguous block of NUL-terminated strings. Note that
2121 * the number of snapshots could change by the time we read
2122 * it in, in which case we re-read it.
00f1f36f 2123 */
4156d998
AE
2124 do {
2125 size_t size;
2126
2127 kfree(ondisk);
2128
2129 size = sizeof (*ondisk);
2130 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2131 size += names_size;
2132 ondisk = kmalloc(size, GFP_KERNEL);
2133 if (!ondisk)
2134 return ERR_PTR(-ENOMEM);
2135
788e2df3 2136 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
4156d998
AE
2137 0, size,
2138 (char *) ondisk, version);
4156d998
AE
2139 if (ret < 0)
2140 goto out_err;
2141 if (WARN_ON((size_t) ret < size)) {
2142 ret = -ENXIO;
06ecc6cb
AE
2143 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2144 size, ret);
4156d998
AE
2145 goto out_err;
2146 }
2147 if (!rbd_dev_ondisk_valid(ondisk)) {
2148 ret = -ENXIO;
06ecc6cb 2149 rbd_warn(rbd_dev, "invalid header");
4156d998 2150 goto out_err;
81e759fb 2151 }
602adf40 2152
4156d998
AE
2153 names_size = le64_to_cpu(ondisk->snap_names_len);
2154 want_count = snap_count;
2155 snap_count = le32_to_cpu(ondisk->snap_count);
2156 } while (snap_count != want_count);
00f1f36f 2157
4156d998 2158 return ondisk;
00f1f36f 2159
4156d998
AE
2160out_err:
2161 kfree(ondisk);
2162
2163 return ERR_PTR(ret);
2164}
2165
2166/*
2167 * reload the ondisk the header
2168 */
2169static int rbd_read_header(struct rbd_device *rbd_dev,
2170 struct rbd_image_header *header)
2171{
2172 struct rbd_image_header_ondisk *ondisk;
2173 u64 ver = 0;
2174 int ret;
602adf40 2175
4156d998
AE
2176 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2177 if (IS_ERR(ondisk))
2178 return PTR_ERR(ondisk);
2179 ret = rbd_header_from_disk(header, ondisk);
2180 if (ret >= 0)
2181 header->obj_version = ver;
2182 kfree(ondisk);
2183
2184 return ret;
602adf40
YS
2185}
2186
41f38c2b 2187static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
dfc5606d
YS
2188{
2189 struct rbd_snap *snap;
a0593290 2190 struct rbd_snap *next;
dfc5606d 2191
a0593290 2192 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
41f38c2b 2193 rbd_remove_snap_dev(snap);
dfc5606d
YS
2194}
2195
9478554a
AE
2196static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2197{
2198 sector_t size;
2199
0d7dbfce 2200 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
9478554a
AE
2201 return;
2202
2203 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2204 dout("setting size to %llu sectors", (unsigned long long) size);
2205 rbd_dev->mapping.size = (u64) size;
2206 set_capacity(rbd_dev->disk, size);
2207}
2208
602adf40
YS
2209/*
2210 * only read the first part of the ondisk header, without the snaps info
2211 */
117973fb 2212static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
602adf40
YS
2213{
2214 int ret;
2215 struct rbd_image_header h;
602adf40
YS
2216
2217 ret = rbd_read_header(rbd_dev, &h);
2218 if (ret < 0)
2219 return ret;
2220
a51aa0c0
JD
2221 down_write(&rbd_dev->header_rwsem);
2222
9478554a
AE
2223 /* Update image size, and check for resize of mapped image */
2224 rbd_dev->header.image_size = h.image_size;
2225 rbd_update_mapping_size(rbd_dev);
9db4b3e3 2226
849b4260 2227 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 2228 kfree(rbd_dev->header.snap_sizes);
849b4260 2229 kfree(rbd_dev->header.snap_names);
d1d25646
JD
2230 /* osd requests may still refer to snapc */
2231 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 2232
b813623a
AE
2233 if (hver)
2234 *hver = h.obj_version;
a71b891b 2235 rbd_dev->header.obj_version = h.obj_version;
93a24e08 2236 rbd_dev->header.image_size = h.image_size;
602adf40
YS
2237 rbd_dev->header.snapc = h.snapc;
2238 rbd_dev->header.snap_names = h.snap_names;
2239 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260
AE
2240 /* Free the extra copy of the object prefix */
2241 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2242 kfree(h.object_prefix);
2243
304f6808
AE
2244 ret = rbd_dev_snaps_update(rbd_dev);
2245 if (!ret)
2246 ret = rbd_dev_snaps_register(rbd_dev);
dfc5606d 2247
c666601a 2248 up_write(&rbd_dev->header_rwsem);
602adf40 2249
dfc5606d 2250 return ret;
602adf40
YS
2251}
2252
117973fb 2253static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1fe5e993
AE
2254{
2255 int ret;
2256
117973fb 2257 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1fe5e993 2258 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
117973fb
AE
2259 if (rbd_dev->image_format == 1)
2260 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2261 else
2262 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1fe5e993
AE
2263 mutex_unlock(&ctl_mutex);
2264
2265 return ret;
2266}
2267
602adf40
YS
2268static int rbd_init_disk(struct rbd_device *rbd_dev)
2269{
2270 struct gendisk *disk;
2271 struct request_queue *q;
593a9e7b 2272 u64 segment_size;
602adf40 2273
602adf40 2274 /* create gendisk info */
602adf40
YS
2275 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2276 if (!disk)
1fcdb8aa 2277 return -ENOMEM;
602adf40 2278
f0f8cef5 2279 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 2280 rbd_dev->dev_id);
602adf40
YS
2281 disk->major = rbd_dev->major;
2282 disk->first_minor = 0;
2283 disk->fops = &rbd_bd_ops;
2284 disk->private_data = rbd_dev;
2285
bf0d5f50 2286 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
2287 if (!q)
2288 goto out_disk;
029bcbd8 2289
593a9e7b
AE
2290 /* We use the default size, but let's be explicit about it. */
2291 blk_queue_physical_block_size(q, SECTOR_SIZE);
2292
029bcbd8 2293 /* set io sizes to object size */
593a9e7b
AE
2294 segment_size = rbd_obj_bytes(&rbd_dev->header);
2295 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2296 blk_queue_max_segment_size(q, segment_size);
2297 blk_queue_io_min(q, segment_size);
2298 blk_queue_io_opt(q, segment_size);
029bcbd8 2299
602adf40
YS
2300 blk_queue_merge_bvec(q, rbd_merge_bvec);
2301 disk->queue = q;
2302
2303 q->queuedata = rbd_dev;
2304
2305 rbd_dev->disk = disk;
602adf40 2306
12f02944
AE
2307 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2308
602adf40 2309 return 0;
602adf40
YS
2310out_disk:
2311 put_disk(disk);
1fcdb8aa
AE
2312
2313 return -ENOMEM;
602adf40
YS
2314}
2315
dfc5606d
YS
2316/*
2317 sysfs
2318*/
2319
593a9e7b
AE
2320static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2321{
2322 return container_of(dev, struct rbd_device, dev);
2323}
2324
dfc5606d
YS
2325static ssize_t rbd_size_show(struct device *dev,
2326 struct device_attribute *attr, char *buf)
2327{
593a9e7b 2328 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0
JD
2329 sector_t size;
2330
2331 down_read(&rbd_dev->header_rwsem);
2332 size = get_capacity(rbd_dev->disk);
2333 up_read(&rbd_dev->header_rwsem);
dfc5606d 2334
a51aa0c0 2335 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
dfc5606d
YS
2336}
2337
34b13184
AE
2338/*
2339 * Note this shows the features for whatever's mapped, which is not
2340 * necessarily the base image.
2341 */
2342static ssize_t rbd_features_show(struct device *dev,
2343 struct device_attribute *attr, char *buf)
2344{
2345 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2346
2347 return sprintf(buf, "0x%016llx\n",
2348 (unsigned long long) rbd_dev->mapping.features);
2349}
2350
dfc5606d
YS
2351static ssize_t rbd_major_show(struct device *dev,
2352 struct device_attribute *attr, char *buf)
2353{
593a9e7b 2354 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 2355
dfc5606d
YS
2356 return sprintf(buf, "%d\n", rbd_dev->major);
2357}
2358
2359static ssize_t rbd_client_id_show(struct device *dev,
2360 struct device_attribute *attr, char *buf)
602adf40 2361{
593a9e7b 2362 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2363
1dbb4399
AE
2364 return sprintf(buf, "client%lld\n",
2365 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
2366}
2367
dfc5606d
YS
2368static ssize_t rbd_pool_show(struct device *dev,
2369 struct device_attribute *attr, char *buf)
602adf40 2370{
593a9e7b 2371 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2372
0d7dbfce 2373 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
2374}
2375
9bb2f334
AE
2376static ssize_t rbd_pool_id_show(struct device *dev,
2377 struct device_attribute *attr, char *buf)
2378{
2379 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2380
0d7dbfce
AE
2381 return sprintf(buf, "%llu\n",
2382 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
2383}
2384
dfc5606d
YS
2385static ssize_t rbd_name_show(struct device *dev,
2386 struct device_attribute *attr, char *buf)
2387{
593a9e7b 2388 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2389
a92ffdf8
AE
2390 if (rbd_dev->spec->image_name)
2391 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2392
2393 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
2394}
2395
589d30e0
AE
2396static ssize_t rbd_image_id_show(struct device *dev,
2397 struct device_attribute *attr, char *buf)
2398{
2399 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2400
0d7dbfce 2401 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
2402}
2403
34b13184
AE
2404/*
2405 * Shows the name of the currently-mapped snapshot (or
2406 * RBD_SNAP_HEAD_NAME for the base image).
2407 */
dfc5606d
YS
2408static ssize_t rbd_snap_show(struct device *dev,
2409 struct device_attribute *attr,
2410 char *buf)
2411{
593a9e7b 2412 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 2413
0d7dbfce 2414 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
2415}
2416
86b00e0d
AE
2417/*
2418 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2419 * for the parent image. If there is no parent, simply shows
2420 * "(no parent image)".
2421 */
2422static ssize_t rbd_parent_show(struct device *dev,
2423 struct device_attribute *attr,
2424 char *buf)
2425{
2426 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2427 struct rbd_spec *spec = rbd_dev->parent_spec;
2428 int count;
2429 char *bufp = buf;
2430
2431 if (!spec)
2432 return sprintf(buf, "(no parent image)\n");
2433
2434 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2435 (unsigned long long) spec->pool_id, spec->pool_name);
2436 if (count < 0)
2437 return count;
2438 bufp += count;
2439
2440 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2441 spec->image_name ? spec->image_name : "(unknown)");
2442 if (count < 0)
2443 return count;
2444 bufp += count;
2445
2446 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2447 (unsigned long long) spec->snap_id, spec->snap_name);
2448 if (count < 0)
2449 return count;
2450 bufp += count;
2451
2452 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2453 if (count < 0)
2454 return count;
2455 bufp += count;
2456
2457 return (ssize_t) (bufp - buf);
2458}
2459
dfc5606d
YS
2460static ssize_t rbd_image_refresh(struct device *dev,
2461 struct device_attribute *attr,
2462 const char *buf,
2463 size_t size)
2464{
593a9e7b 2465 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 2466 int ret;
602adf40 2467
117973fb 2468 ret = rbd_dev_refresh(rbd_dev, NULL);
b813623a
AE
2469
2470 return ret < 0 ? ret : size;
dfc5606d 2471}
602adf40 2472
dfc5606d 2473static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 2474static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
2475static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2476static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2477static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 2478static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 2479static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 2480static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
2481static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2482static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 2483static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
2484
2485static struct attribute *rbd_attrs[] = {
2486 &dev_attr_size.attr,
34b13184 2487 &dev_attr_features.attr,
dfc5606d
YS
2488 &dev_attr_major.attr,
2489 &dev_attr_client_id.attr,
2490 &dev_attr_pool.attr,
9bb2f334 2491 &dev_attr_pool_id.attr,
dfc5606d 2492 &dev_attr_name.attr,
589d30e0 2493 &dev_attr_image_id.attr,
dfc5606d 2494 &dev_attr_current_snap.attr,
86b00e0d 2495 &dev_attr_parent.attr,
dfc5606d 2496 &dev_attr_refresh.attr,
dfc5606d
YS
2497 NULL
2498};
2499
2500static struct attribute_group rbd_attr_group = {
2501 .attrs = rbd_attrs,
2502};
2503
2504static const struct attribute_group *rbd_attr_groups[] = {
2505 &rbd_attr_group,
2506 NULL
2507};
2508
2509static void rbd_sysfs_dev_release(struct device *dev)
2510{
2511}
2512
2513static struct device_type rbd_device_type = {
2514 .name = "rbd",
2515 .groups = rbd_attr_groups,
2516 .release = rbd_sysfs_dev_release,
2517};
2518
2519
2520/*
2521 sysfs - snapshots
2522*/
2523
2524static ssize_t rbd_snap_size_show(struct device *dev,
2525 struct device_attribute *attr,
2526 char *buf)
2527{
2528 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2529
3591538f 2530 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
dfc5606d
YS
2531}
2532
2533static ssize_t rbd_snap_id_show(struct device *dev,
2534 struct device_attribute *attr,
2535 char *buf)
2536{
2537 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2538
3591538f 2539 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
dfc5606d
YS
2540}
2541
34b13184
AE
2542static ssize_t rbd_snap_features_show(struct device *dev,
2543 struct device_attribute *attr,
2544 char *buf)
2545{
2546 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2547
2548 return sprintf(buf, "0x%016llx\n",
2549 (unsigned long long) snap->features);
2550}
2551
dfc5606d
YS
2552static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2553static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
34b13184 2554static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
dfc5606d
YS
2555
2556static struct attribute *rbd_snap_attrs[] = {
2557 &dev_attr_snap_size.attr,
2558 &dev_attr_snap_id.attr,
34b13184 2559 &dev_attr_snap_features.attr,
dfc5606d
YS
2560 NULL,
2561};
2562
2563static struct attribute_group rbd_snap_attr_group = {
2564 .attrs = rbd_snap_attrs,
2565};
2566
2567static void rbd_snap_dev_release(struct device *dev)
2568{
2569 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2570 kfree(snap->name);
2571 kfree(snap);
2572}
2573
2574static const struct attribute_group *rbd_snap_attr_groups[] = {
2575 &rbd_snap_attr_group,
2576 NULL
2577};
2578
2579static struct device_type rbd_snap_device_type = {
2580 .groups = rbd_snap_attr_groups,
2581 .release = rbd_snap_dev_release,
2582};
2583
8b8fb99c
AE
2584static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2585{
2586 kref_get(&spec->kref);
2587
2588 return spec;
2589}
2590
2591static void rbd_spec_free(struct kref *kref);
2592static void rbd_spec_put(struct rbd_spec *spec)
2593{
2594 if (spec)
2595 kref_put(&spec->kref, rbd_spec_free);
2596}
2597
2598static struct rbd_spec *rbd_spec_alloc(void)
2599{
2600 struct rbd_spec *spec;
2601
2602 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2603 if (!spec)
2604 return NULL;
2605 kref_init(&spec->kref);
2606
2607 rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */
2608
2609 return spec;
2610}
2611
2612static void rbd_spec_free(struct kref *kref)
2613{
2614 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2615
2616 kfree(spec->pool_name);
2617 kfree(spec->image_id);
2618 kfree(spec->image_name);
2619 kfree(spec->snap_name);
2620 kfree(spec);
2621}
2622
cc344fa1 2623static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
2624 struct rbd_spec *spec)
2625{
2626 struct rbd_device *rbd_dev;
2627
2628 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2629 if (!rbd_dev)
2630 return NULL;
2631
2632 spin_lock_init(&rbd_dev->lock);
6d292906 2633 rbd_dev->flags = 0;
c53d5893
AE
2634 INIT_LIST_HEAD(&rbd_dev->node);
2635 INIT_LIST_HEAD(&rbd_dev->snaps);
2636 init_rwsem(&rbd_dev->header_rwsem);
2637
2638 rbd_dev->spec = spec;
2639 rbd_dev->rbd_client = rbdc;
2640
0903e875
AE
2641 /* Initialize the layout used for all rbd requests */
2642
2643 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2644 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2645 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2646 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2647
c53d5893
AE
2648 return rbd_dev;
2649}
2650
2651static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2652{
86b00e0d 2653 rbd_spec_put(rbd_dev->parent_spec);
c53d5893
AE
2654 kfree(rbd_dev->header_name);
2655 rbd_put_client(rbd_dev->rbd_client);
2656 rbd_spec_put(rbd_dev->spec);
2657 kfree(rbd_dev);
2658}
2659
304f6808
AE
2660static bool rbd_snap_registered(struct rbd_snap *snap)
2661{
2662 bool ret = snap->dev.type == &rbd_snap_device_type;
2663 bool reg = device_is_registered(&snap->dev);
2664
2665 rbd_assert(!ret ^ reg);
2666
2667 return ret;
2668}
2669
41f38c2b 2670static void rbd_remove_snap_dev(struct rbd_snap *snap)
dfc5606d
YS
2671{
2672 list_del(&snap->node);
304f6808
AE
2673 if (device_is_registered(&snap->dev))
2674 device_unregister(&snap->dev);
dfc5606d
YS
2675}
2676
14e7085d 2677static int rbd_register_snap_dev(struct rbd_snap *snap,
dfc5606d
YS
2678 struct device *parent)
2679{
2680 struct device *dev = &snap->dev;
2681 int ret;
2682
2683 dev->type = &rbd_snap_device_type;
2684 dev->parent = parent;
2685 dev->release = rbd_snap_dev_release;
d4b125e9 2686 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
304f6808
AE
2687 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2688
dfc5606d
YS
2689 ret = device_register(dev);
2690
2691 return ret;
2692}
2693
4e891e0a 2694static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
c8d18425 2695 const char *snap_name,
34b13184
AE
2696 u64 snap_id, u64 snap_size,
2697 u64 snap_features)
dfc5606d 2698{
4e891e0a 2699 struct rbd_snap *snap;
dfc5606d 2700 int ret;
4e891e0a
AE
2701
2702 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
dfc5606d 2703 if (!snap)
4e891e0a
AE
2704 return ERR_PTR(-ENOMEM);
2705
2706 ret = -ENOMEM;
c8d18425 2707 snap->name = kstrdup(snap_name, GFP_KERNEL);
4e891e0a
AE
2708 if (!snap->name)
2709 goto err;
2710
c8d18425
AE
2711 snap->id = snap_id;
2712 snap->size = snap_size;
34b13184 2713 snap->features = snap_features;
4e891e0a
AE
2714
2715 return snap;
2716
dfc5606d
YS
2717err:
2718 kfree(snap->name);
2719 kfree(snap);
4e891e0a
AE
2720
2721 return ERR_PTR(ret);
dfc5606d
YS
2722}
2723
cd892126
AE
2724static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2725 u64 *snap_size, u64 *snap_features)
2726{
2727 char *snap_name;
2728
2729 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2730
2731 *snap_size = rbd_dev->header.snap_sizes[which];
2732 *snap_features = 0; /* No features for v1 */
2733
2734 /* Skip over names until we find the one we are looking for */
2735
2736 snap_name = rbd_dev->header.snap_names;
2737 while (which--)
2738 snap_name += strlen(snap_name) + 1;
2739
2740 return snap_name;
2741}
2742
9d475de5
AE
2743/*
2744 * Get the size and object order for an image snapshot, or if
2745 * snap_id is CEPH_NOSNAP, gets this information for the base
2746 * image.
2747 */
2748static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2749 u8 *order, u64 *snap_size)
2750{
2751 __le64 snapid = cpu_to_le64(snap_id);
2752 int ret;
2753 struct {
2754 u8 order;
2755 __le64 size;
2756 } __attribute__ ((packed)) size_buf = { 0 };
2757
36be9a76 2758 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5
AE
2759 "rbd", "get_size",
2760 (char *) &snapid, sizeof (snapid),
07b2391f 2761 (char *) &size_buf, sizeof (size_buf), NULL);
36be9a76 2762 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
2763 if (ret < 0)
2764 return ret;
2765
2766 *order = size_buf.order;
2767 *snap_size = le64_to_cpu(size_buf.size);
2768
2769 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2770 (unsigned long long) snap_id, (unsigned int) *order,
2771 (unsigned long long) *snap_size);
2772
2773 return 0;
2774}
2775
2776static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2777{
2778 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2779 &rbd_dev->header.obj_order,
2780 &rbd_dev->header.image_size);
2781}
2782
1e130199
AE
2783static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2784{
2785 void *reply_buf;
2786 int ret;
2787 void *p;
2788
2789 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2790 if (!reply_buf)
2791 return -ENOMEM;
2792
36be9a76 2793 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
1e130199
AE
2794 "rbd", "get_object_prefix",
2795 NULL, 0,
07b2391f 2796 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
36be9a76 2797 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
2798 if (ret < 0)
2799 goto out;
2800
2801 p = reply_buf;
2802 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2803 p + RBD_OBJ_PREFIX_LEN_MAX,
2804 NULL, GFP_NOIO);
2805
2806 if (IS_ERR(rbd_dev->header.object_prefix)) {
2807 ret = PTR_ERR(rbd_dev->header.object_prefix);
2808 rbd_dev->header.object_prefix = NULL;
2809 } else {
2810 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2811 }
2812
2813out:
2814 kfree(reply_buf);
2815
2816 return ret;
2817}
2818
b1b5402a
AE
2819static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2820 u64 *snap_features)
2821{
2822 __le64 snapid = cpu_to_le64(snap_id);
2823 struct {
2824 __le64 features;
2825 __le64 incompat;
2826 } features_buf = { 0 };
d889140c 2827 u64 incompat;
b1b5402a
AE
2828 int ret;
2829
36be9a76 2830 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a
AE
2831 "rbd", "get_features",
2832 (char *) &snapid, sizeof (snapid),
2833 (char *) &features_buf, sizeof (features_buf),
07b2391f 2834 NULL);
36be9a76 2835 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
2836 if (ret < 0)
2837 return ret;
d889140c
AE
2838
2839 incompat = le64_to_cpu(features_buf.incompat);
2840 if (incompat & ~RBD_FEATURES_ALL)
b8f5c6ed 2841 return -ENXIO;
d889140c 2842
b1b5402a
AE
2843 *snap_features = le64_to_cpu(features_buf.features);
2844
2845 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2846 (unsigned long long) snap_id,
2847 (unsigned long long) *snap_features,
2848 (unsigned long long) le64_to_cpu(features_buf.incompat));
2849
2850 return 0;
2851}
2852
2853static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2854{
2855 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2856 &rbd_dev->header.features);
2857}
2858
86b00e0d
AE
2859static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2860{
2861 struct rbd_spec *parent_spec;
2862 size_t size;
2863 void *reply_buf = NULL;
2864 __le64 snapid;
2865 void *p;
2866 void *end;
2867 char *image_id;
2868 u64 overlap;
86b00e0d
AE
2869 int ret;
2870
2871 parent_spec = rbd_spec_alloc();
2872 if (!parent_spec)
2873 return -ENOMEM;
2874
2875 size = sizeof (__le64) + /* pool_id */
2876 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
2877 sizeof (__le64) + /* snap_id */
2878 sizeof (__le64); /* overlap */
2879 reply_buf = kmalloc(size, GFP_KERNEL);
2880 if (!reply_buf) {
2881 ret = -ENOMEM;
2882 goto out_err;
2883 }
2884
2885 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 2886 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d
AE
2887 "rbd", "get_parent",
2888 (char *) &snapid, sizeof (snapid),
07b2391f 2889 (char *) reply_buf, size, NULL);
36be9a76 2890 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
2891 if (ret < 0)
2892 goto out_err;
2893
2894 ret = -ERANGE;
2895 p = reply_buf;
2896 end = (char *) reply_buf + size;
2897 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2898 if (parent_spec->pool_id == CEPH_NOPOOL)
2899 goto out; /* No parent? No problem. */
2900
0903e875
AE
2901 /* The ceph file layout needs to fit pool id in 32 bits */
2902
2903 ret = -EIO;
2904 if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2905 goto out;
2906
979ed480 2907 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
2908 if (IS_ERR(image_id)) {
2909 ret = PTR_ERR(image_id);
2910 goto out_err;
2911 }
2912 parent_spec->image_id = image_id;
2913 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2914 ceph_decode_64_safe(&p, end, overlap, out_err);
2915
2916 rbd_dev->parent_overlap = overlap;
2917 rbd_dev->parent_spec = parent_spec;
2918 parent_spec = NULL; /* rbd_dev now owns this */
2919out:
2920 ret = 0;
2921out_err:
2922 kfree(reply_buf);
2923 rbd_spec_put(parent_spec);
2924
2925 return ret;
2926}
2927
9e15b77d
AE
2928static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2929{
2930 size_t image_id_size;
2931 char *image_id;
2932 void *p;
2933 void *end;
2934 size_t size;
2935 void *reply_buf = NULL;
2936 size_t len = 0;
2937 char *image_name = NULL;
2938 int ret;
2939
2940 rbd_assert(!rbd_dev->spec->image_name);
2941
69e7a02f
AE
2942 len = strlen(rbd_dev->spec->image_id);
2943 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
2944 image_id = kmalloc(image_id_size, GFP_KERNEL);
2945 if (!image_id)
2946 return NULL;
2947
2948 p = image_id;
2949 end = (char *) image_id + image_id_size;
69e7a02f 2950 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
9e15b77d
AE
2951
2952 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2953 reply_buf = kmalloc(size, GFP_KERNEL);
2954 if (!reply_buf)
2955 goto out;
2956
36be9a76 2957 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
2958 "rbd", "dir_get_name",
2959 image_id, image_id_size,
07b2391f 2960 (char *) reply_buf, size, NULL);
9e15b77d
AE
2961 if (ret < 0)
2962 goto out;
2963 p = reply_buf;
2964 end = (char *) reply_buf + size;
2965 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2966 if (IS_ERR(image_name))
2967 image_name = NULL;
2968 else
2969 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2970out:
2971 kfree(reply_buf);
2972 kfree(image_id);
2973
2974 return image_name;
2975}
2976
2977/*
2978 * When a parent image gets probed, we only have the pool, image,
2979 * and snapshot ids but not the names of any of them. This call
2980 * is made later to fill in those names. It has to be done after
2981 * rbd_dev_snaps_update() has completed because some of the
2982 * information (in particular, snapshot name) is not available
2983 * until then.
2984 */
2985static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
2986{
2987 struct ceph_osd_client *osdc;
2988 const char *name;
2989 void *reply_buf = NULL;
2990 int ret;
2991
2992 if (rbd_dev->spec->pool_name)
2993 return 0; /* Already have the names */
2994
2995 /* Look up the pool name */
2996
2997 osdc = &rbd_dev->rbd_client->client->osdc;
2998 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
935dc89f
AE
2999 if (!name) {
3000 rbd_warn(rbd_dev, "there is no pool with id %llu",
3001 rbd_dev->spec->pool_id); /* Really a BUG() */
3002 return -EIO;
3003 }
9e15b77d
AE
3004
3005 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3006 if (!rbd_dev->spec->pool_name)
3007 return -ENOMEM;
3008
3009 /* Fetch the image name; tolerate failure here */
3010
3011 name = rbd_dev_image_name(rbd_dev);
69e7a02f 3012 if (name)
9e15b77d 3013 rbd_dev->spec->image_name = (char *) name;
69e7a02f 3014 else
06ecc6cb 3015 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d
AE
3016
3017 /* Look up the snapshot name. */
3018
3019 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3020 if (!name) {
935dc89f
AE
3021 rbd_warn(rbd_dev, "no snapshot with id %llu",
3022 rbd_dev->spec->snap_id); /* Really a BUG() */
9e15b77d
AE
3023 ret = -EIO;
3024 goto out_err;
3025 }
3026 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3027 if(!rbd_dev->spec->snap_name)
3028 goto out_err;
3029
3030 return 0;
3031out_err:
3032 kfree(reply_buf);
3033 kfree(rbd_dev->spec->pool_name);
3034 rbd_dev->spec->pool_name = NULL;
3035
3036 return ret;
3037}
3038
6e14b1a6 3039static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
35d489f9
AE
3040{
3041 size_t size;
3042 int ret;
3043 void *reply_buf;
3044 void *p;
3045 void *end;
3046 u64 seq;
3047 u32 snap_count;
3048 struct ceph_snap_context *snapc;
3049 u32 i;
3050
3051 /*
3052 * We'll need room for the seq value (maximum snapshot id),
3053 * snapshot count, and array of that many snapshot ids.
3054 * For now we have a fixed upper limit on the number we're
3055 * prepared to receive.
3056 */
3057 size = sizeof (__le64) + sizeof (__le32) +
3058 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3059 reply_buf = kzalloc(size, GFP_KERNEL);
3060 if (!reply_buf)
3061 return -ENOMEM;
3062
36be9a76 3063 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
35d489f9
AE
3064 "rbd", "get_snapcontext",
3065 NULL, 0,
07b2391f 3066 reply_buf, size, ver);
36be9a76 3067 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
3068 if (ret < 0)
3069 goto out;
3070
3071 ret = -ERANGE;
3072 p = reply_buf;
3073 end = (char *) reply_buf + size;
3074 ceph_decode_64_safe(&p, end, seq, out);
3075 ceph_decode_32_safe(&p, end, snap_count, out);
3076
3077 /*
3078 * Make sure the reported number of snapshot ids wouldn't go
3079 * beyond the end of our buffer. But before checking that,
3080 * make sure the computed size of the snapshot context we
3081 * allocate is representable in a size_t.
3082 */
3083 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3084 / sizeof (u64)) {
3085 ret = -EINVAL;
3086 goto out;
3087 }
3088 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3089 goto out;
3090
3091 size = sizeof (struct ceph_snap_context) +
3092 snap_count * sizeof (snapc->snaps[0]);
3093 snapc = kmalloc(size, GFP_KERNEL);
3094 if (!snapc) {
3095 ret = -ENOMEM;
3096 goto out;
3097 }
3098
3099 atomic_set(&snapc->nref, 1);
3100 snapc->seq = seq;
3101 snapc->num_snaps = snap_count;
3102 for (i = 0; i < snap_count; i++)
3103 snapc->snaps[i] = ceph_decode_64(&p);
3104
3105 rbd_dev->header.snapc = snapc;
3106
3107 dout(" snap context seq = %llu, snap_count = %u\n",
3108 (unsigned long long) seq, (unsigned int) snap_count);
3109
3110out:
3111 kfree(reply_buf);
3112
3113 return 0;
3114}
3115
b8b1e2db
AE
3116static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3117{
3118 size_t size;
3119 void *reply_buf;
3120 __le64 snap_id;
3121 int ret;
3122 void *p;
3123 void *end;
b8b1e2db
AE
3124 char *snap_name;
3125
3126 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3127 reply_buf = kmalloc(size, GFP_KERNEL);
3128 if (!reply_buf)
3129 return ERR_PTR(-ENOMEM);
3130
3131 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
36be9a76 3132 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db
AE
3133 "rbd", "get_snapshot_name",
3134 (char *) &snap_id, sizeof (snap_id),
07b2391f 3135 reply_buf, size, NULL);
36be9a76 3136 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b8b1e2db
AE
3137 if (ret < 0)
3138 goto out;
3139
3140 p = reply_buf;
3141 end = (char *) reply_buf + size;
e5c35534 3142 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
b8b1e2db
AE
3143 if (IS_ERR(snap_name)) {
3144 ret = PTR_ERR(snap_name);
3145 goto out;
3146 } else {
3147 dout(" snap_id 0x%016llx snap_name = %s\n",
3148 (unsigned long long) le64_to_cpu(snap_id), snap_name);
3149 }
3150 kfree(reply_buf);
3151
3152 return snap_name;
3153out:
3154 kfree(reply_buf);
3155
3156 return ERR_PTR(ret);
3157}
3158
3159static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3160 u64 *snap_size, u64 *snap_features)
3161{
e0b49868 3162 u64 snap_id;
b8b1e2db
AE
3163 u8 order;
3164 int ret;
3165
3166 snap_id = rbd_dev->header.snapc->snaps[which];
3167 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3168 if (ret)
3169 return ERR_PTR(ret);
3170 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3171 if (ret)
3172 return ERR_PTR(ret);
3173
3174 return rbd_dev_v2_snap_name(rbd_dev, which);
3175}
3176
3177static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3178 u64 *snap_size, u64 *snap_features)
3179{
3180 if (rbd_dev->image_format == 1)
3181 return rbd_dev_v1_snap_info(rbd_dev, which,
3182 snap_size, snap_features);
3183 if (rbd_dev->image_format == 2)
3184 return rbd_dev_v2_snap_info(rbd_dev, which,
3185 snap_size, snap_features);
3186 return ERR_PTR(-EINVAL);
3187}
3188
117973fb
AE
3189static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3190{
3191 int ret;
3192 __u8 obj_order;
3193
3194 down_write(&rbd_dev->header_rwsem);
3195
3196 /* Grab old order first, to see if it changes */
3197
3198 obj_order = rbd_dev->header.obj_order,
3199 ret = rbd_dev_v2_image_size(rbd_dev);
3200 if (ret)
3201 goto out;
3202 if (rbd_dev->header.obj_order != obj_order) {
3203 ret = -EIO;
3204 goto out;
3205 }
3206 rbd_update_mapping_size(rbd_dev);
3207
3208 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3209 dout("rbd_dev_v2_snap_context returned %d\n", ret);
3210 if (ret)
3211 goto out;
3212 ret = rbd_dev_snaps_update(rbd_dev);
3213 dout("rbd_dev_snaps_update returned %d\n", ret);
3214 if (ret)
3215 goto out;
3216 ret = rbd_dev_snaps_register(rbd_dev);
3217 dout("rbd_dev_snaps_register returned %d\n", ret);
3218out:
3219 up_write(&rbd_dev->header_rwsem);
3220
3221 return ret;
3222}
3223
dfc5606d 3224/*
35938150
AE
3225 * Scan the rbd device's current snapshot list and compare it to the
3226 * newly-received snapshot context. Remove any existing snapshots
3227 * not present in the new snapshot context. Add a new snapshot for
3228 * any snaphots in the snapshot context not in the current list.
3229 * And verify there are no changes to snapshots we already know
3230 * about.
3231 *
3232 * Assumes the snapshots in the snapshot context are sorted by
3233 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
3234 * are also maintained in that order.)
dfc5606d 3235 */
304f6808 3236static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
dfc5606d 3237{
35938150
AE
3238 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3239 const u32 snap_count = snapc->num_snaps;
35938150
AE
3240 struct list_head *head = &rbd_dev->snaps;
3241 struct list_head *links = head->next;
3242 u32 index = 0;
dfc5606d 3243
9fcbb800 3244 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
35938150
AE
3245 while (index < snap_count || links != head) {
3246 u64 snap_id;
3247 struct rbd_snap *snap;
cd892126
AE
3248 char *snap_name;
3249 u64 snap_size = 0;
3250 u64 snap_features = 0;
dfc5606d 3251
35938150
AE
3252 snap_id = index < snap_count ? snapc->snaps[index]
3253 : CEPH_NOSNAP;
3254 snap = links != head ? list_entry(links, struct rbd_snap, node)
3255 : NULL;
aafb230e 3256 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
dfc5606d 3257
35938150
AE
3258 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3259 struct list_head *next = links->next;
dfc5606d 3260
6d292906
AE
3261 /*
3262 * A previously-existing snapshot is not in
3263 * the new snap context.
3264 *
3265 * If the now missing snapshot is the one the
3266 * image is mapped to, clear its exists flag
3267 * so we can avoid sending any more requests
3268 * to it.
3269 */
0d7dbfce 3270 if (rbd_dev->spec->snap_id == snap->id)
6d292906 3271 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
41f38c2b 3272 rbd_remove_snap_dev(snap);
9fcbb800 3273 dout("%ssnap id %llu has been removed\n",
0d7dbfce
AE
3274 rbd_dev->spec->snap_id == snap->id ?
3275 "mapped " : "",
9fcbb800 3276 (unsigned long long) snap->id);
35938150
AE
3277
3278 /* Done with this list entry; advance */
3279
3280 links = next;
dfc5606d
YS
3281 continue;
3282 }
35938150 3283
b8b1e2db
AE
3284 snap_name = rbd_dev_snap_info(rbd_dev, index,
3285 &snap_size, &snap_features);
cd892126
AE
3286 if (IS_ERR(snap_name))
3287 return PTR_ERR(snap_name);
3288
9fcbb800
AE
3289 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3290 (unsigned long long) snap_id);
35938150
AE
3291 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3292 struct rbd_snap *new_snap;
3293
3294 /* We haven't seen this snapshot before */
3295
c8d18425 3296 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
cd892126 3297 snap_id, snap_size, snap_features);
9fcbb800
AE
3298 if (IS_ERR(new_snap)) {
3299 int err = PTR_ERR(new_snap);
3300
3301 dout(" failed to add dev, error %d\n", err);
3302
3303 return err;
3304 }
35938150
AE
3305
3306 /* New goes before existing, or at end of list */
3307
9fcbb800 3308 dout(" added dev%s\n", snap ? "" : " at end\n");
35938150
AE
3309 if (snap)
3310 list_add_tail(&new_snap->node, &snap->node);
3311 else
523f3258 3312 list_add_tail(&new_snap->node, head);
35938150
AE
3313 } else {
3314 /* Already have this one */
3315
9fcbb800
AE
3316 dout(" already present\n");
3317
cd892126 3318 rbd_assert(snap->size == snap_size);
aafb230e 3319 rbd_assert(!strcmp(snap->name, snap_name));
cd892126 3320 rbd_assert(snap->features == snap_features);
35938150
AE
3321
3322 /* Done with this list entry; advance */
3323
3324 links = links->next;
dfc5606d 3325 }
35938150
AE
3326
3327 /* Advance to the next entry in the snapshot context */
3328
3329 index++;
dfc5606d 3330 }
9fcbb800 3331 dout("%s: done\n", __func__);
dfc5606d
YS
3332
3333 return 0;
3334}
3335
304f6808
AE
3336/*
3337 * Scan the list of snapshots and register the devices for any that
3338 * have not already been registered.
3339 */
3340static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3341{
3342 struct rbd_snap *snap;
3343 int ret = 0;
3344
37206ee5 3345 dout("%s:\n", __func__);
86ff77bb
AE
3346 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3347 return -EIO;
304f6808
AE
3348
3349 list_for_each_entry(snap, &rbd_dev->snaps, node) {
3350 if (!rbd_snap_registered(snap)) {
3351 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3352 if (ret < 0)
3353 break;
3354 }
3355 }
3356 dout("%s: returning %d\n", __func__, ret);
3357
3358 return ret;
3359}
3360
dfc5606d
YS
3361static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3362{
dfc5606d 3363 struct device *dev;
cd789ab9 3364 int ret;
dfc5606d
YS
3365
3366 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
dfc5606d 3367
cd789ab9 3368 dev = &rbd_dev->dev;
dfc5606d
YS
3369 dev->bus = &rbd_bus_type;
3370 dev->type = &rbd_device_type;
3371 dev->parent = &rbd_root_dev;
3372 dev->release = rbd_dev_release;
de71a297 3373 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 3374 ret = device_register(dev);
dfc5606d 3375
dfc5606d 3376 mutex_unlock(&ctl_mutex);
cd789ab9 3377
dfc5606d 3378 return ret;
602adf40
YS
3379}
3380
dfc5606d
YS
3381static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3382{
3383 device_unregister(&rbd_dev->dev);
3384}
3385
e2839308 3386static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
3387
3388/*
499afd5b
AE
3389 * Get a unique rbd identifier for the given new rbd_dev, and add
3390 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 3391 */
e2839308 3392static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 3393{
e2839308 3394 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
3395
3396 spin_lock(&rbd_dev_list_lock);
3397 list_add_tail(&rbd_dev->node, &rbd_dev_list);
3398 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
3399 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3400 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 3401}
b7f23c36 3402
1ddbe94e 3403/*
499afd5b
AE
3404 * Remove an rbd_dev from the global list, and record that its
3405 * identifier is no longer in use.
1ddbe94e 3406 */
e2839308 3407static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 3408{
d184f6bf 3409 struct list_head *tmp;
de71a297 3410 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
3411 int max_id;
3412
aafb230e 3413 rbd_assert(rbd_id > 0);
499afd5b 3414
e2839308
AE
3415 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3416 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
3417 spin_lock(&rbd_dev_list_lock);
3418 list_del_init(&rbd_dev->node);
d184f6bf
AE
3419
3420 /*
3421 * If the id being "put" is not the current maximum, there
3422 * is nothing special we need to do.
3423 */
e2839308 3424 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
3425 spin_unlock(&rbd_dev_list_lock);
3426 return;
3427 }
3428
3429 /*
3430 * We need to update the current maximum id. Search the
3431 * list to find out what it is. We're more likely to find
3432 * the maximum at the end, so search the list backward.
3433 */
3434 max_id = 0;
3435 list_for_each_prev(tmp, &rbd_dev_list) {
3436 struct rbd_device *rbd_dev;
3437
3438 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
3439 if (rbd_dev->dev_id > max_id)
3440 max_id = rbd_dev->dev_id;
d184f6bf 3441 }
499afd5b 3442 spin_unlock(&rbd_dev_list_lock);
b7f23c36 3443
1ddbe94e 3444 /*
e2839308 3445 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
3446 * which case it now accurately reflects the new maximum.
3447 * Be careful not to overwrite the maximum value in that
3448 * case.
1ddbe94e 3449 */
e2839308
AE
3450 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3451 dout(" max dev id has been reset\n");
b7f23c36
AE
3452}
3453
e28fff26
AE
3454/*
3455 * Skips over white space at *buf, and updates *buf to point to the
3456 * first found non-space character (if any). Returns the length of
593a9e7b
AE
3457 * the token (string of non-white space characters) found. Note
3458 * that *buf must be terminated with '\0'.
e28fff26
AE
3459 */
3460static inline size_t next_token(const char **buf)
3461{
3462 /*
3463 * These are the characters that produce nonzero for
3464 * isspace() in the "C" and "POSIX" locales.
3465 */
3466 const char *spaces = " \f\n\r\t\v";
3467
3468 *buf += strspn(*buf, spaces); /* Find start of token */
3469
3470 return strcspn(*buf, spaces); /* Return token length */
3471}
3472
3473/*
3474 * Finds the next token in *buf, and if the provided token buffer is
3475 * big enough, copies the found token into it. The result, if
593a9e7b
AE
3476 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3477 * must be terminated with '\0' on entry.
e28fff26
AE
3478 *
3479 * Returns the length of the token found (not including the '\0').
3480 * Return value will be 0 if no token is found, and it will be >=
3481 * token_size if the token would not fit.
3482 *
593a9e7b 3483 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
3484 * found token. Note that this occurs even if the token buffer is
3485 * too small to hold it.
3486 */
3487static inline size_t copy_token(const char **buf,
3488 char *token,
3489 size_t token_size)
3490{
3491 size_t len;
3492
3493 len = next_token(buf);
3494 if (len < token_size) {
3495 memcpy(token, *buf, len);
3496 *(token + len) = '\0';
3497 }
3498 *buf += len;
3499
3500 return len;
3501}
3502
ea3352f4
AE
3503/*
3504 * Finds the next token in *buf, dynamically allocates a buffer big
3505 * enough to hold a copy of it, and copies the token into the new
3506 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3507 * that a duplicate buffer is created even for a zero-length token.
3508 *
3509 * Returns a pointer to the newly-allocated duplicate, or a null
3510 * pointer if memory for the duplicate was not available. If
3511 * the lenp argument is a non-null pointer, the length of the token
3512 * (not including the '\0') is returned in *lenp.
3513 *
3514 * If successful, the *buf pointer will be updated to point beyond
3515 * the end of the found token.
3516 *
3517 * Note: uses GFP_KERNEL for allocation.
3518 */
3519static inline char *dup_token(const char **buf, size_t *lenp)
3520{
3521 char *dup;
3522 size_t len;
3523
3524 len = next_token(buf);
4caf35f9 3525 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
3526 if (!dup)
3527 return NULL;
ea3352f4
AE
3528 *(dup + len) = '\0';
3529 *buf += len;
3530
3531 if (lenp)
3532 *lenp = len;
3533
3534 return dup;
3535}
3536
a725f65e 3537/*
859c31df
AE
3538 * Parse the options provided for an "rbd add" (i.e., rbd image
3539 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
3540 * and the data written is passed here via a NUL-terminated buffer.
3541 * Returns 0 if successful or an error code otherwise.
d22f76e7 3542 *
859c31df
AE
3543 * The information extracted from these options is recorded in
3544 * the other parameters which return dynamically-allocated
3545 * structures:
3546 * ceph_opts
3547 * The address of a pointer that will refer to a ceph options
3548 * structure. Caller must release the returned pointer using
3549 * ceph_destroy_options() when it is no longer needed.
3550 * rbd_opts
3551 * Address of an rbd options pointer. Fully initialized by
3552 * this function; caller must release with kfree().
3553 * spec
3554 * Address of an rbd image specification pointer. Fully
3555 * initialized by this function based on parsed options.
3556 * Caller must release with rbd_spec_put().
3557 *
3558 * The options passed take this form:
3559 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3560 * where:
3561 * <mon_addrs>
3562 * A comma-separated list of one or more monitor addresses.
3563 * A monitor address is an ip address, optionally followed
3564 * by a port number (separated by a colon).
3565 * I.e.: ip1[:port1][,ip2[:port2]...]
3566 * <options>
3567 * A comma-separated list of ceph and/or rbd options.
3568 * <pool_name>
3569 * The name of the rados pool containing the rbd image.
3570 * <image_name>
3571 * The name of the image in that pool to map.
3572 * <snap_id>
3573 * An optional snapshot id. If provided, the mapping will
3574 * present data from the image at the time that snapshot was
3575 * created. The image head is used if no snapshot id is
3576 * provided. Snapshot mappings are always read-only.
a725f65e 3577 */
859c31df 3578static int rbd_add_parse_args(const char *buf,
dc79b113 3579 struct ceph_options **ceph_opts,
859c31df
AE
3580 struct rbd_options **opts,
3581 struct rbd_spec **rbd_spec)
e28fff26 3582{
d22f76e7 3583 size_t len;
859c31df 3584 char *options;
0ddebc0c
AE
3585 const char *mon_addrs;
3586 size_t mon_addrs_size;
859c31df 3587 struct rbd_spec *spec = NULL;
4e9afeba 3588 struct rbd_options *rbd_opts = NULL;
859c31df 3589 struct ceph_options *copts;
dc79b113 3590 int ret;
e28fff26
AE
3591
3592 /* The first four tokens are required */
3593
7ef3214a 3594 len = next_token(&buf);
4fb5d671
AE
3595 if (!len) {
3596 rbd_warn(NULL, "no monitor address(es) provided");
3597 return -EINVAL;
3598 }
0ddebc0c 3599 mon_addrs = buf;
f28e565a 3600 mon_addrs_size = len + 1;
7ef3214a 3601 buf += len;
a725f65e 3602
dc79b113 3603 ret = -EINVAL;
f28e565a
AE
3604 options = dup_token(&buf, NULL);
3605 if (!options)
dc79b113 3606 return -ENOMEM;
4fb5d671
AE
3607 if (!*options) {
3608 rbd_warn(NULL, "no options provided");
3609 goto out_err;
3610 }
e28fff26 3611
859c31df
AE
3612 spec = rbd_spec_alloc();
3613 if (!spec)
f28e565a 3614 goto out_mem;
859c31df
AE
3615
3616 spec->pool_name = dup_token(&buf, NULL);
3617 if (!spec->pool_name)
3618 goto out_mem;
4fb5d671
AE
3619 if (!*spec->pool_name) {
3620 rbd_warn(NULL, "no pool name provided");
3621 goto out_err;
3622 }
e28fff26 3623
69e7a02f 3624 spec->image_name = dup_token(&buf, NULL);
859c31df 3625 if (!spec->image_name)
f28e565a 3626 goto out_mem;
4fb5d671
AE
3627 if (!*spec->image_name) {
3628 rbd_warn(NULL, "no image name provided");
3629 goto out_err;
3630 }
d4b125e9 3631
f28e565a
AE
3632 /*
3633 * Snapshot name is optional; default is to use "-"
3634 * (indicating the head/no snapshot).
3635 */
3feeb894 3636 len = next_token(&buf);
820a5f3e 3637 if (!len) {
3feeb894
AE
3638 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3639 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 3640 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 3641 ret = -ENAMETOOLONG;
f28e565a 3642 goto out_err;
849b4260 3643 }
4caf35f9 3644 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
859c31df 3645 if (!spec->snap_name)
f28e565a 3646 goto out_mem;
859c31df 3647 *(spec->snap_name + len) = '\0';
e5c35534 3648
0ddebc0c 3649 /* Initialize all rbd options to the defaults */
e28fff26 3650
4e9afeba
AE
3651 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3652 if (!rbd_opts)
3653 goto out_mem;
3654
3655 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 3656
859c31df 3657 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 3658 mon_addrs + mon_addrs_size - 1,
4e9afeba 3659 parse_rbd_opts_token, rbd_opts);
859c31df
AE
3660 if (IS_ERR(copts)) {
3661 ret = PTR_ERR(copts);
dc79b113
AE
3662 goto out_err;
3663 }
859c31df
AE
3664 kfree(options);
3665
3666 *ceph_opts = copts;
4e9afeba 3667 *opts = rbd_opts;
859c31df 3668 *rbd_spec = spec;
0ddebc0c 3669
dc79b113 3670 return 0;
f28e565a 3671out_mem:
dc79b113 3672 ret = -ENOMEM;
d22f76e7 3673out_err:
859c31df
AE
3674 kfree(rbd_opts);
3675 rbd_spec_put(spec);
f28e565a 3676 kfree(options);
d22f76e7 3677
dc79b113 3678 return ret;
a725f65e
AE
3679}
3680
589d30e0
AE
3681/*
3682 * An rbd format 2 image has a unique identifier, distinct from the
3683 * name given to it by the user. Internally, that identifier is
3684 * what's used to specify the names of objects related to the image.
3685 *
3686 * A special "rbd id" object is used to map an rbd image name to its
3687 * id. If that object doesn't exist, then there is no v2 rbd image
3688 * with the supplied name.
3689 *
3690 * This function will record the given rbd_dev's image_id field if
3691 * it can be determined, and in that case will return 0. If any
3692 * errors occur a negative errno will be returned and the rbd_dev's
3693 * image_id field will be unchanged (and should be NULL).
3694 */
3695static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3696{
3697 int ret;
3698 size_t size;
3699 char *object_name;
3700 void *response;
3701 void *p;
3702
2c0d0a10
AE
3703 /*
3704 * When probing a parent image, the image id is already
3705 * known (and the image name likely is not). There's no
3706 * need to fetch the image id again in this case.
3707 */
3708 if (rbd_dev->spec->image_id)
3709 return 0;
3710
589d30e0
AE
3711 /*
3712 * First, see if the format 2 image id file exists, and if
3713 * so, get the image's persistent id from it.
3714 */
69e7a02f 3715 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
3716 object_name = kmalloc(size, GFP_NOIO);
3717 if (!object_name)
3718 return -ENOMEM;
0d7dbfce 3719 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
3720 dout("rbd id object name is %s\n", object_name);
3721
3722 /* Response will be an encoded string, which includes a length */
3723
3724 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3725 response = kzalloc(size, GFP_NOIO);
3726 if (!response) {
3727 ret = -ENOMEM;
3728 goto out;
3729 }
3730
36be9a76 3731 ret = rbd_obj_method_sync(rbd_dev, object_name,
589d30e0
AE
3732 "rbd", "get_id",
3733 NULL, 0,
07b2391f 3734 response, RBD_IMAGE_ID_LEN_MAX, NULL);
36be9a76 3735 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
589d30e0
AE
3736 if (ret < 0)
3737 goto out;
3738
3739 p = response;
0d7dbfce 3740 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
589d30e0 3741 p + RBD_IMAGE_ID_LEN_MAX,
979ed480 3742 NULL, GFP_NOIO);
0d7dbfce
AE
3743 if (IS_ERR(rbd_dev->spec->image_id)) {
3744 ret = PTR_ERR(rbd_dev->spec->image_id);
3745 rbd_dev->spec->image_id = NULL;
589d30e0 3746 } else {
0d7dbfce 3747 dout("image_id is %s\n", rbd_dev->spec->image_id);
589d30e0
AE
3748 }
3749out:
3750 kfree(response);
3751 kfree(object_name);
3752
3753 return ret;
3754}
3755
a30b71b9
AE
3756static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3757{
3758 int ret;
3759 size_t size;
3760
3761 /* Version 1 images have no id; empty string is used */
3762
0d7dbfce
AE
3763 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3764 if (!rbd_dev->spec->image_id)
a30b71b9 3765 return -ENOMEM;
a30b71b9
AE
3766
3767 /* Record the header object name for this rbd image. */
3768
69e7a02f 3769 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
a30b71b9
AE
3770 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3771 if (!rbd_dev->header_name) {
3772 ret = -ENOMEM;
3773 goto out_err;
3774 }
0d7dbfce
AE
3775 sprintf(rbd_dev->header_name, "%s%s",
3776 rbd_dev->spec->image_name, RBD_SUFFIX);
a30b71b9
AE
3777
3778 /* Populate rbd image metadata */
3779
3780 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3781 if (ret < 0)
3782 goto out_err;
86b00e0d
AE
3783
3784 /* Version 1 images have no parent (no layering) */
3785
3786 rbd_dev->parent_spec = NULL;
3787 rbd_dev->parent_overlap = 0;
3788
a30b71b9
AE
3789 rbd_dev->image_format = 1;
3790
3791 dout("discovered version 1 image, header name is %s\n",
3792 rbd_dev->header_name);
3793
3794 return 0;
3795
3796out_err:
3797 kfree(rbd_dev->header_name);
3798 rbd_dev->header_name = NULL;
0d7dbfce
AE
3799 kfree(rbd_dev->spec->image_id);
3800 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
3801
3802 return ret;
3803}
3804
3805static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3806{
3807 size_t size;
9d475de5 3808 int ret;
6e14b1a6 3809 u64 ver = 0;
a30b71b9
AE
3810
3811 /*
3812 * Image id was filled in by the caller. Record the header
3813 * object name for this rbd image.
3814 */
979ed480 3815 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
a30b71b9
AE
3816 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3817 if (!rbd_dev->header_name)
3818 return -ENOMEM;
3819 sprintf(rbd_dev->header_name, "%s%s",
0d7dbfce 3820 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
9d475de5
AE
3821
3822 /* Get the size and object order for the image */
3823
3824 ret = rbd_dev_v2_image_size(rbd_dev);
1e130199
AE
3825 if (ret < 0)
3826 goto out_err;
3827
3828 /* Get the object prefix (a.k.a. block_name) for the image */
3829
3830 ret = rbd_dev_v2_object_prefix(rbd_dev);
b1b5402a
AE
3831 if (ret < 0)
3832 goto out_err;
3833
d889140c 3834 /* Get the and check features for the image */
b1b5402a
AE
3835
3836 ret = rbd_dev_v2_features(rbd_dev);
9d475de5
AE
3837 if (ret < 0)
3838 goto out_err;
35d489f9 3839
86b00e0d
AE
3840 /* If the image supports layering, get the parent info */
3841
3842 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3843 ret = rbd_dev_v2_parent_info(rbd_dev);
3844 if (ret < 0)
3845 goto out_err;
3846 }
3847
6e14b1a6
AE
3848 /* crypto and compression type aren't (yet) supported for v2 images */
3849
3850 rbd_dev->header.crypt_type = 0;
3851 rbd_dev->header.comp_type = 0;
35d489f9 3852
6e14b1a6
AE
3853 /* Get the snapshot context, plus the header version */
3854
3855 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
35d489f9
AE
3856 if (ret)
3857 goto out_err;
6e14b1a6
AE
3858 rbd_dev->header.obj_version = ver;
3859
a30b71b9
AE
3860 rbd_dev->image_format = 2;
3861
3862 dout("discovered version 2 image, header name is %s\n",
3863 rbd_dev->header_name);
3864
35152979 3865 return 0;
9d475de5 3866out_err:
86b00e0d
AE
3867 rbd_dev->parent_overlap = 0;
3868 rbd_spec_put(rbd_dev->parent_spec);
3869 rbd_dev->parent_spec = NULL;
9d475de5
AE
3870 kfree(rbd_dev->header_name);
3871 rbd_dev->header_name = NULL;
1e130199
AE
3872 kfree(rbd_dev->header.object_prefix);
3873 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
3874
3875 return ret;
a30b71b9
AE
3876}
3877
83a06263
AE
3878static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3879{
3880 int ret;
3881
3882 /* no need to lock here, as rbd_dev is not registered yet */
3883 ret = rbd_dev_snaps_update(rbd_dev);
3884 if (ret)
3885 return ret;
3886
9e15b77d
AE
3887 ret = rbd_dev_probe_update_spec(rbd_dev);
3888 if (ret)
3889 goto err_out_snaps;
3890
83a06263
AE
3891 ret = rbd_dev_set_mapping(rbd_dev);
3892 if (ret)
3893 goto err_out_snaps;
3894
3895 /* generate unique id: find highest unique id, add one */
3896 rbd_dev_id_get(rbd_dev);
3897
3898 /* Fill in the device name, now that we have its id. */
3899 BUILD_BUG_ON(DEV_NAME_LEN
3900 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3901 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3902
3903 /* Get our block major device number. */
3904
3905 ret = register_blkdev(0, rbd_dev->name);
3906 if (ret < 0)
3907 goto err_out_id;
3908 rbd_dev->major = ret;
3909
3910 /* Set up the blkdev mapping. */
3911
3912 ret = rbd_init_disk(rbd_dev);
3913 if (ret)
3914 goto err_out_blkdev;
3915
3916 ret = rbd_bus_add_dev(rbd_dev);
3917 if (ret)
3918 goto err_out_disk;
3919
3920 /*
3921 * At this point cleanup in the event of an error is the job
3922 * of the sysfs code (initiated by rbd_bus_del_dev()).
3923 */
3924 down_write(&rbd_dev->header_rwsem);
3925 ret = rbd_dev_snaps_register(rbd_dev);
3926 up_write(&rbd_dev->header_rwsem);
3927 if (ret)
3928 goto err_out_bus;
3929
9969ebc5 3930 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
83a06263
AE
3931 if (ret)
3932 goto err_out_bus;
3933
3934 /* Everything's ready. Announce the disk to the world. */
3935
3936 add_disk(rbd_dev->disk);
3937
3938 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3939 (unsigned long long) rbd_dev->mapping.size);
3940
3941 return ret;
3942err_out_bus:
3943 /* this will also clean up rest of rbd_dev stuff */
3944
3945 rbd_bus_del_dev(rbd_dev);
3946
3947 return ret;
3948err_out_disk:
3949 rbd_free_disk(rbd_dev);
3950err_out_blkdev:
3951 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3952err_out_id:
3953 rbd_dev_id_put(rbd_dev);
3954err_out_snaps:
3955 rbd_remove_all_snaps(rbd_dev);
3956
3957 return ret;
3958}
3959
a30b71b9
AE
3960/*
3961 * Probe for the existence of the header object for the given rbd
3962 * device. For format 2 images this includes determining the image
3963 * id.
3964 */
3965static int rbd_dev_probe(struct rbd_device *rbd_dev)
3966{
3967 int ret;
3968
3969 /*
3970 * Get the id from the image id object. If it's not a
3971 * format 2 image, we'll get ENOENT back, and we'll assume
3972 * it's a format 1 image.
3973 */
3974 ret = rbd_dev_image_id(rbd_dev);
3975 if (ret)
3976 ret = rbd_dev_v1_probe(rbd_dev);
3977 else
3978 ret = rbd_dev_v2_probe(rbd_dev);
83a06263 3979 if (ret) {
a30b71b9
AE
3980 dout("probe failed, returning %d\n", ret);
3981
83a06263
AE
3982 return ret;
3983 }
3984
3985 ret = rbd_dev_probe_finish(rbd_dev);
3986 if (ret)
3987 rbd_header_free(&rbd_dev->header);
3988
a30b71b9
AE
3989 return ret;
3990}
3991
59c2be1e
YS
3992static ssize_t rbd_add(struct bus_type *bus,
3993 const char *buf,
3994 size_t count)
602adf40 3995{
cb8627c7 3996 struct rbd_device *rbd_dev = NULL;
dc79b113 3997 struct ceph_options *ceph_opts = NULL;
4e9afeba 3998 struct rbd_options *rbd_opts = NULL;
859c31df 3999 struct rbd_spec *spec = NULL;
9d3997fd 4000 struct rbd_client *rbdc;
27cc2594
AE
4001 struct ceph_osd_client *osdc;
4002 int rc = -ENOMEM;
602adf40
YS
4003
4004 if (!try_module_get(THIS_MODULE))
4005 return -ENODEV;
4006
602adf40 4007 /* parse add command */
859c31df 4008 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 4009 if (rc < 0)
bd4ba655 4010 goto err_out_module;
78cea76e 4011
9d3997fd
AE
4012 rbdc = rbd_get_client(ceph_opts);
4013 if (IS_ERR(rbdc)) {
4014 rc = PTR_ERR(rbdc);
0ddebc0c 4015 goto err_out_args;
9d3997fd 4016 }
c53d5893 4017 ceph_opts = NULL; /* rbd_dev client now owns this */
602adf40 4018
602adf40 4019 /* pick the pool */
9d3997fd 4020 osdc = &rbdc->client->osdc;
859c31df 4021 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
4022 if (rc < 0)
4023 goto err_out_client;
859c31df
AE
4024 spec->pool_id = (u64) rc;
4025
0903e875
AE
4026 /* The ceph file layout needs to fit pool id in 32 bits */
4027
4028 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4029 rc = -EIO;
4030 goto err_out_client;
4031 }
4032
c53d5893 4033 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
4034 if (!rbd_dev)
4035 goto err_out_client;
c53d5893
AE
4036 rbdc = NULL; /* rbd_dev now owns this */
4037 spec = NULL; /* rbd_dev now owns this */
602adf40 4038
bd4ba655 4039 rbd_dev->mapping.read_only = rbd_opts->read_only;
c53d5893
AE
4040 kfree(rbd_opts);
4041 rbd_opts = NULL; /* done with this */
bd4ba655 4042
a30b71b9
AE
4043 rc = rbd_dev_probe(rbd_dev);
4044 if (rc < 0)
c53d5893 4045 goto err_out_rbd_dev;
05fd6f6f 4046
602adf40 4047 return count;
c53d5893
AE
4048err_out_rbd_dev:
4049 rbd_dev_destroy(rbd_dev);
bd4ba655 4050err_out_client:
9d3997fd 4051 rbd_put_client(rbdc);
0ddebc0c 4052err_out_args:
78cea76e
AE
4053 if (ceph_opts)
4054 ceph_destroy_options(ceph_opts);
4e9afeba 4055 kfree(rbd_opts);
859c31df 4056 rbd_spec_put(spec);
bd4ba655
AE
4057err_out_module:
4058 module_put(THIS_MODULE);
27cc2594 4059
602adf40 4060 dout("Error adding device %s\n", buf);
27cc2594
AE
4061
4062 return (ssize_t) rc;
602adf40
YS
4063}
4064
de71a297 4065static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
4066{
4067 struct list_head *tmp;
4068 struct rbd_device *rbd_dev;
4069
e124a82f 4070 spin_lock(&rbd_dev_list_lock);
602adf40
YS
4071 list_for_each(tmp, &rbd_dev_list) {
4072 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 4073 if (rbd_dev->dev_id == dev_id) {
e124a82f 4074 spin_unlock(&rbd_dev_list_lock);
602adf40 4075 return rbd_dev;
e124a82f 4076 }
602adf40 4077 }
e124a82f 4078 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
4079 return NULL;
4080}
4081
dfc5606d 4082static void rbd_dev_release(struct device *dev)
602adf40 4083{
593a9e7b 4084 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4085
59c2be1e 4086 if (rbd_dev->watch_event)
9969ebc5 4087 rbd_dev_header_watch_sync(rbd_dev, 0);
602adf40
YS
4088
4089 /* clean up and free blkdev */
4090 rbd_free_disk(rbd_dev);
4091 unregister_blkdev(rbd_dev->major, rbd_dev->name);
32eec68d 4092
2ac4e75d
AE
4093 /* release allocated disk header fields */
4094 rbd_header_free(&rbd_dev->header);
4095
32eec68d 4096 /* done with the id, and with the rbd_dev */
e2839308 4097 rbd_dev_id_put(rbd_dev);
c53d5893
AE
4098 rbd_assert(rbd_dev->rbd_client != NULL);
4099 rbd_dev_destroy(rbd_dev);
602adf40
YS
4100
4101 /* release module ref */
4102 module_put(THIS_MODULE);
602adf40
YS
4103}
4104
dfc5606d
YS
4105static ssize_t rbd_remove(struct bus_type *bus,
4106 const char *buf,
4107 size_t count)
602adf40
YS
4108{
4109 struct rbd_device *rbd_dev = NULL;
4110 int target_id, rc;
4111 unsigned long ul;
4112 int ret = count;
4113
4114 rc = strict_strtoul(buf, 10, &ul);
4115 if (rc)
4116 return rc;
4117
4118 /* convert to int; abort if we lost anything in the conversion */
4119 target_id = (int) ul;
4120 if (target_id != ul)
4121 return -EINVAL;
4122
4123 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4124
4125 rbd_dev = __rbd_get_dev(target_id);
4126 if (!rbd_dev) {
4127 ret = -ENOENT;
4128 goto done;
42382b70
AE
4129 }
4130
a14ea269 4131 spin_lock_irq(&rbd_dev->lock);
b82d167b 4132 if (rbd_dev->open_count)
42382b70 4133 ret = -EBUSY;
b82d167b
AE
4134 else
4135 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
a14ea269 4136 spin_unlock_irq(&rbd_dev->lock);
b82d167b 4137 if (ret < 0)
42382b70 4138 goto done;
602adf40 4139
41f38c2b 4140 rbd_remove_all_snaps(rbd_dev);
dfc5606d 4141 rbd_bus_del_dev(rbd_dev);
602adf40
YS
4142
4143done:
4144 mutex_unlock(&ctl_mutex);
aafb230e 4145
602adf40
YS
4146 return ret;
4147}
4148
602adf40
YS
4149/*
4150 * create control files in sysfs
dfc5606d 4151 * /sys/bus/rbd/...
602adf40
YS
4152 */
4153static int rbd_sysfs_init(void)
4154{
dfc5606d 4155 int ret;
602adf40 4156
fed4c143 4157 ret = device_register(&rbd_root_dev);
21079786 4158 if (ret < 0)
dfc5606d 4159 return ret;
602adf40 4160
fed4c143
AE
4161 ret = bus_register(&rbd_bus_type);
4162 if (ret < 0)
4163 device_unregister(&rbd_root_dev);
602adf40 4164
602adf40
YS
4165 return ret;
4166}
4167
4168static void rbd_sysfs_cleanup(void)
4169{
dfc5606d 4170 bus_unregister(&rbd_bus_type);
fed4c143 4171 device_unregister(&rbd_root_dev);
602adf40
YS
4172}
4173
cc344fa1 4174static int __init rbd_init(void)
602adf40
YS
4175{
4176 int rc;
4177
1e32d34c
AE
4178 if (!libceph_compatible(NULL)) {
4179 rbd_warn(NULL, "libceph incompatibility (quitting)");
4180
4181 return -EINVAL;
4182 }
602adf40
YS
4183 rc = rbd_sysfs_init();
4184 if (rc)
4185 return rc;
f0f8cef5 4186 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
4187 return 0;
4188}
4189
cc344fa1 4190static void __exit rbd_exit(void)
602adf40
YS
4191{
4192 rbd_sysfs_cleanup();
4193}
4194
4195module_init(rbd_init);
4196module_exit(rbd_exit);
4197
4198MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4199MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4200MODULE_DESCRIPTION("rados block device");
4201
4202/* following authorship retained from original osdblk.c */
4203MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4204
4205MODULE_LICENSE("GPL");