rbd: flush dcache after zeroing page data
[linux-2.6-block.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/decode.h>
59c2be1e 35#include <linux/parser.h>
30d1cff8 36#include <linux/bsearch.h>
602adf40
YS
37
38#include <linux/kernel.h>
39#include <linux/device.h>
40#include <linux/module.h>
41#include <linux/fs.h>
42#include <linux/blkdev.h>
1c2a9dfe 43#include <linux/slab.h>
602adf40
YS
44
45#include "rbd_types.h"
46
aafb230e
AE
47#define RBD_DEBUG /* Activate rbd_assert() calls */
48
593a9e7b
AE
49/*
50 * The basic unit of block I/O is a sector. It is interpreted in a
51 * number of contexts in Linux (blk, bio, genhd), but the default is
52 * universally 512 bytes. These symbols are just slightly more
53 * meaningful than the bare numbers they represent.
54 */
55#define SECTOR_SHIFT 9
56#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
57
a2acd00e
AE
58/*
59 * Increment the given counter and return its updated value.
60 * If the counter is already 0 it will not be incremented.
61 * If the counter is already at its maximum value returns
62 * -EINVAL without updating it.
63 */
64static int atomic_inc_return_safe(atomic_t *v)
65{
66 unsigned int counter;
67
68 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
69 if (counter <= (unsigned int)INT_MAX)
70 return (int)counter;
71
72 atomic_dec(v);
73
74 return -EINVAL;
75}
76
77/* Decrement the counter. Return the resulting value, or -EINVAL */
78static int atomic_dec_return_safe(atomic_t *v)
79{
80 int counter;
81
82 counter = atomic_dec_return(v);
83 if (counter >= 0)
84 return counter;
85
86 atomic_inc(v);
87
88 return -EINVAL;
89}
90
f0f8cef5
AE
91#define RBD_DRV_NAME "rbd"
92#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
93
94#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
95
d4b125e9
AE
96#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
97#define RBD_MAX_SNAP_NAME_LEN \
98 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
99
35d489f9 100#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
101
102#define RBD_SNAP_HEAD_NAME "-"
103
9682fc6d
AE
104#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
105
9e15b77d
AE
106/* This allows a single page to hold an image name sent by OSD */
107#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 108#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 109
1e130199 110#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 111
d889140c
AE
112/* Feature bits */
113
5cbf6f12
AE
114#define RBD_FEATURE_LAYERING (1<<0)
115#define RBD_FEATURE_STRIPINGV2 (1<<1)
116#define RBD_FEATURES_ALL \
117 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
118
119/* Features supported by this (client software) implementation. */
120
770eba6e 121#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 122
81a89793
AE
123/*
124 * An RBD device name will be "rbd#", where the "rbd" comes from
125 * RBD_DRV_NAME above, and # is a unique integer identifier.
126 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
127 * enough to hold all possible device names.
128 */
602adf40 129#define DEV_NAME_LEN 32
81a89793 130#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
131
132/*
133 * block device image metadata (in-memory version)
134 */
135struct rbd_image_header {
f35a4dee 136 /* These six fields never change for a given rbd image */
849b4260 137 char *object_prefix;
602adf40
YS
138 __u8 obj_order;
139 __u8 crypt_type;
140 __u8 comp_type;
f35a4dee
AE
141 u64 stripe_unit;
142 u64 stripe_count;
143 u64 features; /* Might be changeable someday? */
602adf40 144
f84344f3
AE
145 /* The remaining fields need to be updated occasionally */
146 u64 image_size;
147 struct ceph_snap_context *snapc;
f35a4dee
AE
148 char *snap_names; /* format 1 only */
149 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
150};
151
0d7dbfce
AE
152/*
153 * An rbd image specification.
154 *
155 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
156 * identify an image. Each rbd_dev structure includes a pointer to
157 * an rbd_spec structure that encapsulates this identity.
158 *
159 * Each of the id's in an rbd_spec has an associated name. For a
160 * user-mapped image, the names are supplied and the id's associated
161 * with them are looked up. For a layered image, a parent image is
162 * defined by the tuple, and the names are looked up.
163 *
164 * An rbd_dev structure contains a parent_spec pointer which is
165 * non-null if the image it represents is a child in a layered
166 * image. This pointer will refer to the rbd_spec structure used
167 * by the parent rbd_dev for its own identity (i.e., the structure
168 * is shared between the parent and child).
169 *
170 * Since these structures are populated once, during the discovery
171 * phase of image construction, they are effectively immutable so
172 * we make no effort to synchronize access to them.
173 *
174 * Note that code herein does not assume the image name is known (it
175 * could be a null pointer).
0d7dbfce
AE
176 */
177struct rbd_spec {
178 u64 pool_id;
ecb4dc22 179 const char *pool_name;
0d7dbfce 180
ecb4dc22
AE
181 const char *image_id;
182 const char *image_name;
0d7dbfce
AE
183
184 u64 snap_id;
ecb4dc22 185 const char *snap_name;
0d7dbfce
AE
186
187 struct kref kref;
188};
189
602adf40 190/*
f0f8cef5 191 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
192 */
193struct rbd_client {
194 struct ceph_client *client;
195 struct kref kref;
196 struct list_head node;
197};
198
bf0d5f50
AE
199struct rbd_img_request;
200typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
201
202#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
203
204struct rbd_obj_request;
205typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
206
9969ebc5
AE
207enum obj_request_type {
208 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
209};
bf0d5f50 210
926f9b3f
AE
211enum obj_req_flags {
212 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 213 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
214 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
215 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
216};
217
bf0d5f50
AE
218struct rbd_obj_request {
219 const char *object_name;
220 u64 offset; /* object start byte */
221 u64 length; /* bytes from offset */
926f9b3f 222 unsigned long flags;
bf0d5f50 223
c5b5ef6c
AE
224 /*
225 * An object request associated with an image will have its
226 * img_data flag set; a standalone object request will not.
227 *
228 * A standalone object request will have which == BAD_WHICH
229 * and a null obj_request pointer.
230 *
231 * An object request initiated in support of a layered image
232 * object (to check for its existence before a write) will
233 * have which == BAD_WHICH and a non-null obj_request pointer.
234 *
235 * Finally, an object request for rbd image data will have
236 * which != BAD_WHICH, and will have a non-null img_request
237 * pointer. The value of which will be in the range
238 * 0..(img_request->obj_request_count-1).
239 */
240 union {
241 struct rbd_obj_request *obj_request; /* STAT op */
242 struct {
243 struct rbd_img_request *img_request;
244 u64 img_offset;
245 /* links for img_request->obj_requests list */
246 struct list_head links;
247 };
248 };
bf0d5f50
AE
249 u32 which; /* posn image request list */
250
251 enum obj_request_type type;
788e2df3
AE
252 union {
253 struct bio *bio_list;
254 struct {
255 struct page **pages;
256 u32 page_count;
257 };
258 };
0eefd470 259 struct page **copyup_pages;
ebda6408 260 u32 copyup_page_count;
bf0d5f50
AE
261
262 struct ceph_osd_request *osd_req;
263
264 u64 xferred; /* bytes transferred */
1b83bef2 265 int result;
bf0d5f50
AE
266
267 rbd_obj_callback_t callback;
788e2df3 268 struct completion completion;
bf0d5f50
AE
269
270 struct kref kref;
271};
272
0c425248 273enum img_req_flags {
9849e986
AE
274 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
275 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 276 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
277};
278
bf0d5f50 279struct rbd_img_request {
bf0d5f50
AE
280 struct rbd_device *rbd_dev;
281 u64 offset; /* starting image byte offset */
282 u64 length; /* byte count from offset */
0c425248 283 unsigned long flags;
bf0d5f50 284 union {
9849e986 285 u64 snap_id; /* for reads */
bf0d5f50 286 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
287 };
288 union {
289 struct request *rq; /* block request */
290 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 291 };
3d7efd18 292 struct page **copyup_pages;
ebda6408 293 u32 copyup_page_count;
bf0d5f50
AE
294 spinlock_t completion_lock;/* protects next_completion */
295 u32 next_completion;
296 rbd_img_callback_t callback;
55f27e09 297 u64 xferred;/* aggregate bytes transferred */
a5a337d4 298 int result; /* first nonzero obj_request result */
bf0d5f50
AE
299
300 u32 obj_request_count;
301 struct list_head obj_requests; /* rbd_obj_request structs */
302
303 struct kref kref;
304};
305
306#define for_each_obj_request(ireq, oreq) \
ef06f4d3 307 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 308#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 309 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 310#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 311 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 312
f84344f3 313struct rbd_mapping {
99c1f08f 314 u64 size;
34b13184 315 u64 features;
f84344f3
AE
316 bool read_only;
317};
318
602adf40
YS
319/*
320 * a single device
321 */
322struct rbd_device {
de71a297 323 int dev_id; /* blkdev unique id */
602adf40
YS
324
325 int major; /* blkdev assigned major */
326 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 327
a30b71b9 328 u32 image_format; /* Either 1 or 2 */
602adf40
YS
329 struct rbd_client *rbd_client;
330
331 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
332
b82d167b 333 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
334
335 struct rbd_image_header header;
b82d167b 336 unsigned long flags; /* possibly lock protected */
0d7dbfce 337 struct rbd_spec *spec;
602adf40 338
0d7dbfce 339 char *header_name;
971f839a 340
0903e875
AE
341 struct ceph_file_layout layout;
342
59c2be1e 343 struct ceph_osd_event *watch_event;
975241af 344 struct rbd_obj_request *watch_request;
59c2be1e 345
86b00e0d
AE
346 struct rbd_spec *parent_spec;
347 u64 parent_overlap;
a2acd00e 348 atomic_t parent_ref;
2f82ee54 349 struct rbd_device *parent;
86b00e0d 350
c666601a
JD
351 /* protects updating the header */
352 struct rw_semaphore header_rwsem;
f84344f3
AE
353
354 struct rbd_mapping mapping;
602adf40
YS
355
356 struct list_head node;
dfc5606d 357
dfc5606d
YS
358 /* sysfs related */
359 struct device dev;
b82d167b 360 unsigned long open_count; /* protected by lock */
dfc5606d
YS
361};
362
b82d167b
AE
363/*
364 * Flag bits for rbd_dev->flags. If atomicity is required,
365 * rbd_dev->lock is used to protect access.
366 *
367 * Currently, only the "removing" flag (which is coupled with the
368 * "open_count" field) requires atomic access.
369 */
6d292906
AE
370enum rbd_dev_flags {
371 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 372 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
373};
374
602adf40 375static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 376
602adf40 377static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
378static DEFINE_SPINLOCK(rbd_dev_list_lock);
379
432b8587
AE
380static LIST_HEAD(rbd_client_list); /* clients */
381static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 382
78c2a44a
AE
383/* Slab caches for frequently-allocated structures */
384
1c2a9dfe 385static struct kmem_cache *rbd_img_request_cache;
868311b1 386static struct kmem_cache *rbd_obj_request_cache;
78c2a44a 387static struct kmem_cache *rbd_segment_name_cache;
1c2a9dfe 388
3d7efd18
AE
389static int rbd_img_request_submit(struct rbd_img_request *img_request);
390
200a6a8b 391static void rbd_dev_device_release(struct device *dev);
dfc5606d 392
f0f8cef5
AE
393static ssize_t rbd_add(struct bus_type *bus, const char *buf,
394 size_t count);
395static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
396 size_t count);
1f3ef788 397static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
a2acd00e 398static void rbd_spec_put(struct rbd_spec *spec);
f0f8cef5
AE
399
400static struct bus_attribute rbd_bus_attrs[] = {
401 __ATTR(add, S_IWUSR, NULL, rbd_add),
402 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
403 __ATTR_NULL
404};
405
406static struct bus_type rbd_bus_type = {
407 .name = "rbd",
408 .bus_attrs = rbd_bus_attrs,
409};
410
411static void rbd_root_dev_release(struct device *dev)
412{
413}
414
415static struct device rbd_root_dev = {
416 .init_name = "rbd",
417 .release = rbd_root_dev_release,
418};
419
06ecc6cb
AE
420static __printf(2, 3)
421void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
422{
423 struct va_format vaf;
424 va_list args;
425
426 va_start(args, fmt);
427 vaf.fmt = fmt;
428 vaf.va = &args;
429
430 if (!rbd_dev)
431 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
432 else if (rbd_dev->disk)
433 printk(KERN_WARNING "%s: %s: %pV\n",
434 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
435 else if (rbd_dev->spec && rbd_dev->spec->image_name)
436 printk(KERN_WARNING "%s: image %s: %pV\n",
437 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
438 else if (rbd_dev->spec && rbd_dev->spec->image_id)
439 printk(KERN_WARNING "%s: id %s: %pV\n",
440 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
441 else /* punt */
442 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
443 RBD_DRV_NAME, rbd_dev, &vaf);
444 va_end(args);
445}
446
aafb230e
AE
447#ifdef RBD_DEBUG
448#define rbd_assert(expr) \
449 if (unlikely(!(expr))) { \
450 printk(KERN_ERR "\nAssertion failure in %s() " \
451 "at line %d:\n\n" \
452 "\trbd_assert(%s);\n\n", \
453 __func__, __LINE__, #expr); \
454 BUG(); \
455 }
456#else /* !RBD_DEBUG */
457# define rbd_assert(expr) ((void) 0)
458#endif /* !RBD_DEBUG */
dfc5606d 459
b454e36d 460static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
461static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
462static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 463
cc4a38bd 464static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7
AE
465static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
466static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
54cac61f
AE
467static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
468 u64 snap_id);
2ad3d716
AE
469static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
470 u8 *order, u64 *snap_size);
471static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
472 u64 *snap_features);
473static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
59c2be1e 474
602adf40
YS
475static int rbd_open(struct block_device *bdev, fmode_t mode)
476{
f0f8cef5 477 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 478 bool removing = false;
602adf40 479
f84344f3 480 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
481 return -EROFS;
482
a14ea269 483 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
484 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
485 removing = true;
486 else
487 rbd_dev->open_count++;
a14ea269 488 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
489 if (removing)
490 return -ENOENT;
491
42382b70 492 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 493 (void) get_device(&rbd_dev->dev);
f84344f3 494 set_device_ro(bdev, rbd_dev->mapping.read_only);
42382b70 495 mutex_unlock(&ctl_mutex);
340c7a2b 496
602adf40
YS
497 return 0;
498}
499
db2a144b 500static void rbd_release(struct gendisk *disk, fmode_t mode)
dfc5606d
YS
501{
502 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
503 unsigned long open_count_before;
504
a14ea269 505 spin_lock_irq(&rbd_dev->lock);
b82d167b 506 open_count_before = rbd_dev->open_count--;
a14ea269 507 spin_unlock_irq(&rbd_dev->lock);
b82d167b 508 rbd_assert(open_count_before > 0);
dfc5606d 509
42382b70 510 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 511 put_device(&rbd_dev->dev);
42382b70 512 mutex_unlock(&ctl_mutex);
dfc5606d
YS
513}
514
602adf40
YS
515static const struct block_device_operations rbd_bd_ops = {
516 .owner = THIS_MODULE,
517 .open = rbd_open,
dfc5606d 518 .release = rbd_release,
602adf40
YS
519};
520
521/*
7262cfca
AE
522 * Initialize an rbd client instance. Success or not, this function
523 * consumes ceph_opts.
602adf40 524 */
f8c38929 525static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
526{
527 struct rbd_client *rbdc;
528 int ret = -ENOMEM;
529
37206ee5 530 dout("%s:\n", __func__);
602adf40
YS
531 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
532 if (!rbdc)
533 goto out_opt;
534
535 kref_init(&rbdc->kref);
536 INIT_LIST_HEAD(&rbdc->node);
537
bc534d86
AE
538 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
539
43ae4701 540 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 541 if (IS_ERR(rbdc->client))
bc534d86 542 goto out_mutex;
43ae4701 543 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
544
545 ret = ceph_open_session(rbdc->client);
546 if (ret < 0)
547 goto out_err;
548
432b8587 549 spin_lock(&rbd_client_list_lock);
602adf40 550 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 551 spin_unlock(&rbd_client_list_lock);
602adf40 552
bc534d86 553 mutex_unlock(&ctl_mutex);
37206ee5 554 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 555
602adf40
YS
556 return rbdc;
557
558out_err:
559 ceph_destroy_client(rbdc->client);
bc534d86
AE
560out_mutex:
561 mutex_unlock(&ctl_mutex);
602adf40
YS
562 kfree(rbdc);
563out_opt:
43ae4701
AE
564 if (ceph_opts)
565 ceph_destroy_options(ceph_opts);
37206ee5
AE
566 dout("%s: error %d\n", __func__, ret);
567
28f259b7 568 return ERR_PTR(ret);
602adf40
YS
569}
570
2f82ee54
AE
571static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
572{
573 kref_get(&rbdc->kref);
574
575 return rbdc;
576}
577
602adf40 578/*
1f7ba331
AE
579 * Find a ceph client with specific addr and configuration. If
580 * found, bump its reference count.
602adf40 581 */
1f7ba331 582static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
583{
584 struct rbd_client *client_node;
1f7ba331 585 bool found = false;
602adf40 586
43ae4701 587 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
588 return NULL;
589
1f7ba331
AE
590 spin_lock(&rbd_client_list_lock);
591 list_for_each_entry(client_node, &rbd_client_list, node) {
592 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
593 __rbd_get_client(client_node);
594
1f7ba331
AE
595 found = true;
596 break;
597 }
598 }
599 spin_unlock(&rbd_client_list_lock);
600
601 return found ? client_node : NULL;
602adf40
YS
602}
603
59c2be1e
YS
604/*
605 * mount options
606 */
607enum {
59c2be1e
YS
608 Opt_last_int,
609 /* int args above */
610 Opt_last_string,
611 /* string args above */
cc0538b6
AE
612 Opt_read_only,
613 Opt_read_write,
614 /* Boolean args above */
615 Opt_last_bool,
59c2be1e
YS
616};
617
43ae4701 618static match_table_t rbd_opts_tokens = {
59c2be1e
YS
619 /* int args above */
620 /* string args above */
be466c1c 621 {Opt_read_only, "read_only"},
cc0538b6
AE
622 {Opt_read_only, "ro"}, /* Alternate spelling */
623 {Opt_read_write, "read_write"},
624 {Opt_read_write, "rw"}, /* Alternate spelling */
625 /* Boolean args above */
59c2be1e
YS
626 {-1, NULL}
627};
628
98571b5a
AE
629struct rbd_options {
630 bool read_only;
631};
632
633#define RBD_READ_ONLY_DEFAULT false
634
59c2be1e
YS
635static int parse_rbd_opts_token(char *c, void *private)
636{
43ae4701 637 struct rbd_options *rbd_opts = private;
59c2be1e
YS
638 substring_t argstr[MAX_OPT_ARGS];
639 int token, intval, ret;
640
43ae4701 641 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
642 if (token < 0)
643 return -EINVAL;
644
645 if (token < Opt_last_int) {
646 ret = match_int(&argstr[0], &intval);
647 if (ret < 0) {
648 pr_err("bad mount option arg (not int) "
649 "at '%s'\n", c);
650 return ret;
651 }
652 dout("got int token %d val %d\n", token, intval);
653 } else if (token > Opt_last_int && token < Opt_last_string) {
654 dout("got string token %d val %s\n", token,
655 argstr[0].from);
cc0538b6
AE
656 } else if (token > Opt_last_string && token < Opt_last_bool) {
657 dout("got Boolean token %d\n", token);
59c2be1e
YS
658 } else {
659 dout("got token %d\n", token);
660 }
661
662 switch (token) {
cc0538b6
AE
663 case Opt_read_only:
664 rbd_opts->read_only = true;
665 break;
666 case Opt_read_write:
667 rbd_opts->read_only = false;
668 break;
59c2be1e 669 default:
aafb230e
AE
670 rbd_assert(false);
671 break;
59c2be1e
YS
672 }
673 return 0;
674}
675
602adf40
YS
676/*
677 * Get a ceph client with specific addr and configuration, if one does
7262cfca
AE
678 * not exist create it. Either way, ceph_opts is consumed by this
679 * function.
602adf40 680 */
9d3997fd 681static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 682{
f8c38929 683 struct rbd_client *rbdc;
59c2be1e 684
1f7ba331 685 rbdc = rbd_client_find(ceph_opts);
9d3997fd 686 if (rbdc) /* using an existing client */
43ae4701 687 ceph_destroy_options(ceph_opts);
9d3997fd 688 else
f8c38929 689 rbdc = rbd_client_create(ceph_opts);
602adf40 690
9d3997fd 691 return rbdc;
602adf40
YS
692}
693
694/*
695 * Destroy ceph client
d23a4b3f 696 *
432b8587 697 * Caller must hold rbd_client_list_lock.
602adf40
YS
698 */
699static void rbd_client_release(struct kref *kref)
700{
701 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
702
37206ee5 703 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 704 spin_lock(&rbd_client_list_lock);
602adf40 705 list_del(&rbdc->node);
cd9d9f5d 706 spin_unlock(&rbd_client_list_lock);
602adf40
YS
707
708 ceph_destroy_client(rbdc->client);
709 kfree(rbdc);
710}
711
712/*
713 * Drop reference to ceph client node. If it's not referenced anymore, release
714 * it.
715 */
9d3997fd 716static void rbd_put_client(struct rbd_client *rbdc)
602adf40 717{
c53d5893
AE
718 if (rbdc)
719 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
720}
721
a30b71b9
AE
722static bool rbd_image_format_valid(u32 image_format)
723{
724 return image_format == 1 || image_format == 2;
725}
726
8e94af8e
AE
727static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
728{
103a150f
AE
729 size_t size;
730 u32 snap_count;
731
732 /* The header has to start with the magic rbd header text */
733 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
734 return false;
735
db2388b6
AE
736 /* The bio layer requires at least sector-sized I/O */
737
738 if (ondisk->options.order < SECTOR_SHIFT)
739 return false;
740
741 /* If we use u64 in a few spots we may be able to loosen this */
742
743 if (ondisk->options.order > 8 * sizeof (int) - 1)
744 return false;
745
103a150f
AE
746 /*
747 * The size of a snapshot header has to fit in a size_t, and
748 * that limits the number of snapshots.
749 */
750 snap_count = le32_to_cpu(ondisk->snap_count);
751 size = SIZE_MAX - sizeof (struct ceph_snap_context);
752 if (snap_count > size / sizeof (__le64))
753 return false;
754
755 /*
756 * Not only that, but the size of the entire the snapshot
757 * header must also be representable in a size_t.
758 */
759 size -= snap_count * sizeof (__le64);
760 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
761 return false;
762
763 return true;
8e94af8e
AE
764}
765
602adf40 766/*
bb23e37a
AE
767 * Fill an rbd image header with information from the given format 1
768 * on-disk header.
602adf40 769 */
662518b1 770static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 771 struct rbd_image_header_ondisk *ondisk)
602adf40 772{
662518b1 773 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
774 bool first_time = header->object_prefix == NULL;
775 struct ceph_snap_context *snapc;
776 char *object_prefix = NULL;
777 char *snap_names = NULL;
778 u64 *snap_sizes = NULL;
ccece235 779 u32 snap_count;
d2bb24e5 780 size_t size;
bb23e37a 781 int ret = -ENOMEM;
621901d6 782 u32 i;
602adf40 783
bb23e37a 784 /* Allocate this now to avoid having to handle failure below */
6a52325f 785
bb23e37a
AE
786 if (first_time) {
787 size_t len;
103a150f 788
bb23e37a
AE
789 len = strnlen(ondisk->object_prefix,
790 sizeof (ondisk->object_prefix));
791 object_prefix = kmalloc(len + 1, GFP_KERNEL);
792 if (!object_prefix)
793 return -ENOMEM;
794 memcpy(object_prefix, ondisk->object_prefix, len);
795 object_prefix[len] = '\0';
796 }
00f1f36f 797
bb23e37a 798 /* Allocate the snapshot context and fill it in */
00f1f36f 799
bb23e37a
AE
800 snap_count = le32_to_cpu(ondisk->snap_count);
801 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
802 if (!snapc)
803 goto out_err;
804 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 805 if (snap_count) {
bb23e37a 806 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
807 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
808
bb23e37a 809 /* We'll keep a copy of the snapshot names... */
621901d6 810
bb23e37a
AE
811 if (snap_names_len > (u64)SIZE_MAX)
812 goto out_2big;
813 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
814 if (!snap_names)
6a52325f
AE
815 goto out_err;
816
bb23e37a 817 /* ...as well as the array of their sizes. */
621901d6 818
d2bb24e5 819 size = snap_count * sizeof (*header->snap_sizes);
bb23e37a
AE
820 snap_sizes = kmalloc(size, GFP_KERNEL);
821 if (!snap_sizes)
6a52325f 822 goto out_err;
bb23e37a 823
f785cc1d 824 /*
bb23e37a
AE
825 * Copy the names, and fill in each snapshot's id
826 * and size.
827 *
99a41ebc 828 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a 829 * ondisk buffer we're working with has
f785cc1d
AE
830 * snap_names_len bytes beyond the end of the
831 * snapshot id array, this memcpy() is safe.
832 */
bb23e37a
AE
833 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
834 snaps = ondisk->snaps;
835 for (i = 0; i < snap_count; i++) {
836 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
837 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
838 }
602adf40 839 }
6a52325f 840
bb23e37a 841 /* We won't fail any more, fill in the header */
621901d6 842
662518b1 843 down_write(&rbd_dev->header_rwsem);
bb23e37a
AE
844 if (first_time) {
845 header->object_prefix = object_prefix;
846 header->obj_order = ondisk->options.order;
847 header->crypt_type = ondisk->options.crypt_type;
848 header->comp_type = ondisk->options.comp_type;
849 /* The rest aren't used for format 1 images */
850 header->stripe_unit = 0;
851 header->stripe_count = 0;
852 header->features = 0;
602adf40 853 } else {
662518b1
AE
854 ceph_put_snap_context(header->snapc);
855 kfree(header->snap_names);
856 kfree(header->snap_sizes);
602adf40 857 }
849b4260 858
bb23e37a 859 /* The remaining fields always get updated (when we refresh) */
621901d6 860
f84344f3 861 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
862 header->snapc = snapc;
863 header->snap_names = snap_names;
864 header->snap_sizes = snap_sizes;
468521c1 865
662518b1 866 /* Make sure mapping size is consistent with header info */
602adf40 867
662518b1
AE
868 if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
869 if (rbd_dev->mapping.size != header->image_size)
870 rbd_dev->mapping.size = header->image_size;
871
872 up_write(&rbd_dev->header_rwsem);
602adf40 873
602adf40 874 return 0;
bb23e37a
AE
875out_2big:
876 ret = -EIO;
6a52325f 877out_err:
bb23e37a
AE
878 kfree(snap_sizes);
879 kfree(snap_names);
880 ceph_put_snap_context(snapc);
881 kfree(object_prefix);
ccece235 882
bb23e37a 883 return ret;
602adf40
YS
884}
885
9682fc6d
AE
886static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
887{
888 const char *snap_name;
889
890 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
891
892 /* Skip over names until we find the one we are looking for */
893
894 snap_name = rbd_dev->header.snap_names;
895 while (which--)
896 snap_name += strlen(snap_name) + 1;
897
898 return kstrdup(snap_name, GFP_KERNEL);
899}
900
30d1cff8
AE
901/*
902 * Snapshot id comparison function for use with qsort()/bsearch().
903 * Note that result is for snapshots in *descending* order.
904 */
905static int snapid_compare_reverse(const void *s1, const void *s2)
906{
907 u64 snap_id1 = *(u64 *)s1;
908 u64 snap_id2 = *(u64 *)s2;
909
910 if (snap_id1 < snap_id2)
911 return 1;
912 return snap_id1 == snap_id2 ? 0 : -1;
913}
914
915/*
916 * Search a snapshot context to see if the given snapshot id is
917 * present.
918 *
919 * Returns the position of the snapshot id in the array if it's found,
920 * or BAD_SNAP_INDEX otherwise.
921 *
922 * Note: The snapshot array is in kept sorted (by the osd) in
923 * reverse order, highest snapshot id first.
924 */
9682fc6d
AE
925static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
926{
927 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 928 u64 *found;
9682fc6d 929
30d1cff8
AE
930 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
931 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 932
30d1cff8 933 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
934}
935
2ad3d716
AE
936static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
937 u64 snap_id)
9e15b77d 938{
54cac61f 939 u32 which;
9e15b77d 940
54cac61f
AE
941 which = rbd_dev_snap_index(rbd_dev, snap_id);
942 if (which == BAD_SNAP_INDEX)
943 return NULL;
944
945 return _rbd_dev_v1_snap_name(rbd_dev, which);
946}
947
948static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
949{
9e15b77d
AE
950 if (snap_id == CEPH_NOSNAP)
951 return RBD_SNAP_HEAD_NAME;
952
54cac61f
AE
953 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
954 if (rbd_dev->image_format == 1)
955 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 956
54cac61f 957 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
958}
959
2ad3d716
AE
960static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
961 u64 *snap_size)
602adf40 962{
2ad3d716
AE
963 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
964 if (snap_id == CEPH_NOSNAP) {
965 *snap_size = rbd_dev->header.image_size;
966 } else if (rbd_dev->image_format == 1) {
967 u32 which;
602adf40 968
2ad3d716
AE
969 which = rbd_dev_snap_index(rbd_dev, snap_id);
970 if (which == BAD_SNAP_INDEX)
971 return -ENOENT;
e86924a8 972
2ad3d716
AE
973 *snap_size = rbd_dev->header.snap_sizes[which];
974 } else {
975 u64 size = 0;
976 int ret;
977
978 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
979 if (ret)
980 return ret;
981
982 *snap_size = size;
983 }
984 return 0;
602adf40
YS
985}
986
2ad3d716
AE
987static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
988 u64 *snap_features)
602adf40 989{
2ad3d716
AE
990 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
991 if (snap_id == CEPH_NOSNAP) {
992 *snap_features = rbd_dev->header.features;
993 } else if (rbd_dev->image_format == 1) {
994 *snap_features = 0; /* No features for format 1 */
602adf40 995 } else {
2ad3d716
AE
996 u64 features = 0;
997 int ret;
8b0241f8 998
2ad3d716
AE
999 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1000 if (ret)
1001 return ret;
1002
1003 *snap_features = features;
1004 }
1005 return 0;
1006}
1007
1008static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1009{
8f4b7d98 1010 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
1011 u64 size = 0;
1012 u64 features = 0;
1013 int ret;
1014
2ad3d716
AE
1015 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1016 if (ret)
1017 return ret;
1018 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1019 if (ret)
1020 return ret;
1021
1022 rbd_dev->mapping.size = size;
1023 rbd_dev->mapping.features = features;
1024
8b0241f8 1025 return 0;
602adf40
YS
1026}
1027
d1cf5788
AE
1028static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1029{
1030 rbd_dev->mapping.size = 0;
1031 rbd_dev->mapping.features = 0;
200a6a8b
AE
1032}
1033
98571b5a 1034static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 1035{
65ccfe21
AE
1036 char *name;
1037 u64 segment;
1038 int ret;
3a96d5cd 1039 char *name_format;
602adf40 1040
78c2a44a 1041 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
65ccfe21
AE
1042 if (!name)
1043 return NULL;
1044 segment = offset >> rbd_dev->header.obj_order;
3a96d5cd
JD
1045 name_format = "%s.%012llx";
1046 if (rbd_dev->image_format == 2)
1047 name_format = "%s.%016llx";
1048 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, name_format,
65ccfe21 1049 rbd_dev->header.object_prefix, segment);
2fd82b9e 1050 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
1051 pr_err("error formatting segment name for #%llu (%d)\n",
1052 segment, ret);
1053 kfree(name);
1054 name = NULL;
1055 }
602adf40 1056
65ccfe21
AE
1057 return name;
1058}
602adf40 1059
78c2a44a
AE
1060static void rbd_segment_name_free(const char *name)
1061{
1062 /* The explicit cast here is needed to drop the const qualifier */
1063
1064 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1065}
1066
65ccfe21
AE
1067static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1068{
1069 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 1070
65ccfe21
AE
1071 return offset & (segment_size - 1);
1072}
1073
1074static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1075 u64 offset, u64 length)
1076{
1077 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1078
1079 offset &= segment_size - 1;
1080
aafb230e 1081 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1082 if (offset + length > segment_size)
1083 length = segment_size - offset;
1084
1085 return length;
602adf40
YS
1086}
1087
029bcbd8
JD
1088/*
1089 * returns the size of an object in the image
1090 */
1091static u64 rbd_obj_bytes(struct rbd_image_header *header)
1092{
1093 return 1 << header->obj_order;
1094}
1095
602adf40
YS
1096/*
1097 * bio helpers
1098 */
1099
1100static void bio_chain_put(struct bio *chain)
1101{
1102 struct bio *tmp;
1103
1104 while (chain) {
1105 tmp = chain;
1106 chain = chain->bi_next;
1107 bio_put(tmp);
1108 }
1109}
1110
1111/*
1112 * zeros a bio chain, starting at specific offset
1113 */
1114static void zero_bio_chain(struct bio *chain, int start_ofs)
1115{
1116 struct bio_vec *bv;
1117 unsigned long flags;
1118 void *buf;
1119 int i;
1120 int pos = 0;
1121
1122 while (chain) {
1123 bio_for_each_segment(bv, chain, i) {
1124 if (pos + bv->bv_len > start_ofs) {
1125 int remainder = max(start_ofs - pos, 0);
1126 buf = bvec_kmap_irq(bv, &flags);
1127 memset(buf + remainder, 0,
1128 bv->bv_len - remainder);
e2156054 1129 flush_dcache_page(bv->bv_page);
85b5aaa6 1130 bvec_kunmap_irq(buf, &flags);
602adf40
YS
1131 }
1132 pos += bv->bv_len;
1133 }
1134
1135 chain = chain->bi_next;
1136 }
1137}
1138
b9434c5b
AE
1139/*
1140 * similar to zero_bio_chain(), zeros data defined by a page array,
1141 * starting at the given byte offset from the start of the array and
1142 * continuing up to the given end offset. The pages array is
1143 * assumed to be big enough to hold all bytes up to the end.
1144 */
1145static void zero_pages(struct page **pages, u64 offset, u64 end)
1146{
1147 struct page **page = &pages[offset >> PAGE_SHIFT];
1148
1149 rbd_assert(end > offset);
1150 rbd_assert(end - offset <= (u64)SIZE_MAX);
1151 while (offset < end) {
1152 size_t page_offset;
1153 size_t length;
1154 unsigned long flags;
1155 void *kaddr;
1156
491205a8
GU
1157 page_offset = offset & ~PAGE_MASK;
1158 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
b9434c5b
AE
1159 local_irq_save(flags);
1160 kaddr = kmap_atomic(*page);
1161 memset(kaddr + page_offset, 0, length);
e2156054 1162 flush_dcache_page(*page);
b9434c5b
AE
1163 kunmap_atomic(kaddr);
1164 local_irq_restore(flags);
1165
1166 offset += length;
1167 page++;
1168 }
1169}
1170
602adf40 1171/*
f7760dad
AE
1172 * Clone a portion of a bio, starting at the given byte offset
1173 * and continuing for the number of bytes indicated.
602adf40 1174 */
f7760dad
AE
1175static struct bio *bio_clone_range(struct bio *bio_src,
1176 unsigned int offset,
1177 unsigned int len,
1178 gfp_t gfpmask)
602adf40 1179{
f7760dad
AE
1180 struct bio_vec *bv;
1181 unsigned int resid;
1182 unsigned short idx;
1183 unsigned int voff;
1184 unsigned short end_idx;
1185 unsigned short vcnt;
1186 struct bio *bio;
1187
1188 /* Handle the easy case for the caller */
1189
1190 if (!offset && len == bio_src->bi_size)
1191 return bio_clone(bio_src, gfpmask);
1192
1193 if (WARN_ON_ONCE(!len))
1194 return NULL;
1195 if (WARN_ON_ONCE(len > bio_src->bi_size))
1196 return NULL;
1197 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1198 return NULL;
1199
1200 /* Find first affected segment... */
1201
1202 resid = offset;
d74c6d51 1203 bio_for_each_segment(bv, bio_src, idx) {
f7760dad
AE
1204 if (resid < bv->bv_len)
1205 break;
1206 resid -= bv->bv_len;
602adf40 1207 }
f7760dad 1208 voff = resid;
602adf40 1209
f7760dad 1210 /* ...and the last affected segment */
602adf40 1211
f7760dad
AE
1212 resid += len;
1213 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1214 if (resid <= bv->bv_len)
1215 break;
1216 resid -= bv->bv_len;
1217 }
1218 vcnt = end_idx - idx + 1;
1219
1220 /* Build the clone */
1221
1222 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1223 if (!bio)
1224 return NULL; /* ENOMEM */
602adf40 1225
f7760dad
AE
1226 bio->bi_bdev = bio_src->bi_bdev;
1227 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1228 bio->bi_rw = bio_src->bi_rw;
1229 bio->bi_flags |= 1 << BIO_CLONED;
1230
1231 /*
1232 * Copy over our part of the bio_vec, then update the first
1233 * and last (or only) entries.
1234 */
1235 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1236 vcnt * sizeof (struct bio_vec));
1237 bio->bi_io_vec[0].bv_offset += voff;
1238 if (vcnt > 1) {
1239 bio->bi_io_vec[0].bv_len -= voff;
1240 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1241 } else {
1242 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
1243 }
1244
f7760dad
AE
1245 bio->bi_vcnt = vcnt;
1246 bio->bi_size = len;
1247 bio->bi_idx = 0;
1248
1249 return bio;
1250}
1251
1252/*
1253 * Clone a portion of a bio chain, starting at the given byte offset
1254 * into the first bio in the source chain and continuing for the
1255 * number of bytes indicated. The result is another bio chain of
1256 * exactly the given length, or a null pointer on error.
1257 *
1258 * The bio_src and offset parameters are both in-out. On entry they
1259 * refer to the first source bio and the offset into that bio where
1260 * the start of data to be cloned is located.
1261 *
1262 * On return, bio_src is updated to refer to the bio in the source
1263 * chain that contains first un-cloned byte, and *offset will
1264 * contain the offset of that byte within that bio.
1265 */
1266static struct bio *bio_chain_clone_range(struct bio **bio_src,
1267 unsigned int *offset,
1268 unsigned int len,
1269 gfp_t gfpmask)
1270{
1271 struct bio *bi = *bio_src;
1272 unsigned int off = *offset;
1273 struct bio *chain = NULL;
1274 struct bio **end;
1275
1276 /* Build up a chain of clone bios up to the limit */
1277
1278 if (!bi || off >= bi->bi_size || !len)
1279 return NULL; /* Nothing to clone */
602adf40 1280
f7760dad
AE
1281 end = &chain;
1282 while (len) {
1283 unsigned int bi_size;
1284 struct bio *bio;
1285
f5400b7a
AE
1286 if (!bi) {
1287 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1288 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1289 }
f7760dad
AE
1290 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1291 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1292 if (!bio)
1293 goto out_err; /* ENOMEM */
1294
1295 *end = bio;
1296 end = &bio->bi_next;
602adf40 1297
f7760dad
AE
1298 off += bi_size;
1299 if (off == bi->bi_size) {
1300 bi = bi->bi_next;
1301 off = 0;
1302 }
1303 len -= bi_size;
1304 }
1305 *bio_src = bi;
1306 *offset = off;
1307
1308 return chain;
1309out_err:
1310 bio_chain_put(chain);
602adf40 1311
602adf40
YS
1312 return NULL;
1313}
1314
926f9b3f
AE
1315/*
1316 * The default/initial value for all object request flags is 0. For
1317 * each flag, once its value is set to 1 it is never reset to 0
1318 * again.
1319 */
57acbaa7 1320static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1321{
57acbaa7 1322 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1323 struct rbd_device *rbd_dev;
1324
57acbaa7
AE
1325 rbd_dev = obj_request->img_request->rbd_dev;
1326 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
926f9b3f
AE
1327 obj_request);
1328 }
1329}
1330
57acbaa7 1331static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1332{
1333 smp_mb();
57acbaa7 1334 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1335}
1336
57acbaa7 1337static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1338{
57acbaa7
AE
1339 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1340 struct rbd_device *rbd_dev = NULL;
6365d33a 1341
57acbaa7
AE
1342 if (obj_request_img_data_test(obj_request))
1343 rbd_dev = obj_request->img_request->rbd_dev;
1344 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
6365d33a
AE
1345 obj_request);
1346 }
1347}
1348
57acbaa7 1349static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1350{
1351 smp_mb();
57acbaa7 1352 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1353}
1354
5679c59f
AE
1355/*
1356 * This sets the KNOWN flag after (possibly) setting the EXISTS
1357 * flag. The latter is set based on the "exists" value provided.
1358 *
1359 * Note that for our purposes once an object exists it never goes
1360 * away again. It's possible that the response from two existence
1361 * checks are separated by the creation of the target object, and
1362 * the first ("doesn't exist") response arrives *after* the second
1363 * ("does exist"). In that case we ignore the second one.
1364 */
1365static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1366 bool exists)
1367{
1368 if (exists)
1369 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1370 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1371 smp_mb();
1372}
1373
1374static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1375{
1376 smp_mb();
1377 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1378}
1379
1380static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1381{
1382 smp_mb();
1383 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1384}
1385
bf0d5f50
AE
1386static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1387{
37206ee5
AE
1388 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1389 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1390 kref_get(&obj_request->kref);
1391}
1392
1393static void rbd_obj_request_destroy(struct kref *kref);
1394static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1395{
1396 rbd_assert(obj_request != NULL);
37206ee5
AE
1397 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1398 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1399 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1400}
1401
e93f3152
AE
1402static bool img_request_child_test(struct rbd_img_request *img_request);
1403static void rbd_parent_request_destroy(struct kref *kref);
bf0d5f50
AE
1404static void rbd_img_request_destroy(struct kref *kref);
1405static void rbd_img_request_put(struct rbd_img_request *img_request)
1406{
1407 rbd_assert(img_request != NULL);
37206ee5
AE
1408 dout("%s: img %p (was %d)\n", __func__, img_request,
1409 atomic_read(&img_request->kref.refcount));
e93f3152
AE
1410 if (img_request_child_test(img_request))
1411 kref_put(&img_request->kref, rbd_parent_request_destroy);
1412 else
1413 kref_put(&img_request->kref, rbd_img_request_destroy);
bf0d5f50
AE
1414}
1415
1416static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1417 struct rbd_obj_request *obj_request)
1418{
25dcf954
AE
1419 rbd_assert(obj_request->img_request == NULL);
1420
b155e86c 1421 /* Image request now owns object's original reference */
bf0d5f50 1422 obj_request->img_request = img_request;
25dcf954 1423 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1424 rbd_assert(!obj_request_img_data_test(obj_request));
1425 obj_request_img_data_set(obj_request);
bf0d5f50 1426 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1427 img_request->obj_request_count++;
1428 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1429 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1430 obj_request->which);
bf0d5f50
AE
1431}
1432
1433static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1434 struct rbd_obj_request *obj_request)
1435{
1436 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1437
37206ee5
AE
1438 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1439 obj_request->which);
bf0d5f50 1440 list_del(&obj_request->links);
25dcf954
AE
1441 rbd_assert(img_request->obj_request_count > 0);
1442 img_request->obj_request_count--;
1443 rbd_assert(obj_request->which == img_request->obj_request_count);
1444 obj_request->which = BAD_WHICH;
6365d33a 1445 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1446 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1447 obj_request->img_request = NULL;
25dcf954 1448 obj_request->callback = NULL;
bf0d5f50
AE
1449 rbd_obj_request_put(obj_request);
1450}
1451
1452static bool obj_request_type_valid(enum obj_request_type type)
1453{
1454 switch (type) {
9969ebc5 1455 case OBJ_REQUEST_NODATA:
bf0d5f50 1456 case OBJ_REQUEST_BIO:
788e2df3 1457 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1458 return true;
1459 default:
1460 return false;
1461 }
1462}
1463
bf0d5f50
AE
1464static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1465 struct rbd_obj_request *obj_request)
1466{
37206ee5
AE
1467 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1468
bf0d5f50
AE
1469 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1470}
1471
1472static void rbd_img_request_complete(struct rbd_img_request *img_request)
1473{
55f27e09 1474
37206ee5 1475 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1476
1477 /*
1478 * If no error occurred, compute the aggregate transfer
1479 * count for the image request. We could instead use
1480 * atomic64_cmpxchg() to update it as each object request
1481 * completes; not clear which way is better off hand.
1482 */
1483 if (!img_request->result) {
1484 struct rbd_obj_request *obj_request;
1485 u64 xferred = 0;
1486
1487 for_each_obj_request(img_request, obj_request)
1488 xferred += obj_request->xferred;
1489 img_request->xferred = xferred;
1490 }
1491
bf0d5f50
AE
1492 if (img_request->callback)
1493 img_request->callback(img_request);
1494 else
1495 rbd_img_request_put(img_request);
1496}
1497
788e2df3
AE
1498/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1499
1500static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1501{
37206ee5
AE
1502 dout("%s: obj %p\n", __func__, obj_request);
1503
788e2df3
AE
1504 return wait_for_completion_interruptible(&obj_request->completion);
1505}
1506
0c425248
AE
1507/*
1508 * The default/initial value for all image request flags is 0. Each
1509 * is conditionally set to 1 at image request initialization time
1510 * and currently never change thereafter.
1511 */
1512static void img_request_write_set(struct rbd_img_request *img_request)
1513{
1514 set_bit(IMG_REQ_WRITE, &img_request->flags);
1515 smp_mb();
1516}
1517
1518static bool img_request_write_test(struct rbd_img_request *img_request)
1519{
1520 smp_mb();
1521 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1522}
1523
9849e986
AE
1524static void img_request_child_set(struct rbd_img_request *img_request)
1525{
1526 set_bit(IMG_REQ_CHILD, &img_request->flags);
1527 smp_mb();
1528}
1529
e93f3152
AE
1530static void img_request_child_clear(struct rbd_img_request *img_request)
1531{
1532 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1533 smp_mb();
1534}
1535
9849e986
AE
1536static bool img_request_child_test(struct rbd_img_request *img_request)
1537{
1538 smp_mb();
1539 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1540}
1541
d0b2e944
AE
1542static void img_request_layered_set(struct rbd_img_request *img_request)
1543{
1544 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1545 smp_mb();
1546}
1547
a2acd00e
AE
1548static void img_request_layered_clear(struct rbd_img_request *img_request)
1549{
1550 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1551 smp_mb();
1552}
1553
d0b2e944
AE
1554static bool img_request_layered_test(struct rbd_img_request *img_request)
1555{
1556 smp_mb();
1557 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1558}
1559
6e2a4505
AE
1560static void
1561rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1562{
b9434c5b
AE
1563 u64 xferred = obj_request->xferred;
1564 u64 length = obj_request->length;
1565
6e2a4505
AE
1566 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1567 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1568 xferred, length);
6e2a4505
AE
1569 /*
1570 * ENOENT means a hole in the image. We zero-fill the
1571 * entire length of the request. A short read also implies
1572 * zero-fill to the end of the request. Either way we
1573 * update the xferred count to indicate the whole request
1574 * was satisfied.
1575 */
b9434c5b 1576 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1577 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1578 if (obj_request->type == OBJ_REQUEST_BIO)
1579 zero_bio_chain(obj_request->bio_list, 0);
1580 else
1581 zero_pages(obj_request->pages, 0, length);
6e2a4505 1582 obj_request->result = 0;
b9434c5b
AE
1583 obj_request->xferred = length;
1584 } else if (xferred < length && !obj_request->result) {
1585 if (obj_request->type == OBJ_REQUEST_BIO)
1586 zero_bio_chain(obj_request->bio_list, xferred);
1587 else
1588 zero_pages(obj_request->pages, xferred, length);
1589 obj_request->xferred = length;
6e2a4505
AE
1590 }
1591 obj_request_done_set(obj_request);
1592}
1593
bf0d5f50
AE
1594static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1595{
37206ee5
AE
1596 dout("%s: obj %p cb %p\n", __func__, obj_request,
1597 obj_request->callback);
bf0d5f50
AE
1598 if (obj_request->callback)
1599 obj_request->callback(obj_request);
788e2df3
AE
1600 else
1601 complete_all(&obj_request->completion);
bf0d5f50
AE
1602}
1603
c47f9371 1604static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1605{
1606 dout("%s: obj %p\n", __func__, obj_request);
1607 obj_request_done_set(obj_request);
1608}
1609
c47f9371 1610static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1611{
57acbaa7 1612 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1613 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1614 bool layered = false;
1615
1616 if (obj_request_img_data_test(obj_request)) {
1617 img_request = obj_request->img_request;
1618 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1619 rbd_dev = img_request->rbd_dev;
57acbaa7 1620 }
8b3e1a56
AE
1621
1622 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1623 obj_request, img_request, obj_request->result,
1624 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1625 if (layered && obj_request->result == -ENOENT &&
1626 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1627 rbd_img_parent_read(obj_request);
1628 else if (img_request)
6e2a4505
AE
1629 rbd_img_obj_request_read_callback(obj_request);
1630 else
1631 obj_request_done_set(obj_request);
bf0d5f50
AE
1632}
1633
c47f9371 1634static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1635{
1b83bef2
SW
1636 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1637 obj_request->result, obj_request->length);
1638 /*
8b3e1a56
AE
1639 * There is no such thing as a successful short write. Set
1640 * it to our originally-requested length.
1b83bef2
SW
1641 */
1642 obj_request->xferred = obj_request->length;
07741308 1643 obj_request_done_set(obj_request);
bf0d5f50
AE
1644}
1645
fbfab539
AE
1646/*
1647 * For a simple stat call there's nothing to do. We'll do more if
1648 * this is part of a write sequence for a layered image.
1649 */
c47f9371 1650static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1651{
37206ee5 1652 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1653 obj_request_done_set(obj_request);
1654}
1655
bf0d5f50
AE
1656static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1657 struct ceph_msg *msg)
1658{
1659 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1660 u16 opcode;
1661
37206ee5 1662 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50 1663 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1664 if (obj_request_img_data_test(obj_request)) {
1665 rbd_assert(obj_request->img_request);
1666 rbd_assert(obj_request->which != BAD_WHICH);
1667 } else {
1668 rbd_assert(obj_request->which == BAD_WHICH);
1669 }
bf0d5f50 1670
1b83bef2
SW
1671 if (osd_req->r_result < 0)
1672 obj_request->result = osd_req->r_result;
bf0d5f50 1673
0eefd470 1674 BUG_ON(osd_req->r_num_ops > 2);
bf0d5f50 1675
c47f9371
AE
1676 /*
1677 * We support a 64-bit length, but ultimately it has to be
1678 * passed to blk_end_request(), which takes an unsigned int.
1679 */
1b83bef2 1680 obj_request->xferred = osd_req->r_reply_op_len[0];
8b3e1a56 1681 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
79528734 1682 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1683 switch (opcode) {
1684 case CEPH_OSD_OP_READ:
c47f9371 1685 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1686 break;
1687 case CEPH_OSD_OP_WRITE:
c47f9371 1688 rbd_osd_write_callback(obj_request);
bf0d5f50 1689 break;
fbfab539 1690 case CEPH_OSD_OP_STAT:
c47f9371 1691 rbd_osd_stat_callback(obj_request);
fbfab539 1692 break;
36be9a76 1693 case CEPH_OSD_OP_CALL:
b8d70035 1694 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1695 case CEPH_OSD_OP_WATCH:
c47f9371 1696 rbd_osd_trivial_callback(obj_request);
9969ebc5 1697 break;
bf0d5f50
AE
1698 default:
1699 rbd_warn(NULL, "%s: unsupported op %hu\n",
1700 obj_request->object_name, (unsigned short) opcode);
1701 break;
1702 }
1703
07741308 1704 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1705 rbd_obj_request_complete(obj_request);
1706}
1707
9d4df01f 1708static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3
AE
1709{
1710 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1711 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1712 u64 snap_id;
430c28c3 1713
8c042b0d 1714 rbd_assert(osd_req != NULL);
430c28c3 1715
9d4df01f 1716 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
8c042b0d 1717 ceph_osdc_build_request(osd_req, obj_request->offset,
9d4df01f
AE
1718 NULL, snap_id, NULL);
1719}
1720
1721static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1722{
1723 struct rbd_img_request *img_request = obj_request->img_request;
1724 struct ceph_osd_request *osd_req = obj_request->osd_req;
1725 struct ceph_snap_context *snapc;
1726 struct timespec mtime = CURRENT_TIME;
1727
1728 rbd_assert(osd_req != NULL);
1729
1730 snapc = img_request ? img_request->snapc : NULL;
1731 ceph_osdc_build_request(osd_req, obj_request->offset,
1732 snapc, CEPH_NOSNAP, &mtime);
430c28c3
AE
1733}
1734
bf0d5f50
AE
1735static struct ceph_osd_request *rbd_osd_req_create(
1736 struct rbd_device *rbd_dev,
1737 bool write_request,
430c28c3 1738 struct rbd_obj_request *obj_request)
bf0d5f50 1739{
bf0d5f50
AE
1740 struct ceph_snap_context *snapc = NULL;
1741 struct ceph_osd_client *osdc;
1742 struct ceph_osd_request *osd_req;
bf0d5f50 1743
6365d33a
AE
1744 if (obj_request_img_data_test(obj_request)) {
1745 struct rbd_img_request *img_request = obj_request->img_request;
1746
0c425248
AE
1747 rbd_assert(write_request ==
1748 img_request_write_test(img_request));
1749 if (write_request)
bf0d5f50 1750 snapc = img_request->snapc;
bf0d5f50
AE
1751 }
1752
1753 /* Allocate and initialize the request, for the single op */
1754
1755 osdc = &rbd_dev->rbd_client->client->osdc;
1756 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1757 if (!osd_req)
1758 return NULL; /* ENOMEM */
bf0d5f50 1759
430c28c3 1760 if (write_request)
bf0d5f50 1761 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1762 else
bf0d5f50 1763 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1764
1765 osd_req->r_callback = rbd_osd_req_callback;
1766 osd_req->r_priv = obj_request;
1767
1768 osd_req->r_oid_len = strlen(obj_request->object_name);
1769 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1770 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1771
1772 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1773
bf0d5f50
AE
1774 return osd_req;
1775}
1776
0eefd470
AE
1777/*
1778 * Create a copyup osd request based on the information in the
1779 * object request supplied. A copyup request has two osd ops,
1780 * a copyup method call, and a "normal" write request.
1781 */
1782static struct ceph_osd_request *
1783rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1784{
1785 struct rbd_img_request *img_request;
1786 struct ceph_snap_context *snapc;
1787 struct rbd_device *rbd_dev;
1788 struct ceph_osd_client *osdc;
1789 struct ceph_osd_request *osd_req;
1790
1791 rbd_assert(obj_request_img_data_test(obj_request));
1792 img_request = obj_request->img_request;
1793 rbd_assert(img_request);
1794 rbd_assert(img_request_write_test(img_request));
1795
1796 /* Allocate and initialize the request, for the two ops */
1797
1798 snapc = img_request->snapc;
1799 rbd_dev = img_request->rbd_dev;
1800 osdc = &rbd_dev->rbd_client->client->osdc;
1801 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1802 if (!osd_req)
1803 return NULL; /* ENOMEM */
1804
1805 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1806 osd_req->r_callback = rbd_osd_req_callback;
1807 osd_req->r_priv = obj_request;
1808
1809 osd_req->r_oid_len = strlen(obj_request->object_name);
1810 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1811 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1812
1813 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1814
1815 return osd_req;
1816}
1817
1818
bf0d5f50
AE
1819static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1820{
1821 ceph_osdc_put_request(osd_req);
1822}
1823
1824/* object_name is assumed to be a non-null pointer and NUL-terminated */
1825
1826static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1827 u64 offset, u64 length,
1828 enum obj_request_type type)
1829{
1830 struct rbd_obj_request *obj_request;
1831 size_t size;
1832 char *name;
1833
1834 rbd_assert(obj_request_type_valid(type));
1835
1836 size = strlen(object_name) + 1;
f907ad55
AE
1837 name = kmalloc(size, GFP_KERNEL);
1838 if (!name)
bf0d5f50
AE
1839 return NULL;
1840
868311b1 1841 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
f907ad55
AE
1842 if (!obj_request) {
1843 kfree(name);
1844 return NULL;
1845 }
1846
bf0d5f50
AE
1847 obj_request->object_name = memcpy(name, object_name, size);
1848 obj_request->offset = offset;
1849 obj_request->length = length;
926f9b3f 1850 obj_request->flags = 0;
bf0d5f50
AE
1851 obj_request->which = BAD_WHICH;
1852 obj_request->type = type;
1853 INIT_LIST_HEAD(&obj_request->links);
788e2df3 1854 init_completion(&obj_request->completion);
bf0d5f50
AE
1855 kref_init(&obj_request->kref);
1856
37206ee5
AE
1857 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1858 offset, length, (int)type, obj_request);
1859
bf0d5f50
AE
1860 return obj_request;
1861}
1862
1863static void rbd_obj_request_destroy(struct kref *kref)
1864{
1865 struct rbd_obj_request *obj_request;
1866
1867 obj_request = container_of(kref, struct rbd_obj_request, kref);
1868
37206ee5
AE
1869 dout("%s: obj %p\n", __func__, obj_request);
1870
bf0d5f50
AE
1871 rbd_assert(obj_request->img_request == NULL);
1872 rbd_assert(obj_request->which == BAD_WHICH);
1873
1874 if (obj_request->osd_req)
1875 rbd_osd_req_destroy(obj_request->osd_req);
1876
1877 rbd_assert(obj_request_type_valid(obj_request->type));
1878 switch (obj_request->type) {
9969ebc5
AE
1879 case OBJ_REQUEST_NODATA:
1880 break; /* Nothing to do */
bf0d5f50
AE
1881 case OBJ_REQUEST_BIO:
1882 if (obj_request->bio_list)
1883 bio_chain_put(obj_request->bio_list);
1884 break;
788e2df3
AE
1885 case OBJ_REQUEST_PAGES:
1886 if (obj_request->pages)
1887 ceph_release_page_vector(obj_request->pages,
1888 obj_request->page_count);
1889 break;
bf0d5f50
AE
1890 }
1891
f907ad55 1892 kfree(obj_request->object_name);
868311b1
AE
1893 obj_request->object_name = NULL;
1894 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
1895}
1896
fb65d228
AE
1897/* It's OK to call this for a device with no parent */
1898
1899static void rbd_spec_put(struct rbd_spec *spec);
1900static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1901{
1902 rbd_dev_remove_parent(rbd_dev);
1903 rbd_spec_put(rbd_dev->parent_spec);
1904 rbd_dev->parent_spec = NULL;
1905 rbd_dev->parent_overlap = 0;
1906}
1907
a2acd00e
AE
1908/*
1909 * Parent image reference counting is used to determine when an
1910 * image's parent fields can be safely torn down--after there are no
1911 * more in-flight requests to the parent image. When the last
1912 * reference is dropped, cleaning them up is safe.
1913 */
1914static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1915{
1916 int counter;
1917
1918 if (!rbd_dev->parent_spec)
1919 return;
1920
1921 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1922 if (counter > 0)
1923 return;
1924
1925 /* Last reference; clean up parent data structures */
1926
1927 if (!counter)
1928 rbd_dev_unparent(rbd_dev);
1929 else
1930 rbd_warn(rbd_dev, "parent reference underflow\n");
1931}
1932
1933/*
1934 * If an image has a non-zero parent overlap, get a reference to its
1935 * parent.
1936 *
392a9dad
AE
1937 * We must get the reference before checking for the overlap to
1938 * coordinate properly with zeroing the parent overlap in
1939 * rbd_dev_v2_parent_info() when an image gets flattened. We
1940 * drop it again if there is no overlap.
1941 *
a2acd00e
AE
1942 * Returns true if the rbd device has a parent with a non-zero
1943 * overlap and a reference for it was successfully taken, or
1944 * false otherwise.
1945 */
1946static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1947{
1948 int counter;
1949
1950 if (!rbd_dev->parent_spec)
1951 return false;
1952
1953 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1954 if (counter > 0 && rbd_dev->parent_overlap)
1955 return true;
1956
1957 /* Image was flattened, but parent is not yet torn down */
1958
1959 if (counter < 0)
1960 rbd_warn(rbd_dev, "parent reference overflow\n");
1961
1962 return false;
1963}
1964
bf0d5f50
AE
1965/*
1966 * Caller is responsible for filling in the list of object requests
1967 * that comprises the image request, and the Linux request pointer
1968 * (if there is one).
1969 */
cc344fa1
AE
1970static struct rbd_img_request *rbd_img_request_create(
1971 struct rbd_device *rbd_dev,
bf0d5f50 1972 u64 offset, u64 length,
e93f3152 1973 bool write_request)
bf0d5f50
AE
1974{
1975 struct rbd_img_request *img_request;
bf0d5f50 1976
1c2a9dfe 1977 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
bf0d5f50
AE
1978 if (!img_request)
1979 return NULL;
1980
1981 if (write_request) {
1982 down_read(&rbd_dev->header_rwsem);
812164f8 1983 ceph_get_snap_context(rbd_dev->header.snapc);
bf0d5f50 1984 up_read(&rbd_dev->header_rwsem);
bf0d5f50
AE
1985 }
1986
1987 img_request->rq = NULL;
1988 img_request->rbd_dev = rbd_dev;
1989 img_request->offset = offset;
1990 img_request->length = length;
0c425248
AE
1991 img_request->flags = 0;
1992 if (write_request) {
1993 img_request_write_set(img_request);
468521c1 1994 img_request->snapc = rbd_dev->header.snapc;
0c425248 1995 } else {
bf0d5f50 1996 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 1997 }
a2acd00e 1998 if (rbd_dev_parent_get(rbd_dev))
d0b2e944 1999 img_request_layered_set(img_request);
bf0d5f50
AE
2000 spin_lock_init(&img_request->completion_lock);
2001 img_request->next_completion = 0;
2002 img_request->callback = NULL;
a5a337d4 2003 img_request->result = 0;
bf0d5f50
AE
2004 img_request->obj_request_count = 0;
2005 INIT_LIST_HEAD(&img_request->obj_requests);
2006 kref_init(&img_request->kref);
2007
37206ee5
AE
2008 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2009 write_request ? "write" : "read", offset, length,
2010 img_request);
2011
bf0d5f50
AE
2012 return img_request;
2013}
2014
2015static void rbd_img_request_destroy(struct kref *kref)
2016{
2017 struct rbd_img_request *img_request;
2018 struct rbd_obj_request *obj_request;
2019 struct rbd_obj_request *next_obj_request;
2020
2021 img_request = container_of(kref, struct rbd_img_request, kref);
2022
37206ee5
AE
2023 dout("%s: img %p\n", __func__, img_request);
2024
bf0d5f50
AE
2025 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2026 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 2027 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 2028
a2acd00e
AE
2029 if (img_request_layered_test(img_request)) {
2030 img_request_layered_clear(img_request);
2031 rbd_dev_parent_put(img_request->rbd_dev);
2032 }
2033
0c425248 2034 if (img_request_write_test(img_request))
812164f8 2035 ceph_put_snap_context(img_request->snapc);
bf0d5f50 2036
1c2a9dfe 2037 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
2038}
2039
e93f3152
AE
2040static struct rbd_img_request *rbd_parent_request_create(
2041 struct rbd_obj_request *obj_request,
2042 u64 img_offset, u64 length)
2043{
2044 struct rbd_img_request *parent_request;
2045 struct rbd_device *rbd_dev;
2046
2047 rbd_assert(obj_request->img_request);
2048 rbd_dev = obj_request->img_request->rbd_dev;
2049
2050 parent_request = rbd_img_request_create(rbd_dev->parent,
2051 img_offset, length, false);
2052 if (!parent_request)
2053 return NULL;
2054
2055 img_request_child_set(parent_request);
2056 rbd_obj_request_get(obj_request);
2057 parent_request->obj_request = obj_request;
2058
2059 return parent_request;
2060}
2061
2062static void rbd_parent_request_destroy(struct kref *kref)
2063{
2064 struct rbd_img_request *parent_request;
2065 struct rbd_obj_request *orig_request;
2066
2067 parent_request = container_of(kref, struct rbd_img_request, kref);
2068 orig_request = parent_request->obj_request;
2069
2070 parent_request->obj_request = NULL;
2071 rbd_obj_request_put(orig_request);
2072 img_request_child_clear(parent_request);
2073
2074 rbd_img_request_destroy(kref);
2075}
2076
1217857f
AE
2077static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2078{
6365d33a 2079 struct rbd_img_request *img_request;
1217857f
AE
2080 unsigned int xferred;
2081 int result;
8b3e1a56 2082 bool more;
1217857f 2083
6365d33a
AE
2084 rbd_assert(obj_request_img_data_test(obj_request));
2085 img_request = obj_request->img_request;
2086
1217857f
AE
2087 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2088 xferred = (unsigned int)obj_request->xferred;
2089 result = obj_request->result;
2090 if (result) {
2091 struct rbd_device *rbd_dev = img_request->rbd_dev;
2092
2093 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
2094 img_request_write_test(img_request) ? "write" : "read",
2095 obj_request->length, obj_request->img_offset,
2096 obj_request->offset);
2097 rbd_warn(rbd_dev, " result %d xferred %x\n",
2098 result, xferred);
2099 if (!img_request->result)
2100 img_request->result = result;
2101 }
2102
f1a4739f
AE
2103 /* Image object requests don't own their page array */
2104
2105 if (obj_request->type == OBJ_REQUEST_PAGES) {
2106 obj_request->pages = NULL;
2107 obj_request->page_count = 0;
2108 }
2109
8b3e1a56
AE
2110 if (img_request_child_test(img_request)) {
2111 rbd_assert(img_request->obj_request != NULL);
2112 more = obj_request->which < img_request->obj_request_count - 1;
2113 } else {
2114 rbd_assert(img_request->rq != NULL);
2115 more = blk_end_request(img_request->rq, result, xferred);
2116 }
2117
2118 return more;
1217857f
AE
2119}
2120
2169238d
AE
2121static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2122{
2123 struct rbd_img_request *img_request;
2124 u32 which = obj_request->which;
2125 bool more = true;
2126
6365d33a 2127 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
2128 img_request = obj_request->img_request;
2129
2130 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2131 rbd_assert(img_request != NULL);
2169238d
AE
2132 rbd_assert(img_request->obj_request_count > 0);
2133 rbd_assert(which != BAD_WHICH);
2134 rbd_assert(which < img_request->obj_request_count);
2135 rbd_assert(which >= img_request->next_completion);
2136
2137 spin_lock_irq(&img_request->completion_lock);
2138 if (which != img_request->next_completion)
2139 goto out;
2140
2141 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
2142 rbd_assert(more);
2143 rbd_assert(which < img_request->obj_request_count);
2144
2145 if (!obj_request_done_test(obj_request))
2146 break;
1217857f 2147 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
2148 which++;
2149 }
2150
2151 rbd_assert(more ^ (which == img_request->obj_request_count));
2152 img_request->next_completion = which;
2153out:
2154 spin_unlock_irq(&img_request->completion_lock);
2155
2156 if (!more)
2157 rbd_img_request_complete(img_request);
2158}
2159
f1a4739f
AE
2160/*
2161 * Split up an image request into one or more object requests, each
2162 * to a different object. The "type" parameter indicates whether
2163 * "data_desc" is the pointer to the head of a list of bio
2164 * structures, or the base of a page array. In either case this
2165 * function assumes data_desc describes memory sufficient to hold
2166 * all data described by the image request.
2167 */
2168static int rbd_img_request_fill(struct rbd_img_request *img_request,
2169 enum obj_request_type type,
2170 void *data_desc)
bf0d5f50
AE
2171{
2172 struct rbd_device *rbd_dev = img_request->rbd_dev;
2173 struct rbd_obj_request *obj_request = NULL;
2174 struct rbd_obj_request *next_obj_request;
0c425248 2175 bool write_request = img_request_write_test(img_request);
f1a4739f
AE
2176 struct bio *bio_list;
2177 unsigned int bio_offset = 0;
2178 struct page **pages;
7da22d29 2179 u64 img_offset;
bf0d5f50
AE
2180 u64 resid;
2181 u16 opcode;
2182
f1a4739f
AE
2183 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2184 (int)type, data_desc);
37206ee5 2185
430c28c3 2186 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
7da22d29 2187 img_offset = img_request->offset;
bf0d5f50 2188 resid = img_request->length;
4dda41d3 2189 rbd_assert(resid > 0);
f1a4739f
AE
2190
2191 if (type == OBJ_REQUEST_BIO) {
2192 bio_list = data_desc;
2193 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2194 } else {
2195 rbd_assert(type == OBJ_REQUEST_PAGES);
2196 pages = data_desc;
2197 }
2198
bf0d5f50 2199 while (resid) {
2fa12320 2200 struct ceph_osd_request *osd_req;
bf0d5f50 2201 const char *object_name;
bf0d5f50
AE
2202 u64 offset;
2203 u64 length;
2204
7da22d29 2205 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
2206 if (!object_name)
2207 goto out_unwind;
7da22d29
AE
2208 offset = rbd_segment_offset(rbd_dev, img_offset);
2209 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2210 obj_request = rbd_obj_request_create(object_name,
f1a4739f 2211 offset, length, type);
78c2a44a
AE
2212 /* object request has its own copy of the object name */
2213 rbd_segment_name_free(object_name);
bf0d5f50
AE
2214 if (!obj_request)
2215 goto out_unwind;
2216
f1a4739f
AE
2217 if (type == OBJ_REQUEST_BIO) {
2218 unsigned int clone_size;
2219
2220 rbd_assert(length <= (u64)UINT_MAX);
2221 clone_size = (unsigned int)length;
2222 obj_request->bio_list =
2223 bio_chain_clone_range(&bio_list,
2224 &bio_offset,
2225 clone_size,
2226 GFP_ATOMIC);
2227 if (!obj_request->bio_list)
2228 goto out_partial;
2229 } else {
2230 unsigned int page_count;
2231
2232 obj_request->pages = pages;
2233 page_count = (u32)calc_pages_for(offset, length);
2234 obj_request->page_count = page_count;
2235 if ((offset + length) & ~PAGE_MASK)
2236 page_count--; /* more on last page */
2237 pages += page_count;
2238 }
bf0d5f50 2239
2fa12320
AE
2240 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2241 obj_request);
2242 if (!osd_req)
bf0d5f50 2243 goto out_partial;
2fa12320 2244 obj_request->osd_req = osd_req;
2169238d 2245 obj_request->callback = rbd_img_obj_callback;
430c28c3 2246
2fa12320
AE
2247 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2248 0, 0);
f1a4739f
AE
2249 if (type == OBJ_REQUEST_BIO)
2250 osd_req_op_extent_osd_data_bio(osd_req, 0,
2251 obj_request->bio_list, length);
2252 else
2253 osd_req_op_extent_osd_data_pages(osd_req, 0,
2254 obj_request->pages, length,
2255 offset & ~PAGE_MASK, false, false);
9d4df01f 2256
d2d1f17a
JD
2257 /*
2258 * set obj_request->img_request before formatting
2259 * the osd_request so that it gets the right snapc
2260 */
2261 rbd_img_obj_request_add(img_request, obj_request);
9d4df01f
AE
2262 if (write_request)
2263 rbd_osd_req_format_write(obj_request);
2264 else
2265 rbd_osd_req_format_read(obj_request);
430c28c3 2266
7da22d29 2267 obj_request->img_offset = img_offset;
bf0d5f50 2268
7da22d29 2269 img_offset += length;
bf0d5f50
AE
2270 resid -= length;
2271 }
2272
2273 return 0;
2274
2275out_partial:
2276 rbd_obj_request_put(obj_request);
2277out_unwind:
2278 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2279 rbd_obj_request_put(obj_request);
2280
2281 return -ENOMEM;
2282}
2283
0eefd470
AE
2284static void
2285rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2286{
2287 struct rbd_img_request *img_request;
2288 struct rbd_device *rbd_dev;
ebda6408 2289 struct page **pages;
0eefd470
AE
2290 u32 page_count;
2291
2292 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2293 rbd_assert(obj_request_img_data_test(obj_request));
2294 img_request = obj_request->img_request;
2295 rbd_assert(img_request);
2296
2297 rbd_dev = img_request->rbd_dev;
2298 rbd_assert(rbd_dev);
0eefd470 2299
ebda6408
AE
2300 pages = obj_request->copyup_pages;
2301 rbd_assert(pages != NULL);
0eefd470 2302 obj_request->copyup_pages = NULL;
ebda6408
AE
2303 page_count = obj_request->copyup_page_count;
2304 rbd_assert(page_count);
2305 obj_request->copyup_page_count = 0;
2306 ceph_release_page_vector(pages, page_count);
0eefd470
AE
2307
2308 /*
2309 * We want the transfer count to reflect the size of the
2310 * original write request. There is no such thing as a
2311 * successful short write, so if the request was successful
2312 * we can just set it to the originally-requested length.
2313 */
2314 if (!obj_request->result)
2315 obj_request->xferred = obj_request->length;
2316
2317 /* Finish up with the normal image object callback */
2318
2319 rbd_img_obj_callback(obj_request);
2320}
2321
3d7efd18
AE
2322static void
2323rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2324{
2325 struct rbd_obj_request *orig_request;
0eefd470
AE
2326 struct ceph_osd_request *osd_req;
2327 struct ceph_osd_client *osdc;
2328 struct rbd_device *rbd_dev;
3d7efd18 2329 struct page **pages;
ebda6408 2330 u32 page_count;
bbea1c1a 2331 int img_result;
ebda6408 2332 u64 parent_length;
b91f09f1
AE
2333 u64 offset;
2334 u64 length;
3d7efd18
AE
2335
2336 rbd_assert(img_request_child_test(img_request));
2337
2338 /* First get what we need from the image request */
2339
2340 pages = img_request->copyup_pages;
2341 rbd_assert(pages != NULL);
2342 img_request->copyup_pages = NULL;
ebda6408
AE
2343 page_count = img_request->copyup_page_count;
2344 rbd_assert(page_count);
2345 img_request->copyup_page_count = 0;
3d7efd18
AE
2346
2347 orig_request = img_request->obj_request;
2348 rbd_assert(orig_request != NULL);
b91f09f1 2349 rbd_assert(obj_request_type_valid(orig_request->type));
bbea1c1a 2350 img_result = img_request->result;
ebda6408
AE
2351 parent_length = img_request->length;
2352 rbd_assert(parent_length == img_request->xferred);
91c6febb 2353 rbd_img_request_put(img_request);
3d7efd18 2354
91c6febb
AE
2355 rbd_assert(orig_request->img_request);
2356 rbd_dev = orig_request->img_request->rbd_dev;
0eefd470 2357 rbd_assert(rbd_dev);
0eefd470 2358
bbea1c1a
AE
2359 /*
2360 * If the overlap has become 0 (most likely because the
2361 * image has been flattened) we need to free the pages
2362 * and re-submit the original write request.
2363 */
2364 if (!rbd_dev->parent_overlap) {
2365 struct ceph_osd_client *osdc;
3d7efd18 2366
bbea1c1a
AE
2367 ceph_release_page_vector(pages, page_count);
2368 osdc = &rbd_dev->rbd_client->client->osdc;
2369 img_result = rbd_obj_request_submit(osdc, orig_request);
2370 if (!img_result)
2371 return;
2372 }
0eefd470 2373
bbea1c1a 2374 if (img_result)
0eefd470 2375 goto out_err;
0eefd470 2376
8785b1d4
AE
2377 /*
2378 * The original osd request is of no use to use any more.
2379 * We need a new one that can hold the two ops in a copyup
2380 * request. Allocate the new copyup osd request for the
2381 * original request, and release the old one.
2382 */
bbea1c1a 2383 img_result = -ENOMEM;
0eefd470
AE
2384 osd_req = rbd_osd_req_create_copyup(orig_request);
2385 if (!osd_req)
2386 goto out_err;
8785b1d4 2387 rbd_osd_req_destroy(orig_request->osd_req);
0eefd470
AE
2388 orig_request->osd_req = osd_req;
2389 orig_request->copyup_pages = pages;
ebda6408 2390 orig_request->copyup_page_count = page_count;
3d7efd18 2391
0eefd470 2392 /* Initialize the copyup op */
3d7efd18 2393
0eefd470 2394 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
ebda6408 2395 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
0eefd470 2396 false, false);
3d7efd18 2397
0eefd470
AE
2398 /* Then the original write request op */
2399
b91f09f1
AE
2400 offset = orig_request->offset;
2401 length = orig_request->length;
0eefd470 2402 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
b91f09f1
AE
2403 offset, length, 0, 0);
2404 if (orig_request->type == OBJ_REQUEST_BIO)
2405 osd_req_op_extent_osd_data_bio(osd_req, 1,
2406 orig_request->bio_list, length);
2407 else
2408 osd_req_op_extent_osd_data_pages(osd_req, 1,
2409 orig_request->pages, length,
2410 offset & ~PAGE_MASK, false, false);
0eefd470
AE
2411
2412 rbd_osd_req_format_write(orig_request);
2413
2414 /* All set, send it off. */
2415
2416 orig_request->callback = rbd_img_obj_copyup_callback;
2417 osdc = &rbd_dev->rbd_client->client->osdc;
bbea1c1a
AE
2418 img_result = rbd_obj_request_submit(osdc, orig_request);
2419 if (!img_result)
0eefd470
AE
2420 return;
2421out_err:
2422 /* Record the error code and complete the request */
2423
bbea1c1a 2424 orig_request->result = img_result;
0eefd470
AE
2425 orig_request->xferred = 0;
2426 obj_request_done_set(orig_request);
2427 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2428}
2429
2430/*
2431 * Read from the parent image the range of data that covers the
2432 * entire target of the given object request. This is used for
2433 * satisfying a layered image write request when the target of an
2434 * object request from the image request does not exist.
2435 *
2436 * A page array big enough to hold the returned data is allocated
2437 * and supplied to rbd_img_request_fill() as the "data descriptor."
2438 * When the read completes, this page array will be transferred to
2439 * the original object request for the copyup operation.
2440 *
2441 * If an error occurs, record it as the result of the original
2442 * object request and mark it done so it gets completed.
2443 */
2444static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2445{
2446 struct rbd_img_request *img_request = NULL;
2447 struct rbd_img_request *parent_request = NULL;
2448 struct rbd_device *rbd_dev;
2449 u64 img_offset;
2450 u64 length;
2451 struct page **pages = NULL;
2452 u32 page_count;
2453 int result;
2454
2455 rbd_assert(obj_request_img_data_test(obj_request));
b91f09f1 2456 rbd_assert(obj_request_type_valid(obj_request->type));
3d7efd18
AE
2457
2458 img_request = obj_request->img_request;
2459 rbd_assert(img_request != NULL);
2460 rbd_dev = img_request->rbd_dev;
2461 rbd_assert(rbd_dev->parent != NULL);
2462
2463 /*
2464 * Determine the byte range covered by the object in the
2465 * child image to which the original request was to be sent.
2466 */
2467 img_offset = obj_request->img_offset - obj_request->offset;
2468 length = (u64)1 << rbd_dev->header.obj_order;
2469
a9e8ba2c
AE
2470 /*
2471 * There is no defined parent data beyond the parent
2472 * overlap, so limit what we read at that boundary if
2473 * necessary.
2474 */
2475 if (img_offset + length > rbd_dev->parent_overlap) {
2476 rbd_assert(img_offset < rbd_dev->parent_overlap);
2477 length = rbd_dev->parent_overlap - img_offset;
2478 }
2479
3d7efd18
AE
2480 /*
2481 * Allocate a page array big enough to receive the data read
2482 * from the parent.
2483 */
2484 page_count = (u32)calc_pages_for(0, length);
2485 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2486 if (IS_ERR(pages)) {
2487 result = PTR_ERR(pages);
2488 pages = NULL;
2489 goto out_err;
2490 }
2491
2492 result = -ENOMEM;
e93f3152
AE
2493 parent_request = rbd_parent_request_create(obj_request,
2494 img_offset, length);
3d7efd18
AE
2495 if (!parent_request)
2496 goto out_err;
3d7efd18
AE
2497
2498 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2499 if (result)
2500 goto out_err;
2501 parent_request->copyup_pages = pages;
ebda6408 2502 parent_request->copyup_page_count = page_count;
3d7efd18
AE
2503
2504 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2505 result = rbd_img_request_submit(parent_request);
2506 if (!result)
2507 return 0;
2508
2509 parent_request->copyup_pages = NULL;
ebda6408 2510 parent_request->copyup_page_count = 0;
3d7efd18
AE
2511 parent_request->obj_request = NULL;
2512 rbd_obj_request_put(obj_request);
2513out_err:
2514 if (pages)
2515 ceph_release_page_vector(pages, page_count);
2516 if (parent_request)
2517 rbd_img_request_put(parent_request);
2518 obj_request->result = result;
2519 obj_request->xferred = 0;
2520 obj_request_done_set(obj_request);
2521
2522 return result;
2523}
2524
c5b5ef6c
AE
2525static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2526{
c5b5ef6c 2527 struct rbd_obj_request *orig_request;
638f5abe 2528 struct rbd_device *rbd_dev;
c5b5ef6c
AE
2529 int result;
2530
2531 rbd_assert(!obj_request_img_data_test(obj_request));
2532
2533 /*
2534 * All we need from the object request is the original
2535 * request and the result of the STAT op. Grab those, then
2536 * we're done with the request.
2537 */
2538 orig_request = obj_request->obj_request;
2539 obj_request->obj_request = NULL;
912c317d 2540 rbd_obj_request_put(orig_request);
c5b5ef6c
AE
2541 rbd_assert(orig_request);
2542 rbd_assert(orig_request->img_request);
2543
2544 result = obj_request->result;
2545 obj_request->result = 0;
2546
2547 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2548 obj_request, orig_request, result,
2549 obj_request->xferred, obj_request->length);
2550 rbd_obj_request_put(obj_request);
2551
638f5abe
AE
2552 /*
2553 * If the overlap has become 0 (most likely because the
2554 * image has been flattened) we need to free the pages
2555 * and re-submit the original write request.
2556 */
2557 rbd_dev = orig_request->img_request->rbd_dev;
2558 if (!rbd_dev->parent_overlap) {
2559 struct ceph_osd_client *osdc;
2560
638f5abe
AE
2561 osdc = &rbd_dev->rbd_client->client->osdc;
2562 result = rbd_obj_request_submit(osdc, orig_request);
2563 if (!result)
2564 return;
2565 }
c5b5ef6c
AE
2566
2567 /*
2568 * Our only purpose here is to determine whether the object
2569 * exists, and we don't want to treat the non-existence as
2570 * an error. If something else comes back, transfer the
2571 * error to the original request and complete it now.
2572 */
2573 if (!result) {
2574 obj_request_existence_set(orig_request, true);
2575 } else if (result == -ENOENT) {
2576 obj_request_existence_set(orig_request, false);
2577 } else if (result) {
2578 orig_request->result = result;
3d7efd18 2579 goto out;
c5b5ef6c
AE
2580 }
2581
2582 /*
2583 * Resubmit the original request now that we have recorded
2584 * whether the target object exists.
2585 */
b454e36d 2586 orig_request->result = rbd_img_obj_request_submit(orig_request);
3d7efd18 2587out:
c5b5ef6c
AE
2588 if (orig_request->result)
2589 rbd_obj_request_complete(orig_request);
c5b5ef6c
AE
2590}
2591
2592static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2593{
2594 struct rbd_obj_request *stat_request;
2595 struct rbd_device *rbd_dev;
2596 struct ceph_osd_client *osdc;
2597 struct page **pages = NULL;
2598 u32 page_count;
2599 size_t size;
2600 int ret;
2601
2602 /*
2603 * The response data for a STAT call consists of:
2604 * le64 length;
2605 * struct {
2606 * le32 tv_sec;
2607 * le32 tv_nsec;
2608 * } mtime;
2609 */
2610 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2611 page_count = (u32)calc_pages_for(0, size);
2612 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2613 if (IS_ERR(pages))
2614 return PTR_ERR(pages);
2615
2616 ret = -ENOMEM;
2617 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2618 OBJ_REQUEST_PAGES);
2619 if (!stat_request)
2620 goto out;
2621
2622 rbd_obj_request_get(obj_request);
2623 stat_request->obj_request = obj_request;
2624 stat_request->pages = pages;
2625 stat_request->page_count = page_count;
2626
2627 rbd_assert(obj_request->img_request);
2628 rbd_dev = obj_request->img_request->rbd_dev;
2629 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2630 stat_request);
2631 if (!stat_request->osd_req)
2632 goto out;
2633 stat_request->callback = rbd_img_obj_exists_callback;
2634
2635 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2636 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2637 false, false);
9d4df01f 2638 rbd_osd_req_format_read(stat_request);
c5b5ef6c
AE
2639
2640 osdc = &rbd_dev->rbd_client->client->osdc;
2641 ret = rbd_obj_request_submit(osdc, stat_request);
2642out:
2643 if (ret)
2644 rbd_obj_request_put(obj_request);
2645
2646 return ret;
2647}
2648
b454e36d
AE
2649static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2650{
2651 struct rbd_img_request *img_request;
a9e8ba2c 2652 struct rbd_device *rbd_dev;
3d7efd18 2653 bool known;
b454e36d
AE
2654
2655 rbd_assert(obj_request_img_data_test(obj_request));
2656
2657 img_request = obj_request->img_request;
2658 rbd_assert(img_request);
a9e8ba2c 2659 rbd_dev = img_request->rbd_dev;
b454e36d 2660
b454e36d 2661 /*
a9e8ba2c
AE
2662 * Only writes to layered images need special handling.
2663 * Reads and non-layered writes are simple object requests.
2664 * Layered writes that start beyond the end of the overlap
2665 * with the parent have no parent data, so they too are
2666 * simple object requests. Finally, if the target object is
2667 * known to already exist, its parent data has already been
2668 * copied, so a write to the object can also be handled as a
2669 * simple object request.
b454e36d
AE
2670 */
2671 if (!img_request_write_test(img_request) ||
2672 !img_request_layered_test(img_request) ||
a9e8ba2c 2673 rbd_dev->parent_overlap <= obj_request->img_offset ||
3d7efd18
AE
2674 ((known = obj_request_known_test(obj_request)) &&
2675 obj_request_exists_test(obj_request))) {
b454e36d
AE
2676
2677 struct rbd_device *rbd_dev;
2678 struct ceph_osd_client *osdc;
2679
2680 rbd_dev = obj_request->img_request->rbd_dev;
2681 osdc = &rbd_dev->rbd_client->client->osdc;
2682
2683 return rbd_obj_request_submit(osdc, obj_request);
2684 }
2685
2686 /*
3d7efd18
AE
2687 * It's a layered write. The target object might exist but
2688 * we may not know that yet. If we know it doesn't exist,
2689 * start by reading the data for the full target object from
2690 * the parent so we can use it for a copyup to the target.
b454e36d 2691 */
3d7efd18
AE
2692 if (known)
2693 return rbd_img_obj_parent_read_full(obj_request);
2694
2695 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2696
2697 return rbd_img_obj_exists_submit(obj_request);
2698}
2699
bf0d5f50
AE
2700static int rbd_img_request_submit(struct rbd_img_request *img_request)
2701{
bf0d5f50 2702 struct rbd_obj_request *obj_request;
46faeed4 2703 struct rbd_obj_request *next_obj_request;
bf0d5f50 2704
37206ee5 2705 dout("%s: img %p\n", __func__, img_request);
46faeed4 2706 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
2707 int ret;
2708
b454e36d 2709 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50
AE
2710 if (ret)
2711 return ret;
bf0d5f50
AE
2712 }
2713
2714 return 0;
2715}
8b3e1a56
AE
2716
2717static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2718{
2719 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2720 struct rbd_device *rbd_dev;
2721 u64 obj_end;
02c74fba
AE
2722 u64 img_xferred;
2723 int img_result;
8b3e1a56
AE
2724
2725 rbd_assert(img_request_child_test(img_request));
2726
02c74fba
AE
2727 /* First get what we need from the image request and release it */
2728
8b3e1a56 2729 obj_request = img_request->obj_request;
02c74fba
AE
2730 img_xferred = img_request->xferred;
2731 img_result = img_request->result;
2732 rbd_img_request_put(img_request);
2733
2734 /*
2735 * If the overlap has become 0 (most likely because the
2736 * image has been flattened) we need to re-submit the
2737 * original request.
2738 */
a9e8ba2c
AE
2739 rbd_assert(obj_request);
2740 rbd_assert(obj_request->img_request);
02c74fba
AE
2741 rbd_dev = obj_request->img_request->rbd_dev;
2742 if (!rbd_dev->parent_overlap) {
2743 struct ceph_osd_client *osdc;
2744
2745 osdc = &rbd_dev->rbd_client->client->osdc;
2746 img_result = rbd_obj_request_submit(osdc, obj_request);
2747 if (!img_result)
2748 return;
2749 }
a9e8ba2c 2750
02c74fba 2751 obj_request->result = img_result;
a9e8ba2c
AE
2752 if (obj_request->result)
2753 goto out;
2754
2755 /*
2756 * We need to zero anything beyond the parent overlap
2757 * boundary. Since rbd_img_obj_request_read_callback()
2758 * will zero anything beyond the end of a short read, an
2759 * easy way to do this is to pretend the data from the
2760 * parent came up short--ending at the overlap boundary.
2761 */
2762 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2763 obj_end = obj_request->img_offset + obj_request->length;
a9e8ba2c
AE
2764 if (obj_end > rbd_dev->parent_overlap) {
2765 u64 xferred = 0;
2766
2767 if (obj_request->img_offset < rbd_dev->parent_overlap)
2768 xferred = rbd_dev->parent_overlap -
2769 obj_request->img_offset;
8b3e1a56 2770
02c74fba 2771 obj_request->xferred = min(img_xferred, xferred);
a9e8ba2c 2772 } else {
02c74fba 2773 obj_request->xferred = img_xferred;
a9e8ba2c
AE
2774 }
2775out:
8b3e1a56
AE
2776 rbd_img_obj_request_read_callback(obj_request);
2777 rbd_obj_request_complete(obj_request);
2778}
2779
2780static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2781{
8b3e1a56
AE
2782 struct rbd_img_request *img_request;
2783 int result;
2784
2785 rbd_assert(obj_request_img_data_test(obj_request));
2786 rbd_assert(obj_request->img_request != NULL);
2787 rbd_assert(obj_request->result == (s32) -ENOENT);
5b2ab72d 2788 rbd_assert(obj_request_type_valid(obj_request->type));
8b3e1a56 2789
8b3e1a56 2790 /* rbd_read_finish(obj_request, obj_request->length); */
e93f3152 2791 img_request = rbd_parent_request_create(obj_request,
8b3e1a56 2792 obj_request->img_offset,
e93f3152 2793 obj_request->length);
8b3e1a56
AE
2794 result = -ENOMEM;
2795 if (!img_request)
2796 goto out_err;
2797
5b2ab72d
AE
2798 if (obj_request->type == OBJ_REQUEST_BIO)
2799 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2800 obj_request->bio_list);
2801 else
2802 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2803 obj_request->pages);
8b3e1a56
AE
2804 if (result)
2805 goto out_err;
2806
2807 img_request->callback = rbd_img_parent_read_callback;
2808 result = rbd_img_request_submit(img_request);
2809 if (result)
2810 goto out_err;
2811
2812 return;
2813out_err:
2814 if (img_request)
2815 rbd_img_request_put(img_request);
2816 obj_request->result = result;
2817 obj_request->xferred = 0;
2818 obj_request_done_set(obj_request);
2819}
bf0d5f50 2820
cc4a38bd 2821static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
b8d70035
AE
2822{
2823 struct rbd_obj_request *obj_request;
2169238d 2824 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
b8d70035
AE
2825 int ret;
2826
2827 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2828 OBJ_REQUEST_NODATA);
2829 if (!obj_request)
2830 return -ENOMEM;
2831
2832 ret = -ENOMEM;
430c28c3 2833 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
b8d70035
AE
2834 if (!obj_request->osd_req)
2835 goto out;
2169238d 2836 obj_request->callback = rbd_obj_request_put;
b8d70035 2837
c99d2d4a 2838 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
cc4a38bd 2839 notify_id, 0, 0);
9d4df01f 2840 rbd_osd_req_format_read(obj_request);
430c28c3 2841
b8d70035 2842 ret = rbd_obj_request_submit(osdc, obj_request);
b8d70035 2843out:
cf81b60e
AE
2844 if (ret)
2845 rbd_obj_request_put(obj_request);
b8d70035
AE
2846
2847 return ret;
2848}
2849
2850static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2851{
2852 struct rbd_device *rbd_dev = (struct rbd_device *)data;
e627db08 2853 int ret;
b8d70035
AE
2854
2855 if (!rbd_dev)
2856 return;
2857
37206ee5 2858 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
cc4a38bd
AE
2859 rbd_dev->header_name, (unsigned long long)notify_id,
2860 (unsigned int)opcode);
e627db08
AE
2861 ret = rbd_dev_refresh(rbd_dev);
2862 if (ret)
2863 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
b8d70035 2864
cc4a38bd 2865 rbd_obj_notify_ack(rbd_dev, notify_id);
b8d70035
AE
2866}
2867
9969ebc5
AE
2868/*
2869 * Request sync osd watch/unwatch. The value of "start" determines
2870 * whether a watch request is being initiated or torn down.
2871 */
1f3ef788 2872static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
9969ebc5
AE
2873{
2874 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2875 struct rbd_obj_request *obj_request;
9969ebc5
AE
2876 int ret;
2877
2878 rbd_assert(start ^ !!rbd_dev->watch_event);
2879 rbd_assert(start ^ !!rbd_dev->watch_request);
2880
2881 if (start) {
3c663bbd 2882 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
2883 &rbd_dev->watch_event);
2884 if (ret < 0)
2885 return ret;
8eb87565 2886 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
2887 }
2888
2889 ret = -ENOMEM;
2890 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2891 OBJ_REQUEST_NODATA);
2892 if (!obj_request)
2893 goto out_cancel;
2894
430c28c3
AE
2895 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2896 if (!obj_request->osd_req)
2897 goto out_cancel;
2898
8eb87565 2899 if (start)
975241af 2900 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 2901 else
6977c3f9 2902 ceph_osdc_unregister_linger_request(osdc,
975241af 2903 rbd_dev->watch_request->osd_req);
2169238d
AE
2904
2905 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1f3ef788 2906 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
9d4df01f 2907 rbd_osd_req_format_write(obj_request);
2169238d 2908
9969ebc5
AE
2909 ret = rbd_obj_request_submit(osdc, obj_request);
2910 if (ret)
2911 goto out_cancel;
2912 ret = rbd_obj_request_wait(obj_request);
2913 if (ret)
2914 goto out_cancel;
9969ebc5
AE
2915 ret = obj_request->result;
2916 if (ret)
2917 goto out_cancel;
2918
8eb87565
AE
2919 /*
2920 * A watch request is set to linger, so the underlying osd
2921 * request won't go away until we unregister it. We retain
2922 * a pointer to the object request during that time (in
2923 * rbd_dev->watch_request), so we'll keep a reference to
2924 * it. We'll drop that reference (below) after we've
2925 * unregistered it.
2926 */
2927 if (start) {
2928 rbd_dev->watch_request = obj_request;
2929
2930 return 0;
2931 }
2932
2933 /* We have successfully torn down the watch request */
2934
2935 rbd_obj_request_put(rbd_dev->watch_request);
2936 rbd_dev->watch_request = NULL;
9969ebc5
AE
2937out_cancel:
2938 /* Cancel the event if we're tearing down, or on error */
2939 ceph_osdc_cancel_event(rbd_dev->watch_event);
2940 rbd_dev->watch_event = NULL;
9969ebc5
AE
2941 if (obj_request)
2942 rbd_obj_request_put(obj_request);
2943
2944 return ret;
2945}
2946
36be9a76 2947/*
f40eb349
AE
2948 * Synchronous osd object method call. Returns the number of bytes
2949 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
2950 */
2951static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2952 const char *object_name,
2953 const char *class_name,
2954 const char *method_name,
4157976b 2955 const void *outbound,
36be9a76 2956 size_t outbound_size,
4157976b 2957 void *inbound,
e2a58ee5 2958 size_t inbound_size)
36be9a76 2959{
2169238d 2960 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 2961 struct rbd_obj_request *obj_request;
36be9a76
AE
2962 struct page **pages;
2963 u32 page_count;
2964 int ret;
2965
2966 /*
6010a451
AE
2967 * Method calls are ultimately read operations. The result
2968 * should placed into the inbound buffer provided. They
2969 * also supply outbound data--parameters for the object
2970 * method. Currently if this is present it will be a
2971 * snapshot id.
36be9a76 2972 */
57385b51 2973 page_count = (u32)calc_pages_for(0, inbound_size);
36be9a76
AE
2974 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2975 if (IS_ERR(pages))
2976 return PTR_ERR(pages);
2977
2978 ret = -ENOMEM;
6010a451 2979 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
2980 OBJ_REQUEST_PAGES);
2981 if (!obj_request)
2982 goto out;
2983
2984 obj_request->pages = pages;
2985 obj_request->page_count = page_count;
2986
430c28c3 2987 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
36be9a76
AE
2988 if (!obj_request->osd_req)
2989 goto out;
2990
c99d2d4a 2991 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
2992 class_name, method_name);
2993 if (outbound_size) {
2994 struct ceph_pagelist *pagelist;
2995
2996 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2997 if (!pagelist)
2998 goto out;
2999
3000 ceph_pagelist_init(pagelist);
3001 ceph_pagelist_append(pagelist, outbound, outbound_size);
3002 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
3003 pagelist);
3004 }
a4ce40a9
AE
3005 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
3006 obj_request->pages, inbound_size,
44cd188d 3007 0, false, false);
9d4df01f 3008 rbd_osd_req_format_read(obj_request);
430c28c3 3009
36be9a76
AE
3010 ret = rbd_obj_request_submit(osdc, obj_request);
3011 if (ret)
3012 goto out;
3013 ret = rbd_obj_request_wait(obj_request);
3014 if (ret)
3015 goto out;
3016
3017 ret = obj_request->result;
3018 if (ret < 0)
3019 goto out;
57385b51
AE
3020
3021 rbd_assert(obj_request->xferred < (u64)INT_MAX);
3022 ret = (int)obj_request->xferred;
903bb32e 3023 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
3024out:
3025 if (obj_request)
3026 rbd_obj_request_put(obj_request);
3027 else
3028 ceph_release_page_vector(pages, page_count);
3029
3030 return ret;
3031}
3032
bf0d5f50 3033static void rbd_request_fn(struct request_queue *q)
cc344fa1 3034 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
3035{
3036 struct rbd_device *rbd_dev = q->queuedata;
3037 bool read_only = rbd_dev->mapping.read_only;
3038 struct request *rq;
3039 int result;
3040
3041 while ((rq = blk_fetch_request(q))) {
3042 bool write_request = rq_data_dir(rq) == WRITE;
3043 struct rbd_img_request *img_request;
3044 u64 offset;
3045 u64 length;
3046
3047 /* Ignore any non-FS requests that filter through. */
3048
3049 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
3050 dout("%s: non-fs request type %d\n", __func__,
3051 (int) rq->cmd_type);
3052 __blk_end_request_all(rq, 0);
3053 continue;
3054 }
3055
3056 /* Ignore/skip any zero-length requests */
3057
3058 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
3059 length = (u64) blk_rq_bytes(rq);
3060
3061 if (!length) {
3062 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
3063 __blk_end_request_all(rq, 0);
3064 continue;
3065 }
3066
3067 spin_unlock_irq(q->queue_lock);
3068
3069 /* Disallow writes to a read-only device */
3070
3071 if (write_request) {
3072 result = -EROFS;
3073 if (read_only)
3074 goto end_request;
3075 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3076 }
3077
6d292906
AE
3078 /*
3079 * Quit early if the mapped snapshot no longer
3080 * exists. It's still possible the snapshot will
3081 * have disappeared by the time our request arrives
3082 * at the osd, but there's no sense in sending it if
3083 * we already know.
3084 */
3085 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
3086 dout("request for non-existent snapshot");
3087 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3088 result = -ENXIO;
3089 goto end_request;
3090 }
3091
bf0d5f50 3092 result = -EINVAL;
c0cd10db
AE
3093 if (offset && length > U64_MAX - offset + 1) {
3094 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
3095 offset, length);
bf0d5f50 3096 goto end_request; /* Shouldn't happen */
c0cd10db 3097 }
bf0d5f50 3098
00a653e2
AE
3099 result = -EIO;
3100 if (offset + length > rbd_dev->mapping.size) {
3101 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
3102 offset, length, rbd_dev->mapping.size);
3103 goto end_request;
3104 }
3105
bf0d5f50
AE
3106 result = -ENOMEM;
3107 img_request = rbd_img_request_create(rbd_dev, offset, length,
e93f3152 3108 write_request);
bf0d5f50
AE
3109 if (!img_request)
3110 goto end_request;
3111
3112 img_request->rq = rq;
3113
f1a4739f
AE
3114 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3115 rq->bio);
bf0d5f50
AE
3116 if (!result)
3117 result = rbd_img_request_submit(img_request);
3118 if (result)
3119 rbd_img_request_put(img_request);
3120end_request:
3121 spin_lock_irq(q->queue_lock);
3122 if (result < 0) {
7da22d29
AE
3123 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
3124 write_request ? "write" : "read",
3125 length, offset, result);
3126
bf0d5f50
AE
3127 __blk_end_request_all(rq, result);
3128 }
3129 }
3130}
3131
602adf40
YS
3132/*
3133 * a queue callback. Makes sure that we don't create a bio that spans across
3134 * multiple osd objects. One exception would be with a single page bios,
f7760dad 3135 * which we handle later at bio_chain_clone_range()
602adf40
YS
3136 */
3137static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3138 struct bio_vec *bvec)
3139{
3140 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
3141 sector_t sector_offset;
3142 sector_t sectors_per_obj;
3143 sector_t obj_sector_offset;
3144 int ret;
3145
3146 /*
3147 * Find how far into its rbd object the partition-relative
3148 * bio start sector is to offset relative to the enclosing
3149 * device.
3150 */
3151 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3152 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3153 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3154
3155 /*
3156 * Compute the number of bytes from that offset to the end
3157 * of the object. Account for what's already used by the bio.
3158 */
3159 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3160 if (ret > bmd->bi_size)
3161 ret -= bmd->bi_size;
3162 else
3163 ret = 0;
3164
3165 /*
3166 * Don't send back more than was asked for. And if the bio
3167 * was empty, let the whole thing through because: "Note
3168 * that a block device *must* allow a single page to be
3169 * added to an empty bio."
3170 */
3171 rbd_assert(bvec->bv_len <= PAGE_SIZE);
3172 if (ret > (int) bvec->bv_len || !bmd->bi_size)
3173 ret = (int) bvec->bv_len;
3174
3175 return ret;
602adf40
YS
3176}
3177
3178static void rbd_free_disk(struct rbd_device *rbd_dev)
3179{
3180 struct gendisk *disk = rbd_dev->disk;
3181
3182 if (!disk)
3183 return;
3184
a0cab924
AE
3185 rbd_dev->disk = NULL;
3186 if (disk->flags & GENHD_FL_UP) {
602adf40 3187 del_gendisk(disk);
a0cab924
AE
3188 if (disk->queue)
3189 blk_cleanup_queue(disk->queue);
3190 }
602adf40
YS
3191 put_disk(disk);
3192}
3193
788e2df3
AE
3194static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3195 const char *object_name,
7097f8df 3196 u64 offset, u64 length, void *buf)
788e2df3
AE
3197
3198{
2169238d 3199 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 3200 struct rbd_obj_request *obj_request;
788e2df3
AE
3201 struct page **pages = NULL;
3202 u32 page_count;
1ceae7ef 3203 size_t size;
788e2df3
AE
3204 int ret;
3205
3206 page_count = (u32) calc_pages_for(offset, length);
3207 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3208 if (IS_ERR(pages))
3209 ret = PTR_ERR(pages);
3210
3211 ret = -ENOMEM;
3212 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 3213 OBJ_REQUEST_PAGES);
788e2df3
AE
3214 if (!obj_request)
3215 goto out;
3216
3217 obj_request->pages = pages;
3218 obj_request->page_count = page_count;
3219
430c28c3 3220 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
788e2df3
AE
3221 if (!obj_request->osd_req)
3222 goto out;
3223
c99d2d4a
AE
3224 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3225 offset, length, 0, 0);
406e2c9f 3226 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 3227 obj_request->pages,
44cd188d
AE
3228 obj_request->length,
3229 obj_request->offset & ~PAGE_MASK,
3230 false, false);
9d4df01f 3231 rbd_osd_req_format_read(obj_request);
430c28c3 3232
788e2df3
AE
3233 ret = rbd_obj_request_submit(osdc, obj_request);
3234 if (ret)
3235 goto out;
3236 ret = rbd_obj_request_wait(obj_request);
3237 if (ret)
3238 goto out;
3239
3240 ret = obj_request->result;
3241 if (ret < 0)
3242 goto out;
1ceae7ef
AE
3243
3244 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3245 size = (size_t) obj_request->xferred;
903bb32e 3246 ceph_copy_from_page_vector(pages, buf, 0, size);
7097f8df
AE
3247 rbd_assert(size <= (size_t)INT_MAX);
3248 ret = (int)size;
788e2df3
AE
3249out:
3250 if (obj_request)
3251 rbd_obj_request_put(obj_request);
3252 else
3253 ceph_release_page_vector(pages, page_count);
3254
3255 return ret;
3256}
3257
602adf40 3258/*
662518b1
AE
3259 * Read the complete header for the given rbd device. On successful
3260 * return, the rbd_dev->header field will contain up-to-date
3261 * information about the image.
602adf40 3262 */
99a41ebc 3263static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 3264{
4156d998 3265 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 3266 u32 snap_count = 0;
4156d998
AE
3267 u64 names_size = 0;
3268 u32 want_count;
3269 int ret;
602adf40 3270
00f1f36f 3271 /*
4156d998
AE
3272 * The complete header will include an array of its 64-bit
3273 * snapshot ids, followed by the names of those snapshots as
3274 * a contiguous block of NUL-terminated strings. Note that
3275 * the number of snapshots could change by the time we read
3276 * it in, in which case we re-read it.
00f1f36f 3277 */
4156d998
AE
3278 do {
3279 size_t size;
3280
3281 kfree(ondisk);
3282
3283 size = sizeof (*ondisk);
3284 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3285 size += names_size;
3286 ondisk = kmalloc(size, GFP_KERNEL);
3287 if (!ondisk)
662518b1 3288 return -ENOMEM;
4156d998 3289
788e2df3 3290 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
7097f8df 3291 0, size, ondisk);
4156d998 3292 if (ret < 0)
662518b1 3293 goto out;
c0cd10db 3294 if ((size_t)ret < size) {
4156d998 3295 ret = -ENXIO;
06ecc6cb
AE
3296 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3297 size, ret);
662518b1 3298 goto out;
4156d998
AE
3299 }
3300 if (!rbd_dev_ondisk_valid(ondisk)) {
3301 ret = -ENXIO;
06ecc6cb 3302 rbd_warn(rbd_dev, "invalid header");
662518b1 3303 goto out;
81e759fb 3304 }
602adf40 3305
4156d998
AE
3306 names_size = le64_to_cpu(ondisk->snap_names_len);
3307 want_count = snap_count;
3308 snap_count = le32_to_cpu(ondisk->snap_count);
3309 } while (snap_count != want_count);
00f1f36f 3310
662518b1
AE
3311 ret = rbd_header_from_disk(rbd_dev, ondisk);
3312out:
4156d998
AE
3313 kfree(ondisk);
3314
3315 return ret;
602adf40
YS
3316}
3317
15228ede
AE
3318/*
3319 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3320 * has disappeared from the (just updated) snapshot context.
3321 */
3322static void rbd_exists_validate(struct rbd_device *rbd_dev)
3323{
3324 u64 snap_id;
3325
3326 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3327 return;
3328
3329 snap_id = rbd_dev->spec->snap_id;
3330 if (snap_id == CEPH_NOSNAP)
3331 return;
3332
3333 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3334 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3335}
3336
cc4a38bd 3337static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 3338{
e627db08 3339 u64 mapping_size;
1fe5e993
AE
3340 int ret;
3341
117973fb 3342 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
e627db08 3343 mapping_size = rbd_dev->mapping.size;
1fe5e993 3344 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
117973fb 3345 if (rbd_dev->image_format == 1)
99a41ebc 3346 ret = rbd_dev_v1_header_info(rbd_dev);
117973fb 3347 else
2df3fac7 3348 ret = rbd_dev_v2_header_info(rbd_dev);
15228ede
AE
3349
3350 /* If it's a mapped snapshot, validate its EXISTS flag */
3351
3352 rbd_exists_validate(rbd_dev);
1fe5e993 3353 mutex_unlock(&ctl_mutex);
00a653e2
AE
3354 if (mapping_size != rbd_dev->mapping.size) {
3355 sector_t size;
3356
3357 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3358 dout("setting size to %llu sectors", (unsigned long long)size);
3359 set_capacity(rbd_dev->disk, size);
a3fbe5d4 3360 revalidate_disk(rbd_dev->disk);
00a653e2 3361 }
1fe5e993
AE
3362
3363 return ret;
3364}
3365
602adf40
YS
3366static int rbd_init_disk(struct rbd_device *rbd_dev)
3367{
3368 struct gendisk *disk;
3369 struct request_queue *q;
593a9e7b 3370 u64 segment_size;
602adf40 3371
602adf40 3372 /* create gendisk info */
602adf40
YS
3373 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3374 if (!disk)
1fcdb8aa 3375 return -ENOMEM;
602adf40 3376
f0f8cef5 3377 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3378 rbd_dev->dev_id);
602adf40
YS
3379 disk->major = rbd_dev->major;
3380 disk->first_minor = 0;
3381 disk->fops = &rbd_bd_ops;
3382 disk->private_data = rbd_dev;
3383
bf0d5f50 3384 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
3385 if (!q)
3386 goto out_disk;
029bcbd8 3387
593a9e7b
AE
3388 /* We use the default size, but let's be explicit about it. */
3389 blk_queue_physical_block_size(q, SECTOR_SIZE);
3390
029bcbd8 3391 /* set io sizes to object size */
593a9e7b
AE
3392 segment_size = rbd_obj_bytes(&rbd_dev->header);
3393 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3394 blk_queue_max_segment_size(q, segment_size);
3395 blk_queue_io_min(q, segment_size);
3396 blk_queue_io_opt(q, segment_size);
029bcbd8 3397
602adf40
YS
3398 blk_queue_merge_bvec(q, rbd_merge_bvec);
3399 disk->queue = q;
3400
3401 q->queuedata = rbd_dev;
3402
3403 rbd_dev->disk = disk;
602adf40 3404
602adf40 3405 return 0;
602adf40
YS
3406out_disk:
3407 put_disk(disk);
1fcdb8aa
AE
3408
3409 return -ENOMEM;
602adf40
YS
3410}
3411
dfc5606d
YS
3412/*
3413 sysfs
3414*/
3415
593a9e7b
AE
3416static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3417{
3418 return container_of(dev, struct rbd_device, dev);
3419}
3420
dfc5606d
YS
3421static ssize_t rbd_size_show(struct device *dev,
3422 struct device_attribute *attr, char *buf)
3423{
593a9e7b 3424 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 3425
fc71d833
AE
3426 return sprintf(buf, "%llu\n",
3427 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
3428}
3429
34b13184
AE
3430/*
3431 * Note this shows the features for whatever's mapped, which is not
3432 * necessarily the base image.
3433 */
3434static ssize_t rbd_features_show(struct device *dev,
3435 struct device_attribute *attr, char *buf)
3436{
3437 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3438
3439 return sprintf(buf, "0x%016llx\n",
fc71d833 3440 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
3441}
3442
dfc5606d
YS
3443static ssize_t rbd_major_show(struct device *dev,
3444 struct device_attribute *attr, char *buf)
3445{
593a9e7b 3446 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 3447
fc71d833
AE
3448 if (rbd_dev->major)
3449 return sprintf(buf, "%d\n", rbd_dev->major);
3450
3451 return sprintf(buf, "(none)\n");
3452
dfc5606d
YS
3453}
3454
3455static ssize_t rbd_client_id_show(struct device *dev,
3456 struct device_attribute *attr, char *buf)
602adf40 3457{
593a9e7b 3458 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3459
1dbb4399
AE
3460 return sprintf(buf, "client%lld\n",
3461 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
3462}
3463
dfc5606d
YS
3464static ssize_t rbd_pool_show(struct device *dev,
3465 struct device_attribute *attr, char *buf)
602adf40 3466{
593a9e7b 3467 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3468
0d7dbfce 3469 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
3470}
3471
9bb2f334
AE
3472static ssize_t rbd_pool_id_show(struct device *dev,
3473 struct device_attribute *attr, char *buf)
3474{
3475 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3476
0d7dbfce 3477 return sprintf(buf, "%llu\n",
fc71d833 3478 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
3479}
3480
dfc5606d
YS
3481static ssize_t rbd_name_show(struct device *dev,
3482 struct device_attribute *attr, char *buf)
3483{
593a9e7b 3484 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3485
a92ffdf8
AE
3486 if (rbd_dev->spec->image_name)
3487 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3488
3489 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
3490}
3491
589d30e0
AE
3492static ssize_t rbd_image_id_show(struct device *dev,
3493 struct device_attribute *attr, char *buf)
3494{
3495 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3496
0d7dbfce 3497 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
3498}
3499
34b13184
AE
3500/*
3501 * Shows the name of the currently-mapped snapshot (or
3502 * RBD_SNAP_HEAD_NAME for the base image).
3503 */
dfc5606d
YS
3504static ssize_t rbd_snap_show(struct device *dev,
3505 struct device_attribute *attr,
3506 char *buf)
3507{
593a9e7b 3508 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3509
0d7dbfce 3510 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
3511}
3512
86b00e0d
AE
3513/*
3514 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3515 * for the parent image. If there is no parent, simply shows
3516 * "(no parent image)".
3517 */
3518static ssize_t rbd_parent_show(struct device *dev,
3519 struct device_attribute *attr,
3520 char *buf)
3521{
3522 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3523 struct rbd_spec *spec = rbd_dev->parent_spec;
3524 int count;
3525 char *bufp = buf;
3526
3527 if (!spec)
3528 return sprintf(buf, "(no parent image)\n");
3529
3530 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3531 (unsigned long long) spec->pool_id, spec->pool_name);
3532 if (count < 0)
3533 return count;
3534 bufp += count;
3535
3536 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3537 spec->image_name ? spec->image_name : "(unknown)");
3538 if (count < 0)
3539 return count;
3540 bufp += count;
3541
3542 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3543 (unsigned long long) spec->snap_id, spec->snap_name);
3544 if (count < 0)
3545 return count;
3546 bufp += count;
3547
3548 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3549 if (count < 0)
3550 return count;
3551 bufp += count;
3552
3553 return (ssize_t) (bufp - buf);
3554}
3555
dfc5606d
YS
3556static ssize_t rbd_image_refresh(struct device *dev,
3557 struct device_attribute *attr,
3558 const char *buf,
3559 size_t size)
3560{
593a9e7b 3561 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 3562 int ret;
602adf40 3563
cc4a38bd 3564 ret = rbd_dev_refresh(rbd_dev);
e627db08
AE
3565 if (ret)
3566 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
b813623a
AE
3567
3568 return ret < 0 ? ret : size;
dfc5606d 3569}
602adf40 3570
dfc5606d 3571static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 3572static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
3573static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3574static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3575static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 3576static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 3577static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 3578static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
3579static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3580static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 3581static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
3582
3583static struct attribute *rbd_attrs[] = {
3584 &dev_attr_size.attr,
34b13184 3585 &dev_attr_features.attr,
dfc5606d
YS
3586 &dev_attr_major.attr,
3587 &dev_attr_client_id.attr,
3588 &dev_attr_pool.attr,
9bb2f334 3589 &dev_attr_pool_id.attr,
dfc5606d 3590 &dev_attr_name.attr,
589d30e0 3591 &dev_attr_image_id.attr,
dfc5606d 3592 &dev_attr_current_snap.attr,
86b00e0d 3593 &dev_attr_parent.attr,
dfc5606d 3594 &dev_attr_refresh.attr,
dfc5606d
YS
3595 NULL
3596};
3597
3598static struct attribute_group rbd_attr_group = {
3599 .attrs = rbd_attrs,
3600};
3601
3602static const struct attribute_group *rbd_attr_groups[] = {
3603 &rbd_attr_group,
3604 NULL
3605};
3606
3607static void rbd_sysfs_dev_release(struct device *dev)
3608{
3609}
3610
3611static struct device_type rbd_device_type = {
3612 .name = "rbd",
3613 .groups = rbd_attr_groups,
3614 .release = rbd_sysfs_dev_release,
3615};
3616
8b8fb99c
AE
3617static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3618{
3619 kref_get(&spec->kref);
3620
3621 return spec;
3622}
3623
3624static void rbd_spec_free(struct kref *kref);
3625static void rbd_spec_put(struct rbd_spec *spec)
3626{
3627 if (spec)
3628 kref_put(&spec->kref, rbd_spec_free);
3629}
3630
3631static struct rbd_spec *rbd_spec_alloc(void)
3632{
3633 struct rbd_spec *spec;
3634
3635 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3636 if (!spec)
3637 return NULL;
3638 kref_init(&spec->kref);
3639
8b8fb99c
AE
3640 return spec;
3641}
3642
3643static void rbd_spec_free(struct kref *kref)
3644{
3645 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3646
3647 kfree(spec->pool_name);
3648 kfree(spec->image_id);
3649 kfree(spec->image_name);
3650 kfree(spec->snap_name);
3651 kfree(spec);
3652}
3653
cc344fa1 3654static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
3655 struct rbd_spec *spec)
3656{
3657 struct rbd_device *rbd_dev;
3658
3659 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3660 if (!rbd_dev)
3661 return NULL;
3662
3663 spin_lock_init(&rbd_dev->lock);
6d292906 3664 rbd_dev->flags = 0;
a2acd00e 3665 atomic_set(&rbd_dev->parent_ref, 0);
c53d5893 3666 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
3667 init_rwsem(&rbd_dev->header_rwsem);
3668
3669 rbd_dev->spec = spec;
3670 rbd_dev->rbd_client = rbdc;
3671
0903e875
AE
3672 /* Initialize the layout used for all rbd requests */
3673
3674 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3675 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3676 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3677 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3678
c53d5893
AE
3679 return rbd_dev;
3680}
3681
3682static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3683{
c53d5893
AE
3684 rbd_put_client(rbd_dev->rbd_client);
3685 rbd_spec_put(rbd_dev->spec);
3686 kfree(rbd_dev);
3687}
3688
9d475de5
AE
3689/*
3690 * Get the size and object order for an image snapshot, or if
3691 * snap_id is CEPH_NOSNAP, gets this information for the base
3692 * image.
3693 */
3694static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3695 u8 *order, u64 *snap_size)
3696{
3697 __le64 snapid = cpu_to_le64(snap_id);
3698 int ret;
3699 struct {
3700 u8 order;
3701 __le64 size;
3702 } __attribute__ ((packed)) size_buf = { 0 };
3703
36be9a76 3704 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5 3705 "rbd", "get_size",
4157976b 3706 &snapid, sizeof (snapid),
e2a58ee5 3707 &size_buf, sizeof (size_buf));
36be9a76 3708 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
3709 if (ret < 0)
3710 return ret;
57385b51
AE
3711 if (ret < sizeof (size_buf))
3712 return -ERANGE;
9d475de5 3713
c86f86e9
AE
3714 if (order)
3715 *order = size_buf.order;
9d475de5
AE
3716 *snap_size = le64_to_cpu(size_buf.size);
3717
3718 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
57385b51
AE
3719 (unsigned long long)snap_id, (unsigned int)*order,
3720 (unsigned long long)*snap_size);
9d475de5
AE
3721
3722 return 0;
3723}
3724
3725static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3726{
3727 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3728 &rbd_dev->header.obj_order,
3729 &rbd_dev->header.image_size);
3730}
3731
1e130199
AE
3732static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3733{
3734 void *reply_buf;
3735 int ret;
3736 void *p;
3737
3738 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3739 if (!reply_buf)
3740 return -ENOMEM;
3741
36be9a76 3742 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3743 "rbd", "get_object_prefix", NULL, 0,
e2a58ee5 3744 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 3745 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
3746 if (ret < 0)
3747 goto out;
3748
3749 p = reply_buf;
3750 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
3751 p + ret, NULL, GFP_NOIO);
3752 ret = 0;
1e130199
AE
3753
3754 if (IS_ERR(rbd_dev->header.object_prefix)) {
3755 ret = PTR_ERR(rbd_dev->header.object_prefix);
3756 rbd_dev->header.object_prefix = NULL;
3757 } else {
3758 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3759 }
1e130199
AE
3760out:
3761 kfree(reply_buf);
3762
3763 return ret;
3764}
3765
b1b5402a
AE
3766static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3767 u64 *snap_features)
3768{
3769 __le64 snapid = cpu_to_le64(snap_id);
3770 struct {
3771 __le64 features;
3772 __le64 incompat;
4157976b 3773 } __attribute__ ((packed)) features_buf = { 0 };
d889140c 3774 u64 incompat;
b1b5402a
AE
3775 int ret;
3776
36be9a76 3777 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a 3778 "rbd", "get_features",
4157976b 3779 &snapid, sizeof (snapid),
e2a58ee5 3780 &features_buf, sizeof (features_buf));
36be9a76 3781 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
3782 if (ret < 0)
3783 return ret;
57385b51
AE
3784 if (ret < sizeof (features_buf))
3785 return -ERANGE;
d889140c
AE
3786
3787 incompat = le64_to_cpu(features_buf.incompat);
5cbf6f12 3788 if (incompat & ~RBD_FEATURES_SUPPORTED)
b8f5c6ed 3789 return -ENXIO;
d889140c 3790
b1b5402a
AE
3791 *snap_features = le64_to_cpu(features_buf.features);
3792
3793 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
3794 (unsigned long long)snap_id,
3795 (unsigned long long)*snap_features,
3796 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
3797
3798 return 0;
3799}
3800
3801static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3802{
3803 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3804 &rbd_dev->header.features);
3805}
3806
86b00e0d
AE
3807static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3808{
3809 struct rbd_spec *parent_spec;
3810 size_t size;
3811 void *reply_buf = NULL;
3812 __le64 snapid;
3813 void *p;
3814 void *end;
642a2537 3815 u64 pool_id;
86b00e0d
AE
3816 char *image_id;
3817 u64 overlap;
86b00e0d
AE
3818 int ret;
3819
3820 parent_spec = rbd_spec_alloc();
3821 if (!parent_spec)
3822 return -ENOMEM;
3823
3824 size = sizeof (__le64) + /* pool_id */
3825 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3826 sizeof (__le64) + /* snap_id */
3827 sizeof (__le64); /* overlap */
3828 reply_buf = kmalloc(size, GFP_KERNEL);
3829 if (!reply_buf) {
3830 ret = -ENOMEM;
3831 goto out_err;
3832 }
3833
3834 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 3835 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d 3836 "rbd", "get_parent",
4157976b 3837 &snapid, sizeof (snapid),
e2a58ee5 3838 reply_buf, size);
36be9a76 3839 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
3840 if (ret < 0)
3841 goto out_err;
3842
86b00e0d 3843 p = reply_buf;
57385b51
AE
3844 end = reply_buf + ret;
3845 ret = -ERANGE;
642a2537 3846 ceph_decode_64_safe(&p, end, pool_id, out_err);
392a9dad
AE
3847 if (pool_id == CEPH_NOPOOL) {
3848 /*
3849 * Either the parent never existed, or we have
3850 * record of it but the image got flattened so it no
3851 * longer has a parent. When the parent of a
3852 * layered image disappears we immediately set the
3853 * overlap to 0. The effect of this is that all new
3854 * requests will be treated as if the image had no
3855 * parent.
3856 */
3857 if (rbd_dev->parent_overlap) {
3858 rbd_dev->parent_overlap = 0;
3859 smp_mb();
3860 rbd_dev_parent_put(rbd_dev);
3861 pr_info("%s: clone image has been flattened\n",
3862 rbd_dev->disk->disk_name);
3863 }
3864
86b00e0d 3865 goto out; /* No parent? No problem. */
392a9dad 3866 }
86b00e0d 3867
0903e875
AE
3868 /* The ceph file layout needs to fit pool id in 32 bits */
3869
3870 ret = -EIO;
642a2537 3871 if (pool_id > (u64)U32_MAX) {
c0cd10db 3872 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
642a2537 3873 (unsigned long long)pool_id, U32_MAX);
57385b51 3874 goto out_err;
c0cd10db 3875 }
642a2537 3876 parent_spec->pool_id = pool_id;
0903e875 3877
979ed480 3878 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
3879 if (IS_ERR(image_id)) {
3880 ret = PTR_ERR(image_id);
3881 goto out_err;
3882 }
3883 parent_spec->image_id = image_id;
3884 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3885 ceph_decode_64_safe(&p, end, overlap, out_err);
3886
70cf49cf 3887 if (overlap) {
642a2537 3888 rbd_spec_put(rbd_dev->parent_spec);
70cf49cf
AE
3889 rbd_dev->parent_spec = parent_spec;
3890 parent_spec = NULL; /* rbd_dev now owns this */
3891 rbd_dev->parent_overlap = overlap;
3892 } else {
3893 rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
3894 }
86b00e0d
AE
3895out:
3896 ret = 0;
3897out_err:
3898 kfree(reply_buf);
3899 rbd_spec_put(parent_spec);
3900
3901 return ret;
3902}
3903
cc070d59
AE
3904static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3905{
3906 struct {
3907 __le64 stripe_unit;
3908 __le64 stripe_count;
3909 } __attribute__ ((packed)) striping_info_buf = { 0 };
3910 size_t size = sizeof (striping_info_buf);
3911 void *p;
3912 u64 obj_size;
3913 u64 stripe_unit;
3914 u64 stripe_count;
3915 int ret;
3916
3917 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3918 "rbd", "get_stripe_unit_count", NULL, 0,
e2a58ee5 3919 (char *)&striping_info_buf, size);
cc070d59
AE
3920 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3921 if (ret < 0)
3922 return ret;
3923 if (ret < size)
3924 return -ERANGE;
3925
3926 /*
3927 * We don't actually support the "fancy striping" feature
3928 * (STRIPINGV2) yet, but if the striping sizes are the
3929 * defaults the behavior is the same as before. So find
3930 * out, and only fail if the image has non-default values.
3931 */
3932 ret = -EINVAL;
3933 obj_size = (u64)1 << rbd_dev->header.obj_order;
3934 p = &striping_info_buf;
3935 stripe_unit = ceph_decode_64(&p);
3936 if (stripe_unit != obj_size) {
3937 rbd_warn(rbd_dev, "unsupported stripe unit "
3938 "(got %llu want %llu)",
3939 stripe_unit, obj_size);
3940 return -EINVAL;
3941 }
3942 stripe_count = ceph_decode_64(&p);
3943 if (stripe_count != 1) {
3944 rbd_warn(rbd_dev, "unsupported stripe count "
3945 "(got %llu want 1)", stripe_count);
3946 return -EINVAL;
3947 }
500d0c0f
AE
3948 rbd_dev->header.stripe_unit = stripe_unit;
3949 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
3950
3951 return 0;
3952}
3953
9e15b77d
AE
3954static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3955{
3956 size_t image_id_size;
3957 char *image_id;
3958 void *p;
3959 void *end;
3960 size_t size;
3961 void *reply_buf = NULL;
3962 size_t len = 0;
3963 char *image_name = NULL;
3964 int ret;
3965
3966 rbd_assert(!rbd_dev->spec->image_name);
3967
69e7a02f
AE
3968 len = strlen(rbd_dev->spec->image_id);
3969 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
3970 image_id = kmalloc(image_id_size, GFP_KERNEL);
3971 if (!image_id)
3972 return NULL;
3973
3974 p = image_id;
4157976b 3975 end = image_id + image_id_size;
57385b51 3976 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
3977
3978 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3979 reply_buf = kmalloc(size, GFP_KERNEL);
3980 if (!reply_buf)
3981 goto out;
3982
36be9a76 3983 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
3984 "rbd", "dir_get_name",
3985 image_id, image_id_size,
e2a58ee5 3986 reply_buf, size);
9e15b77d
AE
3987 if (ret < 0)
3988 goto out;
3989 p = reply_buf;
f40eb349
AE
3990 end = reply_buf + ret;
3991
9e15b77d
AE
3992 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3993 if (IS_ERR(image_name))
3994 image_name = NULL;
3995 else
3996 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3997out:
3998 kfree(reply_buf);
3999 kfree(image_id);
4000
4001 return image_name;
4002}
4003
2ad3d716
AE
4004static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4005{
4006 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4007 const char *snap_name;
4008 u32 which = 0;
4009
4010 /* Skip over names until we find the one we are looking for */
4011
4012 snap_name = rbd_dev->header.snap_names;
4013 while (which < snapc->num_snaps) {
4014 if (!strcmp(name, snap_name))
4015 return snapc->snaps[which];
4016 snap_name += strlen(snap_name) + 1;
4017 which++;
4018 }
4019 return CEPH_NOSNAP;
4020}
4021
4022static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4023{
4024 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4025 u32 which;
4026 bool found = false;
4027 u64 snap_id;
4028
4029 for (which = 0; !found && which < snapc->num_snaps; which++) {
4030 const char *snap_name;
4031
4032 snap_id = snapc->snaps[which];
4033 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4034 if (IS_ERR(snap_name))
4035 break;
4036 found = !strcmp(name, snap_name);
4037 kfree(snap_name);
4038 }
4039 return found ? snap_id : CEPH_NOSNAP;
4040}
4041
4042/*
4043 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4044 * no snapshot by that name is found, or if an error occurs.
4045 */
4046static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4047{
4048 if (rbd_dev->image_format == 1)
4049 return rbd_v1_snap_id_by_name(rbd_dev, name);
4050
4051 return rbd_v2_snap_id_by_name(rbd_dev, name);
4052}
4053
9e15b77d 4054/*
2e9f7f1c
AE
4055 * When an rbd image has a parent image, it is identified by the
4056 * pool, image, and snapshot ids (not names). This function fills
4057 * in the names for those ids. (It's OK if we can't figure out the
4058 * name for an image id, but the pool and snapshot ids should always
4059 * exist and have names.) All names in an rbd spec are dynamically
4060 * allocated.
e1d4213f
AE
4061 *
4062 * When an image being mapped (not a parent) is probed, we have the
4063 * pool name and pool id, image name and image id, and the snapshot
4064 * name. The only thing we're missing is the snapshot id.
9e15b77d 4065 */
2e9f7f1c 4066static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
9e15b77d 4067{
2e9f7f1c
AE
4068 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4069 struct rbd_spec *spec = rbd_dev->spec;
4070 const char *pool_name;
4071 const char *image_name;
4072 const char *snap_name;
9e15b77d
AE
4073 int ret;
4074
e1d4213f
AE
4075 /*
4076 * An image being mapped will have the pool name (etc.), but
4077 * we need to look up the snapshot id.
4078 */
2e9f7f1c
AE
4079 if (spec->pool_name) {
4080 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
2ad3d716 4081 u64 snap_id;
e1d4213f 4082
2ad3d716
AE
4083 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4084 if (snap_id == CEPH_NOSNAP)
e1d4213f 4085 return -ENOENT;
2ad3d716 4086 spec->snap_id = snap_id;
e1d4213f 4087 } else {
2e9f7f1c 4088 spec->snap_id = CEPH_NOSNAP;
e1d4213f
AE
4089 }
4090
4091 return 0;
4092 }
9e15b77d 4093
2e9f7f1c 4094 /* Get the pool name; we have to make our own copy of this */
9e15b77d 4095
2e9f7f1c
AE
4096 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4097 if (!pool_name) {
4098 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
4099 return -EIO;
4100 }
2e9f7f1c
AE
4101 pool_name = kstrdup(pool_name, GFP_KERNEL);
4102 if (!pool_name)
9e15b77d
AE
4103 return -ENOMEM;
4104
4105 /* Fetch the image name; tolerate failure here */
4106
2e9f7f1c
AE
4107 image_name = rbd_dev_image_name(rbd_dev);
4108 if (!image_name)
06ecc6cb 4109 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 4110
2e9f7f1c 4111 /* Look up the snapshot name, and make a copy */
9e15b77d 4112
2e9f7f1c 4113 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
2e9f7f1c
AE
4114 if (!snap_name) {
4115 ret = -ENOMEM;
9e15b77d 4116 goto out_err;
2e9f7f1c
AE
4117 }
4118
4119 spec->pool_name = pool_name;
4120 spec->image_name = image_name;
4121 spec->snap_name = snap_name;
9e15b77d
AE
4122
4123 return 0;
4124out_err:
2e9f7f1c
AE
4125 kfree(image_name);
4126 kfree(pool_name);
9e15b77d
AE
4127
4128 return ret;
4129}
4130
cc4a38bd 4131static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
4132{
4133 size_t size;
4134 int ret;
4135 void *reply_buf;
4136 void *p;
4137 void *end;
4138 u64 seq;
4139 u32 snap_count;
4140 struct ceph_snap_context *snapc;
4141 u32 i;
4142
4143 /*
4144 * We'll need room for the seq value (maximum snapshot id),
4145 * snapshot count, and array of that many snapshot ids.
4146 * For now we have a fixed upper limit on the number we're
4147 * prepared to receive.
4148 */
4149 size = sizeof (__le64) + sizeof (__le32) +
4150 RBD_MAX_SNAP_COUNT * sizeof (__le64);
4151 reply_buf = kzalloc(size, GFP_KERNEL);
4152 if (!reply_buf)
4153 return -ENOMEM;
4154
36be9a76 4155 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 4156 "rbd", "get_snapcontext", NULL, 0,
e2a58ee5 4157 reply_buf, size);
36be9a76 4158 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
4159 if (ret < 0)
4160 goto out;
4161
35d489f9 4162 p = reply_buf;
57385b51
AE
4163 end = reply_buf + ret;
4164 ret = -ERANGE;
35d489f9
AE
4165 ceph_decode_64_safe(&p, end, seq, out);
4166 ceph_decode_32_safe(&p, end, snap_count, out);
4167
4168 /*
4169 * Make sure the reported number of snapshot ids wouldn't go
4170 * beyond the end of our buffer. But before checking that,
4171 * make sure the computed size of the snapshot context we
4172 * allocate is representable in a size_t.
4173 */
4174 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4175 / sizeof (u64)) {
4176 ret = -EINVAL;
4177 goto out;
4178 }
4179 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4180 goto out;
468521c1 4181 ret = 0;
35d489f9 4182
812164f8 4183 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
4184 if (!snapc) {
4185 ret = -ENOMEM;
4186 goto out;
4187 }
35d489f9 4188 snapc->seq = seq;
35d489f9
AE
4189 for (i = 0; i < snap_count; i++)
4190 snapc->snaps[i] = ceph_decode_64(&p);
4191
49ece554 4192 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
4193 rbd_dev->header.snapc = snapc;
4194
4195 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 4196 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
4197out:
4198 kfree(reply_buf);
4199
57385b51 4200 return ret;
35d489f9
AE
4201}
4202
54cac61f
AE
4203static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4204 u64 snap_id)
b8b1e2db
AE
4205{
4206 size_t size;
4207 void *reply_buf;
54cac61f 4208 __le64 snapid;
b8b1e2db
AE
4209 int ret;
4210 void *p;
4211 void *end;
b8b1e2db
AE
4212 char *snap_name;
4213
4214 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4215 reply_buf = kmalloc(size, GFP_KERNEL);
4216 if (!reply_buf)
4217 return ERR_PTR(-ENOMEM);
4218
54cac61f 4219 snapid = cpu_to_le64(snap_id);
36be9a76 4220 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db 4221 "rbd", "get_snapshot_name",
54cac61f 4222 &snapid, sizeof (snapid),
e2a58ee5 4223 reply_buf, size);
36be9a76 4224 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
4225 if (ret < 0) {
4226 snap_name = ERR_PTR(ret);
b8b1e2db 4227 goto out;
f40eb349 4228 }
b8b1e2db
AE
4229
4230 p = reply_buf;
f40eb349 4231 end = reply_buf + ret;
e5c35534 4232 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 4233 if (IS_ERR(snap_name))
b8b1e2db 4234 goto out;
b8b1e2db 4235
f40eb349 4236 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 4237 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
4238out:
4239 kfree(reply_buf);
4240
f40eb349 4241 return snap_name;
b8b1e2db
AE
4242}
4243
2df3fac7 4244static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 4245{
2df3fac7 4246 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 4247 int ret;
117973fb
AE
4248
4249 down_write(&rbd_dev->header_rwsem);
4250
1617e40c
JD
4251 ret = rbd_dev_v2_image_size(rbd_dev);
4252 if (ret)
4253 goto out;
4254
2df3fac7
AE
4255 if (first_time) {
4256 ret = rbd_dev_v2_header_onetime(rbd_dev);
4257 if (ret)
4258 goto out;
4259 }
4260
642a2537
AE
4261 /*
4262 * If the image supports layering, get the parent info. We
4263 * need to probe the first time regardless. Thereafter we
4264 * only need to if there's a parent, to see if it has
4265 * disappeared due to the mapped image getting flattened.
4266 */
4267 if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4268 (first_time || rbd_dev->parent_spec)) {
4269 bool warn;
4270
4271 ret = rbd_dev_v2_parent_info(rbd_dev);
4272 if (ret)
4273 goto out;
4274
4275 /*
4276 * Print a warning if this is the initial probe and
4277 * the image has a parent. Don't print it if the
4278 * image now being probed is itself a parent. We
4279 * can tell at this point because we won't know its
4280 * pool name yet (just its pool id).
4281 */
4282 warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4283 if (first_time && warn)
4284 rbd_warn(rbd_dev, "WARNING: kernel layering "
4285 "is EXPERIMENTAL!");
4286 }
4287
29334ba4
AE
4288 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4289 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4290 rbd_dev->mapping.size = rbd_dev->header.image_size;
117973fb 4291
cc4a38bd 4292 ret = rbd_dev_v2_snap_context(rbd_dev);
117973fb 4293 dout("rbd_dev_v2_snap_context returned %d\n", ret);
117973fb
AE
4294out:
4295 up_write(&rbd_dev->header_rwsem);
4296
4297 return ret;
4298}
4299
dfc5606d
YS
4300static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4301{
dfc5606d 4302 struct device *dev;
cd789ab9 4303 int ret;
dfc5606d
YS
4304
4305 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
dfc5606d 4306
cd789ab9 4307 dev = &rbd_dev->dev;
dfc5606d
YS
4308 dev->bus = &rbd_bus_type;
4309 dev->type = &rbd_device_type;
4310 dev->parent = &rbd_root_dev;
200a6a8b 4311 dev->release = rbd_dev_device_release;
de71a297 4312 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 4313 ret = device_register(dev);
dfc5606d 4314
dfc5606d 4315 mutex_unlock(&ctl_mutex);
cd789ab9 4316
dfc5606d 4317 return ret;
602adf40
YS
4318}
4319
dfc5606d
YS
4320static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4321{
4322 device_unregister(&rbd_dev->dev);
4323}
4324
e2839308 4325static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
4326
4327/*
499afd5b
AE
4328 * Get a unique rbd identifier for the given new rbd_dev, and add
4329 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 4330 */
e2839308 4331static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 4332{
e2839308 4333 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
4334
4335 spin_lock(&rbd_dev_list_lock);
4336 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4337 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
4338 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4339 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 4340}
b7f23c36 4341
1ddbe94e 4342/*
499afd5b
AE
4343 * Remove an rbd_dev from the global list, and record that its
4344 * identifier is no longer in use.
1ddbe94e 4345 */
e2839308 4346static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 4347{
d184f6bf 4348 struct list_head *tmp;
de71a297 4349 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
4350 int max_id;
4351
aafb230e 4352 rbd_assert(rbd_id > 0);
499afd5b 4353
e2839308
AE
4354 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4355 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
4356 spin_lock(&rbd_dev_list_lock);
4357 list_del_init(&rbd_dev->node);
d184f6bf
AE
4358
4359 /*
4360 * If the id being "put" is not the current maximum, there
4361 * is nothing special we need to do.
4362 */
e2839308 4363 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
4364 spin_unlock(&rbd_dev_list_lock);
4365 return;
4366 }
4367
4368 /*
4369 * We need to update the current maximum id. Search the
4370 * list to find out what it is. We're more likely to find
4371 * the maximum at the end, so search the list backward.
4372 */
4373 max_id = 0;
4374 list_for_each_prev(tmp, &rbd_dev_list) {
4375 struct rbd_device *rbd_dev;
4376
4377 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
4378 if (rbd_dev->dev_id > max_id)
4379 max_id = rbd_dev->dev_id;
d184f6bf 4380 }
499afd5b 4381 spin_unlock(&rbd_dev_list_lock);
b7f23c36 4382
1ddbe94e 4383 /*
e2839308 4384 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
4385 * which case it now accurately reflects the new maximum.
4386 * Be careful not to overwrite the maximum value in that
4387 * case.
1ddbe94e 4388 */
e2839308
AE
4389 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4390 dout(" max dev id has been reset\n");
b7f23c36
AE
4391}
4392
e28fff26
AE
4393/*
4394 * Skips over white space at *buf, and updates *buf to point to the
4395 * first found non-space character (if any). Returns the length of
593a9e7b
AE
4396 * the token (string of non-white space characters) found. Note
4397 * that *buf must be terminated with '\0'.
e28fff26
AE
4398 */
4399static inline size_t next_token(const char **buf)
4400{
4401 /*
4402 * These are the characters that produce nonzero for
4403 * isspace() in the "C" and "POSIX" locales.
4404 */
4405 const char *spaces = " \f\n\r\t\v";
4406
4407 *buf += strspn(*buf, spaces); /* Find start of token */
4408
4409 return strcspn(*buf, spaces); /* Return token length */
4410}
4411
4412/*
4413 * Finds the next token in *buf, and if the provided token buffer is
4414 * big enough, copies the found token into it. The result, if
593a9e7b
AE
4415 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4416 * must be terminated with '\0' on entry.
e28fff26
AE
4417 *
4418 * Returns the length of the token found (not including the '\0').
4419 * Return value will be 0 if no token is found, and it will be >=
4420 * token_size if the token would not fit.
4421 *
593a9e7b 4422 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
4423 * found token. Note that this occurs even if the token buffer is
4424 * too small to hold it.
4425 */
4426static inline size_t copy_token(const char **buf,
4427 char *token,
4428 size_t token_size)
4429{
4430 size_t len;
4431
4432 len = next_token(buf);
4433 if (len < token_size) {
4434 memcpy(token, *buf, len);
4435 *(token + len) = '\0';
4436 }
4437 *buf += len;
4438
4439 return len;
4440}
4441
ea3352f4
AE
4442/*
4443 * Finds the next token in *buf, dynamically allocates a buffer big
4444 * enough to hold a copy of it, and copies the token into the new
4445 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4446 * that a duplicate buffer is created even for a zero-length token.
4447 *
4448 * Returns a pointer to the newly-allocated duplicate, or a null
4449 * pointer if memory for the duplicate was not available. If
4450 * the lenp argument is a non-null pointer, the length of the token
4451 * (not including the '\0') is returned in *lenp.
4452 *
4453 * If successful, the *buf pointer will be updated to point beyond
4454 * the end of the found token.
4455 *
4456 * Note: uses GFP_KERNEL for allocation.
4457 */
4458static inline char *dup_token(const char **buf, size_t *lenp)
4459{
4460 char *dup;
4461 size_t len;
4462
4463 len = next_token(buf);
4caf35f9 4464 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
4465 if (!dup)
4466 return NULL;
ea3352f4
AE
4467 *(dup + len) = '\0';
4468 *buf += len;
4469
4470 if (lenp)
4471 *lenp = len;
4472
4473 return dup;
4474}
4475
a725f65e 4476/*
859c31df
AE
4477 * Parse the options provided for an "rbd add" (i.e., rbd image
4478 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4479 * and the data written is passed here via a NUL-terminated buffer.
4480 * Returns 0 if successful or an error code otherwise.
d22f76e7 4481 *
859c31df
AE
4482 * The information extracted from these options is recorded in
4483 * the other parameters which return dynamically-allocated
4484 * structures:
4485 * ceph_opts
4486 * The address of a pointer that will refer to a ceph options
4487 * structure. Caller must release the returned pointer using
4488 * ceph_destroy_options() when it is no longer needed.
4489 * rbd_opts
4490 * Address of an rbd options pointer. Fully initialized by
4491 * this function; caller must release with kfree().
4492 * spec
4493 * Address of an rbd image specification pointer. Fully
4494 * initialized by this function based on parsed options.
4495 * Caller must release with rbd_spec_put().
4496 *
4497 * The options passed take this form:
4498 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4499 * where:
4500 * <mon_addrs>
4501 * A comma-separated list of one or more monitor addresses.
4502 * A monitor address is an ip address, optionally followed
4503 * by a port number (separated by a colon).
4504 * I.e.: ip1[:port1][,ip2[:port2]...]
4505 * <options>
4506 * A comma-separated list of ceph and/or rbd options.
4507 * <pool_name>
4508 * The name of the rados pool containing the rbd image.
4509 * <image_name>
4510 * The name of the image in that pool to map.
4511 * <snap_id>
4512 * An optional snapshot id. If provided, the mapping will
4513 * present data from the image at the time that snapshot was
4514 * created. The image head is used if no snapshot id is
4515 * provided. Snapshot mappings are always read-only.
a725f65e 4516 */
859c31df 4517static int rbd_add_parse_args(const char *buf,
dc79b113 4518 struct ceph_options **ceph_opts,
859c31df
AE
4519 struct rbd_options **opts,
4520 struct rbd_spec **rbd_spec)
e28fff26 4521{
d22f76e7 4522 size_t len;
859c31df 4523 char *options;
0ddebc0c 4524 const char *mon_addrs;
ecb4dc22 4525 char *snap_name;
0ddebc0c 4526 size_t mon_addrs_size;
859c31df 4527 struct rbd_spec *spec = NULL;
4e9afeba 4528 struct rbd_options *rbd_opts = NULL;
859c31df 4529 struct ceph_options *copts;
dc79b113 4530 int ret;
e28fff26
AE
4531
4532 /* The first four tokens are required */
4533
7ef3214a 4534 len = next_token(&buf);
4fb5d671
AE
4535 if (!len) {
4536 rbd_warn(NULL, "no monitor address(es) provided");
4537 return -EINVAL;
4538 }
0ddebc0c 4539 mon_addrs = buf;
f28e565a 4540 mon_addrs_size = len + 1;
7ef3214a 4541 buf += len;
a725f65e 4542
dc79b113 4543 ret = -EINVAL;
f28e565a
AE
4544 options = dup_token(&buf, NULL);
4545 if (!options)
dc79b113 4546 return -ENOMEM;
4fb5d671
AE
4547 if (!*options) {
4548 rbd_warn(NULL, "no options provided");
4549 goto out_err;
4550 }
e28fff26 4551
859c31df
AE
4552 spec = rbd_spec_alloc();
4553 if (!spec)
f28e565a 4554 goto out_mem;
859c31df
AE
4555
4556 spec->pool_name = dup_token(&buf, NULL);
4557 if (!spec->pool_name)
4558 goto out_mem;
4fb5d671
AE
4559 if (!*spec->pool_name) {
4560 rbd_warn(NULL, "no pool name provided");
4561 goto out_err;
4562 }
e28fff26 4563
69e7a02f 4564 spec->image_name = dup_token(&buf, NULL);
859c31df 4565 if (!spec->image_name)
f28e565a 4566 goto out_mem;
4fb5d671
AE
4567 if (!*spec->image_name) {
4568 rbd_warn(NULL, "no image name provided");
4569 goto out_err;
4570 }
d4b125e9 4571
f28e565a
AE
4572 /*
4573 * Snapshot name is optional; default is to use "-"
4574 * (indicating the head/no snapshot).
4575 */
3feeb894 4576 len = next_token(&buf);
820a5f3e 4577 if (!len) {
3feeb894
AE
4578 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4579 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 4580 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 4581 ret = -ENAMETOOLONG;
f28e565a 4582 goto out_err;
849b4260 4583 }
ecb4dc22
AE
4584 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4585 if (!snap_name)
f28e565a 4586 goto out_mem;
ecb4dc22
AE
4587 *(snap_name + len) = '\0';
4588 spec->snap_name = snap_name;
e5c35534 4589
0ddebc0c 4590 /* Initialize all rbd options to the defaults */
e28fff26 4591
4e9afeba
AE
4592 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4593 if (!rbd_opts)
4594 goto out_mem;
4595
4596 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 4597
859c31df 4598 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 4599 mon_addrs + mon_addrs_size - 1,
4e9afeba 4600 parse_rbd_opts_token, rbd_opts);
859c31df
AE
4601 if (IS_ERR(copts)) {
4602 ret = PTR_ERR(copts);
dc79b113
AE
4603 goto out_err;
4604 }
859c31df
AE
4605 kfree(options);
4606
4607 *ceph_opts = copts;
4e9afeba 4608 *opts = rbd_opts;
859c31df 4609 *rbd_spec = spec;
0ddebc0c 4610
dc79b113 4611 return 0;
f28e565a 4612out_mem:
dc79b113 4613 ret = -ENOMEM;
d22f76e7 4614out_err:
859c31df
AE
4615 kfree(rbd_opts);
4616 rbd_spec_put(spec);
f28e565a 4617 kfree(options);
d22f76e7 4618
dc79b113 4619 return ret;
a725f65e
AE
4620}
4621
589d30e0
AE
4622/*
4623 * An rbd format 2 image has a unique identifier, distinct from the
4624 * name given to it by the user. Internally, that identifier is
4625 * what's used to specify the names of objects related to the image.
4626 *
4627 * A special "rbd id" object is used to map an rbd image name to its
4628 * id. If that object doesn't exist, then there is no v2 rbd image
4629 * with the supplied name.
4630 *
4631 * This function will record the given rbd_dev's image_id field if
4632 * it can be determined, and in that case will return 0. If any
4633 * errors occur a negative errno will be returned and the rbd_dev's
4634 * image_id field will be unchanged (and should be NULL).
4635 */
4636static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4637{
4638 int ret;
4639 size_t size;
4640 char *object_name;
4641 void *response;
c0fba368 4642 char *image_id;
2f82ee54 4643
2c0d0a10
AE
4644 /*
4645 * When probing a parent image, the image id is already
4646 * known (and the image name likely is not). There's no
c0fba368
AE
4647 * need to fetch the image id again in this case. We
4648 * do still need to set the image format though.
2c0d0a10 4649 */
c0fba368
AE
4650 if (rbd_dev->spec->image_id) {
4651 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4652
2c0d0a10 4653 return 0;
c0fba368 4654 }
2c0d0a10 4655
589d30e0
AE
4656 /*
4657 * First, see if the format 2 image id file exists, and if
4658 * so, get the image's persistent id from it.
4659 */
69e7a02f 4660 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
4661 object_name = kmalloc(size, GFP_NOIO);
4662 if (!object_name)
4663 return -ENOMEM;
0d7dbfce 4664 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
4665 dout("rbd id object name is %s\n", object_name);
4666
4667 /* Response will be an encoded string, which includes a length */
4668
4669 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4670 response = kzalloc(size, GFP_NOIO);
4671 if (!response) {
4672 ret = -ENOMEM;
4673 goto out;
4674 }
4675
c0fba368
AE
4676 /* If it doesn't exist we'll assume it's a format 1 image */
4677
36be9a76 4678 ret = rbd_obj_method_sync(rbd_dev, object_name,
4157976b 4679 "rbd", "get_id", NULL, 0,
e2a58ee5 4680 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 4681 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
4682 if (ret == -ENOENT) {
4683 image_id = kstrdup("", GFP_KERNEL);
4684 ret = image_id ? 0 : -ENOMEM;
4685 if (!ret)
4686 rbd_dev->image_format = 1;
4687 } else if (ret > sizeof (__le32)) {
4688 void *p = response;
4689
4690 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 4691 NULL, GFP_NOIO);
c0fba368
AE
4692 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4693 if (!ret)
4694 rbd_dev->image_format = 2;
589d30e0 4695 } else {
c0fba368
AE
4696 ret = -EINVAL;
4697 }
4698
4699 if (!ret) {
4700 rbd_dev->spec->image_id = image_id;
4701 dout("image_id is %s\n", image_id);
589d30e0
AE
4702 }
4703out:
4704 kfree(response);
4705 kfree(object_name);
4706
4707 return ret;
4708}
4709
3abef3b3
AE
4710/*
4711 * Undo whatever state changes are made by v1 or v2 header info
4712 * call.
4713 */
6fd48b3b
AE
4714static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4715{
4716 struct rbd_image_header *header;
4717
392a9dad
AE
4718 /* Drop parent reference unless it's already been done (or none) */
4719
4720 if (rbd_dev->parent_overlap)
4721 rbd_dev_parent_put(rbd_dev);
6fd48b3b
AE
4722
4723 /* Free dynamic fields from the header, then zero it out */
4724
4725 header = &rbd_dev->header;
812164f8 4726 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
4727 kfree(header->snap_sizes);
4728 kfree(header->snap_names);
4729 kfree(header->object_prefix);
4730 memset(header, 0, sizeof (*header));
4731}
4732
2df3fac7 4733static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9
AE
4734{
4735 int ret;
a30b71b9 4736
1e130199 4737 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 4738 if (ret)
b1b5402a
AE
4739 goto out_err;
4740
2df3fac7
AE
4741 /*
4742 * Get the and check features for the image. Currently the
4743 * features are assumed to never change.
4744 */
b1b5402a 4745 ret = rbd_dev_v2_features(rbd_dev);
57385b51 4746 if (ret)
9d475de5 4747 goto out_err;
35d489f9 4748
cc070d59
AE
4749 /* If the image supports fancy striping, get its parameters */
4750
4751 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4752 ret = rbd_dev_v2_striping_info(rbd_dev);
4753 if (ret < 0)
4754 goto out_err;
4755 }
2df3fac7 4756 /* No support for crypto and compression type format 2 images */
a30b71b9 4757
35152979 4758 return 0;
9d475de5 4759out_err:
642a2537 4760 rbd_dev->header.features = 0;
1e130199
AE
4761 kfree(rbd_dev->header.object_prefix);
4762 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
4763
4764 return ret;
a30b71b9
AE
4765}
4766
124afba2 4767static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
83a06263 4768{
2f82ee54 4769 struct rbd_device *parent = NULL;
124afba2
AE
4770 struct rbd_spec *parent_spec;
4771 struct rbd_client *rbdc;
4772 int ret;
4773
4774 if (!rbd_dev->parent_spec)
4775 return 0;
4776 /*
4777 * We need to pass a reference to the client and the parent
4778 * spec when creating the parent rbd_dev. Images related by
4779 * parent/child relationships always share both.
4780 */
4781 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4782 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4783
4784 ret = -ENOMEM;
4785 parent = rbd_dev_create(rbdc, parent_spec);
4786 if (!parent)
4787 goto out_err;
4788
1f3ef788 4789 ret = rbd_dev_image_probe(parent, false);
124afba2
AE
4790 if (ret < 0)
4791 goto out_err;
4792 rbd_dev->parent = parent;
a2acd00e 4793 atomic_set(&rbd_dev->parent_ref, 1);
124afba2
AE
4794
4795 return 0;
4796out_err:
4797 if (parent) {
fb65d228 4798 rbd_dev_unparent(rbd_dev);
124afba2
AE
4799 kfree(rbd_dev->header_name);
4800 rbd_dev_destroy(parent);
4801 } else {
4802 rbd_put_client(rbdc);
4803 rbd_spec_put(parent_spec);
4804 }
4805
4806 return ret;
4807}
4808
200a6a8b 4809static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 4810{
83a06263 4811 int ret;
d1cf5788 4812
83a06263
AE
4813 /* generate unique id: find highest unique id, add one */
4814 rbd_dev_id_get(rbd_dev);
4815
4816 /* Fill in the device name, now that we have its id. */
4817 BUILD_BUG_ON(DEV_NAME_LEN
4818 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4819 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4820
4821 /* Get our block major device number. */
4822
4823 ret = register_blkdev(0, rbd_dev->name);
4824 if (ret < 0)
4825 goto err_out_id;
4826 rbd_dev->major = ret;
4827
4828 /* Set up the blkdev mapping. */
4829
4830 ret = rbd_init_disk(rbd_dev);
4831 if (ret)
4832 goto err_out_blkdev;
4833
f35a4dee 4834 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
4835 if (ret)
4836 goto err_out_disk;
f35a4dee
AE
4837 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4838
4839 ret = rbd_bus_add_dev(rbd_dev);
4840 if (ret)
4841 goto err_out_mapping;
83a06263 4842
83a06263
AE
4843 /* Everything's ready. Announce the disk to the world. */
4844
129b79d4 4845 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
83a06263
AE
4846 add_disk(rbd_dev->disk);
4847
4848 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4849 (unsigned long long) rbd_dev->mapping.size);
4850
4851 return ret;
2f82ee54 4852
f35a4dee
AE
4853err_out_mapping:
4854 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4855err_out_disk:
4856 rbd_free_disk(rbd_dev);
4857err_out_blkdev:
4858 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4859err_out_id:
4860 rbd_dev_id_put(rbd_dev);
d1cf5788 4861 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4862
4863 return ret;
4864}
4865
332bb12d
AE
4866static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4867{
4868 struct rbd_spec *spec = rbd_dev->spec;
4869 size_t size;
4870
4871 /* Record the header object name for this rbd image. */
4872
4873 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4874
4875 if (rbd_dev->image_format == 1)
4876 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4877 else
4878 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4879
4880 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4881 if (!rbd_dev->header_name)
4882 return -ENOMEM;
4883
4884 if (rbd_dev->image_format == 1)
4885 sprintf(rbd_dev->header_name, "%s%s",
4886 spec->image_name, RBD_SUFFIX);
4887 else
4888 sprintf(rbd_dev->header_name, "%s%s",
4889 RBD_HEADER_PREFIX, spec->image_id);
4890 return 0;
4891}
4892
200a6a8b
AE
4893static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4894{
6fd48b3b 4895 rbd_dev_unprobe(rbd_dev);
200a6a8b 4896 kfree(rbd_dev->header_name);
6fd48b3b
AE
4897 rbd_dev->header_name = NULL;
4898 rbd_dev->image_format = 0;
4899 kfree(rbd_dev->spec->image_id);
4900 rbd_dev->spec->image_id = NULL;
4901
200a6a8b
AE
4902 rbd_dev_destroy(rbd_dev);
4903}
4904
a30b71b9
AE
4905/*
4906 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
4907 * device. If this image is the one being mapped (i.e., not a
4908 * parent), initiate a watch on its header object before using that
4909 * object to get detailed information about the rbd image.
a30b71b9 4910 */
1f3ef788 4911static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
a30b71b9
AE
4912{
4913 int ret;
b644de2b 4914 int tmp;
a30b71b9
AE
4915
4916 /*
3abef3b3
AE
4917 * Get the id from the image id object. Unless there's an
4918 * error, rbd_dev->spec->image_id will be filled in with
4919 * a dynamically-allocated string, and rbd_dev->image_format
4920 * will be set to either 1 or 2.
a30b71b9
AE
4921 */
4922 ret = rbd_dev_image_id(rbd_dev);
4923 if (ret)
c0fba368
AE
4924 return ret;
4925 rbd_assert(rbd_dev->spec->image_id);
4926 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4927
332bb12d
AE
4928 ret = rbd_dev_header_name(rbd_dev);
4929 if (ret)
4930 goto err_out_format;
4931
1f3ef788
AE
4932 if (mapping) {
4933 ret = rbd_dev_header_watch_sync(rbd_dev, true);
4934 if (ret)
4935 goto out_header_name;
4936 }
b644de2b 4937
c0fba368 4938 if (rbd_dev->image_format == 1)
99a41ebc 4939 ret = rbd_dev_v1_header_info(rbd_dev);
a30b71b9 4940 else
2df3fac7 4941 ret = rbd_dev_v2_header_info(rbd_dev);
5655c4d9 4942 if (ret)
b644de2b 4943 goto err_out_watch;
83a06263 4944
9bb81c9b
AE
4945 ret = rbd_dev_spec_update(rbd_dev);
4946 if (ret)
33dca39f 4947 goto err_out_probe;
9bb81c9b
AE
4948
4949 ret = rbd_dev_probe_parent(rbd_dev);
30d60ba2
AE
4950 if (ret)
4951 goto err_out_probe;
4952
4953 dout("discovered format %u image, header name is %s\n",
4954 rbd_dev->image_format, rbd_dev->header_name);
83a06263 4955
30d60ba2 4956 return 0;
6fd48b3b
AE
4957err_out_probe:
4958 rbd_dev_unprobe(rbd_dev);
b644de2b 4959err_out_watch:
1f3ef788
AE
4960 if (mapping) {
4961 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
4962 if (tmp)
4963 rbd_warn(rbd_dev, "unable to tear down "
4964 "watch request (%d)\n", tmp);
4965 }
332bb12d
AE
4966out_header_name:
4967 kfree(rbd_dev->header_name);
4968 rbd_dev->header_name = NULL;
4969err_out_format:
4970 rbd_dev->image_format = 0;
5655c4d9
AE
4971 kfree(rbd_dev->spec->image_id);
4972 rbd_dev->spec->image_id = NULL;
4973
4974 dout("probe failed, returning %d\n", ret);
4975
a30b71b9
AE
4976 return ret;
4977}
4978
59c2be1e
YS
4979static ssize_t rbd_add(struct bus_type *bus,
4980 const char *buf,
4981 size_t count)
602adf40 4982{
cb8627c7 4983 struct rbd_device *rbd_dev = NULL;
dc79b113 4984 struct ceph_options *ceph_opts = NULL;
4e9afeba 4985 struct rbd_options *rbd_opts = NULL;
859c31df 4986 struct rbd_spec *spec = NULL;
9d3997fd 4987 struct rbd_client *rbdc;
27cc2594 4988 struct ceph_osd_client *osdc;
51344a38 4989 bool read_only;
27cc2594 4990 int rc = -ENOMEM;
602adf40
YS
4991
4992 if (!try_module_get(THIS_MODULE))
4993 return -ENODEV;
4994
602adf40 4995 /* parse add command */
859c31df 4996 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 4997 if (rc < 0)
bd4ba655 4998 goto err_out_module;
51344a38
AE
4999 read_only = rbd_opts->read_only;
5000 kfree(rbd_opts);
5001 rbd_opts = NULL; /* done with this */
78cea76e 5002
9d3997fd
AE
5003 rbdc = rbd_get_client(ceph_opts);
5004 if (IS_ERR(rbdc)) {
5005 rc = PTR_ERR(rbdc);
0ddebc0c 5006 goto err_out_args;
9d3997fd 5007 }
602adf40 5008
602adf40 5009 /* pick the pool */
9d3997fd 5010 osdc = &rbdc->client->osdc;
859c31df 5011 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
5012 if (rc < 0)
5013 goto err_out_client;
c0cd10db 5014 spec->pool_id = (u64)rc;
859c31df 5015
0903e875
AE
5016 /* The ceph file layout needs to fit pool id in 32 bits */
5017
c0cd10db
AE
5018 if (spec->pool_id > (u64)U32_MAX) {
5019 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
5020 (unsigned long long)spec->pool_id, U32_MAX);
0903e875
AE
5021 rc = -EIO;
5022 goto err_out_client;
5023 }
5024
c53d5893 5025 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
5026 if (!rbd_dev)
5027 goto err_out_client;
c53d5893
AE
5028 rbdc = NULL; /* rbd_dev now owns this */
5029 spec = NULL; /* rbd_dev now owns this */
602adf40 5030
1f3ef788 5031 rc = rbd_dev_image_probe(rbd_dev, true);
a30b71b9 5032 if (rc < 0)
c53d5893 5033 goto err_out_rbd_dev;
05fd6f6f 5034
7ce4eef7
AE
5035 /* If we are mapping a snapshot it must be marked read-only */
5036
5037 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5038 read_only = true;
5039 rbd_dev->mapping.read_only = read_only;
5040
b536f69a 5041 rc = rbd_dev_device_setup(rbd_dev);
3abef3b3
AE
5042 if (rc) {
5043 rbd_dev_image_release(rbd_dev);
5044 goto err_out_module;
5045 }
5046
5047 return count;
b536f69a 5048
c53d5893
AE
5049err_out_rbd_dev:
5050 rbd_dev_destroy(rbd_dev);
bd4ba655 5051err_out_client:
9d3997fd 5052 rbd_put_client(rbdc);
0ddebc0c 5053err_out_args:
859c31df 5054 rbd_spec_put(spec);
bd4ba655
AE
5055err_out_module:
5056 module_put(THIS_MODULE);
27cc2594 5057
602adf40 5058 dout("Error adding device %s\n", buf);
27cc2594 5059
c0cd10db 5060 return (ssize_t)rc;
602adf40
YS
5061}
5062
de71a297 5063static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
5064{
5065 struct list_head *tmp;
5066 struct rbd_device *rbd_dev;
5067
e124a82f 5068 spin_lock(&rbd_dev_list_lock);
602adf40
YS
5069 list_for_each(tmp, &rbd_dev_list) {
5070 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 5071 if (rbd_dev->dev_id == dev_id) {
e124a82f 5072 spin_unlock(&rbd_dev_list_lock);
602adf40 5073 return rbd_dev;
e124a82f 5074 }
602adf40 5075 }
e124a82f 5076 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
5077 return NULL;
5078}
5079
200a6a8b 5080static void rbd_dev_device_release(struct device *dev)
602adf40 5081{
593a9e7b 5082 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 5083
602adf40 5084 rbd_free_disk(rbd_dev);
200a6a8b 5085 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6d80b130 5086 rbd_dev_mapping_clear(rbd_dev);
602adf40 5087 unregister_blkdev(rbd_dev->major, rbd_dev->name);
200a6a8b 5088 rbd_dev->major = 0;
e2839308 5089 rbd_dev_id_put(rbd_dev);
d1cf5788 5090 rbd_dev_mapping_clear(rbd_dev);
602adf40
YS
5091}
5092
05a46afd
AE
5093static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5094{
ad945fc1 5095 while (rbd_dev->parent) {
05a46afd
AE
5096 struct rbd_device *first = rbd_dev;
5097 struct rbd_device *second = first->parent;
5098 struct rbd_device *third;
5099
5100 /*
5101 * Follow to the parent with no grandparent and
5102 * remove it.
5103 */
5104 while (second && (third = second->parent)) {
5105 first = second;
5106 second = third;
5107 }
ad945fc1 5108 rbd_assert(second);
8ad42cd0 5109 rbd_dev_image_release(second);
ad945fc1
AE
5110 first->parent = NULL;
5111 first->parent_overlap = 0;
5112
5113 rbd_assert(first->parent_spec);
05a46afd
AE
5114 rbd_spec_put(first->parent_spec);
5115 first->parent_spec = NULL;
05a46afd
AE
5116 }
5117}
5118
dfc5606d
YS
5119static ssize_t rbd_remove(struct bus_type *bus,
5120 const char *buf,
5121 size_t count)
602adf40
YS
5122{
5123 struct rbd_device *rbd_dev = NULL;
0d8189e1 5124 int target_id;
602adf40 5125 unsigned long ul;
0d8189e1 5126 int ret;
602adf40 5127
0d8189e1
AE
5128 ret = strict_strtoul(buf, 10, &ul);
5129 if (ret)
5130 return ret;
602adf40
YS
5131
5132 /* convert to int; abort if we lost anything in the conversion */
5133 target_id = (int) ul;
5134 if (target_id != ul)
5135 return -EINVAL;
5136
5137 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5138
5139 rbd_dev = __rbd_get_dev(target_id);
5140 if (!rbd_dev) {
5141 ret = -ENOENT;
5142 goto done;
42382b70
AE
5143 }
5144
a14ea269 5145 spin_lock_irq(&rbd_dev->lock);
b82d167b 5146 if (rbd_dev->open_count)
42382b70 5147 ret = -EBUSY;
b82d167b
AE
5148 else
5149 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
a14ea269 5150 spin_unlock_irq(&rbd_dev->lock);
b82d167b 5151 if (ret < 0)
42382b70 5152 goto done;
b480815a 5153 rbd_bus_del_dev(rbd_dev);
1f3ef788
AE
5154 ret = rbd_dev_header_watch_sync(rbd_dev, false);
5155 if (ret)
5156 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
8ad42cd0 5157 rbd_dev_image_release(rbd_dev);
79ab7558 5158 module_put(THIS_MODULE);
1f3ef788 5159 ret = count;
602adf40
YS
5160done:
5161 mutex_unlock(&ctl_mutex);
aafb230e 5162
602adf40
YS
5163 return ret;
5164}
5165
602adf40
YS
5166/*
5167 * create control files in sysfs
dfc5606d 5168 * /sys/bus/rbd/...
602adf40
YS
5169 */
5170static int rbd_sysfs_init(void)
5171{
dfc5606d 5172 int ret;
602adf40 5173
fed4c143 5174 ret = device_register(&rbd_root_dev);
21079786 5175 if (ret < 0)
dfc5606d 5176 return ret;
602adf40 5177
fed4c143
AE
5178 ret = bus_register(&rbd_bus_type);
5179 if (ret < 0)
5180 device_unregister(&rbd_root_dev);
602adf40 5181
602adf40
YS
5182 return ret;
5183}
5184
5185static void rbd_sysfs_cleanup(void)
5186{
dfc5606d 5187 bus_unregister(&rbd_bus_type);
fed4c143 5188 device_unregister(&rbd_root_dev);
602adf40
YS
5189}
5190
1c2a9dfe
AE
5191static int rbd_slab_init(void)
5192{
5193 rbd_assert(!rbd_img_request_cache);
5194 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5195 sizeof (struct rbd_img_request),
5196 __alignof__(struct rbd_img_request),
5197 0, NULL);
868311b1
AE
5198 if (!rbd_img_request_cache)
5199 return -ENOMEM;
5200
5201 rbd_assert(!rbd_obj_request_cache);
5202 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5203 sizeof (struct rbd_obj_request),
5204 __alignof__(struct rbd_obj_request),
5205 0, NULL);
78c2a44a
AE
5206 if (!rbd_obj_request_cache)
5207 goto out_err;
5208
5209 rbd_assert(!rbd_segment_name_cache);
5210 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5211 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5212 if (rbd_segment_name_cache)
1c2a9dfe 5213 return 0;
78c2a44a
AE
5214out_err:
5215 if (rbd_obj_request_cache) {
5216 kmem_cache_destroy(rbd_obj_request_cache);
5217 rbd_obj_request_cache = NULL;
5218 }
1c2a9dfe 5219
868311b1
AE
5220 kmem_cache_destroy(rbd_img_request_cache);
5221 rbd_img_request_cache = NULL;
5222
1c2a9dfe
AE
5223 return -ENOMEM;
5224}
5225
5226static void rbd_slab_exit(void)
5227{
78c2a44a
AE
5228 rbd_assert(rbd_segment_name_cache);
5229 kmem_cache_destroy(rbd_segment_name_cache);
5230 rbd_segment_name_cache = NULL;
5231
868311b1
AE
5232 rbd_assert(rbd_obj_request_cache);
5233 kmem_cache_destroy(rbd_obj_request_cache);
5234 rbd_obj_request_cache = NULL;
5235
1c2a9dfe
AE
5236 rbd_assert(rbd_img_request_cache);
5237 kmem_cache_destroy(rbd_img_request_cache);
5238 rbd_img_request_cache = NULL;
5239}
5240
cc344fa1 5241static int __init rbd_init(void)
602adf40
YS
5242{
5243 int rc;
5244
1e32d34c
AE
5245 if (!libceph_compatible(NULL)) {
5246 rbd_warn(NULL, "libceph incompatibility (quitting)");
5247
5248 return -EINVAL;
5249 }
1c2a9dfe 5250 rc = rbd_slab_init();
602adf40
YS
5251 if (rc)
5252 return rc;
1c2a9dfe
AE
5253 rc = rbd_sysfs_init();
5254 if (rc)
5255 rbd_slab_exit();
5256 else
5257 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5258
5259 return rc;
602adf40
YS
5260}
5261
cc344fa1 5262static void __exit rbd_exit(void)
602adf40
YS
5263{
5264 rbd_sysfs_cleanup();
1c2a9dfe 5265 rbd_slab_exit();
602adf40
YS
5266}
5267
5268module_init(rbd_init);
5269module_exit(rbd_exit);
5270
5271MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5272MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5273MODULE_DESCRIPTION("rados block device");
5274
5275/* following authorship retained from original osdblk.c */
5276MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5277
5278MODULE_LICENSE("GPL");