rbd: don't hold ctl_mutex to get/put device
[linux-2.6-block.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/decode.h>
59c2be1e 35#include <linux/parser.h>
30d1cff8 36#include <linux/bsearch.h>
602adf40
YS
37
38#include <linux/kernel.h>
39#include <linux/device.h>
40#include <linux/module.h>
41#include <linux/fs.h>
42#include <linux/blkdev.h>
1c2a9dfe 43#include <linux/slab.h>
602adf40
YS
44
45#include "rbd_types.h"
46
aafb230e
AE
47#define RBD_DEBUG /* Activate rbd_assert() calls */
48
593a9e7b
AE
49/*
50 * The basic unit of block I/O is a sector. It is interpreted in a
51 * number of contexts in Linux (blk, bio, genhd), but the default is
52 * universally 512 bytes. These symbols are just slightly more
53 * meaningful than the bare numbers they represent.
54 */
55#define SECTOR_SHIFT 9
56#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
57
a2acd00e
AE
58/*
59 * Increment the given counter and return its updated value.
60 * If the counter is already 0 it will not be incremented.
61 * If the counter is already at its maximum value returns
62 * -EINVAL without updating it.
63 */
64static int atomic_inc_return_safe(atomic_t *v)
65{
66 unsigned int counter;
67
68 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
69 if (counter <= (unsigned int)INT_MAX)
70 return (int)counter;
71
72 atomic_dec(v);
73
74 return -EINVAL;
75}
76
77/* Decrement the counter. Return the resulting value, or -EINVAL */
78static int atomic_dec_return_safe(atomic_t *v)
79{
80 int counter;
81
82 counter = atomic_dec_return(v);
83 if (counter >= 0)
84 return counter;
85
86 atomic_inc(v);
87
88 return -EINVAL;
89}
90
f0f8cef5
AE
91#define RBD_DRV_NAME "rbd"
92#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
93
94#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
95
d4b125e9
AE
96#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
97#define RBD_MAX_SNAP_NAME_LEN \
98 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
99
35d489f9 100#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
101
102#define RBD_SNAP_HEAD_NAME "-"
103
9682fc6d
AE
104#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
105
9e15b77d
AE
106/* This allows a single page to hold an image name sent by OSD */
107#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 108#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 109
1e130199 110#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 111
d889140c
AE
112/* Feature bits */
113
5cbf6f12
AE
114#define RBD_FEATURE_LAYERING (1<<0)
115#define RBD_FEATURE_STRIPINGV2 (1<<1)
116#define RBD_FEATURES_ALL \
117 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
118
119/* Features supported by this (client software) implementation. */
120
770eba6e 121#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 122
81a89793
AE
123/*
124 * An RBD device name will be "rbd#", where the "rbd" comes from
125 * RBD_DRV_NAME above, and # is a unique integer identifier.
126 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
127 * enough to hold all possible device names.
128 */
602adf40 129#define DEV_NAME_LEN 32
81a89793 130#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
131
132/*
133 * block device image metadata (in-memory version)
134 */
135struct rbd_image_header {
f35a4dee 136 /* These six fields never change for a given rbd image */
849b4260 137 char *object_prefix;
602adf40
YS
138 __u8 obj_order;
139 __u8 crypt_type;
140 __u8 comp_type;
f35a4dee
AE
141 u64 stripe_unit;
142 u64 stripe_count;
143 u64 features; /* Might be changeable someday? */
602adf40 144
f84344f3
AE
145 /* The remaining fields need to be updated occasionally */
146 u64 image_size;
147 struct ceph_snap_context *snapc;
f35a4dee
AE
148 char *snap_names; /* format 1 only */
149 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
150};
151
0d7dbfce
AE
152/*
153 * An rbd image specification.
154 *
155 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
156 * identify an image. Each rbd_dev structure includes a pointer to
157 * an rbd_spec structure that encapsulates this identity.
158 *
159 * Each of the id's in an rbd_spec has an associated name. For a
160 * user-mapped image, the names are supplied and the id's associated
161 * with them are looked up. For a layered image, a parent image is
162 * defined by the tuple, and the names are looked up.
163 *
164 * An rbd_dev structure contains a parent_spec pointer which is
165 * non-null if the image it represents is a child in a layered
166 * image. This pointer will refer to the rbd_spec structure used
167 * by the parent rbd_dev for its own identity (i.e., the structure
168 * is shared between the parent and child).
169 *
170 * Since these structures are populated once, during the discovery
171 * phase of image construction, they are effectively immutable so
172 * we make no effort to synchronize access to them.
173 *
174 * Note that code herein does not assume the image name is known (it
175 * could be a null pointer).
0d7dbfce
AE
176 */
177struct rbd_spec {
178 u64 pool_id;
ecb4dc22 179 const char *pool_name;
0d7dbfce 180
ecb4dc22
AE
181 const char *image_id;
182 const char *image_name;
0d7dbfce
AE
183
184 u64 snap_id;
ecb4dc22 185 const char *snap_name;
0d7dbfce
AE
186
187 struct kref kref;
188};
189
602adf40 190/*
f0f8cef5 191 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
192 */
193struct rbd_client {
194 struct ceph_client *client;
195 struct kref kref;
196 struct list_head node;
197};
198
bf0d5f50
AE
199struct rbd_img_request;
200typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
201
202#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
203
204struct rbd_obj_request;
205typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
206
9969ebc5
AE
207enum obj_request_type {
208 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
209};
bf0d5f50 210
926f9b3f
AE
211enum obj_req_flags {
212 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 213 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
214 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
215 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
216};
217
bf0d5f50
AE
218struct rbd_obj_request {
219 const char *object_name;
220 u64 offset; /* object start byte */
221 u64 length; /* bytes from offset */
926f9b3f 222 unsigned long flags;
bf0d5f50 223
c5b5ef6c
AE
224 /*
225 * An object request associated with an image will have its
226 * img_data flag set; a standalone object request will not.
227 *
228 * A standalone object request will have which == BAD_WHICH
229 * and a null obj_request pointer.
230 *
231 * An object request initiated in support of a layered image
232 * object (to check for its existence before a write) will
233 * have which == BAD_WHICH and a non-null obj_request pointer.
234 *
235 * Finally, an object request for rbd image data will have
236 * which != BAD_WHICH, and will have a non-null img_request
237 * pointer. The value of which will be in the range
238 * 0..(img_request->obj_request_count-1).
239 */
240 union {
241 struct rbd_obj_request *obj_request; /* STAT op */
242 struct {
243 struct rbd_img_request *img_request;
244 u64 img_offset;
245 /* links for img_request->obj_requests list */
246 struct list_head links;
247 };
248 };
bf0d5f50
AE
249 u32 which; /* posn image request list */
250
251 enum obj_request_type type;
788e2df3
AE
252 union {
253 struct bio *bio_list;
254 struct {
255 struct page **pages;
256 u32 page_count;
257 };
258 };
0eefd470 259 struct page **copyup_pages;
ebda6408 260 u32 copyup_page_count;
bf0d5f50
AE
261
262 struct ceph_osd_request *osd_req;
263
264 u64 xferred; /* bytes transferred */
1b83bef2 265 int result;
bf0d5f50
AE
266
267 rbd_obj_callback_t callback;
788e2df3 268 struct completion completion;
bf0d5f50
AE
269
270 struct kref kref;
271};
272
0c425248 273enum img_req_flags {
9849e986
AE
274 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
275 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 276 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
277};
278
bf0d5f50 279struct rbd_img_request {
bf0d5f50
AE
280 struct rbd_device *rbd_dev;
281 u64 offset; /* starting image byte offset */
282 u64 length; /* byte count from offset */
0c425248 283 unsigned long flags;
bf0d5f50 284 union {
9849e986 285 u64 snap_id; /* for reads */
bf0d5f50 286 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
287 };
288 union {
289 struct request *rq; /* block request */
290 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 291 };
3d7efd18 292 struct page **copyup_pages;
ebda6408 293 u32 copyup_page_count;
bf0d5f50
AE
294 spinlock_t completion_lock;/* protects next_completion */
295 u32 next_completion;
296 rbd_img_callback_t callback;
55f27e09 297 u64 xferred;/* aggregate bytes transferred */
a5a337d4 298 int result; /* first nonzero obj_request result */
bf0d5f50
AE
299
300 u32 obj_request_count;
301 struct list_head obj_requests; /* rbd_obj_request structs */
302
303 struct kref kref;
304};
305
306#define for_each_obj_request(ireq, oreq) \
ef06f4d3 307 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 308#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 309 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 310#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 311 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 312
f84344f3 313struct rbd_mapping {
99c1f08f 314 u64 size;
34b13184 315 u64 features;
f84344f3
AE
316 bool read_only;
317};
318
602adf40
YS
319/*
320 * a single device
321 */
322struct rbd_device {
de71a297 323 int dev_id; /* blkdev unique id */
602adf40
YS
324
325 int major; /* blkdev assigned major */
326 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 327
a30b71b9 328 u32 image_format; /* Either 1 or 2 */
602adf40
YS
329 struct rbd_client *rbd_client;
330
331 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
332
b82d167b 333 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
334
335 struct rbd_image_header header;
b82d167b 336 unsigned long flags; /* possibly lock protected */
0d7dbfce 337 struct rbd_spec *spec;
602adf40 338
0d7dbfce 339 char *header_name;
971f839a 340
0903e875
AE
341 struct ceph_file_layout layout;
342
59c2be1e 343 struct ceph_osd_event *watch_event;
975241af 344 struct rbd_obj_request *watch_request;
59c2be1e 345
86b00e0d
AE
346 struct rbd_spec *parent_spec;
347 u64 parent_overlap;
a2acd00e 348 atomic_t parent_ref;
2f82ee54 349 struct rbd_device *parent;
86b00e0d 350
c666601a
JD
351 /* protects updating the header */
352 struct rw_semaphore header_rwsem;
f84344f3
AE
353
354 struct rbd_mapping mapping;
602adf40
YS
355
356 struct list_head node;
dfc5606d 357
dfc5606d
YS
358 /* sysfs related */
359 struct device dev;
b82d167b 360 unsigned long open_count; /* protected by lock */
dfc5606d
YS
361};
362
b82d167b
AE
363/*
364 * Flag bits for rbd_dev->flags. If atomicity is required,
365 * rbd_dev->lock is used to protect access.
366 *
367 * Currently, only the "removing" flag (which is coupled with the
368 * "open_count" field) requires atomic access.
369 */
6d292906
AE
370enum rbd_dev_flags {
371 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 372 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
373};
374
602adf40 375static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 376
602adf40 377static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
378static DEFINE_SPINLOCK(rbd_dev_list_lock);
379
432b8587
AE
380static LIST_HEAD(rbd_client_list); /* clients */
381static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 382
78c2a44a
AE
383/* Slab caches for frequently-allocated structures */
384
1c2a9dfe 385static struct kmem_cache *rbd_img_request_cache;
868311b1 386static struct kmem_cache *rbd_obj_request_cache;
78c2a44a 387static struct kmem_cache *rbd_segment_name_cache;
1c2a9dfe 388
3d7efd18
AE
389static int rbd_img_request_submit(struct rbd_img_request *img_request);
390
200a6a8b 391static void rbd_dev_device_release(struct device *dev);
dfc5606d 392
f0f8cef5
AE
393static ssize_t rbd_add(struct bus_type *bus, const char *buf,
394 size_t count);
395static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
396 size_t count);
1f3ef788 397static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
a2acd00e 398static void rbd_spec_put(struct rbd_spec *spec);
f0f8cef5
AE
399
400static struct bus_attribute rbd_bus_attrs[] = {
401 __ATTR(add, S_IWUSR, NULL, rbd_add),
402 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
403 __ATTR_NULL
404};
405
406static struct bus_type rbd_bus_type = {
407 .name = "rbd",
408 .bus_attrs = rbd_bus_attrs,
409};
410
411static void rbd_root_dev_release(struct device *dev)
412{
413}
414
415static struct device rbd_root_dev = {
416 .init_name = "rbd",
417 .release = rbd_root_dev_release,
418};
419
06ecc6cb
AE
420static __printf(2, 3)
421void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
422{
423 struct va_format vaf;
424 va_list args;
425
426 va_start(args, fmt);
427 vaf.fmt = fmt;
428 vaf.va = &args;
429
430 if (!rbd_dev)
431 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
432 else if (rbd_dev->disk)
433 printk(KERN_WARNING "%s: %s: %pV\n",
434 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
435 else if (rbd_dev->spec && rbd_dev->spec->image_name)
436 printk(KERN_WARNING "%s: image %s: %pV\n",
437 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
438 else if (rbd_dev->spec && rbd_dev->spec->image_id)
439 printk(KERN_WARNING "%s: id %s: %pV\n",
440 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
441 else /* punt */
442 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
443 RBD_DRV_NAME, rbd_dev, &vaf);
444 va_end(args);
445}
446
aafb230e
AE
447#ifdef RBD_DEBUG
448#define rbd_assert(expr) \
449 if (unlikely(!(expr))) { \
450 printk(KERN_ERR "\nAssertion failure in %s() " \
451 "at line %d:\n\n" \
452 "\trbd_assert(%s);\n\n", \
453 __func__, __LINE__, #expr); \
454 BUG(); \
455 }
456#else /* !RBD_DEBUG */
457# define rbd_assert(expr) ((void) 0)
458#endif /* !RBD_DEBUG */
dfc5606d 459
b454e36d 460static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
461static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
462static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 463
cc4a38bd 464static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7
AE
465static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
466static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
54cac61f
AE
467static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
468 u64 snap_id);
2ad3d716
AE
469static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
470 u8 *order, u64 *snap_size);
471static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
472 u64 *snap_features);
473static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
59c2be1e 474
602adf40
YS
475static int rbd_open(struct block_device *bdev, fmode_t mode)
476{
f0f8cef5 477 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 478 bool removing = false;
602adf40 479
f84344f3 480 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
481 return -EROFS;
482
a14ea269 483 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
484 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
485 removing = true;
486 else
487 rbd_dev->open_count++;
a14ea269 488 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
489 if (removing)
490 return -ENOENT;
491
c3e946ce 492 (void) get_device(&rbd_dev->dev);
f84344f3 493 set_device_ro(bdev, rbd_dev->mapping.read_only);
340c7a2b 494
602adf40
YS
495 return 0;
496}
497
db2a144b 498static void rbd_release(struct gendisk *disk, fmode_t mode)
dfc5606d
YS
499{
500 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
501 unsigned long open_count_before;
502
a14ea269 503 spin_lock_irq(&rbd_dev->lock);
b82d167b 504 open_count_before = rbd_dev->open_count--;
a14ea269 505 spin_unlock_irq(&rbd_dev->lock);
b82d167b 506 rbd_assert(open_count_before > 0);
dfc5606d 507
c3e946ce 508 put_device(&rbd_dev->dev);
dfc5606d
YS
509}
510
602adf40
YS
511static const struct block_device_operations rbd_bd_ops = {
512 .owner = THIS_MODULE,
513 .open = rbd_open,
dfc5606d 514 .release = rbd_release,
602adf40
YS
515};
516
517/*
7262cfca 518 * Initialize an rbd client instance. Success or not, this function
08f75463 519 * consumes ceph_opts. Caller holds ctl_mutex.
602adf40 520 */
f8c38929 521static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
522{
523 struct rbd_client *rbdc;
524 int ret = -ENOMEM;
525
37206ee5 526 dout("%s:\n", __func__);
602adf40
YS
527 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
528 if (!rbdc)
529 goto out_opt;
530
531 kref_init(&rbdc->kref);
532 INIT_LIST_HEAD(&rbdc->node);
533
43ae4701 534 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 535 if (IS_ERR(rbdc->client))
08f75463 536 goto out_rbdc;
43ae4701 537 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
538
539 ret = ceph_open_session(rbdc->client);
540 if (ret < 0)
08f75463 541 goto out_client;
602adf40 542
432b8587 543 spin_lock(&rbd_client_list_lock);
602adf40 544 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 545 spin_unlock(&rbd_client_list_lock);
602adf40 546
37206ee5 547 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 548
602adf40 549 return rbdc;
08f75463 550out_client:
602adf40 551 ceph_destroy_client(rbdc->client);
08f75463 552out_rbdc:
602adf40
YS
553 kfree(rbdc);
554out_opt:
43ae4701
AE
555 if (ceph_opts)
556 ceph_destroy_options(ceph_opts);
37206ee5
AE
557 dout("%s: error %d\n", __func__, ret);
558
28f259b7 559 return ERR_PTR(ret);
602adf40
YS
560}
561
2f82ee54
AE
562static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
563{
564 kref_get(&rbdc->kref);
565
566 return rbdc;
567}
568
602adf40 569/*
1f7ba331
AE
570 * Find a ceph client with specific addr and configuration. If
571 * found, bump its reference count.
602adf40 572 */
1f7ba331 573static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
574{
575 struct rbd_client *client_node;
1f7ba331 576 bool found = false;
602adf40 577
43ae4701 578 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
579 return NULL;
580
1f7ba331
AE
581 spin_lock(&rbd_client_list_lock);
582 list_for_each_entry(client_node, &rbd_client_list, node) {
583 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
584 __rbd_get_client(client_node);
585
1f7ba331
AE
586 found = true;
587 break;
588 }
589 }
590 spin_unlock(&rbd_client_list_lock);
591
592 return found ? client_node : NULL;
602adf40
YS
593}
594
59c2be1e
YS
595/*
596 * mount options
597 */
598enum {
59c2be1e
YS
599 Opt_last_int,
600 /* int args above */
601 Opt_last_string,
602 /* string args above */
cc0538b6
AE
603 Opt_read_only,
604 Opt_read_write,
605 /* Boolean args above */
606 Opt_last_bool,
59c2be1e
YS
607};
608
43ae4701 609static match_table_t rbd_opts_tokens = {
59c2be1e
YS
610 /* int args above */
611 /* string args above */
be466c1c 612 {Opt_read_only, "read_only"},
cc0538b6
AE
613 {Opt_read_only, "ro"}, /* Alternate spelling */
614 {Opt_read_write, "read_write"},
615 {Opt_read_write, "rw"}, /* Alternate spelling */
616 /* Boolean args above */
59c2be1e
YS
617 {-1, NULL}
618};
619
98571b5a
AE
620struct rbd_options {
621 bool read_only;
622};
623
624#define RBD_READ_ONLY_DEFAULT false
625
59c2be1e
YS
626static int parse_rbd_opts_token(char *c, void *private)
627{
43ae4701 628 struct rbd_options *rbd_opts = private;
59c2be1e
YS
629 substring_t argstr[MAX_OPT_ARGS];
630 int token, intval, ret;
631
43ae4701 632 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
633 if (token < 0)
634 return -EINVAL;
635
636 if (token < Opt_last_int) {
637 ret = match_int(&argstr[0], &intval);
638 if (ret < 0) {
639 pr_err("bad mount option arg (not int) "
640 "at '%s'\n", c);
641 return ret;
642 }
643 dout("got int token %d val %d\n", token, intval);
644 } else if (token > Opt_last_int && token < Opt_last_string) {
645 dout("got string token %d val %s\n", token,
646 argstr[0].from);
cc0538b6
AE
647 } else if (token > Opt_last_string && token < Opt_last_bool) {
648 dout("got Boolean token %d\n", token);
59c2be1e
YS
649 } else {
650 dout("got token %d\n", token);
651 }
652
653 switch (token) {
cc0538b6
AE
654 case Opt_read_only:
655 rbd_opts->read_only = true;
656 break;
657 case Opt_read_write:
658 rbd_opts->read_only = false;
659 break;
59c2be1e 660 default:
aafb230e
AE
661 rbd_assert(false);
662 break;
59c2be1e
YS
663 }
664 return 0;
665}
666
602adf40
YS
667/*
668 * Get a ceph client with specific addr and configuration, if one does
7262cfca
AE
669 * not exist create it. Either way, ceph_opts is consumed by this
670 * function.
602adf40 671 */
9d3997fd 672static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 673{
f8c38929 674 struct rbd_client *rbdc;
59c2be1e 675
08f75463 676 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1f7ba331 677 rbdc = rbd_client_find(ceph_opts);
9d3997fd 678 if (rbdc) /* using an existing client */
43ae4701 679 ceph_destroy_options(ceph_opts);
9d3997fd 680 else
f8c38929 681 rbdc = rbd_client_create(ceph_opts);
08f75463 682 mutex_unlock(&ctl_mutex);
602adf40 683
9d3997fd 684 return rbdc;
602adf40
YS
685}
686
687/*
688 * Destroy ceph client
d23a4b3f 689 *
432b8587 690 * Caller must hold rbd_client_list_lock.
602adf40
YS
691 */
692static void rbd_client_release(struct kref *kref)
693{
694 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
695
37206ee5 696 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 697 spin_lock(&rbd_client_list_lock);
602adf40 698 list_del(&rbdc->node);
cd9d9f5d 699 spin_unlock(&rbd_client_list_lock);
602adf40
YS
700
701 ceph_destroy_client(rbdc->client);
702 kfree(rbdc);
703}
704
705/*
706 * Drop reference to ceph client node. If it's not referenced anymore, release
707 * it.
708 */
9d3997fd 709static void rbd_put_client(struct rbd_client *rbdc)
602adf40 710{
c53d5893
AE
711 if (rbdc)
712 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
713}
714
a30b71b9
AE
715static bool rbd_image_format_valid(u32 image_format)
716{
717 return image_format == 1 || image_format == 2;
718}
719
8e94af8e
AE
720static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
721{
103a150f
AE
722 size_t size;
723 u32 snap_count;
724
725 /* The header has to start with the magic rbd header text */
726 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
727 return false;
728
db2388b6
AE
729 /* The bio layer requires at least sector-sized I/O */
730
731 if (ondisk->options.order < SECTOR_SHIFT)
732 return false;
733
734 /* If we use u64 in a few spots we may be able to loosen this */
735
736 if (ondisk->options.order > 8 * sizeof (int) - 1)
737 return false;
738
103a150f
AE
739 /*
740 * The size of a snapshot header has to fit in a size_t, and
741 * that limits the number of snapshots.
742 */
743 snap_count = le32_to_cpu(ondisk->snap_count);
744 size = SIZE_MAX - sizeof (struct ceph_snap_context);
745 if (snap_count > size / sizeof (__le64))
746 return false;
747
748 /*
749 * Not only that, but the size of the entire the snapshot
750 * header must also be representable in a size_t.
751 */
752 size -= snap_count * sizeof (__le64);
753 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
754 return false;
755
756 return true;
8e94af8e
AE
757}
758
602adf40 759/*
bb23e37a
AE
760 * Fill an rbd image header with information from the given format 1
761 * on-disk header.
602adf40 762 */
662518b1 763static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 764 struct rbd_image_header_ondisk *ondisk)
602adf40 765{
662518b1 766 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
767 bool first_time = header->object_prefix == NULL;
768 struct ceph_snap_context *snapc;
769 char *object_prefix = NULL;
770 char *snap_names = NULL;
771 u64 *snap_sizes = NULL;
ccece235 772 u32 snap_count;
d2bb24e5 773 size_t size;
bb23e37a 774 int ret = -ENOMEM;
621901d6 775 u32 i;
602adf40 776
bb23e37a 777 /* Allocate this now to avoid having to handle failure below */
6a52325f 778
bb23e37a
AE
779 if (first_time) {
780 size_t len;
103a150f 781
bb23e37a
AE
782 len = strnlen(ondisk->object_prefix,
783 sizeof (ondisk->object_prefix));
784 object_prefix = kmalloc(len + 1, GFP_KERNEL);
785 if (!object_prefix)
786 return -ENOMEM;
787 memcpy(object_prefix, ondisk->object_prefix, len);
788 object_prefix[len] = '\0';
789 }
00f1f36f 790
bb23e37a 791 /* Allocate the snapshot context and fill it in */
00f1f36f 792
bb23e37a
AE
793 snap_count = le32_to_cpu(ondisk->snap_count);
794 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
795 if (!snapc)
796 goto out_err;
797 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 798 if (snap_count) {
bb23e37a 799 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
800 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
801
bb23e37a 802 /* We'll keep a copy of the snapshot names... */
621901d6 803
bb23e37a
AE
804 if (snap_names_len > (u64)SIZE_MAX)
805 goto out_2big;
806 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
807 if (!snap_names)
6a52325f
AE
808 goto out_err;
809
bb23e37a 810 /* ...as well as the array of their sizes. */
621901d6 811
d2bb24e5 812 size = snap_count * sizeof (*header->snap_sizes);
bb23e37a
AE
813 snap_sizes = kmalloc(size, GFP_KERNEL);
814 if (!snap_sizes)
6a52325f 815 goto out_err;
bb23e37a 816
f785cc1d 817 /*
bb23e37a
AE
818 * Copy the names, and fill in each snapshot's id
819 * and size.
820 *
99a41ebc 821 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a 822 * ondisk buffer we're working with has
f785cc1d
AE
823 * snap_names_len bytes beyond the end of the
824 * snapshot id array, this memcpy() is safe.
825 */
bb23e37a
AE
826 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
827 snaps = ondisk->snaps;
828 for (i = 0; i < snap_count; i++) {
829 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
830 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
831 }
602adf40 832 }
6a52325f 833
bb23e37a 834 /* We won't fail any more, fill in the header */
621901d6 835
662518b1 836 down_write(&rbd_dev->header_rwsem);
bb23e37a
AE
837 if (first_time) {
838 header->object_prefix = object_prefix;
839 header->obj_order = ondisk->options.order;
840 header->crypt_type = ondisk->options.crypt_type;
841 header->comp_type = ondisk->options.comp_type;
842 /* The rest aren't used for format 1 images */
843 header->stripe_unit = 0;
844 header->stripe_count = 0;
845 header->features = 0;
602adf40 846 } else {
662518b1
AE
847 ceph_put_snap_context(header->snapc);
848 kfree(header->snap_names);
849 kfree(header->snap_sizes);
602adf40 850 }
849b4260 851
bb23e37a 852 /* The remaining fields always get updated (when we refresh) */
621901d6 853
f84344f3 854 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
855 header->snapc = snapc;
856 header->snap_names = snap_names;
857 header->snap_sizes = snap_sizes;
468521c1 858
662518b1 859 /* Make sure mapping size is consistent with header info */
602adf40 860
662518b1
AE
861 if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
862 if (rbd_dev->mapping.size != header->image_size)
863 rbd_dev->mapping.size = header->image_size;
864
865 up_write(&rbd_dev->header_rwsem);
602adf40 866
602adf40 867 return 0;
bb23e37a
AE
868out_2big:
869 ret = -EIO;
6a52325f 870out_err:
bb23e37a
AE
871 kfree(snap_sizes);
872 kfree(snap_names);
873 ceph_put_snap_context(snapc);
874 kfree(object_prefix);
ccece235 875
bb23e37a 876 return ret;
602adf40
YS
877}
878
9682fc6d
AE
879static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
880{
881 const char *snap_name;
882
883 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
884
885 /* Skip over names until we find the one we are looking for */
886
887 snap_name = rbd_dev->header.snap_names;
888 while (which--)
889 snap_name += strlen(snap_name) + 1;
890
891 return kstrdup(snap_name, GFP_KERNEL);
892}
893
30d1cff8
AE
894/*
895 * Snapshot id comparison function for use with qsort()/bsearch().
896 * Note that result is for snapshots in *descending* order.
897 */
898static int snapid_compare_reverse(const void *s1, const void *s2)
899{
900 u64 snap_id1 = *(u64 *)s1;
901 u64 snap_id2 = *(u64 *)s2;
902
903 if (snap_id1 < snap_id2)
904 return 1;
905 return snap_id1 == snap_id2 ? 0 : -1;
906}
907
908/*
909 * Search a snapshot context to see if the given snapshot id is
910 * present.
911 *
912 * Returns the position of the snapshot id in the array if it's found,
913 * or BAD_SNAP_INDEX otherwise.
914 *
915 * Note: The snapshot array is in kept sorted (by the osd) in
916 * reverse order, highest snapshot id first.
917 */
9682fc6d
AE
918static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
919{
920 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 921 u64 *found;
9682fc6d 922
30d1cff8
AE
923 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
924 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 925
30d1cff8 926 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
927}
928
2ad3d716
AE
929static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
930 u64 snap_id)
9e15b77d 931{
54cac61f 932 u32 which;
9e15b77d 933
54cac61f
AE
934 which = rbd_dev_snap_index(rbd_dev, snap_id);
935 if (which == BAD_SNAP_INDEX)
936 return NULL;
937
938 return _rbd_dev_v1_snap_name(rbd_dev, which);
939}
940
941static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
942{
9e15b77d
AE
943 if (snap_id == CEPH_NOSNAP)
944 return RBD_SNAP_HEAD_NAME;
945
54cac61f
AE
946 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
947 if (rbd_dev->image_format == 1)
948 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 949
54cac61f 950 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
951}
952
2ad3d716
AE
953static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
954 u64 *snap_size)
602adf40 955{
2ad3d716
AE
956 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
957 if (snap_id == CEPH_NOSNAP) {
958 *snap_size = rbd_dev->header.image_size;
959 } else if (rbd_dev->image_format == 1) {
960 u32 which;
602adf40 961
2ad3d716
AE
962 which = rbd_dev_snap_index(rbd_dev, snap_id);
963 if (which == BAD_SNAP_INDEX)
964 return -ENOENT;
e86924a8 965
2ad3d716
AE
966 *snap_size = rbd_dev->header.snap_sizes[which];
967 } else {
968 u64 size = 0;
969 int ret;
970
971 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
972 if (ret)
973 return ret;
974
975 *snap_size = size;
976 }
977 return 0;
602adf40
YS
978}
979
2ad3d716
AE
980static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
981 u64 *snap_features)
602adf40 982{
2ad3d716
AE
983 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
984 if (snap_id == CEPH_NOSNAP) {
985 *snap_features = rbd_dev->header.features;
986 } else if (rbd_dev->image_format == 1) {
987 *snap_features = 0; /* No features for format 1 */
602adf40 988 } else {
2ad3d716
AE
989 u64 features = 0;
990 int ret;
8b0241f8 991
2ad3d716
AE
992 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
993 if (ret)
994 return ret;
995
996 *snap_features = features;
997 }
998 return 0;
999}
1000
1001static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1002{
8f4b7d98 1003 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
1004 u64 size = 0;
1005 u64 features = 0;
1006 int ret;
1007
2ad3d716
AE
1008 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1009 if (ret)
1010 return ret;
1011 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1012 if (ret)
1013 return ret;
1014
1015 rbd_dev->mapping.size = size;
1016 rbd_dev->mapping.features = features;
1017
8b0241f8 1018 return 0;
602adf40
YS
1019}
1020
d1cf5788
AE
1021static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1022{
1023 rbd_dev->mapping.size = 0;
1024 rbd_dev->mapping.features = 0;
200a6a8b
AE
1025}
1026
98571b5a 1027static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 1028{
65ccfe21
AE
1029 char *name;
1030 u64 segment;
1031 int ret;
3a96d5cd 1032 char *name_format;
602adf40 1033
78c2a44a 1034 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
65ccfe21
AE
1035 if (!name)
1036 return NULL;
1037 segment = offset >> rbd_dev->header.obj_order;
3a96d5cd
JD
1038 name_format = "%s.%012llx";
1039 if (rbd_dev->image_format == 2)
1040 name_format = "%s.%016llx";
1041 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, name_format,
65ccfe21 1042 rbd_dev->header.object_prefix, segment);
2fd82b9e 1043 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
1044 pr_err("error formatting segment name for #%llu (%d)\n",
1045 segment, ret);
1046 kfree(name);
1047 name = NULL;
1048 }
602adf40 1049
65ccfe21
AE
1050 return name;
1051}
602adf40 1052
78c2a44a
AE
1053static void rbd_segment_name_free(const char *name)
1054{
1055 /* The explicit cast here is needed to drop the const qualifier */
1056
1057 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1058}
1059
65ccfe21
AE
1060static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1061{
1062 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 1063
65ccfe21
AE
1064 return offset & (segment_size - 1);
1065}
1066
1067static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1068 u64 offset, u64 length)
1069{
1070 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1071
1072 offset &= segment_size - 1;
1073
aafb230e 1074 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1075 if (offset + length > segment_size)
1076 length = segment_size - offset;
1077
1078 return length;
602adf40
YS
1079}
1080
029bcbd8
JD
1081/*
1082 * returns the size of an object in the image
1083 */
1084static u64 rbd_obj_bytes(struct rbd_image_header *header)
1085{
1086 return 1 << header->obj_order;
1087}
1088
602adf40
YS
1089/*
1090 * bio helpers
1091 */
1092
1093static void bio_chain_put(struct bio *chain)
1094{
1095 struct bio *tmp;
1096
1097 while (chain) {
1098 tmp = chain;
1099 chain = chain->bi_next;
1100 bio_put(tmp);
1101 }
1102}
1103
1104/*
1105 * zeros a bio chain, starting at specific offset
1106 */
1107static void zero_bio_chain(struct bio *chain, int start_ofs)
1108{
1109 struct bio_vec *bv;
1110 unsigned long flags;
1111 void *buf;
1112 int i;
1113 int pos = 0;
1114
1115 while (chain) {
1116 bio_for_each_segment(bv, chain, i) {
1117 if (pos + bv->bv_len > start_ofs) {
1118 int remainder = max(start_ofs - pos, 0);
1119 buf = bvec_kmap_irq(bv, &flags);
1120 memset(buf + remainder, 0,
1121 bv->bv_len - remainder);
e2156054 1122 flush_dcache_page(bv->bv_page);
85b5aaa6 1123 bvec_kunmap_irq(buf, &flags);
602adf40
YS
1124 }
1125 pos += bv->bv_len;
1126 }
1127
1128 chain = chain->bi_next;
1129 }
1130}
1131
b9434c5b
AE
1132/*
1133 * similar to zero_bio_chain(), zeros data defined by a page array,
1134 * starting at the given byte offset from the start of the array and
1135 * continuing up to the given end offset. The pages array is
1136 * assumed to be big enough to hold all bytes up to the end.
1137 */
1138static void zero_pages(struct page **pages, u64 offset, u64 end)
1139{
1140 struct page **page = &pages[offset >> PAGE_SHIFT];
1141
1142 rbd_assert(end > offset);
1143 rbd_assert(end - offset <= (u64)SIZE_MAX);
1144 while (offset < end) {
1145 size_t page_offset;
1146 size_t length;
1147 unsigned long flags;
1148 void *kaddr;
1149
491205a8
GU
1150 page_offset = offset & ~PAGE_MASK;
1151 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
b9434c5b
AE
1152 local_irq_save(flags);
1153 kaddr = kmap_atomic(*page);
1154 memset(kaddr + page_offset, 0, length);
e2156054 1155 flush_dcache_page(*page);
b9434c5b
AE
1156 kunmap_atomic(kaddr);
1157 local_irq_restore(flags);
1158
1159 offset += length;
1160 page++;
1161 }
1162}
1163
602adf40 1164/*
f7760dad
AE
1165 * Clone a portion of a bio, starting at the given byte offset
1166 * and continuing for the number of bytes indicated.
602adf40 1167 */
f7760dad
AE
1168static struct bio *bio_clone_range(struct bio *bio_src,
1169 unsigned int offset,
1170 unsigned int len,
1171 gfp_t gfpmask)
602adf40 1172{
f7760dad
AE
1173 struct bio_vec *bv;
1174 unsigned int resid;
1175 unsigned short idx;
1176 unsigned int voff;
1177 unsigned short end_idx;
1178 unsigned short vcnt;
1179 struct bio *bio;
1180
1181 /* Handle the easy case for the caller */
1182
1183 if (!offset && len == bio_src->bi_size)
1184 return bio_clone(bio_src, gfpmask);
1185
1186 if (WARN_ON_ONCE(!len))
1187 return NULL;
1188 if (WARN_ON_ONCE(len > bio_src->bi_size))
1189 return NULL;
1190 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1191 return NULL;
1192
1193 /* Find first affected segment... */
1194
1195 resid = offset;
d74c6d51 1196 bio_for_each_segment(bv, bio_src, idx) {
f7760dad
AE
1197 if (resid < bv->bv_len)
1198 break;
1199 resid -= bv->bv_len;
602adf40 1200 }
f7760dad 1201 voff = resid;
602adf40 1202
f7760dad 1203 /* ...and the last affected segment */
602adf40 1204
f7760dad
AE
1205 resid += len;
1206 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1207 if (resid <= bv->bv_len)
1208 break;
1209 resid -= bv->bv_len;
1210 }
1211 vcnt = end_idx - idx + 1;
1212
1213 /* Build the clone */
1214
1215 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1216 if (!bio)
1217 return NULL; /* ENOMEM */
602adf40 1218
f7760dad
AE
1219 bio->bi_bdev = bio_src->bi_bdev;
1220 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1221 bio->bi_rw = bio_src->bi_rw;
1222 bio->bi_flags |= 1 << BIO_CLONED;
1223
1224 /*
1225 * Copy over our part of the bio_vec, then update the first
1226 * and last (or only) entries.
1227 */
1228 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1229 vcnt * sizeof (struct bio_vec));
1230 bio->bi_io_vec[0].bv_offset += voff;
1231 if (vcnt > 1) {
1232 bio->bi_io_vec[0].bv_len -= voff;
1233 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1234 } else {
1235 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
1236 }
1237
f7760dad
AE
1238 bio->bi_vcnt = vcnt;
1239 bio->bi_size = len;
1240 bio->bi_idx = 0;
1241
1242 return bio;
1243}
1244
1245/*
1246 * Clone a portion of a bio chain, starting at the given byte offset
1247 * into the first bio in the source chain and continuing for the
1248 * number of bytes indicated. The result is another bio chain of
1249 * exactly the given length, or a null pointer on error.
1250 *
1251 * The bio_src and offset parameters are both in-out. On entry they
1252 * refer to the first source bio and the offset into that bio where
1253 * the start of data to be cloned is located.
1254 *
1255 * On return, bio_src is updated to refer to the bio in the source
1256 * chain that contains first un-cloned byte, and *offset will
1257 * contain the offset of that byte within that bio.
1258 */
1259static struct bio *bio_chain_clone_range(struct bio **bio_src,
1260 unsigned int *offset,
1261 unsigned int len,
1262 gfp_t gfpmask)
1263{
1264 struct bio *bi = *bio_src;
1265 unsigned int off = *offset;
1266 struct bio *chain = NULL;
1267 struct bio **end;
1268
1269 /* Build up a chain of clone bios up to the limit */
1270
1271 if (!bi || off >= bi->bi_size || !len)
1272 return NULL; /* Nothing to clone */
602adf40 1273
f7760dad
AE
1274 end = &chain;
1275 while (len) {
1276 unsigned int bi_size;
1277 struct bio *bio;
1278
f5400b7a
AE
1279 if (!bi) {
1280 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1281 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1282 }
f7760dad
AE
1283 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1284 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1285 if (!bio)
1286 goto out_err; /* ENOMEM */
1287
1288 *end = bio;
1289 end = &bio->bi_next;
602adf40 1290
f7760dad
AE
1291 off += bi_size;
1292 if (off == bi->bi_size) {
1293 bi = bi->bi_next;
1294 off = 0;
1295 }
1296 len -= bi_size;
1297 }
1298 *bio_src = bi;
1299 *offset = off;
1300
1301 return chain;
1302out_err:
1303 bio_chain_put(chain);
602adf40 1304
602adf40
YS
1305 return NULL;
1306}
1307
926f9b3f
AE
1308/*
1309 * The default/initial value for all object request flags is 0. For
1310 * each flag, once its value is set to 1 it is never reset to 0
1311 * again.
1312 */
57acbaa7 1313static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1314{
57acbaa7 1315 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1316 struct rbd_device *rbd_dev;
1317
57acbaa7
AE
1318 rbd_dev = obj_request->img_request->rbd_dev;
1319 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
926f9b3f
AE
1320 obj_request);
1321 }
1322}
1323
57acbaa7 1324static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1325{
1326 smp_mb();
57acbaa7 1327 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1328}
1329
57acbaa7 1330static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1331{
57acbaa7
AE
1332 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1333 struct rbd_device *rbd_dev = NULL;
6365d33a 1334
57acbaa7
AE
1335 if (obj_request_img_data_test(obj_request))
1336 rbd_dev = obj_request->img_request->rbd_dev;
1337 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
6365d33a
AE
1338 obj_request);
1339 }
1340}
1341
57acbaa7 1342static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1343{
1344 smp_mb();
57acbaa7 1345 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1346}
1347
5679c59f
AE
1348/*
1349 * This sets the KNOWN flag after (possibly) setting the EXISTS
1350 * flag. The latter is set based on the "exists" value provided.
1351 *
1352 * Note that for our purposes once an object exists it never goes
1353 * away again. It's possible that the response from two existence
1354 * checks are separated by the creation of the target object, and
1355 * the first ("doesn't exist") response arrives *after* the second
1356 * ("does exist"). In that case we ignore the second one.
1357 */
1358static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1359 bool exists)
1360{
1361 if (exists)
1362 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1363 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1364 smp_mb();
1365}
1366
1367static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1368{
1369 smp_mb();
1370 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1371}
1372
1373static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1374{
1375 smp_mb();
1376 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1377}
1378
bf0d5f50
AE
1379static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1380{
37206ee5
AE
1381 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1382 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1383 kref_get(&obj_request->kref);
1384}
1385
1386static void rbd_obj_request_destroy(struct kref *kref);
1387static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1388{
1389 rbd_assert(obj_request != NULL);
37206ee5
AE
1390 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1391 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1392 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1393}
1394
e93f3152
AE
1395static bool img_request_child_test(struct rbd_img_request *img_request);
1396static void rbd_parent_request_destroy(struct kref *kref);
bf0d5f50
AE
1397static void rbd_img_request_destroy(struct kref *kref);
1398static void rbd_img_request_put(struct rbd_img_request *img_request)
1399{
1400 rbd_assert(img_request != NULL);
37206ee5
AE
1401 dout("%s: img %p (was %d)\n", __func__, img_request,
1402 atomic_read(&img_request->kref.refcount));
e93f3152
AE
1403 if (img_request_child_test(img_request))
1404 kref_put(&img_request->kref, rbd_parent_request_destroy);
1405 else
1406 kref_put(&img_request->kref, rbd_img_request_destroy);
bf0d5f50
AE
1407}
1408
1409static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1410 struct rbd_obj_request *obj_request)
1411{
25dcf954
AE
1412 rbd_assert(obj_request->img_request == NULL);
1413
b155e86c 1414 /* Image request now owns object's original reference */
bf0d5f50 1415 obj_request->img_request = img_request;
25dcf954 1416 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1417 rbd_assert(!obj_request_img_data_test(obj_request));
1418 obj_request_img_data_set(obj_request);
bf0d5f50 1419 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1420 img_request->obj_request_count++;
1421 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1422 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1423 obj_request->which);
bf0d5f50
AE
1424}
1425
1426static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1427 struct rbd_obj_request *obj_request)
1428{
1429 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1430
37206ee5
AE
1431 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1432 obj_request->which);
bf0d5f50 1433 list_del(&obj_request->links);
25dcf954
AE
1434 rbd_assert(img_request->obj_request_count > 0);
1435 img_request->obj_request_count--;
1436 rbd_assert(obj_request->which == img_request->obj_request_count);
1437 obj_request->which = BAD_WHICH;
6365d33a 1438 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1439 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1440 obj_request->img_request = NULL;
25dcf954 1441 obj_request->callback = NULL;
bf0d5f50
AE
1442 rbd_obj_request_put(obj_request);
1443}
1444
1445static bool obj_request_type_valid(enum obj_request_type type)
1446{
1447 switch (type) {
9969ebc5 1448 case OBJ_REQUEST_NODATA:
bf0d5f50 1449 case OBJ_REQUEST_BIO:
788e2df3 1450 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1451 return true;
1452 default:
1453 return false;
1454 }
1455}
1456
bf0d5f50
AE
1457static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1458 struct rbd_obj_request *obj_request)
1459{
37206ee5
AE
1460 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1461
bf0d5f50
AE
1462 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1463}
1464
1465static void rbd_img_request_complete(struct rbd_img_request *img_request)
1466{
55f27e09 1467
37206ee5 1468 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1469
1470 /*
1471 * If no error occurred, compute the aggregate transfer
1472 * count for the image request. We could instead use
1473 * atomic64_cmpxchg() to update it as each object request
1474 * completes; not clear which way is better off hand.
1475 */
1476 if (!img_request->result) {
1477 struct rbd_obj_request *obj_request;
1478 u64 xferred = 0;
1479
1480 for_each_obj_request(img_request, obj_request)
1481 xferred += obj_request->xferred;
1482 img_request->xferred = xferred;
1483 }
1484
bf0d5f50
AE
1485 if (img_request->callback)
1486 img_request->callback(img_request);
1487 else
1488 rbd_img_request_put(img_request);
1489}
1490
788e2df3
AE
1491/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1492
1493static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1494{
37206ee5
AE
1495 dout("%s: obj %p\n", __func__, obj_request);
1496
788e2df3
AE
1497 return wait_for_completion_interruptible(&obj_request->completion);
1498}
1499
0c425248
AE
1500/*
1501 * The default/initial value for all image request flags is 0. Each
1502 * is conditionally set to 1 at image request initialization time
1503 * and currently never change thereafter.
1504 */
1505static void img_request_write_set(struct rbd_img_request *img_request)
1506{
1507 set_bit(IMG_REQ_WRITE, &img_request->flags);
1508 smp_mb();
1509}
1510
1511static bool img_request_write_test(struct rbd_img_request *img_request)
1512{
1513 smp_mb();
1514 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1515}
1516
9849e986
AE
1517static void img_request_child_set(struct rbd_img_request *img_request)
1518{
1519 set_bit(IMG_REQ_CHILD, &img_request->flags);
1520 smp_mb();
1521}
1522
e93f3152
AE
1523static void img_request_child_clear(struct rbd_img_request *img_request)
1524{
1525 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1526 smp_mb();
1527}
1528
9849e986
AE
1529static bool img_request_child_test(struct rbd_img_request *img_request)
1530{
1531 smp_mb();
1532 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1533}
1534
d0b2e944
AE
1535static void img_request_layered_set(struct rbd_img_request *img_request)
1536{
1537 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1538 smp_mb();
1539}
1540
a2acd00e
AE
1541static void img_request_layered_clear(struct rbd_img_request *img_request)
1542{
1543 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1544 smp_mb();
1545}
1546
d0b2e944
AE
1547static bool img_request_layered_test(struct rbd_img_request *img_request)
1548{
1549 smp_mb();
1550 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1551}
1552
6e2a4505
AE
1553static void
1554rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1555{
b9434c5b
AE
1556 u64 xferred = obj_request->xferred;
1557 u64 length = obj_request->length;
1558
6e2a4505
AE
1559 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1560 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1561 xferred, length);
6e2a4505
AE
1562 /*
1563 * ENOENT means a hole in the image. We zero-fill the
1564 * entire length of the request. A short read also implies
1565 * zero-fill to the end of the request. Either way we
1566 * update the xferred count to indicate the whole request
1567 * was satisfied.
1568 */
b9434c5b 1569 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1570 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1571 if (obj_request->type == OBJ_REQUEST_BIO)
1572 zero_bio_chain(obj_request->bio_list, 0);
1573 else
1574 zero_pages(obj_request->pages, 0, length);
6e2a4505 1575 obj_request->result = 0;
b9434c5b
AE
1576 obj_request->xferred = length;
1577 } else if (xferred < length && !obj_request->result) {
1578 if (obj_request->type == OBJ_REQUEST_BIO)
1579 zero_bio_chain(obj_request->bio_list, xferred);
1580 else
1581 zero_pages(obj_request->pages, xferred, length);
1582 obj_request->xferred = length;
6e2a4505
AE
1583 }
1584 obj_request_done_set(obj_request);
1585}
1586
bf0d5f50
AE
1587static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1588{
37206ee5
AE
1589 dout("%s: obj %p cb %p\n", __func__, obj_request,
1590 obj_request->callback);
bf0d5f50
AE
1591 if (obj_request->callback)
1592 obj_request->callback(obj_request);
788e2df3
AE
1593 else
1594 complete_all(&obj_request->completion);
bf0d5f50
AE
1595}
1596
c47f9371 1597static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1598{
1599 dout("%s: obj %p\n", __func__, obj_request);
1600 obj_request_done_set(obj_request);
1601}
1602
c47f9371 1603static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1604{
57acbaa7 1605 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1606 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1607 bool layered = false;
1608
1609 if (obj_request_img_data_test(obj_request)) {
1610 img_request = obj_request->img_request;
1611 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1612 rbd_dev = img_request->rbd_dev;
57acbaa7 1613 }
8b3e1a56
AE
1614
1615 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1616 obj_request, img_request, obj_request->result,
1617 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1618 if (layered && obj_request->result == -ENOENT &&
1619 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1620 rbd_img_parent_read(obj_request);
1621 else if (img_request)
6e2a4505
AE
1622 rbd_img_obj_request_read_callback(obj_request);
1623 else
1624 obj_request_done_set(obj_request);
bf0d5f50
AE
1625}
1626
c47f9371 1627static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1628{
1b83bef2
SW
1629 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1630 obj_request->result, obj_request->length);
1631 /*
8b3e1a56
AE
1632 * There is no such thing as a successful short write. Set
1633 * it to our originally-requested length.
1b83bef2
SW
1634 */
1635 obj_request->xferred = obj_request->length;
07741308 1636 obj_request_done_set(obj_request);
bf0d5f50
AE
1637}
1638
fbfab539
AE
1639/*
1640 * For a simple stat call there's nothing to do. We'll do more if
1641 * this is part of a write sequence for a layered image.
1642 */
c47f9371 1643static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1644{
37206ee5 1645 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1646 obj_request_done_set(obj_request);
1647}
1648
bf0d5f50
AE
1649static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1650 struct ceph_msg *msg)
1651{
1652 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1653 u16 opcode;
1654
37206ee5 1655 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50 1656 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1657 if (obj_request_img_data_test(obj_request)) {
1658 rbd_assert(obj_request->img_request);
1659 rbd_assert(obj_request->which != BAD_WHICH);
1660 } else {
1661 rbd_assert(obj_request->which == BAD_WHICH);
1662 }
bf0d5f50 1663
1b83bef2
SW
1664 if (osd_req->r_result < 0)
1665 obj_request->result = osd_req->r_result;
bf0d5f50 1666
0eefd470 1667 BUG_ON(osd_req->r_num_ops > 2);
bf0d5f50 1668
c47f9371
AE
1669 /*
1670 * We support a 64-bit length, but ultimately it has to be
1671 * passed to blk_end_request(), which takes an unsigned int.
1672 */
1b83bef2 1673 obj_request->xferred = osd_req->r_reply_op_len[0];
8b3e1a56 1674 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
79528734 1675 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1676 switch (opcode) {
1677 case CEPH_OSD_OP_READ:
c47f9371 1678 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1679 break;
1680 case CEPH_OSD_OP_WRITE:
c47f9371 1681 rbd_osd_write_callback(obj_request);
bf0d5f50 1682 break;
fbfab539 1683 case CEPH_OSD_OP_STAT:
c47f9371 1684 rbd_osd_stat_callback(obj_request);
fbfab539 1685 break;
36be9a76 1686 case CEPH_OSD_OP_CALL:
b8d70035 1687 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1688 case CEPH_OSD_OP_WATCH:
c47f9371 1689 rbd_osd_trivial_callback(obj_request);
9969ebc5 1690 break;
bf0d5f50
AE
1691 default:
1692 rbd_warn(NULL, "%s: unsupported op %hu\n",
1693 obj_request->object_name, (unsigned short) opcode);
1694 break;
1695 }
1696
07741308 1697 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1698 rbd_obj_request_complete(obj_request);
1699}
1700
9d4df01f 1701static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3
AE
1702{
1703 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1704 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1705 u64 snap_id;
430c28c3 1706
8c042b0d 1707 rbd_assert(osd_req != NULL);
430c28c3 1708
9d4df01f 1709 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
8c042b0d 1710 ceph_osdc_build_request(osd_req, obj_request->offset,
9d4df01f
AE
1711 NULL, snap_id, NULL);
1712}
1713
1714static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1715{
1716 struct rbd_img_request *img_request = obj_request->img_request;
1717 struct ceph_osd_request *osd_req = obj_request->osd_req;
1718 struct ceph_snap_context *snapc;
1719 struct timespec mtime = CURRENT_TIME;
1720
1721 rbd_assert(osd_req != NULL);
1722
1723 snapc = img_request ? img_request->snapc : NULL;
1724 ceph_osdc_build_request(osd_req, obj_request->offset,
1725 snapc, CEPH_NOSNAP, &mtime);
430c28c3
AE
1726}
1727
bf0d5f50
AE
1728static struct ceph_osd_request *rbd_osd_req_create(
1729 struct rbd_device *rbd_dev,
1730 bool write_request,
430c28c3 1731 struct rbd_obj_request *obj_request)
bf0d5f50 1732{
bf0d5f50
AE
1733 struct ceph_snap_context *snapc = NULL;
1734 struct ceph_osd_client *osdc;
1735 struct ceph_osd_request *osd_req;
bf0d5f50 1736
6365d33a
AE
1737 if (obj_request_img_data_test(obj_request)) {
1738 struct rbd_img_request *img_request = obj_request->img_request;
1739
0c425248
AE
1740 rbd_assert(write_request ==
1741 img_request_write_test(img_request));
1742 if (write_request)
bf0d5f50 1743 snapc = img_request->snapc;
bf0d5f50
AE
1744 }
1745
1746 /* Allocate and initialize the request, for the single op */
1747
1748 osdc = &rbd_dev->rbd_client->client->osdc;
1749 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1750 if (!osd_req)
1751 return NULL; /* ENOMEM */
bf0d5f50 1752
430c28c3 1753 if (write_request)
bf0d5f50 1754 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1755 else
bf0d5f50 1756 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1757
1758 osd_req->r_callback = rbd_osd_req_callback;
1759 osd_req->r_priv = obj_request;
1760
1761 osd_req->r_oid_len = strlen(obj_request->object_name);
1762 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1763 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1764
1765 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1766
bf0d5f50
AE
1767 return osd_req;
1768}
1769
0eefd470
AE
1770/*
1771 * Create a copyup osd request based on the information in the
1772 * object request supplied. A copyup request has two osd ops,
1773 * a copyup method call, and a "normal" write request.
1774 */
1775static struct ceph_osd_request *
1776rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1777{
1778 struct rbd_img_request *img_request;
1779 struct ceph_snap_context *snapc;
1780 struct rbd_device *rbd_dev;
1781 struct ceph_osd_client *osdc;
1782 struct ceph_osd_request *osd_req;
1783
1784 rbd_assert(obj_request_img_data_test(obj_request));
1785 img_request = obj_request->img_request;
1786 rbd_assert(img_request);
1787 rbd_assert(img_request_write_test(img_request));
1788
1789 /* Allocate and initialize the request, for the two ops */
1790
1791 snapc = img_request->snapc;
1792 rbd_dev = img_request->rbd_dev;
1793 osdc = &rbd_dev->rbd_client->client->osdc;
1794 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1795 if (!osd_req)
1796 return NULL; /* ENOMEM */
1797
1798 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1799 osd_req->r_callback = rbd_osd_req_callback;
1800 osd_req->r_priv = obj_request;
1801
1802 osd_req->r_oid_len = strlen(obj_request->object_name);
1803 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1804 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1805
1806 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1807
1808 return osd_req;
1809}
1810
1811
bf0d5f50
AE
1812static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1813{
1814 ceph_osdc_put_request(osd_req);
1815}
1816
1817/* object_name is assumed to be a non-null pointer and NUL-terminated */
1818
1819static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1820 u64 offset, u64 length,
1821 enum obj_request_type type)
1822{
1823 struct rbd_obj_request *obj_request;
1824 size_t size;
1825 char *name;
1826
1827 rbd_assert(obj_request_type_valid(type));
1828
1829 size = strlen(object_name) + 1;
f907ad55
AE
1830 name = kmalloc(size, GFP_KERNEL);
1831 if (!name)
bf0d5f50
AE
1832 return NULL;
1833
868311b1 1834 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
f907ad55
AE
1835 if (!obj_request) {
1836 kfree(name);
1837 return NULL;
1838 }
1839
bf0d5f50
AE
1840 obj_request->object_name = memcpy(name, object_name, size);
1841 obj_request->offset = offset;
1842 obj_request->length = length;
926f9b3f 1843 obj_request->flags = 0;
bf0d5f50
AE
1844 obj_request->which = BAD_WHICH;
1845 obj_request->type = type;
1846 INIT_LIST_HEAD(&obj_request->links);
788e2df3 1847 init_completion(&obj_request->completion);
bf0d5f50
AE
1848 kref_init(&obj_request->kref);
1849
37206ee5
AE
1850 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1851 offset, length, (int)type, obj_request);
1852
bf0d5f50
AE
1853 return obj_request;
1854}
1855
1856static void rbd_obj_request_destroy(struct kref *kref)
1857{
1858 struct rbd_obj_request *obj_request;
1859
1860 obj_request = container_of(kref, struct rbd_obj_request, kref);
1861
37206ee5
AE
1862 dout("%s: obj %p\n", __func__, obj_request);
1863
bf0d5f50
AE
1864 rbd_assert(obj_request->img_request == NULL);
1865 rbd_assert(obj_request->which == BAD_WHICH);
1866
1867 if (obj_request->osd_req)
1868 rbd_osd_req_destroy(obj_request->osd_req);
1869
1870 rbd_assert(obj_request_type_valid(obj_request->type));
1871 switch (obj_request->type) {
9969ebc5
AE
1872 case OBJ_REQUEST_NODATA:
1873 break; /* Nothing to do */
bf0d5f50
AE
1874 case OBJ_REQUEST_BIO:
1875 if (obj_request->bio_list)
1876 bio_chain_put(obj_request->bio_list);
1877 break;
788e2df3
AE
1878 case OBJ_REQUEST_PAGES:
1879 if (obj_request->pages)
1880 ceph_release_page_vector(obj_request->pages,
1881 obj_request->page_count);
1882 break;
bf0d5f50
AE
1883 }
1884
f907ad55 1885 kfree(obj_request->object_name);
868311b1
AE
1886 obj_request->object_name = NULL;
1887 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
1888}
1889
fb65d228
AE
1890/* It's OK to call this for a device with no parent */
1891
1892static void rbd_spec_put(struct rbd_spec *spec);
1893static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1894{
1895 rbd_dev_remove_parent(rbd_dev);
1896 rbd_spec_put(rbd_dev->parent_spec);
1897 rbd_dev->parent_spec = NULL;
1898 rbd_dev->parent_overlap = 0;
1899}
1900
a2acd00e
AE
1901/*
1902 * Parent image reference counting is used to determine when an
1903 * image's parent fields can be safely torn down--after there are no
1904 * more in-flight requests to the parent image. When the last
1905 * reference is dropped, cleaning them up is safe.
1906 */
1907static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1908{
1909 int counter;
1910
1911 if (!rbd_dev->parent_spec)
1912 return;
1913
1914 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1915 if (counter > 0)
1916 return;
1917
1918 /* Last reference; clean up parent data structures */
1919
1920 if (!counter)
1921 rbd_dev_unparent(rbd_dev);
1922 else
1923 rbd_warn(rbd_dev, "parent reference underflow\n");
1924}
1925
1926/*
1927 * If an image has a non-zero parent overlap, get a reference to its
1928 * parent.
1929 *
392a9dad
AE
1930 * We must get the reference before checking for the overlap to
1931 * coordinate properly with zeroing the parent overlap in
1932 * rbd_dev_v2_parent_info() when an image gets flattened. We
1933 * drop it again if there is no overlap.
1934 *
a2acd00e
AE
1935 * Returns true if the rbd device has a parent with a non-zero
1936 * overlap and a reference for it was successfully taken, or
1937 * false otherwise.
1938 */
1939static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1940{
1941 int counter;
1942
1943 if (!rbd_dev->parent_spec)
1944 return false;
1945
1946 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1947 if (counter > 0 && rbd_dev->parent_overlap)
1948 return true;
1949
1950 /* Image was flattened, but parent is not yet torn down */
1951
1952 if (counter < 0)
1953 rbd_warn(rbd_dev, "parent reference overflow\n");
1954
1955 return false;
1956}
1957
bf0d5f50
AE
1958/*
1959 * Caller is responsible for filling in the list of object requests
1960 * that comprises the image request, and the Linux request pointer
1961 * (if there is one).
1962 */
cc344fa1
AE
1963static struct rbd_img_request *rbd_img_request_create(
1964 struct rbd_device *rbd_dev,
bf0d5f50 1965 u64 offset, u64 length,
e93f3152 1966 bool write_request)
bf0d5f50
AE
1967{
1968 struct rbd_img_request *img_request;
bf0d5f50 1969
1c2a9dfe 1970 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
bf0d5f50
AE
1971 if (!img_request)
1972 return NULL;
1973
1974 if (write_request) {
1975 down_read(&rbd_dev->header_rwsem);
812164f8 1976 ceph_get_snap_context(rbd_dev->header.snapc);
bf0d5f50 1977 up_read(&rbd_dev->header_rwsem);
bf0d5f50
AE
1978 }
1979
1980 img_request->rq = NULL;
1981 img_request->rbd_dev = rbd_dev;
1982 img_request->offset = offset;
1983 img_request->length = length;
0c425248
AE
1984 img_request->flags = 0;
1985 if (write_request) {
1986 img_request_write_set(img_request);
468521c1 1987 img_request->snapc = rbd_dev->header.snapc;
0c425248 1988 } else {
bf0d5f50 1989 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 1990 }
a2acd00e 1991 if (rbd_dev_parent_get(rbd_dev))
d0b2e944 1992 img_request_layered_set(img_request);
bf0d5f50
AE
1993 spin_lock_init(&img_request->completion_lock);
1994 img_request->next_completion = 0;
1995 img_request->callback = NULL;
a5a337d4 1996 img_request->result = 0;
bf0d5f50
AE
1997 img_request->obj_request_count = 0;
1998 INIT_LIST_HEAD(&img_request->obj_requests);
1999 kref_init(&img_request->kref);
2000
37206ee5
AE
2001 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2002 write_request ? "write" : "read", offset, length,
2003 img_request);
2004
bf0d5f50
AE
2005 return img_request;
2006}
2007
2008static void rbd_img_request_destroy(struct kref *kref)
2009{
2010 struct rbd_img_request *img_request;
2011 struct rbd_obj_request *obj_request;
2012 struct rbd_obj_request *next_obj_request;
2013
2014 img_request = container_of(kref, struct rbd_img_request, kref);
2015
37206ee5
AE
2016 dout("%s: img %p\n", __func__, img_request);
2017
bf0d5f50
AE
2018 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2019 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 2020 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 2021
a2acd00e
AE
2022 if (img_request_layered_test(img_request)) {
2023 img_request_layered_clear(img_request);
2024 rbd_dev_parent_put(img_request->rbd_dev);
2025 }
2026
0c425248 2027 if (img_request_write_test(img_request))
812164f8 2028 ceph_put_snap_context(img_request->snapc);
bf0d5f50 2029
1c2a9dfe 2030 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
2031}
2032
e93f3152
AE
2033static struct rbd_img_request *rbd_parent_request_create(
2034 struct rbd_obj_request *obj_request,
2035 u64 img_offset, u64 length)
2036{
2037 struct rbd_img_request *parent_request;
2038 struct rbd_device *rbd_dev;
2039
2040 rbd_assert(obj_request->img_request);
2041 rbd_dev = obj_request->img_request->rbd_dev;
2042
2043 parent_request = rbd_img_request_create(rbd_dev->parent,
2044 img_offset, length, false);
2045 if (!parent_request)
2046 return NULL;
2047
2048 img_request_child_set(parent_request);
2049 rbd_obj_request_get(obj_request);
2050 parent_request->obj_request = obj_request;
2051
2052 return parent_request;
2053}
2054
2055static void rbd_parent_request_destroy(struct kref *kref)
2056{
2057 struct rbd_img_request *parent_request;
2058 struct rbd_obj_request *orig_request;
2059
2060 parent_request = container_of(kref, struct rbd_img_request, kref);
2061 orig_request = parent_request->obj_request;
2062
2063 parent_request->obj_request = NULL;
2064 rbd_obj_request_put(orig_request);
2065 img_request_child_clear(parent_request);
2066
2067 rbd_img_request_destroy(kref);
2068}
2069
1217857f
AE
2070static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2071{
6365d33a 2072 struct rbd_img_request *img_request;
1217857f
AE
2073 unsigned int xferred;
2074 int result;
8b3e1a56 2075 bool more;
1217857f 2076
6365d33a
AE
2077 rbd_assert(obj_request_img_data_test(obj_request));
2078 img_request = obj_request->img_request;
2079
1217857f
AE
2080 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2081 xferred = (unsigned int)obj_request->xferred;
2082 result = obj_request->result;
2083 if (result) {
2084 struct rbd_device *rbd_dev = img_request->rbd_dev;
2085
2086 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
2087 img_request_write_test(img_request) ? "write" : "read",
2088 obj_request->length, obj_request->img_offset,
2089 obj_request->offset);
2090 rbd_warn(rbd_dev, " result %d xferred %x\n",
2091 result, xferred);
2092 if (!img_request->result)
2093 img_request->result = result;
2094 }
2095
f1a4739f
AE
2096 /* Image object requests don't own their page array */
2097
2098 if (obj_request->type == OBJ_REQUEST_PAGES) {
2099 obj_request->pages = NULL;
2100 obj_request->page_count = 0;
2101 }
2102
8b3e1a56
AE
2103 if (img_request_child_test(img_request)) {
2104 rbd_assert(img_request->obj_request != NULL);
2105 more = obj_request->which < img_request->obj_request_count - 1;
2106 } else {
2107 rbd_assert(img_request->rq != NULL);
2108 more = blk_end_request(img_request->rq, result, xferred);
2109 }
2110
2111 return more;
1217857f
AE
2112}
2113
2169238d
AE
2114static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2115{
2116 struct rbd_img_request *img_request;
2117 u32 which = obj_request->which;
2118 bool more = true;
2119
6365d33a 2120 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
2121 img_request = obj_request->img_request;
2122
2123 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2124 rbd_assert(img_request != NULL);
2169238d
AE
2125 rbd_assert(img_request->obj_request_count > 0);
2126 rbd_assert(which != BAD_WHICH);
2127 rbd_assert(which < img_request->obj_request_count);
2128 rbd_assert(which >= img_request->next_completion);
2129
2130 spin_lock_irq(&img_request->completion_lock);
2131 if (which != img_request->next_completion)
2132 goto out;
2133
2134 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
2135 rbd_assert(more);
2136 rbd_assert(which < img_request->obj_request_count);
2137
2138 if (!obj_request_done_test(obj_request))
2139 break;
1217857f 2140 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
2141 which++;
2142 }
2143
2144 rbd_assert(more ^ (which == img_request->obj_request_count));
2145 img_request->next_completion = which;
2146out:
2147 spin_unlock_irq(&img_request->completion_lock);
2148
2149 if (!more)
2150 rbd_img_request_complete(img_request);
2151}
2152
f1a4739f
AE
2153/*
2154 * Split up an image request into one or more object requests, each
2155 * to a different object. The "type" parameter indicates whether
2156 * "data_desc" is the pointer to the head of a list of bio
2157 * structures, or the base of a page array. In either case this
2158 * function assumes data_desc describes memory sufficient to hold
2159 * all data described by the image request.
2160 */
2161static int rbd_img_request_fill(struct rbd_img_request *img_request,
2162 enum obj_request_type type,
2163 void *data_desc)
bf0d5f50
AE
2164{
2165 struct rbd_device *rbd_dev = img_request->rbd_dev;
2166 struct rbd_obj_request *obj_request = NULL;
2167 struct rbd_obj_request *next_obj_request;
0c425248 2168 bool write_request = img_request_write_test(img_request);
f1a4739f
AE
2169 struct bio *bio_list;
2170 unsigned int bio_offset = 0;
2171 struct page **pages;
7da22d29 2172 u64 img_offset;
bf0d5f50
AE
2173 u64 resid;
2174 u16 opcode;
2175
f1a4739f
AE
2176 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2177 (int)type, data_desc);
37206ee5 2178
430c28c3 2179 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
7da22d29 2180 img_offset = img_request->offset;
bf0d5f50 2181 resid = img_request->length;
4dda41d3 2182 rbd_assert(resid > 0);
f1a4739f
AE
2183
2184 if (type == OBJ_REQUEST_BIO) {
2185 bio_list = data_desc;
2186 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2187 } else {
2188 rbd_assert(type == OBJ_REQUEST_PAGES);
2189 pages = data_desc;
2190 }
2191
bf0d5f50 2192 while (resid) {
2fa12320 2193 struct ceph_osd_request *osd_req;
bf0d5f50 2194 const char *object_name;
bf0d5f50
AE
2195 u64 offset;
2196 u64 length;
2197
7da22d29 2198 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
2199 if (!object_name)
2200 goto out_unwind;
7da22d29
AE
2201 offset = rbd_segment_offset(rbd_dev, img_offset);
2202 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2203 obj_request = rbd_obj_request_create(object_name,
f1a4739f 2204 offset, length, type);
78c2a44a
AE
2205 /* object request has its own copy of the object name */
2206 rbd_segment_name_free(object_name);
bf0d5f50
AE
2207 if (!obj_request)
2208 goto out_unwind;
2209
f1a4739f
AE
2210 if (type == OBJ_REQUEST_BIO) {
2211 unsigned int clone_size;
2212
2213 rbd_assert(length <= (u64)UINT_MAX);
2214 clone_size = (unsigned int)length;
2215 obj_request->bio_list =
2216 bio_chain_clone_range(&bio_list,
2217 &bio_offset,
2218 clone_size,
2219 GFP_ATOMIC);
2220 if (!obj_request->bio_list)
2221 goto out_partial;
2222 } else {
2223 unsigned int page_count;
2224
2225 obj_request->pages = pages;
2226 page_count = (u32)calc_pages_for(offset, length);
2227 obj_request->page_count = page_count;
2228 if ((offset + length) & ~PAGE_MASK)
2229 page_count--; /* more on last page */
2230 pages += page_count;
2231 }
bf0d5f50 2232
2fa12320
AE
2233 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2234 obj_request);
2235 if (!osd_req)
bf0d5f50 2236 goto out_partial;
2fa12320 2237 obj_request->osd_req = osd_req;
2169238d 2238 obj_request->callback = rbd_img_obj_callback;
430c28c3 2239
2fa12320
AE
2240 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2241 0, 0);
f1a4739f
AE
2242 if (type == OBJ_REQUEST_BIO)
2243 osd_req_op_extent_osd_data_bio(osd_req, 0,
2244 obj_request->bio_list, length);
2245 else
2246 osd_req_op_extent_osd_data_pages(osd_req, 0,
2247 obj_request->pages, length,
2248 offset & ~PAGE_MASK, false, false);
9d4df01f 2249
d2d1f17a
JD
2250 /*
2251 * set obj_request->img_request before formatting
2252 * the osd_request so that it gets the right snapc
2253 */
2254 rbd_img_obj_request_add(img_request, obj_request);
9d4df01f
AE
2255 if (write_request)
2256 rbd_osd_req_format_write(obj_request);
2257 else
2258 rbd_osd_req_format_read(obj_request);
430c28c3 2259
7da22d29 2260 obj_request->img_offset = img_offset;
bf0d5f50 2261
7da22d29 2262 img_offset += length;
bf0d5f50
AE
2263 resid -= length;
2264 }
2265
2266 return 0;
2267
2268out_partial:
2269 rbd_obj_request_put(obj_request);
2270out_unwind:
2271 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2272 rbd_obj_request_put(obj_request);
2273
2274 return -ENOMEM;
2275}
2276
0eefd470
AE
2277static void
2278rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2279{
2280 struct rbd_img_request *img_request;
2281 struct rbd_device *rbd_dev;
ebda6408 2282 struct page **pages;
0eefd470
AE
2283 u32 page_count;
2284
2285 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2286 rbd_assert(obj_request_img_data_test(obj_request));
2287 img_request = obj_request->img_request;
2288 rbd_assert(img_request);
2289
2290 rbd_dev = img_request->rbd_dev;
2291 rbd_assert(rbd_dev);
0eefd470 2292
ebda6408
AE
2293 pages = obj_request->copyup_pages;
2294 rbd_assert(pages != NULL);
0eefd470 2295 obj_request->copyup_pages = NULL;
ebda6408
AE
2296 page_count = obj_request->copyup_page_count;
2297 rbd_assert(page_count);
2298 obj_request->copyup_page_count = 0;
2299 ceph_release_page_vector(pages, page_count);
0eefd470
AE
2300
2301 /*
2302 * We want the transfer count to reflect the size of the
2303 * original write request. There is no such thing as a
2304 * successful short write, so if the request was successful
2305 * we can just set it to the originally-requested length.
2306 */
2307 if (!obj_request->result)
2308 obj_request->xferred = obj_request->length;
2309
2310 /* Finish up with the normal image object callback */
2311
2312 rbd_img_obj_callback(obj_request);
2313}
2314
3d7efd18
AE
2315static void
2316rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2317{
2318 struct rbd_obj_request *orig_request;
0eefd470
AE
2319 struct ceph_osd_request *osd_req;
2320 struct ceph_osd_client *osdc;
2321 struct rbd_device *rbd_dev;
3d7efd18 2322 struct page **pages;
ebda6408 2323 u32 page_count;
bbea1c1a 2324 int img_result;
ebda6408 2325 u64 parent_length;
b91f09f1
AE
2326 u64 offset;
2327 u64 length;
3d7efd18
AE
2328
2329 rbd_assert(img_request_child_test(img_request));
2330
2331 /* First get what we need from the image request */
2332
2333 pages = img_request->copyup_pages;
2334 rbd_assert(pages != NULL);
2335 img_request->copyup_pages = NULL;
ebda6408
AE
2336 page_count = img_request->copyup_page_count;
2337 rbd_assert(page_count);
2338 img_request->copyup_page_count = 0;
3d7efd18
AE
2339
2340 orig_request = img_request->obj_request;
2341 rbd_assert(orig_request != NULL);
b91f09f1 2342 rbd_assert(obj_request_type_valid(orig_request->type));
bbea1c1a 2343 img_result = img_request->result;
ebda6408
AE
2344 parent_length = img_request->length;
2345 rbd_assert(parent_length == img_request->xferred);
91c6febb 2346 rbd_img_request_put(img_request);
3d7efd18 2347
91c6febb
AE
2348 rbd_assert(orig_request->img_request);
2349 rbd_dev = orig_request->img_request->rbd_dev;
0eefd470 2350 rbd_assert(rbd_dev);
0eefd470 2351
bbea1c1a
AE
2352 /*
2353 * If the overlap has become 0 (most likely because the
2354 * image has been flattened) we need to free the pages
2355 * and re-submit the original write request.
2356 */
2357 if (!rbd_dev->parent_overlap) {
2358 struct ceph_osd_client *osdc;
3d7efd18 2359
bbea1c1a
AE
2360 ceph_release_page_vector(pages, page_count);
2361 osdc = &rbd_dev->rbd_client->client->osdc;
2362 img_result = rbd_obj_request_submit(osdc, orig_request);
2363 if (!img_result)
2364 return;
2365 }
0eefd470 2366
bbea1c1a 2367 if (img_result)
0eefd470 2368 goto out_err;
0eefd470 2369
8785b1d4
AE
2370 /*
2371 * The original osd request is of no use to use any more.
2372 * We need a new one that can hold the two ops in a copyup
2373 * request. Allocate the new copyup osd request for the
2374 * original request, and release the old one.
2375 */
bbea1c1a 2376 img_result = -ENOMEM;
0eefd470
AE
2377 osd_req = rbd_osd_req_create_copyup(orig_request);
2378 if (!osd_req)
2379 goto out_err;
8785b1d4 2380 rbd_osd_req_destroy(orig_request->osd_req);
0eefd470
AE
2381 orig_request->osd_req = osd_req;
2382 orig_request->copyup_pages = pages;
ebda6408 2383 orig_request->copyup_page_count = page_count;
3d7efd18 2384
0eefd470 2385 /* Initialize the copyup op */
3d7efd18 2386
0eefd470 2387 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
ebda6408 2388 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
0eefd470 2389 false, false);
3d7efd18 2390
0eefd470
AE
2391 /* Then the original write request op */
2392
b91f09f1
AE
2393 offset = orig_request->offset;
2394 length = orig_request->length;
0eefd470 2395 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
b91f09f1
AE
2396 offset, length, 0, 0);
2397 if (orig_request->type == OBJ_REQUEST_BIO)
2398 osd_req_op_extent_osd_data_bio(osd_req, 1,
2399 orig_request->bio_list, length);
2400 else
2401 osd_req_op_extent_osd_data_pages(osd_req, 1,
2402 orig_request->pages, length,
2403 offset & ~PAGE_MASK, false, false);
0eefd470
AE
2404
2405 rbd_osd_req_format_write(orig_request);
2406
2407 /* All set, send it off. */
2408
2409 orig_request->callback = rbd_img_obj_copyup_callback;
2410 osdc = &rbd_dev->rbd_client->client->osdc;
bbea1c1a
AE
2411 img_result = rbd_obj_request_submit(osdc, orig_request);
2412 if (!img_result)
0eefd470
AE
2413 return;
2414out_err:
2415 /* Record the error code and complete the request */
2416
bbea1c1a 2417 orig_request->result = img_result;
0eefd470
AE
2418 orig_request->xferred = 0;
2419 obj_request_done_set(orig_request);
2420 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2421}
2422
2423/*
2424 * Read from the parent image the range of data that covers the
2425 * entire target of the given object request. This is used for
2426 * satisfying a layered image write request when the target of an
2427 * object request from the image request does not exist.
2428 *
2429 * A page array big enough to hold the returned data is allocated
2430 * and supplied to rbd_img_request_fill() as the "data descriptor."
2431 * When the read completes, this page array will be transferred to
2432 * the original object request for the copyup operation.
2433 *
2434 * If an error occurs, record it as the result of the original
2435 * object request and mark it done so it gets completed.
2436 */
2437static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2438{
2439 struct rbd_img_request *img_request = NULL;
2440 struct rbd_img_request *parent_request = NULL;
2441 struct rbd_device *rbd_dev;
2442 u64 img_offset;
2443 u64 length;
2444 struct page **pages = NULL;
2445 u32 page_count;
2446 int result;
2447
2448 rbd_assert(obj_request_img_data_test(obj_request));
b91f09f1 2449 rbd_assert(obj_request_type_valid(obj_request->type));
3d7efd18
AE
2450
2451 img_request = obj_request->img_request;
2452 rbd_assert(img_request != NULL);
2453 rbd_dev = img_request->rbd_dev;
2454 rbd_assert(rbd_dev->parent != NULL);
2455
2456 /*
2457 * Determine the byte range covered by the object in the
2458 * child image to which the original request was to be sent.
2459 */
2460 img_offset = obj_request->img_offset - obj_request->offset;
2461 length = (u64)1 << rbd_dev->header.obj_order;
2462
a9e8ba2c
AE
2463 /*
2464 * There is no defined parent data beyond the parent
2465 * overlap, so limit what we read at that boundary if
2466 * necessary.
2467 */
2468 if (img_offset + length > rbd_dev->parent_overlap) {
2469 rbd_assert(img_offset < rbd_dev->parent_overlap);
2470 length = rbd_dev->parent_overlap - img_offset;
2471 }
2472
3d7efd18
AE
2473 /*
2474 * Allocate a page array big enough to receive the data read
2475 * from the parent.
2476 */
2477 page_count = (u32)calc_pages_for(0, length);
2478 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2479 if (IS_ERR(pages)) {
2480 result = PTR_ERR(pages);
2481 pages = NULL;
2482 goto out_err;
2483 }
2484
2485 result = -ENOMEM;
e93f3152
AE
2486 parent_request = rbd_parent_request_create(obj_request,
2487 img_offset, length);
3d7efd18
AE
2488 if (!parent_request)
2489 goto out_err;
3d7efd18
AE
2490
2491 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2492 if (result)
2493 goto out_err;
2494 parent_request->copyup_pages = pages;
ebda6408 2495 parent_request->copyup_page_count = page_count;
3d7efd18
AE
2496
2497 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2498 result = rbd_img_request_submit(parent_request);
2499 if (!result)
2500 return 0;
2501
2502 parent_request->copyup_pages = NULL;
ebda6408 2503 parent_request->copyup_page_count = 0;
3d7efd18
AE
2504 parent_request->obj_request = NULL;
2505 rbd_obj_request_put(obj_request);
2506out_err:
2507 if (pages)
2508 ceph_release_page_vector(pages, page_count);
2509 if (parent_request)
2510 rbd_img_request_put(parent_request);
2511 obj_request->result = result;
2512 obj_request->xferred = 0;
2513 obj_request_done_set(obj_request);
2514
2515 return result;
2516}
2517
c5b5ef6c
AE
2518static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2519{
c5b5ef6c 2520 struct rbd_obj_request *orig_request;
638f5abe 2521 struct rbd_device *rbd_dev;
c5b5ef6c
AE
2522 int result;
2523
2524 rbd_assert(!obj_request_img_data_test(obj_request));
2525
2526 /*
2527 * All we need from the object request is the original
2528 * request and the result of the STAT op. Grab those, then
2529 * we're done with the request.
2530 */
2531 orig_request = obj_request->obj_request;
2532 obj_request->obj_request = NULL;
912c317d 2533 rbd_obj_request_put(orig_request);
c5b5ef6c
AE
2534 rbd_assert(orig_request);
2535 rbd_assert(orig_request->img_request);
2536
2537 result = obj_request->result;
2538 obj_request->result = 0;
2539
2540 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2541 obj_request, orig_request, result,
2542 obj_request->xferred, obj_request->length);
2543 rbd_obj_request_put(obj_request);
2544
638f5abe
AE
2545 /*
2546 * If the overlap has become 0 (most likely because the
2547 * image has been flattened) we need to free the pages
2548 * and re-submit the original write request.
2549 */
2550 rbd_dev = orig_request->img_request->rbd_dev;
2551 if (!rbd_dev->parent_overlap) {
2552 struct ceph_osd_client *osdc;
2553
638f5abe
AE
2554 osdc = &rbd_dev->rbd_client->client->osdc;
2555 result = rbd_obj_request_submit(osdc, orig_request);
2556 if (!result)
2557 return;
2558 }
c5b5ef6c
AE
2559
2560 /*
2561 * Our only purpose here is to determine whether the object
2562 * exists, and we don't want to treat the non-existence as
2563 * an error. If something else comes back, transfer the
2564 * error to the original request and complete it now.
2565 */
2566 if (!result) {
2567 obj_request_existence_set(orig_request, true);
2568 } else if (result == -ENOENT) {
2569 obj_request_existence_set(orig_request, false);
2570 } else if (result) {
2571 orig_request->result = result;
3d7efd18 2572 goto out;
c5b5ef6c
AE
2573 }
2574
2575 /*
2576 * Resubmit the original request now that we have recorded
2577 * whether the target object exists.
2578 */
b454e36d 2579 orig_request->result = rbd_img_obj_request_submit(orig_request);
3d7efd18 2580out:
c5b5ef6c
AE
2581 if (orig_request->result)
2582 rbd_obj_request_complete(orig_request);
c5b5ef6c
AE
2583}
2584
2585static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2586{
2587 struct rbd_obj_request *stat_request;
2588 struct rbd_device *rbd_dev;
2589 struct ceph_osd_client *osdc;
2590 struct page **pages = NULL;
2591 u32 page_count;
2592 size_t size;
2593 int ret;
2594
2595 /*
2596 * The response data for a STAT call consists of:
2597 * le64 length;
2598 * struct {
2599 * le32 tv_sec;
2600 * le32 tv_nsec;
2601 * } mtime;
2602 */
2603 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2604 page_count = (u32)calc_pages_for(0, size);
2605 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2606 if (IS_ERR(pages))
2607 return PTR_ERR(pages);
2608
2609 ret = -ENOMEM;
2610 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2611 OBJ_REQUEST_PAGES);
2612 if (!stat_request)
2613 goto out;
2614
2615 rbd_obj_request_get(obj_request);
2616 stat_request->obj_request = obj_request;
2617 stat_request->pages = pages;
2618 stat_request->page_count = page_count;
2619
2620 rbd_assert(obj_request->img_request);
2621 rbd_dev = obj_request->img_request->rbd_dev;
2622 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2623 stat_request);
2624 if (!stat_request->osd_req)
2625 goto out;
2626 stat_request->callback = rbd_img_obj_exists_callback;
2627
2628 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2629 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2630 false, false);
9d4df01f 2631 rbd_osd_req_format_read(stat_request);
c5b5ef6c
AE
2632
2633 osdc = &rbd_dev->rbd_client->client->osdc;
2634 ret = rbd_obj_request_submit(osdc, stat_request);
2635out:
2636 if (ret)
2637 rbd_obj_request_put(obj_request);
2638
2639 return ret;
2640}
2641
b454e36d
AE
2642static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2643{
2644 struct rbd_img_request *img_request;
a9e8ba2c 2645 struct rbd_device *rbd_dev;
3d7efd18 2646 bool known;
b454e36d
AE
2647
2648 rbd_assert(obj_request_img_data_test(obj_request));
2649
2650 img_request = obj_request->img_request;
2651 rbd_assert(img_request);
a9e8ba2c 2652 rbd_dev = img_request->rbd_dev;
b454e36d 2653
b454e36d 2654 /*
a9e8ba2c
AE
2655 * Only writes to layered images need special handling.
2656 * Reads and non-layered writes are simple object requests.
2657 * Layered writes that start beyond the end of the overlap
2658 * with the parent have no parent data, so they too are
2659 * simple object requests. Finally, if the target object is
2660 * known to already exist, its parent data has already been
2661 * copied, so a write to the object can also be handled as a
2662 * simple object request.
b454e36d
AE
2663 */
2664 if (!img_request_write_test(img_request) ||
2665 !img_request_layered_test(img_request) ||
a9e8ba2c 2666 rbd_dev->parent_overlap <= obj_request->img_offset ||
3d7efd18
AE
2667 ((known = obj_request_known_test(obj_request)) &&
2668 obj_request_exists_test(obj_request))) {
b454e36d
AE
2669
2670 struct rbd_device *rbd_dev;
2671 struct ceph_osd_client *osdc;
2672
2673 rbd_dev = obj_request->img_request->rbd_dev;
2674 osdc = &rbd_dev->rbd_client->client->osdc;
2675
2676 return rbd_obj_request_submit(osdc, obj_request);
2677 }
2678
2679 /*
3d7efd18
AE
2680 * It's a layered write. The target object might exist but
2681 * we may not know that yet. If we know it doesn't exist,
2682 * start by reading the data for the full target object from
2683 * the parent so we can use it for a copyup to the target.
b454e36d 2684 */
3d7efd18
AE
2685 if (known)
2686 return rbd_img_obj_parent_read_full(obj_request);
2687
2688 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2689
2690 return rbd_img_obj_exists_submit(obj_request);
2691}
2692
bf0d5f50
AE
2693static int rbd_img_request_submit(struct rbd_img_request *img_request)
2694{
bf0d5f50 2695 struct rbd_obj_request *obj_request;
46faeed4 2696 struct rbd_obj_request *next_obj_request;
bf0d5f50 2697
37206ee5 2698 dout("%s: img %p\n", __func__, img_request);
46faeed4 2699 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
2700 int ret;
2701
b454e36d 2702 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50
AE
2703 if (ret)
2704 return ret;
bf0d5f50
AE
2705 }
2706
2707 return 0;
2708}
8b3e1a56
AE
2709
2710static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2711{
2712 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2713 struct rbd_device *rbd_dev;
2714 u64 obj_end;
02c74fba
AE
2715 u64 img_xferred;
2716 int img_result;
8b3e1a56
AE
2717
2718 rbd_assert(img_request_child_test(img_request));
2719
02c74fba
AE
2720 /* First get what we need from the image request and release it */
2721
8b3e1a56 2722 obj_request = img_request->obj_request;
02c74fba
AE
2723 img_xferred = img_request->xferred;
2724 img_result = img_request->result;
2725 rbd_img_request_put(img_request);
2726
2727 /*
2728 * If the overlap has become 0 (most likely because the
2729 * image has been flattened) we need to re-submit the
2730 * original request.
2731 */
a9e8ba2c
AE
2732 rbd_assert(obj_request);
2733 rbd_assert(obj_request->img_request);
02c74fba
AE
2734 rbd_dev = obj_request->img_request->rbd_dev;
2735 if (!rbd_dev->parent_overlap) {
2736 struct ceph_osd_client *osdc;
2737
2738 osdc = &rbd_dev->rbd_client->client->osdc;
2739 img_result = rbd_obj_request_submit(osdc, obj_request);
2740 if (!img_result)
2741 return;
2742 }
a9e8ba2c 2743
02c74fba 2744 obj_request->result = img_result;
a9e8ba2c
AE
2745 if (obj_request->result)
2746 goto out;
2747
2748 /*
2749 * We need to zero anything beyond the parent overlap
2750 * boundary. Since rbd_img_obj_request_read_callback()
2751 * will zero anything beyond the end of a short read, an
2752 * easy way to do this is to pretend the data from the
2753 * parent came up short--ending at the overlap boundary.
2754 */
2755 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2756 obj_end = obj_request->img_offset + obj_request->length;
a9e8ba2c
AE
2757 if (obj_end > rbd_dev->parent_overlap) {
2758 u64 xferred = 0;
2759
2760 if (obj_request->img_offset < rbd_dev->parent_overlap)
2761 xferred = rbd_dev->parent_overlap -
2762 obj_request->img_offset;
8b3e1a56 2763
02c74fba 2764 obj_request->xferred = min(img_xferred, xferred);
a9e8ba2c 2765 } else {
02c74fba 2766 obj_request->xferred = img_xferred;
a9e8ba2c
AE
2767 }
2768out:
8b3e1a56
AE
2769 rbd_img_obj_request_read_callback(obj_request);
2770 rbd_obj_request_complete(obj_request);
2771}
2772
2773static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2774{
8b3e1a56
AE
2775 struct rbd_img_request *img_request;
2776 int result;
2777
2778 rbd_assert(obj_request_img_data_test(obj_request));
2779 rbd_assert(obj_request->img_request != NULL);
2780 rbd_assert(obj_request->result == (s32) -ENOENT);
5b2ab72d 2781 rbd_assert(obj_request_type_valid(obj_request->type));
8b3e1a56 2782
8b3e1a56 2783 /* rbd_read_finish(obj_request, obj_request->length); */
e93f3152 2784 img_request = rbd_parent_request_create(obj_request,
8b3e1a56 2785 obj_request->img_offset,
e93f3152 2786 obj_request->length);
8b3e1a56
AE
2787 result = -ENOMEM;
2788 if (!img_request)
2789 goto out_err;
2790
5b2ab72d
AE
2791 if (obj_request->type == OBJ_REQUEST_BIO)
2792 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2793 obj_request->bio_list);
2794 else
2795 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2796 obj_request->pages);
8b3e1a56
AE
2797 if (result)
2798 goto out_err;
2799
2800 img_request->callback = rbd_img_parent_read_callback;
2801 result = rbd_img_request_submit(img_request);
2802 if (result)
2803 goto out_err;
2804
2805 return;
2806out_err:
2807 if (img_request)
2808 rbd_img_request_put(img_request);
2809 obj_request->result = result;
2810 obj_request->xferred = 0;
2811 obj_request_done_set(obj_request);
2812}
bf0d5f50 2813
cc4a38bd 2814static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
b8d70035
AE
2815{
2816 struct rbd_obj_request *obj_request;
2169238d 2817 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
b8d70035
AE
2818 int ret;
2819
2820 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2821 OBJ_REQUEST_NODATA);
2822 if (!obj_request)
2823 return -ENOMEM;
2824
2825 ret = -ENOMEM;
430c28c3 2826 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
b8d70035
AE
2827 if (!obj_request->osd_req)
2828 goto out;
2169238d 2829 obj_request->callback = rbd_obj_request_put;
b8d70035 2830
c99d2d4a 2831 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
cc4a38bd 2832 notify_id, 0, 0);
9d4df01f 2833 rbd_osd_req_format_read(obj_request);
430c28c3 2834
b8d70035 2835 ret = rbd_obj_request_submit(osdc, obj_request);
b8d70035 2836out:
cf81b60e
AE
2837 if (ret)
2838 rbd_obj_request_put(obj_request);
b8d70035
AE
2839
2840 return ret;
2841}
2842
2843static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2844{
2845 struct rbd_device *rbd_dev = (struct rbd_device *)data;
e627db08 2846 int ret;
b8d70035
AE
2847
2848 if (!rbd_dev)
2849 return;
2850
37206ee5 2851 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
cc4a38bd
AE
2852 rbd_dev->header_name, (unsigned long long)notify_id,
2853 (unsigned int)opcode);
e627db08
AE
2854 ret = rbd_dev_refresh(rbd_dev);
2855 if (ret)
3b5cf2a2 2856 rbd_warn(rbd_dev, "header refresh error (%d)\n", ret);
b8d70035 2857
cc4a38bd 2858 rbd_obj_notify_ack(rbd_dev, notify_id);
b8d70035
AE
2859}
2860
9969ebc5
AE
2861/*
2862 * Request sync osd watch/unwatch. The value of "start" determines
2863 * whether a watch request is being initiated or torn down.
2864 */
1f3ef788 2865static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
9969ebc5
AE
2866{
2867 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2868 struct rbd_obj_request *obj_request;
9969ebc5
AE
2869 int ret;
2870
2871 rbd_assert(start ^ !!rbd_dev->watch_event);
2872 rbd_assert(start ^ !!rbd_dev->watch_request);
2873
2874 if (start) {
3c663bbd 2875 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
2876 &rbd_dev->watch_event);
2877 if (ret < 0)
2878 return ret;
8eb87565 2879 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
2880 }
2881
2882 ret = -ENOMEM;
2883 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2884 OBJ_REQUEST_NODATA);
2885 if (!obj_request)
2886 goto out_cancel;
2887
430c28c3
AE
2888 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2889 if (!obj_request->osd_req)
2890 goto out_cancel;
2891
8eb87565 2892 if (start)
975241af 2893 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 2894 else
6977c3f9 2895 ceph_osdc_unregister_linger_request(osdc,
975241af 2896 rbd_dev->watch_request->osd_req);
2169238d
AE
2897
2898 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1f3ef788 2899 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
9d4df01f 2900 rbd_osd_req_format_write(obj_request);
2169238d 2901
9969ebc5
AE
2902 ret = rbd_obj_request_submit(osdc, obj_request);
2903 if (ret)
2904 goto out_cancel;
2905 ret = rbd_obj_request_wait(obj_request);
2906 if (ret)
2907 goto out_cancel;
9969ebc5
AE
2908 ret = obj_request->result;
2909 if (ret)
2910 goto out_cancel;
2911
8eb87565
AE
2912 /*
2913 * A watch request is set to linger, so the underlying osd
2914 * request won't go away until we unregister it. We retain
2915 * a pointer to the object request during that time (in
2916 * rbd_dev->watch_request), so we'll keep a reference to
2917 * it. We'll drop that reference (below) after we've
2918 * unregistered it.
2919 */
2920 if (start) {
2921 rbd_dev->watch_request = obj_request;
2922
2923 return 0;
2924 }
2925
2926 /* We have successfully torn down the watch request */
2927
2928 rbd_obj_request_put(rbd_dev->watch_request);
2929 rbd_dev->watch_request = NULL;
9969ebc5
AE
2930out_cancel:
2931 /* Cancel the event if we're tearing down, or on error */
2932 ceph_osdc_cancel_event(rbd_dev->watch_event);
2933 rbd_dev->watch_event = NULL;
9969ebc5
AE
2934 if (obj_request)
2935 rbd_obj_request_put(obj_request);
2936
2937 return ret;
2938}
2939
36be9a76 2940/*
f40eb349
AE
2941 * Synchronous osd object method call. Returns the number of bytes
2942 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
2943 */
2944static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2945 const char *object_name,
2946 const char *class_name,
2947 const char *method_name,
4157976b 2948 const void *outbound,
36be9a76 2949 size_t outbound_size,
4157976b 2950 void *inbound,
e2a58ee5 2951 size_t inbound_size)
36be9a76 2952{
2169238d 2953 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 2954 struct rbd_obj_request *obj_request;
36be9a76
AE
2955 struct page **pages;
2956 u32 page_count;
2957 int ret;
2958
2959 /*
6010a451
AE
2960 * Method calls are ultimately read operations. The result
2961 * should placed into the inbound buffer provided. They
2962 * also supply outbound data--parameters for the object
2963 * method. Currently if this is present it will be a
2964 * snapshot id.
36be9a76 2965 */
57385b51 2966 page_count = (u32)calc_pages_for(0, inbound_size);
36be9a76
AE
2967 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2968 if (IS_ERR(pages))
2969 return PTR_ERR(pages);
2970
2971 ret = -ENOMEM;
6010a451 2972 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
2973 OBJ_REQUEST_PAGES);
2974 if (!obj_request)
2975 goto out;
2976
2977 obj_request->pages = pages;
2978 obj_request->page_count = page_count;
2979
430c28c3 2980 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
36be9a76
AE
2981 if (!obj_request->osd_req)
2982 goto out;
2983
c99d2d4a 2984 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
2985 class_name, method_name);
2986 if (outbound_size) {
2987 struct ceph_pagelist *pagelist;
2988
2989 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2990 if (!pagelist)
2991 goto out;
2992
2993 ceph_pagelist_init(pagelist);
2994 ceph_pagelist_append(pagelist, outbound, outbound_size);
2995 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2996 pagelist);
2997 }
a4ce40a9
AE
2998 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2999 obj_request->pages, inbound_size,
44cd188d 3000 0, false, false);
9d4df01f 3001 rbd_osd_req_format_read(obj_request);
430c28c3 3002
36be9a76
AE
3003 ret = rbd_obj_request_submit(osdc, obj_request);
3004 if (ret)
3005 goto out;
3006 ret = rbd_obj_request_wait(obj_request);
3007 if (ret)
3008 goto out;
3009
3010 ret = obj_request->result;
3011 if (ret < 0)
3012 goto out;
57385b51
AE
3013
3014 rbd_assert(obj_request->xferred < (u64)INT_MAX);
3015 ret = (int)obj_request->xferred;
903bb32e 3016 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
3017out:
3018 if (obj_request)
3019 rbd_obj_request_put(obj_request);
3020 else
3021 ceph_release_page_vector(pages, page_count);
3022
3023 return ret;
3024}
3025
bf0d5f50 3026static void rbd_request_fn(struct request_queue *q)
cc344fa1 3027 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
3028{
3029 struct rbd_device *rbd_dev = q->queuedata;
3030 bool read_only = rbd_dev->mapping.read_only;
3031 struct request *rq;
3032 int result;
3033
3034 while ((rq = blk_fetch_request(q))) {
3035 bool write_request = rq_data_dir(rq) == WRITE;
3036 struct rbd_img_request *img_request;
3037 u64 offset;
3038 u64 length;
3039
3040 /* Ignore any non-FS requests that filter through. */
3041
3042 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
3043 dout("%s: non-fs request type %d\n", __func__,
3044 (int) rq->cmd_type);
3045 __blk_end_request_all(rq, 0);
3046 continue;
3047 }
3048
3049 /* Ignore/skip any zero-length requests */
3050
3051 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
3052 length = (u64) blk_rq_bytes(rq);
3053
3054 if (!length) {
3055 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
3056 __blk_end_request_all(rq, 0);
3057 continue;
3058 }
3059
3060 spin_unlock_irq(q->queue_lock);
3061
3062 /* Disallow writes to a read-only device */
3063
3064 if (write_request) {
3065 result = -EROFS;
3066 if (read_only)
3067 goto end_request;
3068 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3069 }
3070
6d292906
AE
3071 /*
3072 * Quit early if the mapped snapshot no longer
3073 * exists. It's still possible the snapshot will
3074 * have disappeared by the time our request arrives
3075 * at the osd, but there's no sense in sending it if
3076 * we already know.
3077 */
3078 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
3079 dout("request for non-existent snapshot");
3080 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3081 result = -ENXIO;
3082 goto end_request;
3083 }
3084
bf0d5f50 3085 result = -EINVAL;
c0cd10db
AE
3086 if (offset && length > U64_MAX - offset + 1) {
3087 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
3088 offset, length);
bf0d5f50 3089 goto end_request; /* Shouldn't happen */
c0cd10db 3090 }
bf0d5f50 3091
00a653e2
AE
3092 result = -EIO;
3093 if (offset + length > rbd_dev->mapping.size) {
3094 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
3095 offset, length, rbd_dev->mapping.size);
3096 goto end_request;
3097 }
3098
bf0d5f50
AE
3099 result = -ENOMEM;
3100 img_request = rbd_img_request_create(rbd_dev, offset, length,
e93f3152 3101 write_request);
bf0d5f50
AE
3102 if (!img_request)
3103 goto end_request;
3104
3105 img_request->rq = rq;
3106
f1a4739f
AE
3107 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3108 rq->bio);
bf0d5f50
AE
3109 if (!result)
3110 result = rbd_img_request_submit(img_request);
3111 if (result)
3112 rbd_img_request_put(img_request);
3113end_request:
3114 spin_lock_irq(q->queue_lock);
3115 if (result < 0) {
7da22d29
AE
3116 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
3117 write_request ? "write" : "read",
3118 length, offset, result);
3119
bf0d5f50
AE
3120 __blk_end_request_all(rq, result);
3121 }
3122 }
3123}
3124
602adf40
YS
3125/*
3126 * a queue callback. Makes sure that we don't create a bio that spans across
3127 * multiple osd objects. One exception would be with a single page bios,
f7760dad 3128 * which we handle later at bio_chain_clone_range()
602adf40
YS
3129 */
3130static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3131 struct bio_vec *bvec)
3132{
3133 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
3134 sector_t sector_offset;
3135 sector_t sectors_per_obj;
3136 sector_t obj_sector_offset;
3137 int ret;
3138
3139 /*
3140 * Find how far into its rbd object the partition-relative
3141 * bio start sector is to offset relative to the enclosing
3142 * device.
3143 */
3144 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3145 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3146 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3147
3148 /*
3149 * Compute the number of bytes from that offset to the end
3150 * of the object. Account for what's already used by the bio.
3151 */
3152 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3153 if (ret > bmd->bi_size)
3154 ret -= bmd->bi_size;
3155 else
3156 ret = 0;
3157
3158 /*
3159 * Don't send back more than was asked for. And if the bio
3160 * was empty, let the whole thing through because: "Note
3161 * that a block device *must* allow a single page to be
3162 * added to an empty bio."
3163 */
3164 rbd_assert(bvec->bv_len <= PAGE_SIZE);
3165 if (ret > (int) bvec->bv_len || !bmd->bi_size)
3166 ret = (int) bvec->bv_len;
3167
3168 return ret;
602adf40
YS
3169}
3170
3171static void rbd_free_disk(struct rbd_device *rbd_dev)
3172{
3173 struct gendisk *disk = rbd_dev->disk;
3174
3175 if (!disk)
3176 return;
3177
a0cab924
AE
3178 rbd_dev->disk = NULL;
3179 if (disk->flags & GENHD_FL_UP) {
602adf40 3180 del_gendisk(disk);
a0cab924
AE
3181 if (disk->queue)
3182 blk_cleanup_queue(disk->queue);
3183 }
602adf40
YS
3184 put_disk(disk);
3185}
3186
788e2df3
AE
3187static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3188 const char *object_name,
7097f8df 3189 u64 offset, u64 length, void *buf)
788e2df3
AE
3190
3191{
2169238d 3192 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 3193 struct rbd_obj_request *obj_request;
788e2df3
AE
3194 struct page **pages = NULL;
3195 u32 page_count;
1ceae7ef 3196 size_t size;
788e2df3
AE
3197 int ret;
3198
3199 page_count = (u32) calc_pages_for(offset, length);
3200 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3201 if (IS_ERR(pages))
3202 ret = PTR_ERR(pages);
3203
3204 ret = -ENOMEM;
3205 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 3206 OBJ_REQUEST_PAGES);
788e2df3
AE
3207 if (!obj_request)
3208 goto out;
3209
3210 obj_request->pages = pages;
3211 obj_request->page_count = page_count;
3212
430c28c3 3213 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
788e2df3
AE
3214 if (!obj_request->osd_req)
3215 goto out;
3216
c99d2d4a
AE
3217 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3218 offset, length, 0, 0);
406e2c9f 3219 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 3220 obj_request->pages,
44cd188d
AE
3221 obj_request->length,
3222 obj_request->offset & ~PAGE_MASK,
3223 false, false);
9d4df01f 3224 rbd_osd_req_format_read(obj_request);
430c28c3 3225
788e2df3
AE
3226 ret = rbd_obj_request_submit(osdc, obj_request);
3227 if (ret)
3228 goto out;
3229 ret = rbd_obj_request_wait(obj_request);
3230 if (ret)
3231 goto out;
3232
3233 ret = obj_request->result;
3234 if (ret < 0)
3235 goto out;
1ceae7ef
AE
3236
3237 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3238 size = (size_t) obj_request->xferred;
903bb32e 3239 ceph_copy_from_page_vector(pages, buf, 0, size);
7097f8df
AE
3240 rbd_assert(size <= (size_t)INT_MAX);
3241 ret = (int)size;
788e2df3
AE
3242out:
3243 if (obj_request)
3244 rbd_obj_request_put(obj_request);
3245 else
3246 ceph_release_page_vector(pages, page_count);
3247
3248 return ret;
3249}
3250
602adf40 3251/*
662518b1
AE
3252 * Read the complete header for the given rbd device. On successful
3253 * return, the rbd_dev->header field will contain up-to-date
3254 * information about the image.
602adf40 3255 */
99a41ebc 3256static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 3257{
4156d998 3258 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 3259 u32 snap_count = 0;
4156d998
AE
3260 u64 names_size = 0;
3261 u32 want_count;
3262 int ret;
602adf40 3263
00f1f36f 3264 /*
4156d998
AE
3265 * The complete header will include an array of its 64-bit
3266 * snapshot ids, followed by the names of those snapshots as
3267 * a contiguous block of NUL-terminated strings. Note that
3268 * the number of snapshots could change by the time we read
3269 * it in, in which case we re-read it.
00f1f36f 3270 */
4156d998
AE
3271 do {
3272 size_t size;
3273
3274 kfree(ondisk);
3275
3276 size = sizeof (*ondisk);
3277 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3278 size += names_size;
3279 ondisk = kmalloc(size, GFP_KERNEL);
3280 if (!ondisk)
662518b1 3281 return -ENOMEM;
4156d998 3282
788e2df3 3283 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
7097f8df 3284 0, size, ondisk);
4156d998 3285 if (ret < 0)
662518b1 3286 goto out;
c0cd10db 3287 if ((size_t)ret < size) {
4156d998 3288 ret = -ENXIO;
06ecc6cb
AE
3289 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3290 size, ret);
662518b1 3291 goto out;
4156d998
AE
3292 }
3293 if (!rbd_dev_ondisk_valid(ondisk)) {
3294 ret = -ENXIO;
06ecc6cb 3295 rbd_warn(rbd_dev, "invalid header");
662518b1 3296 goto out;
81e759fb 3297 }
602adf40 3298
4156d998
AE
3299 names_size = le64_to_cpu(ondisk->snap_names_len);
3300 want_count = snap_count;
3301 snap_count = le32_to_cpu(ondisk->snap_count);
3302 } while (snap_count != want_count);
00f1f36f 3303
662518b1
AE
3304 ret = rbd_header_from_disk(rbd_dev, ondisk);
3305out:
4156d998
AE
3306 kfree(ondisk);
3307
3308 return ret;
602adf40
YS
3309}
3310
15228ede
AE
3311/*
3312 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3313 * has disappeared from the (just updated) snapshot context.
3314 */
3315static void rbd_exists_validate(struct rbd_device *rbd_dev)
3316{
3317 u64 snap_id;
3318
3319 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3320 return;
3321
3322 snap_id = rbd_dev->spec->snap_id;
3323 if (snap_id == CEPH_NOSNAP)
3324 return;
3325
3326 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3327 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3328}
3329
cc4a38bd 3330static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 3331{
e627db08 3332 u64 mapping_size;
1fe5e993
AE
3333 int ret;
3334
117973fb 3335 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1fe5e993 3336 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3b5cf2a2 3337 mapping_size = rbd_dev->mapping.size;
117973fb 3338 if (rbd_dev->image_format == 1)
99a41ebc 3339 ret = rbd_dev_v1_header_info(rbd_dev);
117973fb 3340 else
2df3fac7 3341 ret = rbd_dev_v2_header_info(rbd_dev);
15228ede
AE
3342
3343 /* If it's a mapped snapshot, validate its EXISTS flag */
3344
3345 rbd_exists_validate(rbd_dev);
1fe5e993 3346 mutex_unlock(&ctl_mutex);
00a653e2
AE
3347 if (mapping_size != rbd_dev->mapping.size) {
3348 sector_t size;
3349
3350 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3351 dout("setting size to %llu sectors", (unsigned long long)size);
3352 set_capacity(rbd_dev->disk, size);
a3fbe5d4 3353 revalidate_disk(rbd_dev->disk);
00a653e2 3354 }
1fe5e993
AE
3355
3356 return ret;
3357}
3358
602adf40
YS
3359static int rbd_init_disk(struct rbd_device *rbd_dev)
3360{
3361 struct gendisk *disk;
3362 struct request_queue *q;
593a9e7b 3363 u64 segment_size;
602adf40 3364
602adf40 3365 /* create gendisk info */
602adf40
YS
3366 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3367 if (!disk)
1fcdb8aa 3368 return -ENOMEM;
602adf40 3369
f0f8cef5 3370 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3371 rbd_dev->dev_id);
602adf40
YS
3372 disk->major = rbd_dev->major;
3373 disk->first_minor = 0;
3374 disk->fops = &rbd_bd_ops;
3375 disk->private_data = rbd_dev;
3376
bf0d5f50 3377 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
3378 if (!q)
3379 goto out_disk;
029bcbd8 3380
593a9e7b
AE
3381 /* We use the default size, but let's be explicit about it. */
3382 blk_queue_physical_block_size(q, SECTOR_SIZE);
3383
029bcbd8 3384 /* set io sizes to object size */
593a9e7b
AE
3385 segment_size = rbd_obj_bytes(&rbd_dev->header);
3386 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3387 blk_queue_max_segment_size(q, segment_size);
3388 blk_queue_io_min(q, segment_size);
3389 blk_queue_io_opt(q, segment_size);
029bcbd8 3390
602adf40
YS
3391 blk_queue_merge_bvec(q, rbd_merge_bvec);
3392 disk->queue = q;
3393
3394 q->queuedata = rbd_dev;
3395
3396 rbd_dev->disk = disk;
602adf40 3397
602adf40 3398 return 0;
602adf40
YS
3399out_disk:
3400 put_disk(disk);
1fcdb8aa
AE
3401
3402 return -ENOMEM;
602adf40
YS
3403}
3404
dfc5606d
YS
3405/*
3406 sysfs
3407*/
3408
593a9e7b
AE
3409static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3410{
3411 return container_of(dev, struct rbd_device, dev);
3412}
3413
dfc5606d
YS
3414static ssize_t rbd_size_show(struct device *dev,
3415 struct device_attribute *attr, char *buf)
3416{
593a9e7b 3417 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 3418
fc71d833
AE
3419 return sprintf(buf, "%llu\n",
3420 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
3421}
3422
34b13184
AE
3423/*
3424 * Note this shows the features for whatever's mapped, which is not
3425 * necessarily the base image.
3426 */
3427static ssize_t rbd_features_show(struct device *dev,
3428 struct device_attribute *attr, char *buf)
3429{
3430 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3431
3432 return sprintf(buf, "0x%016llx\n",
fc71d833 3433 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
3434}
3435
dfc5606d
YS
3436static ssize_t rbd_major_show(struct device *dev,
3437 struct device_attribute *attr, char *buf)
3438{
593a9e7b 3439 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 3440
fc71d833
AE
3441 if (rbd_dev->major)
3442 return sprintf(buf, "%d\n", rbd_dev->major);
3443
3444 return sprintf(buf, "(none)\n");
3445
dfc5606d
YS
3446}
3447
3448static ssize_t rbd_client_id_show(struct device *dev,
3449 struct device_attribute *attr, char *buf)
602adf40 3450{
593a9e7b 3451 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3452
1dbb4399
AE
3453 return sprintf(buf, "client%lld\n",
3454 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
3455}
3456
dfc5606d
YS
3457static ssize_t rbd_pool_show(struct device *dev,
3458 struct device_attribute *attr, char *buf)
602adf40 3459{
593a9e7b 3460 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3461
0d7dbfce 3462 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
3463}
3464
9bb2f334
AE
3465static ssize_t rbd_pool_id_show(struct device *dev,
3466 struct device_attribute *attr, char *buf)
3467{
3468 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3469
0d7dbfce 3470 return sprintf(buf, "%llu\n",
fc71d833 3471 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
3472}
3473
dfc5606d
YS
3474static ssize_t rbd_name_show(struct device *dev,
3475 struct device_attribute *attr, char *buf)
3476{
593a9e7b 3477 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3478
a92ffdf8
AE
3479 if (rbd_dev->spec->image_name)
3480 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3481
3482 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
3483}
3484
589d30e0
AE
3485static ssize_t rbd_image_id_show(struct device *dev,
3486 struct device_attribute *attr, char *buf)
3487{
3488 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3489
0d7dbfce 3490 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
3491}
3492
34b13184
AE
3493/*
3494 * Shows the name of the currently-mapped snapshot (or
3495 * RBD_SNAP_HEAD_NAME for the base image).
3496 */
dfc5606d
YS
3497static ssize_t rbd_snap_show(struct device *dev,
3498 struct device_attribute *attr,
3499 char *buf)
3500{
593a9e7b 3501 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3502
0d7dbfce 3503 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
3504}
3505
86b00e0d
AE
3506/*
3507 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3508 * for the parent image. If there is no parent, simply shows
3509 * "(no parent image)".
3510 */
3511static ssize_t rbd_parent_show(struct device *dev,
3512 struct device_attribute *attr,
3513 char *buf)
3514{
3515 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3516 struct rbd_spec *spec = rbd_dev->parent_spec;
3517 int count;
3518 char *bufp = buf;
3519
3520 if (!spec)
3521 return sprintf(buf, "(no parent image)\n");
3522
3523 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3524 (unsigned long long) spec->pool_id, spec->pool_name);
3525 if (count < 0)
3526 return count;
3527 bufp += count;
3528
3529 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3530 spec->image_name ? spec->image_name : "(unknown)");
3531 if (count < 0)
3532 return count;
3533 bufp += count;
3534
3535 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3536 (unsigned long long) spec->snap_id, spec->snap_name);
3537 if (count < 0)
3538 return count;
3539 bufp += count;
3540
3541 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3542 if (count < 0)
3543 return count;
3544 bufp += count;
3545
3546 return (ssize_t) (bufp - buf);
3547}
3548
dfc5606d
YS
3549static ssize_t rbd_image_refresh(struct device *dev,
3550 struct device_attribute *attr,
3551 const char *buf,
3552 size_t size)
3553{
593a9e7b 3554 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 3555 int ret;
602adf40 3556
cc4a38bd 3557 ret = rbd_dev_refresh(rbd_dev);
e627db08
AE
3558 if (ret)
3559 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
b813623a
AE
3560
3561 return ret < 0 ? ret : size;
dfc5606d 3562}
602adf40 3563
dfc5606d 3564static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 3565static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
3566static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3567static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3568static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 3569static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 3570static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 3571static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
3572static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3573static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 3574static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
3575
3576static struct attribute *rbd_attrs[] = {
3577 &dev_attr_size.attr,
34b13184 3578 &dev_attr_features.attr,
dfc5606d
YS
3579 &dev_attr_major.attr,
3580 &dev_attr_client_id.attr,
3581 &dev_attr_pool.attr,
9bb2f334 3582 &dev_attr_pool_id.attr,
dfc5606d 3583 &dev_attr_name.attr,
589d30e0 3584 &dev_attr_image_id.attr,
dfc5606d 3585 &dev_attr_current_snap.attr,
86b00e0d 3586 &dev_attr_parent.attr,
dfc5606d 3587 &dev_attr_refresh.attr,
dfc5606d
YS
3588 NULL
3589};
3590
3591static struct attribute_group rbd_attr_group = {
3592 .attrs = rbd_attrs,
3593};
3594
3595static const struct attribute_group *rbd_attr_groups[] = {
3596 &rbd_attr_group,
3597 NULL
3598};
3599
3600static void rbd_sysfs_dev_release(struct device *dev)
3601{
3602}
3603
3604static struct device_type rbd_device_type = {
3605 .name = "rbd",
3606 .groups = rbd_attr_groups,
3607 .release = rbd_sysfs_dev_release,
3608};
3609
8b8fb99c
AE
3610static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3611{
3612 kref_get(&spec->kref);
3613
3614 return spec;
3615}
3616
3617static void rbd_spec_free(struct kref *kref);
3618static void rbd_spec_put(struct rbd_spec *spec)
3619{
3620 if (spec)
3621 kref_put(&spec->kref, rbd_spec_free);
3622}
3623
3624static struct rbd_spec *rbd_spec_alloc(void)
3625{
3626 struct rbd_spec *spec;
3627
3628 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3629 if (!spec)
3630 return NULL;
3631 kref_init(&spec->kref);
3632
8b8fb99c
AE
3633 return spec;
3634}
3635
3636static void rbd_spec_free(struct kref *kref)
3637{
3638 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3639
3640 kfree(spec->pool_name);
3641 kfree(spec->image_id);
3642 kfree(spec->image_name);
3643 kfree(spec->snap_name);
3644 kfree(spec);
3645}
3646
cc344fa1 3647static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
3648 struct rbd_spec *spec)
3649{
3650 struct rbd_device *rbd_dev;
3651
3652 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3653 if (!rbd_dev)
3654 return NULL;
3655
3656 spin_lock_init(&rbd_dev->lock);
6d292906 3657 rbd_dev->flags = 0;
a2acd00e 3658 atomic_set(&rbd_dev->parent_ref, 0);
c53d5893 3659 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
3660 init_rwsem(&rbd_dev->header_rwsem);
3661
3662 rbd_dev->spec = spec;
3663 rbd_dev->rbd_client = rbdc;
3664
0903e875
AE
3665 /* Initialize the layout used for all rbd requests */
3666
3667 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3668 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3669 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3670 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3671
c53d5893
AE
3672 return rbd_dev;
3673}
3674
3675static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3676{
c53d5893
AE
3677 rbd_put_client(rbd_dev->rbd_client);
3678 rbd_spec_put(rbd_dev->spec);
3679 kfree(rbd_dev);
3680}
3681
9d475de5
AE
3682/*
3683 * Get the size and object order for an image snapshot, or if
3684 * snap_id is CEPH_NOSNAP, gets this information for the base
3685 * image.
3686 */
3687static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3688 u8 *order, u64 *snap_size)
3689{
3690 __le64 snapid = cpu_to_le64(snap_id);
3691 int ret;
3692 struct {
3693 u8 order;
3694 __le64 size;
3695 } __attribute__ ((packed)) size_buf = { 0 };
3696
36be9a76 3697 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5 3698 "rbd", "get_size",
4157976b 3699 &snapid, sizeof (snapid),
e2a58ee5 3700 &size_buf, sizeof (size_buf));
36be9a76 3701 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
3702 if (ret < 0)
3703 return ret;
57385b51
AE
3704 if (ret < sizeof (size_buf))
3705 return -ERANGE;
9d475de5 3706
c86f86e9
AE
3707 if (order)
3708 *order = size_buf.order;
9d475de5
AE
3709 *snap_size = le64_to_cpu(size_buf.size);
3710
3711 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
57385b51
AE
3712 (unsigned long long)snap_id, (unsigned int)*order,
3713 (unsigned long long)*snap_size);
9d475de5
AE
3714
3715 return 0;
3716}
3717
3718static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3719{
3720 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3721 &rbd_dev->header.obj_order,
3722 &rbd_dev->header.image_size);
3723}
3724
1e130199
AE
3725static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3726{
3727 void *reply_buf;
3728 int ret;
3729 void *p;
3730
3731 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3732 if (!reply_buf)
3733 return -ENOMEM;
3734
36be9a76 3735 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3736 "rbd", "get_object_prefix", NULL, 0,
e2a58ee5 3737 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 3738 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
3739 if (ret < 0)
3740 goto out;
3741
3742 p = reply_buf;
3743 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
3744 p + ret, NULL, GFP_NOIO);
3745 ret = 0;
1e130199
AE
3746
3747 if (IS_ERR(rbd_dev->header.object_prefix)) {
3748 ret = PTR_ERR(rbd_dev->header.object_prefix);
3749 rbd_dev->header.object_prefix = NULL;
3750 } else {
3751 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3752 }
1e130199
AE
3753out:
3754 kfree(reply_buf);
3755
3756 return ret;
3757}
3758
b1b5402a
AE
3759static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3760 u64 *snap_features)
3761{
3762 __le64 snapid = cpu_to_le64(snap_id);
3763 struct {
3764 __le64 features;
3765 __le64 incompat;
4157976b 3766 } __attribute__ ((packed)) features_buf = { 0 };
d889140c 3767 u64 incompat;
b1b5402a
AE
3768 int ret;
3769
36be9a76 3770 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a 3771 "rbd", "get_features",
4157976b 3772 &snapid, sizeof (snapid),
e2a58ee5 3773 &features_buf, sizeof (features_buf));
36be9a76 3774 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
3775 if (ret < 0)
3776 return ret;
57385b51
AE
3777 if (ret < sizeof (features_buf))
3778 return -ERANGE;
d889140c
AE
3779
3780 incompat = le64_to_cpu(features_buf.incompat);
5cbf6f12 3781 if (incompat & ~RBD_FEATURES_SUPPORTED)
b8f5c6ed 3782 return -ENXIO;
d889140c 3783
b1b5402a
AE
3784 *snap_features = le64_to_cpu(features_buf.features);
3785
3786 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
3787 (unsigned long long)snap_id,
3788 (unsigned long long)*snap_features,
3789 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
3790
3791 return 0;
3792}
3793
3794static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3795{
3796 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3797 &rbd_dev->header.features);
3798}
3799
86b00e0d
AE
3800static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3801{
3802 struct rbd_spec *parent_spec;
3803 size_t size;
3804 void *reply_buf = NULL;
3805 __le64 snapid;
3806 void *p;
3807 void *end;
642a2537 3808 u64 pool_id;
86b00e0d 3809 char *image_id;
3b5cf2a2 3810 u64 snap_id;
86b00e0d 3811 u64 overlap;
86b00e0d
AE
3812 int ret;
3813
3814 parent_spec = rbd_spec_alloc();
3815 if (!parent_spec)
3816 return -ENOMEM;
3817
3818 size = sizeof (__le64) + /* pool_id */
3819 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3820 sizeof (__le64) + /* snap_id */
3821 sizeof (__le64); /* overlap */
3822 reply_buf = kmalloc(size, GFP_KERNEL);
3823 if (!reply_buf) {
3824 ret = -ENOMEM;
3825 goto out_err;
3826 }
3827
3828 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 3829 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d 3830 "rbd", "get_parent",
4157976b 3831 &snapid, sizeof (snapid),
e2a58ee5 3832 reply_buf, size);
36be9a76 3833 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
3834 if (ret < 0)
3835 goto out_err;
3836
86b00e0d 3837 p = reply_buf;
57385b51
AE
3838 end = reply_buf + ret;
3839 ret = -ERANGE;
642a2537 3840 ceph_decode_64_safe(&p, end, pool_id, out_err);
392a9dad
AE
3841 if (pool_id == CEPH_NOPOOL) {
3842 /*
3843 * Either the parent never existed, or we have
3844 * record of it but the image got flattened so it no
3845 * longer has a parent. When the parent of a
3846 * layered image disappears we immediately set the
3847 * overlap to 0. The effect of this is that all new
3848 * requests will be treated as if the image had no
3849 * parent.
3850 */
3851 if (rbd_dev->parent_overlap) {
3852 rbd_dev->parent_overlap = 0;
3853 smp_mb();
3854 rbd_dev_parent_put(rbd_dev);
3855 pr_info("%s: clone image has been flattened\n",
3856 rbd_dev->disk->disk_name);
3857 }
3858
86b00e0d 3859 goto out; /* No parent? No problem. */
392a9dad 3860 }
86b00e0d 3861
0903e875
AE
3862 /* The ceph file layout needs to fit pool id in 32 bits */
3863
3864 ret = -EIO;
642a2537 3865 if (pool_id > (u64)U32_MAX) {
c0cd10db 3866 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
642a2537 3867 (unsigned long long)pool_id, U32_MAX);
57385b51 3868 goto out_err;
c0cd10db 3869 }
0903e875 3870
979ed480 3871 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
3872 if (IS_ERR(image_id)) {
3873 ret = PTR_ERR(image_id);
3874 goto out_err;
3875 }
3b5cf2a2 3876 ceph_decode_64_safe(&p, end, snap_id, out_err);
86b00e0d
AE
3877 ceph_decode_64_safe(&p, end, overlap, out_err);
3878
3b5cf2a2
AE
3879 /*
3880 * The parent won't change (except when the clone is
3881 * flattened, already handled that). So we only need to
3882 * record the parent spec we have not already done so.
3883 */
3884 if (!rbd_dev->parent_spec) {
3885 parent_spec->pool_id = pool_id;
3886 parent_spec->image_id = image_id;
3887 parent_spec->snap_id = snap_id;
70cf49cf
AE
3888 rbd_dev->parent_spec = parent_spec;
3889 parent_spec = NULL; /* rbd_dev now owns this */
3b5cf2a2
AE
3890 }
3891
3892 /*
3893 * We always update the parent overlap. If it's zero we
3894 * treat it specially.
3895 */
3896 rbd_dev->parent_overlap = overlap;
3897 smp_mb();
3898 if (!overlap) {
3899
3900 /* A null parent_spec indicates it's the initial probe */
3901
3902 if (parent_spec) {
3903 /*
3904 * The overlap has become zero, so the clone
3905 * must have been resized down to 0 at some
3906 * point. Treat this the same as a flatten.
3907 */
3908 rbd_dev_parent_put(rbd_dev);
3909 pr_info("%s: clone image now standalone\n",
3910 rbd_dev->disk->disk_name);
3911 } else {
3912 /*
3913 * For the initial probe, if we find the
3914 * overlap is zero we just pretend there was
3915 * no parent image.
3916 */
3917 rbd_warn(rbd_dev, "ignoring parent of "
3918 "clone with overlap 0\n");
3919 }
70cf49cf 3920 }
86b00e0d
AE
3921out:
3922 ret = 0;
3923out_err:
3924 kfree(reply_buf);
3925 rbd_spec_put(parent_spec);
3926
3927 return ret;
3928}
3929
cc070d59
AE
3930static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3931{
3932 struct {
3933 __le64 stripe_unit;
3934 __le64 stripe_count;
3935 } __attribute__ ((packed)) striping_info_buf = { 0 };
3936 size_t size = sizeof (striping_info_buf);
3937 void *p;
3938 u64 obj_size;
3939 u64 stripe_unit;
3940 u64 stripe_count;
3941 int ret;
3942
3943 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3944 "rbd", "get_stripe_unit_count", NULL, 0,
e2a58ee5 3945 (char *)&striping_info_buf, size);
cc070d59
AE
3946 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3947 if (ret < 0)
3948 return ret;
3949 if (ret < size)
3950 return -ERANGE;
3951
3952 /*
3953 * We don't actually support the "fancy striping" feature
3954 * (STRIPINGV2) yet, but if the striping sizes are the
3955 * defaults the behavior is the same as before. So find
3956 * out, and only fail if the image has non-default values.
3957 */
3958 ret = -EINVAL;
3959 obj_size = (u64)1 << rbd_dev->header.obj_order;
3960 p = &striping_info_buf;
3961 stripe_unit = ceph_decode_64(&p);
3962 if (stripe_unit != obj_size) {
3963 rbd_warn(rbd_dev, "unsupported stripe unit "
3964 "(got %llu want %llu)",
3965 stripe_unit, obj_size);
3966 return -EINVAL;
3967 }
3968 stripe_count = ceph_decode_64(&p);
3969 if (stripe_count != 1) {
3970 rbd_warn(rbd_dev, "unsupported stripe count "
3971 "(got %llu want 1)", stripe_count);
3972 return -EINVAL;
3973 }
500d0c0f
AE
3974 rbd_dev->header.stripe_unit = stripe_unit;
3975 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
3976
3977 return 0;
3978}
3979
9e15b77d
AE
3980static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3981{
3982 size_t image_id_size;
3983 char *image_id;
3984 void *p;
3985 void *end;
3986 size_t size;
3987 void *reply_buf = NULL;
3988 size_t len = 0;
3989 char *image_name = NULL;
3990 int ret;
3991
3992 rbd_assert(!rbd_dev->spec->image_name);
3993
69e7a02f
AE
3994 len = strlen(rbd_dev->spec->image_id);
3995 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
3996 image_id = kmalloc(image_id_size, GFP_KERNEL);
3997 if (!image_id)
3998 return NULL;
3999
4000 p = image_id;
4157976b 4001 end = image_id + image_id_size;
57385b51 4002 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
4003
4004 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4005 reply_buf = kmalloc(size, GFP_KERNEL);
4006 if (!reply_buf)
4007 goto out;
4008
36be9a76 4009 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
4010 "rbd", "dir_get_name",
4011 image_id, image_id_size,
e2a58ee5 4012 reply_buf, size);
9e15b77d
AE
4013 if (ret < 0)
4014 goto out;
4015 p = reply_buf;
f40eb349
AE
4016 end = reply_buf + ret;
4017
9e15b77d
AE
4018 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4019 if (IS_ERR(image_name))
4020 image_name = NULL;
4021 else
4022 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4023out:
4024 kfree(reply_buf);
4025 kfree(image_id);
4026
4027 return image_name;
4028}
4029
2ad3d716
AE
4030static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4031{
4032 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4033 const char *snap_name;
4034 u32 which = 0;
4035
4036 /* Skip over names until we find the one we are looking for */
4037
4038 snap_name = rbd_dev->header.snap_names;
4039 while (which < snapc->num_snaps) {
4040 if (!strcmp(name, snap_name))
4041 return snapc->snaps[which];
4042 snap_name += strlen(snap_name) + 1;
4043 which++;
4044 }
4045 return CEPH_NOSNAP;
4046}
4047
4048static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4049{
4050 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4051 u32 which;
4052 bool found = false;
4053 u64 snap_id;
4054
4055 for (which = 0; !found && which < snapc->num_snaps; which++) {
4056 const char *snap_name;
4057
4058 snap_id = snapc->snaps[which];
4059 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4060 if (IS_ERR(snap_name))
4061 break;
4062 found = !strcmp(name, snap_name);
4063 kfree(snap_name);
4064 }
4065 return found ? snap_id : CEPH_NOSNAP;
4066}
4067
4068/*
4069 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4070 * no snapshot by that name is found, or if an error occurs.
4071 */
4072static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4073{
4074 if (rbd_dev->image_format == 1)
4075 return rbd_v1_snap_id_by_name(rbd_dev, name);
4076
4077 return rbd_v2_snap_id_by_name(rbd_dev, name);
4078}
4079
9e15b77d 4080/*
2e9f7f1c
AE
4081 * When an rbd image has a parent image, it is identified by the
4082 * pool, image, and snapshot ids (not names). This function fills
4083 * in the names for those ids. (It's OK if we can't figure out the
4084 * name for an image id, but the pool and snapshot ids should always
4085 * exist and have names.) All names in an rbd spec are dynamically
4086 * allocated.
e1d4213f
AE
4087 *
4088 * When an image being mapped (not a parent) is probed, we have the
4089 * pool name and pool id, image name and image id, and the snapshot
4090 * name. The only thing we're missing is the snapshot id.
9e15b77d 4091 */
2e9f7f1c 4092static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
9e15b77d 4093{
2e9f7f1c
AE
4094 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4095 struct rbd_spec *spec = rbd_dev->spec;
4096 const char *pool_name;
4097 const char *image_name;
4098 const char *snap_name;
9e15b77d
AE
4099 int ret;
4100
e1d4213f
AE
4101 /*
4102 * An image being mapped will have the pool name (etc.), but
4103 * we need to look up the snapshot id.
4104 */
2e9f7f1c
AE
4105 if (spec->pool_name) {
4106 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
2ad3d716 4107 u64 snap_id;
e1d4213f 4108
2ad3d716
AE
4109 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4110 if (snap_id == CEPH_NOSNAP)
e1d4213f 4111 return -ENOENT;
2ad3d716 4112 spec->snap_id = snap_id;
e1d4213f 4113 } else {
2e9f7f1c 4114 spec->snap_id = CEPH_NOSNAP;
e1d4213f
AE
4115 }
4116
4117 return 0;
4118 }
9e15b77d 4119
2e9f7f1c 4120 /* Get the pool name; we have to make our own copy of this */
9e15b77d 4121
2e9f7f1c
AE
4122 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4123 if (!pool_name) {
4124 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
4125 return -EIO;
4126 }
2e9f7f1c
AE
4127 pool_name = kstrdup(pool_name, GFP_KERNEL);
4128 if (!pool_name)
9e15b77d
AE
4129 return -ENOMEM;
4130
4131 /* Fetch the image name; tolerate failure here */
4132
2e9f7f1c
AE
4133 image_name = rbd_dev_image_name(rbd_dev);
4134 if (!image_name)
06ecc6cb 4135 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 4136
2e9f7f1c 4137 /* Look up the snapshot name, and make a copy */
9e15b77d 4138
2e9f7f1c 4139 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
2e9f7f1c
AE
4140 if (!snap_name) {
4141 ret = -ENOMEM;
9e15b77d 4142 goto out_err;
2e9f7f1c
AE
4143 }
4144
4145 spec->pool_name = pool_name;
4146 spec->image_name = image_name;
4147 spec->snap_name = snap_name;
9e15b77d
AE
4148
4149 return 0;
4150out_err:
2e9f7f1c
AE
4151 kfree(image_name);
4152 kfree(pool_name);
9e15b77d
AE
4153
4154 return ret;
4155}
4156
cc4a38bd 4157static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
4158{
4159 size_t size;
4160 int ret;
4161 void *reply_buf;
4162 void *p;
4163 void *end;
4164 u64 seq;
4165 u32 snap_count;
4166 struct ceph_snap_context *snapc;
4167 u32 i;
4168
4169 /*
4170 * We'll need room for the seq value (maximum snapshot id),
4171 * snapshot count, and array of that many snapshot ids.
4172 * For now we have a fixed upper limit on the number we're
4173 * prepared to receive.
4174 */
4175 size = sizeof (__le64) + sizeof (__le32) +
4176 RBD_MAX_SNAP_COUNT * sizeof (__le64);
4177 reply_buf = kzalloc(size, GFP_KERNEL);
4178 if (!reply_buf)
4179 return -ENOMEM;
4180
36be9a76 4181 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 4182 "rbd", "get_snapcontext", NULL, 0,
e2a58ee5 4183 reply_buf, size);
36be9a76 4184 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
4185 if (ret < 0)
4186 goto out;
4187
35d489f9 4188 p = reply_buf;
57385b51
AE
4189 end = reply_buf + ret;
4190 ret = -ERANGE;
35d489f9
AE
4191 ceph_decode_64_safe(&p, end, seq, out);
4192 ceph_decode_32_safe(&p, end, snap_count, out);
4193
4194 /*
4195 * Make sure the reported number of snapshot ids wouldn't go
4196 * beyond the end of our buffer. But before checking that,
4197 * make sure the computed size of the snapshot context we
4198 * allocate is representable in a size_t.
4199 */
4200 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4201 / sizeof (u64)) {
4202 ret = -EINVAL;
4203 goto out;
4204 }
4205 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4206 goto out;
468521c1 4207 ret = 0;
35d489f9 4208
812164f8 4209 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
4210 if (!snapc) {
4211 ret = -ENOMEM;
4212 goto out;
4213 }
35d489f9 4214 snapc->seq = seq;
35d489f9
AE
4215 for (i = 0; i < snap_count; i++)
4216 snapc->snaps[i] = ceph_decode_64(&p);
4217
49ece554 4218 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
4219 rbd_dev->header.snapc = snapc;
4220
4221 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 4222 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
4223out:
4224 kfree(reply_buf);
4225
57385b51 4226 return ret;
35d489f9
AE
4227}
4228
54cac61f
AE
4229static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4230 u64 snap_id)
b8b1e2db
AE
4231{
4232 size_t size;
4233 void *reply_buf;
54cac61f 4234 __le64 snapid;
b8b1e2db
AE
4235 int ret;
4236 void *p;
4237 void *end;
b8b1e2db
AE
4238 char *snap_name;
4239
4240 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4241 reply_buf = kmalloc(size, GFP_KERNEL);
4242 if (!reply_buf)
4243 return ERR_PTR(-ENOMEM);
4244
54cac61f 4245 snapid = cpu_to_le64(snap_id);
36be9a76 4246 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db 4247 "rbd", "get_snapshot_name",
54cac61f 4248 &snapid, sizeof (snapid),
e2a58ee5 4249 reply_buf, size);
36be9a76 4250 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
4251 if (ret < 0) {
4252 snap_name = ERR_PTR(ret);
b8b1e2db 4253 goto out;
f40eb349 4254 }
b8b1e2db
AE
4255
4256 p = reply_buf;
f40eb349 4257 end = reply_buf + ret;
e5c35534 4258 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 4259 if (IS_ERR(snap_name))
b8b1e2db 4260 goto out;
b8b1e2db 4261
f40eb349 4262 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 4263 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
4264out:
4265 kfree(reply_buf);
4266
f40eb349 4267 return snap_name;
b8b1e2db
AE
4268}
4269
2df3fac7 4270static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 4271{
2df3fac7 4272 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 4273 int ret;
117973fb
AE
4274
4275 down_write(&rbd_dev->header_rwsem);
4276
1617e40c
JD
4277 ret = rbd_dev_v2_image_size(rbd_dev);
4278 if (ret)
4279 goto out;
4280
2df3fac7
AE
4281 if (first_time) {
4282 ret = rbd_dev_v2_header_onetime(rbd_dev);
4283 if (ret)
4284 goto out;
4285 }
4286
642a2537
AE
4287 /*
4288 * If the image supports layering, get the parent info. We
4289 * need to probe the first time regardless. Thereafter we
4290 * only need to if there's a parent, to see if it has
4291 * disappeared due to the mapped image getting flattened.
4292 */
4293 if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4294 (first_time || rbd_dev->parent_spec)) {
4295 bool warn;
4296
4297 ret = rbd_dev_v2_parent_info(rbd_dev);
4298 if (ret)
4299 goto out;
4300
4301 /*
4302 * Print a warning if this is the initial probe and
4303 * the image has a parent. Don't print it if the
4304 * image now being probed is itself a parent. We
4305 * can tell at this point because we won't know its
4306 * pool name yet (just its pool id).
4307 */
4308 warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4309 if (first_time && warn)
4310 rbd_warn(rbd_dev, "WARNING: kernel layering "
4311 "is EXPERIMENTAL!");
4312 }
4313
29334ba4
AE
4314 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4315 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4316 rbd_dev->mapping.size = rbd_dev->header.image_size;
117973fb 4317
cc4a38bd 4318 ret = rbd_dev_v2_snap_context(rbd_dev);
117973fb 4319 dout("rbd_dev_v2_snap_context returned %d\n", ret);
117973fb
AE
4320out:
4321 up_write(&rbd_dev->header_rwsem);
4322
4323 return ret;
4324}
4325
dfc5606d
YS
4326static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4327{
dfc5606d 4328 struct device *dev;
cd789ab9 4329 int ret;
dfc5606d 4330
cd789ab9 4331 dev = &rbd_dev->dev;
dfc5606d
YS
4332 dev->bus = &rbd_bus_type;
4333 dev->type = &rbd_device_type;
4334 dev->parent = &rbd_root_dev;
200a6a8b 4335 dev->release = rbd_dev_device_release;
de71a297 4336 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 4337 ret = device_register(dev);
dfc5606d 4338
dfc5606d 4339 return ret;
602adf40
YS
4340}
4341
dfc5606d
YS
4342static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4343{
4344 device_unregister(&rbd_dev->dev);
4345}
4346
e2839308 4347static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
4348
4349/*
499afd5b
AE
4350 * Get a unique rbd identifier for the given new rbd_dev, and add
4351 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 4352 */
e2839308 4353static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 4354{
e2839308 4355 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
4356
4357 spin_lock(&rbd_dev_list_lock);
4358 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4359 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
4360 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4361 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 4362}
b7f23c36 4363
1ddbe94e 4364/*
499afd5b
AE
4365 * Remove an rbd_dev from the global list, and record that its
4366 * identifier is no longer in use.
1ddbe94e 4367 */
e2839308 4368static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 4369{
d184f6bf 4370 struct list_head *tmp;
de71a297 4371 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
4372 int max_id;
4373
aafb230e 4374 rbd_assert(rbd_id > 0);
499afd5b 4375
e2839308
AE
4376 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4377 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
4378 spin_lock(&rbd_dev_list_lock);
4379 list_del_init(&rbd_dev->node);
d184f6bf
AE
4380
4381 /*
4382 * If the id being "put" is not the current maximum, there
4383 * is nothing special we need to do.
4384 */
e2839308 4385 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
4386 spin_unlock(&rbd_dev_list_lock);
4387 return;
4388 }
4389
4390 /*
4391 * We need to update the current maximum id. Search the
4392 * list to find out what it is. We're more likely to find
4393 * the maximum at the end, so search the list backward.
4394 */
4395 max_id = 0;
4396 list_for_each_prev(tmp, &rbd_dev_list) {
4397 struct rbd_device *rbd_dev;
4398
4399 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
4400 if (rbd_dev->dev_id > max_id)
4401 max_id = rbd_dev->dev_id;
d184f6bf 4402 }
499afd5b 4403 spin_unlock(&rbd_dev_list_lock);
b7f23c36 4404
1ddbe94e 4405 /*
e2839308 4406 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
4407 * which case it now accurately reflects the new maximum.
4408 * Be careful not to overwrite the maximum value in that
4409 * case.
1ddbe94e 4410 */
e2839308
AE
4411 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4412 dout(" max dev id has been reset\n");
b7f23c36
AE
4413}
4414
e28fff26
AE
4415/*
4416 * Skips over white space at *buf, and updates *buf to point to the
4417 * first found non-space character (if any). Returns the length of
593a9e7b
AE
4418 * the token (string of non-white space characters) found. Note
4419 * that *buf must be terminated with '\0'.
e28fff26
AE
4420 */
4421static inline size_t next_token(const char **buf)
4422{
4423 /*
4424 * These are the characters that produce nonzero for
4425 * isspace() in the "C" and "POSIX" locales.
4426 */
4427 const char *spaces = " \f\n\r\t\v";
4428
4429 *buf += strspn(*buf, spaces); /* Find start of token */
4430
4431 return strcspn(*buf, spaces); /* Return token length */
4432}
4433
4434/*
4435 * Finds the next token in *buf, and if the provided token buffer is
4436 * big enough, copies the found token into it. The result, if
593a9e7b
AE
4437 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4438 * must be terminated with '\0' on entry.
e28fff26
AE
4439 *
4440 * Returns the length of the token found (not including the '\0').
4441 * Return value will be 0 if no token is found, and it will be >=
4442 * token_size if the token would not fit.
4443 *
593a9e7b 4444 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
4445 * found token. Note that this occurs even if the token buffer is
4446 * too small to hold it.
4447 */
4448static inline size_t copy_token(const char **buf,
4449 char *token,
4450 size_t token_size)
4451{
4452 size_t len;
4453
4454 len = next_token(buf);
4455 if (len < token_size) {
4456 memcpy(token, *buf, len);
4457 *(token + len) = '\0';
4458 }
4459 *buf += len;
4460
4461 return len;
4462}
4463
ea3352f4
AE
4464/*
4465 * Finds the next token in *buf, dynamically allocates a buffer big
4466 * enough to hold a copy of it, and copies the token into the new
4467 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4468 * that a duplicate buffer is created even for a zero-length token.
4469 *
4470 * Returns a pointer to the newly-allocated duplicate, or a null
4471 * pointer if memory for the duplicate was not available. If
4472 * the lenp argument is a non-null pointer, the length of the token
4473 * (not including the '\0') is returned in *lenp.
4474 *
4475 * If successful, the *buf pointer will be updated to point beyond
4476 * the end of the found token.
4477 *
4478 * Note: uses GFP_KERNEL for allocation.
4479 */
4480static inline char *dup_token(const char **buf, size_t *lenp)
4481{
4482 char *dup;
4483 size_t len;
4484
4485 len = next_token(buf);
4caf35f9 4486 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
4487 if (!dup)
4488 return NULL;
ea3352f4
AE
4489 *(dup + len) = '\0';
4490 *buf += len;
4491
4492 if (lenp)
4493 *lenp = len;
4494
4495 return dup;
4496}
4497
a725f65e 4498/*
859c31df
AE
4499 * Parse the options provided for an "rbd add" (i.e., rbd image
4500 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4501 * and the data written is passed here via a NUL-terminated buffer.
4502 * Returns 0 if successful or an error code otherwise.
d22f76e7 4503 *
859c31df
AE
4504 * The information extracted from these options is recorded in
4505 * the other parameters which return dynamically-allocated
4506 * structures:
4507 * ceph_opts
4508 * The address of a pointer that will refer to a ceph options
4509 * structure. Caller must release the returned pointer using
4510 * ceph_destroy_options() when it is no longer needed.
4511 * rbd_opts
4512 * Address of an rbd options pointer. Fully initialized by
4513 * this function; caller must release with kfree().
4514 * spec
4515 * Address of an rbd image specification pointer. Fully
4516 * initialized by this function based on parsed options.
4517 * Caller must release with rbd_spec_put().
4518 *
4519 * The options passed take this form:
4520 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4521 * where:
4522 * <mon_addrs>
4523 * A comma-separated list of one or more monitor addresses.
4524 * A monitor address is an ip address, optionally followed
4525 * by a port number (separated by a colon).
4526 * I.e.: ip1[:port1][,ip2[:port2]...]
4527 * <options>
4528 * A comma-separated list of ceph and/or rbd options.
4529 * <pool_name>
4530 * The name of the rados pool containing the rbd image.
4531 * <image_name>
4532 * The name of the image in that pool to map.
4533 * <snap_id>
4534 * An optional snapshot id. If provided, the mapping will
4535 * present data from the image at the time that snapshot was
4536 * created. The image head is used if no snapshot id is
4537 * provided. Snapshot mappings are always read-only.
a725f65e 4538 */
859c31df 4539static int rbd_add_parse_args(const char *buf,
dc79b113 4540 struct ceph_options **ceph_opts,
859c31df
AE
4541 struct rbd_options **opts,
4542 struct rbd_spec **rbd_spec)
e28fff26 4543{
d22f76e7 4544 size_t len;
859c31df 4545 char *options;
0ddebc0c 4546 const char *mon_addrs;
ecb4dc22 4547 char *snap_name;
0ddebc0c 4548 size_t mon_addrs_size;
859c31df 4549 struct rbd_spec *spec = NULL;
4e9afeba 4550 struct rbd_options *rbd_opts = NULL;
859c31df 4551 struct ceph_options *copts;
dc79b113 4552 int ret;
e28fff26
AE
4553
4554 /* The first four tokens are required */
4555
7ef3214a 4556 len = next_token(&buf);
4fb5d671
AE
4557 if (!len) {
4558 rbd_warn(NULL, "no monitor address(es) provided");
4559 return -EINVAL;
4560 }
0ddebc0c 4561 mon_addrs = buf;
f28e565a 4562 mon_addrs_size = len + 1;
7ef3214a 4563 buf += len;
a725f65e 4564
dc79b113 4565 ret = -EINVAL;
f28e565a
AE
4566 options = dup_token(&buf, NULL);
4567 if (!options)
dc79b113 4568 return -ENOMEM;
4fb5d671
AE
4569 if (!*options) {
4570 rbd_warn(NULL, "no options provided");
4571 goto out_err;
4572 }
e28fff26 4573
859c31df
AE
4574 spec = rbd_spec_alloc();
4575 if (!spec)
f28e565a 4576 goto out_mem;
859c31df
AE
4577
4578 spec->pool_name = dup_token(&buf, NULL);
4579 if (!spec->pool_name)
4580 goto out_mem;
4fb5d671
AE
4581 if (!*spec->pool_name) {
4582 rbd_warn(NULL, "no pool name provided");
4583 goto out_err;
4584 }
e28fff26 4585
69e7a02f 4586 spec->image_name = dup_token(&buf, NULL);
859c31df 4587 if (!spec->image_name)
f28e565a 4588 goto out_mem;
4fb5d671
AE
4589 if (!*spec->image_name) {
4590 rbd_warn(NULL, "no image name provided");
4591 goto out_err;
4592 }
d4b125e9 4593
f28e565a
AE
4594 /*
4595 * Snapshot name is optional; default is to use "-"
4596 * (indicating the head/no snapshot).
4597 */
3feeb894 4598 len = next_token(&buf);
820a5f3e 4599 if (!len) {
3feeb894
AE
4600 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4601 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 4602 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 4603 ret = -ENAMETOOLONG;
f28e565a 4604 goto out_err;
849b4260 4605 }
ecb4dc22
AE
4606 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4607 if (!snap_name)
f28e565a 4608 goto out_mem;
ecb4dc22
AE
4609 *(snap_name + len) = '\0';
4610 spec->snap_name = snap_name;
e5c35534 4611
0ddebc0c 4612 /* Initialize all rbd options to the defaults */
e28fff26 4613
4e9afeba
AE
4614 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4615 if (!rbd_opts)
4616 goto out_mem;
4617
4618 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 4619
859c31df 4620 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 4621 mon_addrs + mon_addrs_size - 1,
4e9afeba 4622 parse_rbd_opts_token, rbd_opts);
859c31df
AE
4623 if (IS_ERR(copts)) {
4624 ret = PTR_ERR(copts);
dc79b113
AE
4625 goto out_err;
4626 }
859c31df
AE
4627 kfree(options);
4628
4629 *ceph_opts = copts;
4e9afeba 4630 *opts = rbd_opts;
859c31df 4631 *rbd_spec = spec;
0ddebc0c 4632
dc79b113 4633 return 0;
f28e565a 4634out_mem:
dc79b113 4635 ret = -ENOMEM;
d22f76e7 4636out_err:
859c31df
AE
4637 kfree(rbd_opts);
4638 rbd_spec_put(spec);
f28e565a 4639 kfree(options);
d22f76e7 4640
dc79b113 4641 return ret;
a725f65e
AE
4642}
4643
589d30e0
AE
4644/*
4645 * An rbd format 2 image has a unique identifier, distinct from the
4646 * name given to it by the user. Internally, that identifier is
4647 * what's used to specify the names of objects related to the image.
4648 *
4649 * A special "rbd id" object is used to map an rbd image name to its
4650 * id. If that object doesn't exist, then there is no v2 rbd image
4651 * with the supplied name.
4652 *
4653 * This function will record the given rbd_dev's image_id field if
4654 * it can be determined, and in that case will return 0. If any
4655 * errors occur a negative errno will be returned and the rbd_dev's
4656 * image_id field will be unchanged (and should be NULL).
4657 */
4658static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4659{
4660 int ret;
4661 size_t size;
4662 char *object_name;
4663 void *response;
c0fba368 4664 char *image_id;
2f82ee54 4665
2c0d0a10
AE
4666 /*
4667 * When probing a parent image, the image id is already
4668 * known (and the image name likely is not). There's no
c0fba368
AE
4669 * need to fetch the image id again in this case. We
4670 * do still need to set the image format though.
2c0d0a10 4671 */
c0fba368
AE
4672 if (rbd_dev->spec->image_id) {
4673 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4674
2c0d0a10 4675 return 0;
c0fba368 4676 }
2c0d0a10 4677
589d30e0
AE
4678 /*
4679 * First, see if the format 2 image id file exists, and if
4680 * so, get the image's persistent id from it.
4681 */
69e7a02f 4682 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
4683 object_name = kmalloc(size, GFP_NOIO);
4684 if (!object_name)
4685 return -ENOMEM;
0d7dbfce 4686 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
4687 dout("rbd id object name is %s\n", object_name);
4688
4689 /* Response will be an encoded string, which includes a length */
4690
4691 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4692 response = kzalloc(size, GFP_NOIO);
4693 if (!response) {
4694 ret = -ENOMEM;
4695 goto out;
4696 }
4697
c0fba368
AE
4698 /* If it doesn't exist we'll assume it's a format 1 image */
4699
36be9a76 4700 ret = rbd_obj_method_sync(rbd_dev, object_name,
4157976b 4701 "rbd", "get_id", NULL, 0,
e2a58ee5 4702 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 4703 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
4704 if (ret == -ENOENT) {
4705 image_id = kstrdup("", GFP_KERNEL);
4706 ret = image_id ? 0 : -ENOMEM;
4707 if (!ret)
4708 rbd_dev->image_format = 1;
4709 } else if (ret > sizeof (__le32)) {
4710 void *p = response;
4711
4712 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 4713 NULL, GFP_NOIO);
c0fba368
AE
4714 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4715 if (!ret)
4716 rbd_dev->image_format = 2;
589d30e0 4717 } else {
c0fba368
AE
4718 ret = -EINVAL;
4719 }
4720
4721 if (!ret) {
4722 rbd_dev->spec->image_id = image_id;
4723 dout("image_id is %s\n", image_id);
589d30e0
AE
4724 }
4725out:
4726 kfree(response);
4727 kfree(object_name);
4728
4729 return ret;
4730}
4731
3abef3b3
AE
4732/*
4733 * Undo whatever state changes are made by v1 or v2 header info
4734 * call.
4735 */
6fd48b3b
AE
4736static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4737{
4738 struct rbd_image_header *header;
4739
392a9dad
AE
4740 /* Drop parent reference unless it's already been done (or none) */
4741
4742 if (rbd_dev->parent_overlap)
4743 rbd_dev_parent_put(rbd_dev);
6fd48b3b
AE
4744
4745 /* Free dynamic fields from the header, then zero it out */
4746
4747 header = &rbd_dev->header;
812164f8 4748 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
4749 kfree(header->snap_sizes);
4750 kfree(header->snap_names);
4751 kfree(header->object_prefix);
4752 memset(header, 0, sizeof (*header));
4753}
4754
2df3fac7 4755static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9
AE
4756{
4757 int ret;
a30b71b9 4758
1e130199 4759 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 4760 if (ret)
b1b5402a
AE
4761 goto out_err;
4762
2df3fac7
AE
4763 /*
4764 * Get the and check features for the image. Currently the
4765 * features are assumed to never change.
4766 */
b1b5402a 4767 ret = rbd_dev_v2_features(rbd_dev);
57385b51 4768 if (ret)
9d475de5 4769 goto out_err;
35d489f9 4770
cc070d59
AE
4771 /* If the image supports fancy striping, get its parameters */
4772
4773 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4774 ret = rbd_dev_v2_striping_info(rbd_dev);
4775 if (ret < 0)
4776 goto out_err;
4777 }
2df3fac7 4778 /* No support for crypto and compression type format 2 images */
a30b71b9 4779
35152979 4780 return 0;
9d475de5 4781out_err:
642a2537 4782 rbd_dev->header.features = 0;
1e130199
AE
4783 kfree(rbd_dev->header.object_prefix);
4784 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
4785
4786 return ret;
a30b71b9
AE
4787}
4788
124afba2 4789static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
83a06263 4790{
2f82ee54 4791 struct rbd_device *parent = NULL;
124afba2
AE
4792 struct rbd_spec *parent_spec;
4793 struct rbd_client *rbdc;
4794 int ret;
4795
4796 if (!rbd_dev->parent_spec)
4797 return 0;
4798 /*
4799 * We need to pass a reference to the client and the parent
4800 * spec when creating the parent rbd_dev. Images related by
4801 * parent/child relationships always share both.
4802 */
4803 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4804 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4805
4806 ret = -ENOMEM;
4807 parent = rbd_dev_create(rbdc, parent_spec);
4808 if (!parent)
4809 goto out_err;
4810
1f3ef788 4811 ret = rbd_dev_image_probe(parent, false);
124afba2
AE
4812 if (ret < 0)
4813 goto out_err;
4814 rbd_dev->parent = parent;
a2acd00e 4815 atomic_set(&rbd_dev->parent_ref, 1);
124afba2
AE
4816
4817 return 0;
4818out_err:
4819 if (parent) {
fb65d228 4820 rbd_dev_unparent(rbd_dev);
124afba2
AE
4821 kfree(rbd_dev->header_name);
4822 rbd_dev_destroy(parent);
4823 } else {
4824 rbd_put_client(rbdc);
4825 rbd_spec_put(parent_spec);
4826 }
4827
4828 return ret;
4829}
4830
200a6a8b 4831static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 4832{
83a06263 4833 int ret;
d1cf5788 4834
83a06263
AE
4835 /* generate unique id: find highest unique id, add one */
4836 rbd_dev_id_get(rbd_dev);
4837
4838 /* Fill in the device name, now that we have its id. */
4839 BUILD_BUG_ON(DEV_NAME_LEN
4840 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4841 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4842
4843 /* Get our block major device number. */
4844
4845 ret = register_blkdev(0, rbd_dev->name);
4846 if (ret < 0)
4847 goto err_out_id;
4848 rbd_dev->major = ret;
4849
4850 /* Set up the blkdev mapping. */
4851
4852 ret = rbd_init_disk(rbd_dev);
4853 if (ret)
4854 goto err_out_blkdev;
4855
f35a4dee 4856 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
4857 if (ret)
4858 goto err_out_disk;
f35a4dee
AE
4859 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4860
4861 ret = rbd_bus_add_dev(rbd_dev);
4862 if (ret)
4863 goto err_out_mapping;
83a06263 4864
83a06263
AE
4865 /* Everything's ready. Announce the disk to the world. */
4866
129b79d4 4867 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
83a06263
AE
4868 add_disk(rbd_dev->disk);
4869
4870 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4871 (unsigned long long) rbd_dev->mapping.size);
4872
4873 return ret;
2f82ee54 4874
f35a4dee
AE
4875err_out_mapping:
4876 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4877err_out_disk:
4878 rbd_free_disk(rbd_dev);
4879err_out_blkdev:
4880 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4881err_out_id:
4882 rbd_dev_id_put(rbd_dev);
d1cf5788 4883 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4884
4885 return ret;
4886}
4887
332bb12d
AE
4888static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4889{
4890 struct rbd_spec *spec = rbd_dev->spec;
4891 size_t size;
4892
4893 /* Record the header object name for this rbd image. */
4894
4895 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4896
4897 if (rbd_dev->image_format == 1)
4898 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4899 else
4900 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4901
4902 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4903 if (!rbd_dev->header_name)
4904 return -ENOMEM;
4905
4906 if (rbd_dev->image_format == 1)
4907 sprintf(rbd_dev->header_name, "%s%s",
4908 spec->image_name, RBD_SUFFIX);
4909 else
4910 sprintf(rbd_dev->header_name, "%s%s",
4911 RBD_HEADER_PREFIX, spec->image_id);
4912 return 0;
4913}
4914
200a6a8b
AE
4915static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4916{
6fd48b3b 4917 rbd_dev_unprobe(rbd_dev);
200a6a8b 4918 kfree(rbd_dev->header_name);
6fd48b3b
AE
4919 rbd_dev->header_name = NULL;
4920 rbd_dev->image_format = 0;
4921 kfree(rbd_dev->spec->image_id);
4922 rbd_dev->spec->image_id = NULL;
4923
200a6a8b
AE
4924 rbd_dev_destroy(rbd_dev);
4925}
4926
a30b71b9
AE
4927/*
4928 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
4929 * device. If this image is the one being mapped (i.e., not a
4930 * parent), initiate a watch on its header object before using that
4931 * object to get detailed information about the rbd image.
a30b71b9 4932 */
1f3ef788 4933static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
a30b71b9
AE
4934{
4935 int ret;
b644de2b 4936 int tmp;
a30b71b9
AE
4937
4938 /*
3abef3b3
AE
4939 * Get the id from the image id object. Unless there's an
4940 * error, rbd_dev->spec->image_id will be filled in with
4941 * a dynamically-allocated string, and rbd_dev->image_format
4942 * will be set to either 1 or 2.
a30b71b9
AE
4943 */
4944 ret = rbd_dev_image_id(rbd_dev);
4945 if (ret)
c0fba368
AE
4946 return ret;
4947 rbd_assert(rbd_dev->spec->image_id);
4948 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4949
332bb12d
AE
4950 ret = rbd_dev_header_name(rbd_dev);
4951 if (ret)
4952 goto err_out_format;
4953
1f3ef788
AE
4954 if (mapping) {
4955 ret = rbd_dev_header_watch_sync(rbd_dev, true);
4956 if (ret)
4957 goto out_header_name;
4958 }
b644de2b 4959
c0fba368 4960 if (rbd_dev->image_format == 1)
99a41ebc 4961 ret = rbd_dev_v1_header_info(rbd_dev);
a30b71b9 4962 else
2df3fac7 4963 ret = rbd_dev_v2_header_info(rbd_dev);
5655c4d9 4964 if (ret)
b644de2b 4965 goto err_out_watch;
83a06263 4966
9bb81c9b
AE
4967 ret = rbd_dev_spec_update(rbd_dev);
4968 if (ret)
33dca39f 4969 goto err_out_probe;
9bb81c9b
AE
4970
4971 ret = rbd_dev_probe_parent(rbd_dev);
30d60ba2
AE
4972 if (ret)
4973 goto err_out_probe;
4974
4975 dout("discovered format %u image, header name is %s\n",
4976 rbd_dev->image_format, rbd_dev->header_name);
83a06263 4977
30d60ba2 4978 return 0;
6fd48b3b
AE
4979err_out_probe:
4980 rbd_dev_unprobe(rbd_dev);
b644de2b 4981err_out_watch:
1f3ef788
AE
4982 if (mapping) {
4983 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
4984 if (tmp)
4985 rbd_warn(rbd_dev, "unable to tear down "
4986 "watch request (%d)\n", tmp);
4987 }
332bb12d
AE
4988out_header_name:
4989 kfree(rbd_dev->header_name);
4990 rbd_dev->header_name = NULL;
4991err_out_format:
4992 rbd_dev->image_format = 0;
5655c4d9
AE
4993 kfree(rbd_dev->spec->image_id);
4994 rbd_dev->spec->image_id = NULL;
4995
4996 dout("probe failed, returning %d\n", ret);
4997
a30b71b9
AE
4998 return ret;
4999}
5000
59c2be1e
YS
5001static ssize_t rbd_add(struct bus_type *bus,
5002 const char *buf,
5003 size_t count)
602adf40 5004{
cb8627c7 5005 struct rbd_device *rbd_dev = NULL;
dc79b113 5006 struct ceph_options *ceph_opts = NULL;
4e9afeba 5007 struct rbd_options *rbd_opts = NULL;
859c31df 5008 struct rbd_spec *spec = NULL;
9d3997fd 5009 struct rbd_client *rbdc;
27cc2594 5010 struct ceph_osd_client *osdc;
51344a38 5011 bool read_only;
27cc2594 5012 int rc = -ENOMEM;
602adf40
YS
5013
5014 if (!try_module_get(THIS_MODULE))
5015 return -ENODEV;
5016
602adf40 5017 /* parse add command */
859c31df 5018 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 5019 if (rc < 0)
bd4ba655 5020 goto err_out_module;
51344a38
AE
5021 read_only = rbd_opts->read_only;
5022 kfree(rbd_opts);
5023 rbd_opts = NULL; /* done with this */
78cea76e 5024
9d3997fd
AE
5025 rbdc = rbd_get_client(ceph_opts);
5026 if (IS_ERR(rbdc)) {
5027 rc = PTR_ERR(rbdc);
0ddebc0c 5028 goto err_out_args;
9d3997fd 5029 }
602adf40 5030
602adf40 5031 /* pick the pool */
9d3997fd 5032 osdc = &rbdc->client->osdc;
859c31df 5033 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
5034 if (rc < 0)
5035 goto err_out_client;
c0cd10db 5036 spec->pool_id = (u64)rc;
859c31df 5037
0903e875
AE
5038 /* The ceph file layout needs to fit pool id in 32 bits */
5039
c0cd10db
AE
5040 if (spec->pool_id > (u64)U32_MAX) {
5041 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
5042 (unsigned long long)spec->pool_id, U32_MAX);
0903e875
AE
5043 rc = -EIO;
5044 goto err_out_client;
5045 }
5046
c53d5893 5047 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
5048 if (!rbd_dev)
5049 goto err_out_client;
c53d5893
AE
5050 rbdc = NULL; /* rbd_dev now owns this */
5051 spec = NULL; /* rbd_dev now owns this */
602adf40 5052
1f3ef788 5053 rc = rbd_dev_image_probe(rbd_dev, true);
a30b71b9 5054 if (rc < 0)
c53d5893 5055 goto err_out_rbd_dev;
05fd6f6f 5056
7ce4eef7
AE
5057 /* If we are mapping a snapshot it must be marked read-only */
5058
5059 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5060 read_only = true;
5061 rbd_dev->mapping.read_only = read_only;
5062
b536f69a 5063 rc = rbd_dev_device_setup(rbd_dev);
3abef3b3
AE
5064 if (rc) {
5065 rbd_dev_image_release(rbd_dev);
5066 goto err_out_module;
5067 }
5068
5069 return count;
b536f69a 5070
c53d5893
AE
5071err_out_rbd_dev:
5072 rbd_dev_destroy(rbd_dev);
bd4ba655 5073err_out_client:
9d3997fd 5074 rbd_put_client(rbdc);
0ddebc0c 5075err_out_args:
859c31df 5076 rbd_spec_put(spec);
bd4ba655
AE
5077err_out_module:
5078 module_put(THIS_MODULE);
27cc2594 5079
602adf40 5080 dout("Error adding device %s\n", buf);
27cc2594 5081
c0cd10db 5082 return (ssize_t)rc;
602adf40
YS
5083}
5084
200a6a8b 5085static void rbd_dev_device_release(struct device *dev)
602adf40 5086{
593a9e7b 5087 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 5088
602adf40 5089 rbd_free_disk(rbd_dev);
200a6a8b 5090 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6d80b130 5091 rbd_dev_mapping_clear(rbd_dev);
602adf40 5092 unregister_blkdev(rbd_dev->major, rbd_dev->name);
200a6a8b 5093 rbd_dev->major = 0;
e2839308 5094 rbd_dev_id_put(rbd_dev);
d1cf5788 5095 rbd_dev_mapping_clear(rbd_dev);
602adf40
YS
5096}
5097
05a46afd
AE
5098static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5099{
ad945fc1 5100 while (rbd_dev->parent) {
05a46afd
AE
5101 struct rbd_device *first = rbd_dev;
5102 struct rbd_device *second = first->parent;
5103 struct rbd_device *third;
5104
5105 /*
5106 * Follow to the parent with no grandparent and
5107 * remove it.
5108 */
5109 while (second && (third = second->parent)) {
5110 first = second;
5111 second = third;
5112 }
ad945fc1 5113 rbd_assert(second);
8ad42cd0 5114 rbd_dev_image_release(second);
ad945fc1
AE
5115 first->parent = NULL;
5116 first->parent_overlap = 0;
5117
5118 rbd_assert(first->parent_spec);
05a46afd
AE
5119 rbd_spec_put(first->parent_spec);
5120 first->parent_spec = NULL;
05a46afd
AE
5121 }
5122}
5123
dfc5606d
YS
5124static ssize_t rbd_remove(struct bus_type *bus,
5125 const char *buf,
5126 size_t count)
602adf40
YS
5127{
5128 struct rbd_device *rbd_dev = NULL;
751cc0e3
AE
5129 struct list_head *tmp;
5130 int dev_id;
602adf40 5131 unsigned long ul;
82a442d2 5132 bool already = false;
0d8189e1 5133 int ret;
602adf40 5134
0d8189e1
AE
5135 ret = strict_strtoul(buf, 10, &ul);
5136 if (ret)
5137 return ret;
602adf40
YS
5138
5139 /* convert to int; abort if we lost anything in the conversion */
751cc0e3
AE
5140 dev_id = (int)ul;
5141 if (dev_id != ul)
602adf40
YS
5142 return -EINVAL;
5143
751cc0e3
AE
5144 ret = -ENOENT;
5145 spin_lock(&rbd_dev_list_lock);
5146 list_for_each(tmp, &rbd_dev_list) {
5147 rbd_dev = list_entry(tmp, struct rbd_device, node);
5148 if (rbd_dev->dev_id == dev_id) {
5149 ret = 0;
5150 break;
5151 }
42382b70 5152 }
751cc0e3
AE
5153 if (!ret) {
5154 spin_lock_irq(&rbd_dev->lock);
5155 if (rbd_dev->open_count)
5156 ret = -EBUSY;
5157 else
82a442d2
AE
5158 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5159 &rbd_dev->flags);
751cc0e3
AE
5160 spin_unlock_irq(&rbd_dev->lock);
5161 }
5162 spin_unlock(&rbd_dev_list_lock);
82a442d2 5163 if (ret < 0 || already)
1ba0f1e7 5164 return ret;
751cc0e3 5165
b480815a 5166 rbd_bus_del_dev(rbd_dev);
1f3ef788
AE
5167 ret = rbd_dev_header_watch_sync(rbd_dev, false);
5168 if (ret)
5169 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
8ad42cd0 5170 rbd_dev_image_release(rbd_dev);
79ab7558 5171 module_put(THIS_MODULE);
aafb230e 5172
1ba0f1e7 5173 return count;
602adf40
YS
5174}
5175
602adf40
YS
5176/*
5177 * create control files in sysfs
dfc5606d 5178 * /sys/bus/rbd/...
602adf40
YS
5179 */
5180static int rbd_sysfs_init(void)
5181{
dfc5606d 5182 int ret;
602adf40 5183
fed4c143 5184 ret = device_register(&rbd_root_dev);
21079786 5185 if (ret < 0)
dfc5606d 5186 return ret;
602adf40 5187
fed4c143
AE
5188 ret = bus_register(&rbd_bus_type);
5189 if (ret < 0)
5190 device_unregister(&rbd_root_dev);
602adf40 5191
602adf40
YS
5192 return ret;
5193}
5194
5195static void rbd_sysfs_cleanup(void)
5196{
dfc5606d 5197 bus_unregister(&rbd_bus_type);
fed4c143 5198 device_unregister(&rbd_root_dev);
602adf40
YS
5199}
5200
1c2a9dfe
AE
5201static int rbd_slab_init(void)
5202{
5203 rbd_assert(!rbd_img_request_cache);
5204 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5205 sizeof (struct rbd_img_request),
5206 __alignof__(struct rbd_img_request),
5207 0, NULL);
868311b1
AE
5208 if (!rbd_img_request_cache)
5209 return -ENOMEM;
5210
5211 rbd_assert(!rbd_obj_request_cache);
5212 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5213 sizeof (struct rbd_obj_request),
5214 __alignof__(struct rbd_obj_request),
5215 0, NULL);
78c2a44a
AE
5216 if (!rbd_obj_request_cache)
5217 goto out_err;
5218
5219 rbd_assert(!rbd_segment_name_cache);
5220 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5221 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5222 if (rbd_segment_name_cache)
1c2a9dfe 5223 return 0;
78c2a44a
AE
5224out_err:
5225 if (rbd_obj_request_cache) {
5226 kmem_cache_destroy(rbd_obj_request_cache);
5227 rbd_obj_request_cache = NULL;
5228 }
1c2a9dfe 5229
868311b1
AE
5230 kmem_cache_destroy(rbd_img_request_cache);
5231 rbd_img_request_cache = NULL;
5232
1c2a9dfe
AE
5233 return -ENOMEM;
5234}
5235
5236static void rbd_slab_exit(void)
5237{
78c2a44a
AE
5238 rbd_assert(rbd_segment_name_cache);
5239 kmem_cache_destroy(rbd_segment_name_cache);
5240 rbd_segment_name_cache = NULL;
5241
868311b1
AE
5242 rbd_assert(rbd_obj_request_cache);
5243 kmem_cache_destroy(rbd_obj_request_cache);
5244 rbd_obj_request_cache = NULL;
5245
1c2a9dfe
AE
5246 rbd_assert(rbd_img_request_cache);
5247 kmem_cache_destroy(rbd_img_request_cache);
5248 rbd_img_request_cache = NULL;
5249}
5250
cc344fa1 5251static int __init rbd_init(void)
602adf40
YS
5252{
5253 int rc;
5254
1e32d34c
AE
5255 if (!libceph_compatible(NULL)) {
5256 rbd_warn(NULL, "libceph incompatibility (quitting)");
5257
5258 return -EINVAL;
5259 }
1c2a9dfe 5260 rc = rbd_slab_init();
602adf40
YS
5261 if (rc)
5262 return rc;
1c2a9dfe
AE
5263 rc = rbd_sysfs_init();
5264 if (rc)
5265 rbd_slab_exit();
5266 else
5267 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5268
5269 return rc;
602adf40
YS
5270}
5271
cc344fa1 5272static void __exit rbd_exit(void)
602adf40
YS
5273{
5274 rbd_sysfs_cleanup();
1c2a9dfe 5275 rbd_slab_exit();
602adf40
YS
5276}
5277
5278module_init(rbd_init);
5279module_exit(rbd_exit);
5280
5281MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5282MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5283MODULE_DESCRIPTION("rados block device");
5284
5285/* following authorship retained from original osdblk.c */
5286MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5287
5288MODULE_LICENSE("GPL");