include/linux/sched.h: don't use task->pid/tgid in same_thread_group/has_group_leader_pid
[linux-block.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/decode.h>
59c2be1e 35#include <linux/parser.h>
30d1cff8 36#include <linux/bsearch.h>
602adf40
YS
37
38#include <linux/kernel.h>
39#include <linux/device.h>
40#include <linux/module.h>
41#include <linux/fs.h>
42#include <linux/blkdev.h>
1c2a9dfe 43#include <linux/slab.h>
602adf40
YS
44
45#include "rbd_types.h"
46
aafb230e
AE
47#define RBD_DEBUG /* Activate rbd_assert() calls */
48
593a9e7b
AE
49/*
50 * The basic unit of block I/O is a sector. It is interpreted in a
51 * number of contexts in Linux (blk, bio, genhd), but the default is
52 * universally 512 bytes. These symbols are just slightly more
53 * meaningful than the bare numbers they represent.
54 */
55#define SECTOR_SHIFT 9
56#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
57
a2acd00e
AE
58/*
59 * Increment the given counter and return its updated value.
60 * If the counter is already 0 it will not be incremented.
61 * If the counter is already at its maximum value returns
62 * -EINVAL without updating it.
63 */
64static int atomic_inc_return_safe(atomic_t *v)
65{
66 unsigned int counter;
67
68 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
69 if (counter <= (unsigned int)INT_MAX)
70 return (int)counter;
71
72 atomic_dec(v);
73
74 return -EINVAL;
75}
76
77/* Decrement the counter. Return the resulting value, or -EINVAL */
78static int atomic_dec_return_safe(atomic_t *v)
79{
80 int counter;
81
82 counter = atomic_dec_return(v);
83 if (counter >= 0)
84 return counter;
85
86 atomic_inc(v);
87
88 return -EINVAL;
89}
90
f0f8cef5
AE
91#define RBD_DRV_NAME "rbd"
92#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
93
94#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
95
d4b125e9
AE
96#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
97#define RBD_MAX_SNAP_NAME_LEN \
98 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
99
35d489f9 100#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
101
102#define RBD_SNAP_HEAD_NAME "-"
103
9682fc6d
AE
104#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
105
9e15b77d
AE
106/* This allows a single page to hold an image name sent by OSD */
107#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 108#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 109
1e130199 110#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 111
d889140c
AE
112/* Feature bits */
113
5cbf6f12
AE
114#define RBD_FEATURE_LAYERING (1<<0)
115#define RBD_FEATURE_STRIPINGV2 (1<<1)
116#define RBD_FEATURES_ALL \
117 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
118
119/* Features supported by this (client software) implementation. */
120
770eba6e 121#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 122
81a89793
AE
123/*
124 * An RBD device name will be "rbd#", where the "rbd" comes from
125 * RBD_DRV_NAME above, and # is a unique integer identifier.
126 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
127 * enough to hold all possible device names.
128 */
602adf40 129#define DEV_NAME_LEN 32
81a89793 130#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
131
132/*
133 * block device image metadata (in-memory version)
134 */
135struct rbd_image_header {
f35a4dee 136 /* These six fields never change for a given rbd image */
849b4260 137 char *object_prefix;
602adf40
YS
138 __u8 obj_order;
139 __u8 crypt_type;
140 __u8 comp_type;
f35a4dee
AE
141 u64 stripe_unit;
142 u64 stripe_count;
143 u64 features; /* Might be changeable someday? */
602adf40 144
f84344f3
AE
145 /* The remaining fields need to be updated occasionally */
146 u64 image_size;
147 struct ceph_snap_context *snapc;
f35a4dee
AE
148 char *snap_names; /* format 1 only */
149 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
150};
151
0d7dbfce
AE
152/*
153 * An rbd image specification.
154 *
155 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
156 * identify an image. Each rbd_dev structure includes a pointer to
157 * an rbd_spec structure that encapsulates this identity.
158 *
159 * Each of the id's in an rbd_spec has an associated name. For a
160 * user-mapped image, the names are supplied and the id's associated
161 * with them are looked up. For a layered image, a parent image is
162 * defined by the tuple, and the names are looked up.
163 *
164 * An rbd_dev structure contains a parent_spec pointer which is
165 * non-null if the image it represents is a child in a layered
166 * image. This pointer will refer to the rbd_spec structure used
167 * by the parent rbd_dev for its own identity (i.e., the structure
168 * is shared between the parent and child).
169 *
170 * Since these structures are populated once, during the discovery
171 * phase of image construction, they are effectively immutable so
172 * we make no effort to synchronize access to them.
173 *
174 * Note that code herein does not assume the image name is known (it
175 * could be a null pointer).
0d7dbfce
AE
176 */
177struct rbd_spec {
178 u64 pool_id;
ecb4dc22 179 const char *pool_name;
0d7dbfce 180
ecb4dc22
AE
181 const char *image_id;
182 const char *image_name;
0d7dbfce
AE
183
184 u64 snap_id;
ecb4dc22 185 const char *snap_name;
0d7dbfce
AE
186
187 struct kref kref;
188};
189
602adf40 190/*
f0f8cef5 191 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
192 */
193struct rbd_client {
194 struct ceph_client *client;
195 struct kref kref;
196 struct list_head node;
197};
198
bf0d5f50
AE
199struct rbd_img_request;
200typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
201
202#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
203
204struct rbd_obj_request;
205typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
206
9969ebc5
AE
207enum obj_request_type {
208 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
209};
bf0d5f50 210
926f9b3f
AE
211enum obj_req_flags {
212 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 213 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
214 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
215 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
216};
217
bf0d5f50
AE
218struct rbd_obj_request {
219 const char *object_name;
220 u64 offset; /* object start byte */
221 u64 length; /* bytes from offset */
926f9b3f 222 unsigned long flags;
bf0d5f50 223
c5b5ef6c
AE
224 /*
225 * An object request associated with an image will have its
226 * img_data flag set; a standalone object request will not.
227 *
228 * A standalone object request will have which == BAD_WHICH
229 * and a null obj_request pointer.
230 *
231 * An object request initiated in support of a layered image
232 * object (to check for its existence before a write) will
233 * have which == BAD_WHICH and a non-null obj_request pointer.
234 *
235 * Finally, an object request for rbd image data will have
236 * which != BAD_WHICH, and will have a non-null img_request
237 * pointer. The value of which will be in the range
238 * 0..(img_request->obj_request_count-1).
239 */
240 union {
241 struct rbd_obj_request *obj_request; /* STAT op */
242 struct {
243 struct rbd_img_request *img_request;
244 u64 img_offset;
245 /* links for img_request->obj_requests list */
246 struct list_head links;
247 };
248 };
bf0d5f50
AE
249 u32 which; /* posn image request list */
250
251 enum obj_request_type type;
788e2df3
AE
252 union {
253 struct bio *bio_list;
254 struct {
255 struct page **pages;
256 u32 page_count;
257 };
258 };
0eefd470 259 struct page **copyup_pages;
ebda6408 260 u32 copyup_page_count;
bf0d5f50
AE
261
262 struct ceph_osd_request *osd_req;
263
264 u64 xferred; /* bytes transferred */
1b83bef2 265 int result;
bf0d5f50
AE
266
267 rbd_obj_callback_t callback;
788e2df3 268 struct completion completion;
bf0d5f50
AE
269
270 struct kref kref;
271};
272
0c425248 273enum img_req_flags {
9849e986
AE
274 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
275 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 276 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
277};
278
bf0d5f50 279struct rbd_img_request {
bf0d5f50
AE
280 struct rbd_device *rbd_dev;
281 u64 offset; /* starting image byte offset */
282 u64 length; /* byte count from offset */
0c425248 283 unsigned long flags;
bf0d5f50 284 union {
9849e986 285 u64 snap_id; /* for reads */
bf0d5f50 286 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
287 };
288 union {
289 struct request *rq; /* block request */
290 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 291 };
3d7efd18 292 struct page **copyup_pages;
ebda6408 293 u32 copyup_page_count;
bf0d5f50
AE
294 spinlock_t completion_lock;/* protects next_completion */
295 u32 next_completion;
296 rbd_img_callback_t callback;
55f27e09 297 u64 xferred;/* aggregate bytes transferred */
a5a337d4 298 int result; /* first nonzero obj_request result */
bf0d5f50
AE
299
300 u32 obj_request_count;
301 struct list_head obj_requests; /* rbd_obj_request structs */
302
303 struct kref kref;
304};
305
306#define for_each_obj_request(ireq, oreq) \
ef06f4d3 307 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 308#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 309 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 310#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 311 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 312
f84344f3 313struct rbd_mapping {
99c1f08f 314 u64 size;
34b13184 315 u64 features;
f84344f3
AE
316 bool read_only;
317};
318
602adf40
YS
319/*
320 * a single device
321 */
322struct rbd_device {
de71a297 323 int dev_id; /* blkdev unique id */
602adf40
YS
324
325 int major; /* blkdev assigned major */
326 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 327
a30b71b9 328 u32 image_format; /* Either 1 or 2 */
602adf40
YS
329 struct rbd_client *rbd_client;
330
331 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
332
b82d167b 333 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
334
335 struct rbd_image_header header;
b82d167b 336 unsigned long flags; /* possibly lock protected */
0d7dbfce 337 struct rbd_spec *spec;
602adf40 338
0d7dbfce 339 char *header_name;
971f839a 340
0903e875
AE
341 struct ceph_file_layout layout;
342
59c2be1e 343 struct ceph_osd_event *watch_event;
975241af 344 struct rbd_obj_request *watch_request;
59c2be1e 345
86b00e0d
AE
346 struct rbd_spec *parent_spec;
347 u64 parent_overlap;
a2acd00e 348 atomic_t parent_ref;
2f82ee54 349 struct rbd_device *parent;
86b00e0d 350
c666601a
JD
351 /* protects updating the header */
352 struct rw_semaphore header_rwsem;
f84344f3
AE
353
354 struct rbd_mapping mapping;
602adf40
YS
355
356 struct list_head node;
dfc5606d 357
dfc5606d
YS
358 /* sysfs related */
359 struct device dev;
b82d167b 360 unsigned long open_count; /* protected by lock */
dfc5606d
YS
361};
362
b82d167b
AE
363/*
364 * Flag bits for rbd_dev->flags. If atomicity is required,
365 * rbd_dev->lock is used to protect access.
366 *
367 * Currently, only the "removing" flag (which is coupled with the
368 * "open_count" field) requires atomic access.
369 */
6d292906
AE
370enum rbd_dev_flags {
371 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 372 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
373};
374
cfbf6377 375static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
e124a82f 376
602adf40 377static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
378static DEFINE_SPINLOCK(rbd_dev_list_lock);
379
432b8587
AE
380static LIST_HEAD(rbd_client_list); /* clients */
381static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 382
78c2a44a
AE
383/* Slab caches for frequently-allocated structures */
384
1c2a9dfe 385static struct kmem_cache *rbd_img_request_cache;
868311b1 386static struct kmem_cache *rbd_obj_request_cache;
78c2a44a 387static struct kmem_cache *rbd_segment_name_cache;
1c2a9dfe 388
3d7efd18
AE
389static int rbd_img_request_submit(struct rbd_img_request *img_request);
390
200a6a8b 391static void rbd_dev_device_release(struct device *dev);
dfc5606d 392
f0f8cef5
AE
393static ssize_t rbd_add(struct bus_type *bus, const char *buf,
394 size_t count);
395static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
396 size_t count);
1f3ef788 397static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
a2acd00e 398static void rbd_spec_put(struct rbd_spec *spec);
f0f8cef5 399
b15a21dd
GKH
400static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
401static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
402
403static struct attribute *rbd_bus_attrs[] = {
404 &bus_attr_add.attr,
405 &bus_attr_remove.attr,
406 NULL,
f0f8cef5 407};
b15a21dd 408ATTRIBUTE_GROUPS(rbd_bus);
f0f8cef5
AE
409
410static struct bus_type rbd_bus_type = {
411 .name = "rbd",
b15a21dd 412 .bus_groups = rbd_bus_groups,
f0f8cef5
AE
413};
414
415static void rbd_root_dev_release(struct device *dev)
416{
417}
418
419static struct device rbd_root_dev = {
420 .init_name = "rbd",
421 .release = rbd_root_dev_release,
422};
423
06ecc6cb
AE
424static __printf(2, 3)
425void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
426{
427 struct va_format vaf;
428 va_list args;
429
430 va_start(args, fmt);
431 vaf.fmt = fmt;
432 vaf.va = &args;
433
434 if (!rbd_dev)
435 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
436 else if (rbd_dev->disk)
437 printk(KERN_WARNING "%s: %s: %pV\n",
438 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
439 else if (rbd_dev->spec && rbd_dev->spec->image_name)
440 printk(KERN_WARNING "%s: image %s: %pV\n",
441 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
442 else if (rbd_dev->spec && rbd_dev->spec->image_id)
443 printk(KERN_WARNING "%s: id %s: %pV\n",
444 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
445 else /* punt */
446 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
447 RBD_DRV_NAME, rbd_dev, &vaf);
448 va_end(args);
449}
450
aafb230e
AE
451#ifdef RBD_DEBUG
452#define rbd_assert(expr) \
453 if (unlikely(!(expr))) { \
454 printk(KERN_ERR "\nAssertion failure in %s() " \
455 "at line %d:\n\n" \
456 "\trbd_assert(%s);\n\n", \
457 __func__, __LINE__, #expr); \
458 BUG(); \
459 }
460#else /* !RBD_DEBUG */
461# define rbd_assert(expr) ((void) 0)
462#endif /* !RBD_DEBUG */
dfc5606d 463
b454e36d 464static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
465static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
466static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 467
cc4a38bd 468static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7
AE
469static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
470static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
54cac61f
AE
471static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
472 u64 snap_id);
2ad3d716
AE
473static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
474 u8 *order, u64 *snap_size);
475static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
476 u64 *snap_features);
477static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
59c2be1e 478
602adf40
YS
479static int rbd_open(struct block_device *bdev, fmode_t mode)
480{
f0f8cef5 481 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 482 bool removing = false;
602adf40 483
f84344f3 484 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
485 return -EROFS;
486
a14ea269 487 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
488 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
489 removing = true;
490 else
491 rbd_dev->open_count++;
a14ea269 492 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
493 if (removing)
494 return -ENOENT;
495
c3e946ce 496 (void) get_device(&rbd_dev->dev);
f84344f3 497 set_device_ro(bdev, rbd_dev->mapping.read_only);
340c7a2b 498
602adf40
YS
499 return 0;
500}
501
db2a144b 502static void rbd_release(struct gendisk *disk, fmode_t mode)
dfc5606d
YS
503{
504 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
505 unsigned long open_count_before;
506
a14ea269 507 spin_lock_irq(&rbd_dev->lock);
b82d167b 508 open_count_before = rbd_dev->open_count--;
a14ea269 509 spin_unlock_irq(&rbd_dev->lock);
b82d167b 510 rbd_assert(open_count_before > 0);
dfc5606d 511
c3e946ce 512 put_device(&rbd_dev->dev);
dfc5606d
YS
513}
514
602adf40
YS
515static const struct block_device_operations rbd_bd_ops = {
516 .owner = THIS_MODULE,
517 .open = rbd_open,
dfc5606d 518 .release = rbd_release,
602adf40
YS
519};
520
521/*
7262cfca 522 * Initialize an rbd client instance. Success or not, this function
cfbf6377 523 * consumes ceph_opts. Caller holds client_mutex.
602adf40 524 */
f8c38929 525static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
526{
527 struct rbd_client *rbdc;
528 int ret = -ENOMEM;
529
37206ee5 530 dout("%s:\n", __func__);
602adf40
YS
531 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
532 if (!rbdc)
533 goto out_opt;
534
535 kref_init(&rbdc->kref);
536 INIT_LIST_HEAD(&rbdc->node);
537
43ae4701 538 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 539 if (IS_ERR(rbdc->client))
08f75463 540 goto out_rbdc;
43ae4701 541 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
542
543 ret = ceph_open_session(rbdc->client);
544 if (ret < 0)
08f75463 545 goto out_client;
602adf40 546
432b8587 547 spin_lock(&rbd_client_list_lock);
602adf40 548 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 549 spin_unlock(&rbd_client_list_lock);
602adf40 550
37206ee5 551 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 552
602adf40 553 return rbdc;
08f75463 554out_client:
602adf40 555 ceph_destroy_client(rbdc->client);
08f75463 556out_rbdc:
602adf40
YS
557 kfree(rbdc);
558out_opt:
43ae4701
AE
559 if (ceph_opts)
560 ceph_destroy_options(ceph_opts);
37206ee5
AE
561 dout("%s: error %d\n", __func__, ret);
562
28f259b7 563 return ERR_PTR(ret);
602adf40
YS
564}
565
2f82ee54
AE
566static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
567{
568 kref_get(&rbdc->kref);
569
570 return rbdc;
571}
572
602adf40 573/*
1f7ba331
AE
574 * Find a ceph client with specific addr and configuration. If
575 * found, bump its reference count.
602adf40 576 */
1f7ba331 577static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
578{
579 struct rbd_client *client_node;
1f7ba331 580 bool found = false;
602adf40 581
43ae4701 582 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
583 return NULL;
584
1f7ba331
AE
585 spin_lock(&rbd_client_list_lock);
586 list_for_each_entry(client_node, &rbd_client_list, node) {
587 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
588 __rbd_get_client(client_node);
589
1f7ba331
AE
590 found = true;
591 break;
592 }
593 }
594 spin_unlock(&rbd_client_list_lock);
595
596 return found ? client_node : NULL;
602adf40
YS
597}
598
59c2be1e
YS
599/*
600 * mount options
601 */
602enum {
59c2be1e
YS
603 Opt_last_int,
604 /* int args above */
605 Opt_last_string,
606 /* string args above */
cc0538b6
AE
607 Opt_read_only,
608 Opt_read_write,
609 /* Boolean args above */
610 Opt_last_bool,
59c2be1e
YS
611};
612
43ae4701 613static match_table_t rbd_opts_tokens = {
59c2be1e
YS
614 /* int args above */
615 /* string args above */
be466c1c 616 {Opt_read_only, "read_only"},
cc0538b6
AE
617 {Opt_read_only, "ro"}, /* Alternate spelling */
618 {Opt_read_write, "read_write"},
619 {Opt_read_write, "rw"}, /* Alternate spelling */
620 /* Boolean args above */
59c2be1e
YS
621 {-1, NULL}
622};
623
98571b5a
AE
624struct rbd_options {
625 bool read_only;
626};
627
628#define RBD_READ_ONLY_DEFAULT false
629
59c2be1e
YS
630static int parse_rbd_opts_token(char *c, void *private)
631{
43ae4701 632 struct rbd_options *rbd_opts = private;
59c2be1e
YS
633 substring_t argstr[MAX_OPT_ARGS];
634 int token, intval, ret;
635
43ae4701 636 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
637 if (token < 0)
638 return -EINVAL;
639
640 if (token < Opt_last_int) {
641 ret = match_int(&argstr[0], &intval);
642 if (ret < 0) {
643 pr_err("bad mount option arg (not int) "
644 "at '%s'\n", c);
645 return ret;
646 }
647 dout("got int token %d val %d\n", token, intval);
648 } else if (token > Opt_last_int && token < Opt_last_string) {
649 dout("got string token %d val %s\n", token,
650 argstr[0].from);
cc0538b6
AE
651 } else if (token > Opt_last_string && token < Opt_last_bool) {
652 dout("got Boolean token %d\n", token);
59c2be1e
YS
653 } else {
654 dout("got token %d\n", token);
655 }
656
657 switch (token) {
cc0538b6
AE
658 case Opt_read_only:
659 rbd_opts->read_only = true;
660 break;
661 case Opt_read_write:
662 rbd_opts->read_only = false;
663 break;
59c2be1e 664 default:
aafb230e
AE
665 rbd_assert(false);
666 break;
59c2be1e
YS
667 }
668 return 0;
669}
670
602adf40
YS
671/*
672 * Get a ceph client with specific addr and configuration, if one does
7262cfca
AE
673 * not exist create it. Either way, ceph_opts is consumed by this
674 * function.
602adf40 675 */
9d3997fd 676static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 677{
f8c38929 678 struct rbd_client *rbdc;
59c2be1e 679
cfbf6377 680 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
1f7ba331 681 rbdc = rbd_client_find(ceph_opts);
9d3997fd 682 if (rbdc) /* using an existing client */
43ae4701 683 ceph_destroy_options(ceph_opts);
9d3997fd 684 else
f8c38929 685 rbdc = rbd_client_create(ceph_opts);
cfbf6377 686 mutex_unlock(&client_mutex);
602adf40 687
9d3997fd 688 return rbdc;
602adf40
YS
689}
690
691/*
692 * Destroy ceph client
d23a4b3f 693 *
432b8587 694 * Caller must hold rbd_client_list_lock.
602adf40
YS
695 */
696static void rbd_client_release(struct kref *kref)
697{
698 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
699
37206ee5 700 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 701 spin_lock(&rbd_client_list_lock);
602adf40 702 list_del(&rbdc->node);
cd9d9f5d 703 spin_unlock(&rbd_client_list_lock);
602adf40
YS
704
705 ceph_destroy_client(rbdc->client);
706 kfree(rbdc);
707}
708
709/*
710 * Drop reference to ceph client node. If it's not referenced anymore, release
711 * it.
712 */
9d3997fd 713static void rbd_put_client(struct rbd_client *rbdc)
602adf40 714{
c53d5893
AE
715 if (rbdc)
716 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
717}
718
a30b71b9
AE
719static bool rbd_image_format_valid(u32 image_format)
720{
721 return image_format == 1 || image_format == 2;
722}
723
8e94af8e
AE
724static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
725{
103a150f
AE
726 size_t size;
727 u32 snap_count;
728
729 /* The header has to start with the magic rbd header text */
730 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
731 return false;
732
db2388b6
AE
733 /* The bio layer requires at least sector-sized I/O */
734
735 if (ondisk->options.order < SECTOR_SHIFT)
736 return false;
737
738 /* If we use u64 in a few spots we may be able to loosen this */
739
740 if (ondisk->options.order > 8 * sizeof (int) - 1)
741 return false;
742
103a150f
AE
743 /*
744 * The size of a snapshot header has to fit in a size_t, and
745 * that limits the number of snapshots.
746 */
747 snap_count = le32_to_cpu(ondisk->snap_count);
748 size = SIZE_MAX - sizeof (struct ceph_snap_context);
749 if (snap_count > size / sizeof (__le64))
750 return false;
751
752 /*
753 * Not only that, but the size of the entire the snapshot
754 * header must also be representable in a size_t.
755 */
756 size -= snap_count * sizeof (__le64);
757 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
758 return false;
759
760 return true;
8e94af8e
AE
761}
762
602adf40 763/*
bb23e37a
AE
764 * Fill an rbd image header with information from the given format 1
765 * on-disk header.
602adf40 766 */
662518b1 767static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 768 struct rbd_image_header_ondisk *ondisk)
602adf40 769{
662518b1 770 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
771 bool first_time = header->object_prefix == NULL;
772 struct ceph_snap_context *snapc;
773 char *object_prefix = NULL;
774 char *snap_names = NULL;
775 u64 *snap_sizes = NULL;
ccece235 776 u32 snap_count;
d2bb24e5 777 size_t size;
bb23e37a 778 int ret = -ENOMEM;
621901d6 779 u32 i;
602adf40 780
bb23e37a 781 /* Allocate this now to avoid having to handle failure below */
6a52325f 782
bb23e37a
AE
783 if (first_time) {
784 size_t len;
103a150f 785
bb23e37a
AE
786 len = strnlen(ondisk->object_prefix,
787 sizeof (ondisk->object_prefix));
788 object_prefix = kmalloc(len + 1, GFP_KERNEL);
789 if (!object_prefix)
790 return -ENOMEM;
791 memcpy(object_prefix, ondisk->object_prefix, len);
792 object_prefix[len] = '\0';
793 }
00f1f36f 794
bb23e37a 795 /* Allocate the snapshot context and fill it in */
00f1f36f 796
bb23e37a
AE
797 snap_count = le32_to_cpu(ondisk->snap_count);
798 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
799 if (!snapc)
800 goto out_err;
801 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 802 if (snap_count) {
bb23e37a 803 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
804 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
805
bb23e37a 806 /* We'll keep a copy of the snapshot names... */
621901d6 807
bb23e37a
AE
808 if (snap_names_len > (u64)SIZE_MAX)
809 goto out_2big;
810 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
811 if (!snap_names)
6a52325f
AE
812 goto out_err;
813
bb23e37a 814 /* ...as well as the array of their sizes. */
621901d6 815
d2bb24e5 816 size = snap_count * sizeof (*header->snap_sizes);
bb23e37a
AE
817 snap_sizes = kmalloc(size, GFP_KERNEL);
818 if (!snap_sizes)
6a52325f 819 goto out_err;
bb23e37a 820
f785cc1d 821 /*
bb23e37a
AE
822 * Copy the names, and fill in each snapshot's id
823 * and size.
824 *
99a41ebc 825 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a 826 * ondisk buffer we're working with has
f785cc1d
AE
827 * snap_names_len bytes beyond the end of the
828 * snapshot id array, this memcpy() is safe.
829 */
bb23e37a
AE
830 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
831 snaps = ondisk->snaps;
832 for (i = 0; i < snap_count; i++) {
833 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
834 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
835 }
602adf40 836 }
6a52325f 837
bb23e37a 838 /* We won't fail any more, fill in the header */
621901d6 839
bb23e37a
AE
840 if (first_time) {
841 header->object_prefix = object_prefix;
842 header->obj_order = ondisk->options.order;
843 header->crypt_type = ondisk->options.crypt_type;
844 header->comp_type = ondisk->options.comp_type;
845 /* The rest aren't used for format 1 images */
846 header->stripe_unit = 0;
847 header->stripe_count = 0;
848 header->features = 0;
602adf40 849 } else {
662518b1
AE
850 ceph_put_snap_context(header->snapc);
851 kfree(header->snap_names);
852 kfree(header->snap_sizes);
602adf40 853 }
849b4260 854
bb23e37a 855 /* The remaining fields always get updated (when we refresh) */
621901d6 856
f84344f3 857 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
858 header->snapc = snapc;
859 header->snap_names = snap_names;
860 header->snap_sizes = snap_sizes;
468521c1 861
662518b1 862 /* Make sure mapping size is consistent with header info */
602adf40 863
662518b1
AE
864 if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
865 if (rbd_dev->mapping.size != header->image_size)
866 rbd_dev->mapping.size = header->image_size;
867
602adf40 868 return 0;
bb23e37a
AE
869out_2big:
870 ret = -EIO;
6a52325f 871out_err:
bb23e37a
AE
872 kfree(snap_sizes);
873 kfree(snap_names);
874 ceph_put_snap_context(snapc);
875 kfree(object_prefix);
ccece235 876
bb23e37a 877 return ret;
602adf40
YS
878}
879
9682fc6d
AE
880static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
881{
882 const char *snap_name;
883
884 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
885
886 /* Skip over names until we find the one we are looking for */
887
888 snap_name = rbd_dev->header.snap_names;
889 while (which--)
890 snap_name += strlen(snap_name) + 1;
891
892 return kstrdup(snap_name, GFP_KERNEL);
893}
894
30d1cff8
AE
895/*
896 * Snapshot id comparison function for use with qsort()/bsearch().
897 * Note that result is for snapshots in *descending* order.
898 */
899static int snapid_compare_reverse(const void *s1, const void *s2)
900{
901 u64 snap_id1 = *(u64 *)s1;
902 u64 snap_id2 = *(u64 *)s2;
903
904 if (snap_id1 < snap_id2)
905 return 1;
906 return snap_id1 == snap_id2 ? 0 : -1;
907}
908
909/*
910 * Search a snapshot context to see if the given snapshot id is
911 * present.
912 *
913 * Returns the position of the snapshot id in the array if it's found,
914 * or BAD_SNAP_INDEX otherwise.
915 *
916 * Note: The snapshot array is in kept sorted (by the osd) in
917 * reverse order, highest snapshot id first.
918 */
9682fc6d
AE
919static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
920{
921 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 922 u64 *found;
9682fc6d 923
30d1cff8
AE
924 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
925 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 926
30d1cff8 927 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
928}
929
2ad3d716
AE
930static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
931 u64 snap_id)
9e15b77d 932{
54cac61f 933 u32 which;
9e15b77d 934
54cac61f
AE
935 which = rbd_dev_snap_index(rbd_dev, snap_id);
936 if (which == BAD_SNAP_INDEX)
937 return NULL;
938
939 return _rbd_dev_v1_snap_name(rbd_dev, which);
940}
941
942static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
943{
9e15b77d
AE
944 if (snap_id == CEPH_NOSNAP)
945 return RBD_SNAP_HEAD_NAME;
946
54cac61f
AE
947 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
948 if (rbd_dev->image_format == 1)
949 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 950
54cac61f 951 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
952}
953
2ad3d716
AE
954static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
955 u64 *snap_size)
602adf40 956{
2ad3d716
AE
957 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
958 if (snap_id == CEPH_NOSNAP) {
959 *snap_size = rbd_dev->header.image_size;
960 } else if (rbd_dev->image_format == 1) {
961 u32 which;
602adf40 962
2ad3d716
AE
963 which = rbd_dev_snap_index(rbd_dev, snap_id);
964 if (which == BAD_SNAP_INDEX)
965 return -ENOENT;
e86924a8 966
2ad3d716
AE
967 *snap_size = rbd_dev->header.snap_sizes[which];
968 } else {
969 u64 size = 0;
970 int ret;
971
972 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
973 if (ret)
974 return ret;
975
976 *snap_size = size;
977 }
978 return 0;
602adf40
YS
979}
980
2ad3d716
AE
981static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
982 u64 *snap_features)
602adf40 983{
2ad3d716
AE
984 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
985 if (snap_id == CEPH_NOSNAP) {
986 *snap_features = rbd_dev->header.features;
987 } else if (rbd_dev->image_format == 1) {
988 *snap_features = 0; /* No features for format 1 */
602adf40 989 } else {
2ad3d716
AE
990 u64 features = 0;
991 int ret;
8b0241f8 992
2ad3d716
AE
993 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
994 if (ret)
995 return ret;
996
997 *snap_features = features;
998 }
999 return 0;
1000}
1001
1002static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1003{
8f4b7d98 1004 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
1005 u64 size = 0;
1006 u64 features = 0;
1007 int ret;
1008
2ad3d716
AE
1009 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1010 if (ret)
1011 return ret;
1012 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1013 if (ret)
1014 return ret;
1015
1016 rbd_dev->mapping.size = size;
1017 rbd_dev->mapping.features = features;
1018
8b0241f8 1019 return 0;
602adf40
YS
1020}
1021
d1cf5788
AE
1022static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1023{
1024 rbd_dev->mapping.size = 0;
1025 rbd_dev->mapping.features = 0;
200a6a8b
AE
1026}
1027
98571b5a 1028static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 1029{
65ccfe21
AE
1030 char *name;
1031 u64 segment;
1032 int ret;
3a96d5cd 1033 char *name_format;
602adf40 1034
78c2a44a 1035 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
65ccfe21
AE
1036 if (!name)
1037 return NULL;
1038 segment = offset >> rbd_dev->header.obj_order;
3a96d5cd
JD
1039 name_format = "%s.%012llx";
1040 if (rbd_dev->image_format == 2)
1041 name_format = "%s.%016llx";
1042 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, name_format,
65ccfe21 1043 rbd_dev->header.object_prefix, segment);
2fd82b9e 1044 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
1045 pr_err("error formatting segment name for #%llu (%d)\n",
1046 segment, ret);
1047 kfree(name);
1048 name = NULL;
1049 }
602adf40 1050
65ccfe21
AE
1051 return name;
1052}
602adf40 1053
78c2a44a
AE
1054static void rbd_segment_name_free(const char *name)
1055{
1056 /* The explicit cast here is needed to drop the const qualifier */
1057
1058 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1059}
1060
65ccfe21
AE
1061static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1062{
1063 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 1064
65ccfe21
AE
1065 return offset & (segment_size - 1);
1066}
1067
1068static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1069 u64 offset, u64 length)
1070{
1071 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1072
1073 offset &= segment_size - 1;
1074
aafb230e 1075 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1076 if (offset + length > segment_size)
1077 length = segment_size - offset;
1078
1079 return length;
602adf40
YS
1080}
1081
029bcbd8
JD
1082/*
1083 * returns the size of an object in the image
1084 */
1085static u64 rbd_obj_bytes(struct rbd_image_header *header)
1086{
1087 return 1 << header->obj_order;
1088}
1089
602adf40
YS
1090/*
1091 * bio helpers
1092 */
1093
1094static void bio_chain_put(struct bio *chain)
1095{
1096 struct bio *tmp;
1097
1098 while (chain) {
1099 tmp = chain;
1100 chain = chain->bi_next;
1101 bio_put(tmp);
1102 }
1103}
1104
1105/*
1106 * zeros a bio chain, starting at specific offset
1107 */
1108static void zero_bio_chain(struct bio *chain, int start_ofs)
1109{
1110 struct bio_vec *bv;
1111 unsigned long flags;
1112 void *buf;
1113 int i;
1114 int pos = 0;
1115
1116 while (chain) {
1117 bio_for_each_segment(bv, chain, i) {
1118 if (pos + bv->bv_len > start_ofs) {
1119 int remainder = max(start_ofs - pos, 0);
1120 buf = bvec_kmap_irq(bv, &flags);
1121 memset(buf + remainder, 0,
1122 bv->bv_len - remainder);
e2156054 1123 flush_dcache_page(bv->bv_page);
85b5aaa6 1124 bvec_kunmap_irq(buf, &flags);
602adf40
YS
1125 }
1126 pos += bv->bv_len;
1127 }
1128
1129 chain = chain->bi_next;
1130 }
1131}
1132
b9434c5b
AE
1133/*
1134 * similar to zero_bio_chain(), zeros data defined by a page array,
1135 * starting at the given byte offset from the start of the array and
1136 * continuing up to the given end offset. The pages array is
1137 * assumed to be big enough to hold all bytes up to the end.
1138 */
1139static void zero_pages(struct page **pages, u64 offset, u64 end)
1140{
1141 struct page **page = &pages[offset >> PAGE_SHIFT];
1142
1143 rbd_assert(end > offset);
1144 rbd_assert(end - offset <= (u64)SIZE_MAX);
1145 while (offset < end) {
1146 size_t page_offset;
1147 size_t length;
1148 unsigned long flags;
1149 void *kaddr;
1150
491205a8
GU
1151 page_offset = offset & ~PAGE_MASK;
1152 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
b9434c5b
AE
1153 local_irq_save(flags);
1154 kaddr = kmap_atomic(*page);
1155 memset(kaddr + page_offset, 0, length);
e2156054 1156 flush_dcache_page(*page);
b9434c5b
AE
1157 kunmap_atomic(kaddr);
1158 local_irq_restore(flags);
1159
1160 offset += length;
1161 page++;
1162 }
1163}
1164
602adf40 1165/*
f7760dad
AE
1166 * Clone a portion of a bio, starting at the given byte offset
1167 * and continuing for the number of bytes indicated.
602adf40 1168 */
f7760dad
AE
1169static struct bio *bio_clone_range(struct bio *bio_src,
1170 unsigned int offset,
1171 unsigned int len,
1172 gfp_t gfpmask)
602adf40 1173{
f7760dad
AE
1174 struct bio_vec *bv;
1175 unsigned int resid;
1176 unsigned short idx;
1177 unsigned int voff;
1178 unsigned short end_idx;
1179 unsigned short vcnt;
1180 struct bio *bio;
1181
1182 /* Handle the easy case for the caller */
1183
1184 if (!offset && len == bio_src->bi_size)
1185 return bio_clone(bio_src, gfpmask);
1186
1187 if (WARN_ON_ONCE(!len))
1188 return NULL;
1189 if (WARN_ON_ONCE(len > bio_src->bi_size))
1190 return NULL;
1191 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1192 return NULL;
1193
1194 /* Find first affected segment... */
1195
1196 resid = offset;
d74c6d51 1197 bio_for_each_segment(bv, bio_src, idx) {
f7760dad
AE
1198 if (resid < bv->bv_len)
1199 break;
1200 resid -= bv->bv_len;
602adf40 1201 }
f7760dad 1202 voff = resid;
602adf40 1203
f7760dad 1204 /* ...and the last affected segment */
602adf40 1205
f7760dad
AE
1206 resid += len;
1207 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1208 if (resid <= bv->bv_len)
1209 break;
1210 resid -= bv->bv_len;
1211 }
1212 vcnt = end_idx - idx + 1;
1213
1214 /* Build the clone */
1215
1216 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1217 if (!bio)
1218 return NULL; /* ENOMEM */
602adf40 1219
f7760dad
AE
1220 bio->bi_bdev = bio_src->bi_bdev;
1221 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1222 bio->bi_rw = bio_src->bi_rw;
1223 bio->bi_flags |= 1 << BIO_CLONED;
1224
1225 /*
1226 * Copy over our part of the bio_vec, then update the first
1227 * and last (or only) entries.
1228 */
1229 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1230 vcnt * sizeof (struct bio_vec));
1231 bio->bi_io_vec[0].bv_offset += voff;
1232 if (vcnt > 1) {
1233 bio->bi_io_vec[0].bv_len -= voff;
1234 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1235 } else {
1236 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
1237 }
1238
f7760dad
AE
1239 bio->bi_vcnt = vcnt;
1240 bio->bi_size = len;
1241 bio->bi_idx = 0;
1242
1243 return bio;
1244}
1245
1246/*
1247 * Clone a portion of a bio chain, starting at the given byte offset
1248 * into the first bio in the source chain and continuing for the
1249 * number of bytes indicated. The result is another bio chain of
1250 * exactly the given length, or a null pointer on error.
1251 *
1252 * The bio_src and offset parameters are both in-out. On entry they
1253 * refer to the first source bio and the offset into that bio where
1254 * the start of data to be cloned is located.
1255 *
1256 * On return, bio_src is updated to refer to the bio in the source
1257 * chain that contains first un-cloned byte, and *offset will
1258 * contain the offset of that byte within that bio.
1259 */
1260static struct bio *bio_chain_clone_range(struct bio **bio_src,
1261 unsigned int *offset,
1262 unsigned int len,
1263 gfp_t gfpmask)
1264{
1265 struct bio *bi = *bio_src;
1266 unsigned int off = *offset;
1267 struct bio *chain = NULL;
1268 struct bio **end;
1269
1270 /* Build up a chain of clone bios up to the limit */
1271
1272 if (!bi || off >= bi->bi_size || !len)
1273 return NULL; /* Nothing to clone */
602adf40 1274
f7760dad
AE
1275 end = &chain;
1276 while (len) {
1277 unsigned int bi_size;
1278 struct bio *bio;
1279
f5400b7a
AE
1280 if (!bi) {
1281 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1282 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1283 }
f7760dad
AE
1284 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1285 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1286 if (!bio)
1287 goto out_err; /* ENOMEM */
1288
1289 *end = bio;
1290 end = &bio->bi_next;
602adf40 1291
f7760dad
AE
1292 off += bi_size;
1293 if (off == bi->bi_size) {
1294 bi = bi->bi_next;
1295 off = 0;
1296 }
1297 len -= bi_size;
1298 }
1299 *bio_src = bi;
1300 *offset = off;
1301
1302 return chain;
1303out_err:
1304 bio_chain_put(chain);
602adf40 1305
602adf40
YS
1306 return NULL;
1307}
1308
926f9b3f
AE
1309/*
1310 * The default/initial value for all object request flags is 0. For
1311 * each flag, once its value is set to 1 it is never reset to 0
1312 * again.
1313 */
57acbaa7 1314static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1315{
57acbaa7 1316 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1317 struct rbd_device *rbd_dev;
1318
57acbaa7
AE
1319 rbd_dev = obj_request->img_request->rbd_dev;
1320 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
926f9b3f
AE
1321 obj_request);
1322 }
1323}
1324
57acbaa7 1325static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1326{
1327 smp_mb();
57acbaa7 1328 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1329}
1330
57acbaa7 1331static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1332{
57acbaa7
AE
1333 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1334 struct rbd_device *rbd_dev = NULL;
6365d33a 1335
57acbaa7
AE
1336 if (obj_request_img_data_test(obj_request))
1337 rbd_dev = obj_request->img_request->rbd_dev;
1338 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
6365d33a
AE
1339 obj_request);
1340 }
1341}
1342
57acbaa7 1343static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1344{
1345 smp_mb();
57acbaa7 1346 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1347}
1348
5679c59f
AE
1349/*
1350 * This sets the KNOWN flag after (possibly) setting the EXISTS
1351 * flag. The latter is set based on the "exists" value provided.
1352 *
1353 * Note that for our purposes once an object exists it never goes
1354 * away again. It's possible that the response from two existence
1355 * checks are separated by the creation of the target object, and
1356 * the first ("doesn't exist") response arrives *after* the second
1357 * ("does exist"). In that case we ignore the second one.
1358 */
1359static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1360 bool exists)
1361{
1362 if (exists)
1363 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1364 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1365 smp_mb();
1366}
1367
1368static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1369{
1370 smp_mb();
1371 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1372}
1373
1374static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1375{
1376 smp_mb();
1377 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1378}
1379
bf0d5f50
AE
1380static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1381{
37206ee5
AE
1382 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1383 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1384 kref_get(&obj_request->kref);
1385}
1386
1387static void rbd_obj_request_destroy(struct kref *kref);
1388static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1389{
1390 rbd_assert(obj_request != NULL);
37206ee5
AE
1391 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1392 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1393 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1394}
1395
e93f3152
AE
1396static bool img_request_child_test(struct rbd_img_request *img_request);
1397static void rbd_parent_request_destroy(struct kref *kref);
bf0d5f50
AE
1398static void rbd_img_request_destroy(struct kref *kref);
1399static void rbd_img_request_put(struct rbd_img_request *img_request)
1400{
1401 rbd_assert(img_request != NULL);
37206ee5
AE
1402 dout("%s: img %p (was %d)\n", __func__, img_request,
1403 atomic_read(&img_request->kref.refcount));
e93f3152
AE
1404 if (img_request_child_test(img_request))
1405 kref_put(&img_request->kref, rbd_parent_request_destroy);
1406 else
1407 kref_put(&img_request->kref, rbd_img_request_destroy);
bf0d5f50
AE
1408}
1409
1410static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1411 struct rbd_obj_request *obj_request)
1412{
25dcf954
AE
1413 rbd_assert(obj_request->img_request == NULL);
1414
b155e86c 1415 /* Image request now owns object's original reference */
bf0d5f50 1416 obj_request->img_request = img_request;
25dcf954 1417 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1418 rbd_assert(!obj_request_img_data_test(obj_request));
1419 obj_request_img_data_set(obj_request);
bf0d5f50 1420 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1421 img_request->obj_request_count++;
1422 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1423 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1424 obj_request->which);
bf0d5f50
AE
1425}
1426
1427static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1428 struct rbd_obj_request *obj_request)
1429{
1430 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1431
37206ee5
AE
1432 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1433 obj_request->which);
bf0d5f50 1434 list_del(&obj_request->links);
25dcf954
AE
1435 rbd_assert(img_request->obj_request_count > 0);
1436 img_request->obj_request_count--;
1437 rbd_assert(obj_request->which == img_request->obj_request_count);
1438 obj_request->which = BAD_WHICH;
6365d33a 1439 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1440 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1441 obj_request->img_request = NULL;
25dcf954 1442 obj_request->callback = NULL;
bf0d5f50
AE
1443 rbd_obj_request_put(obj_request);
1444}
1445
1446static bool obj_request_type_valid(enum obj_request_type type)
1447{
1448 switch (type) {
9969ebc5 1449 case OBJ_REQUEST_NODATA:
bf0d5f50 1450 case OBJ_REQUEST_BIO:
788e2df3 1451 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1452 return true;
1453 default:
1454 return false;
1455 }
1456}
1457
bf0d5f50
AE
1458static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1459 struct rbd_obj_request *obj_request)
1460{
37206ee5
AE
1461 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1462
bf0d5f50
AE
1463 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1464}
1465
1466static void rbd_img_request_complete(struct rbd_img_request *img_request)
1467{
55f27e09 1468
37206ee5 1469 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1470
1471 /*
1472 * If no error occurred, compute the aggregate transfer
1473 * count for the image request. We could instead use
1474 * atomic64_cmpxchg() to update it as each object request
1475 * completes; not clear which way is better off hand.
1476 */
1477 if (!img_request->result) {
1478 struct rbd_obj_request *obj_request;
1479 u64 xferred = 0;
1480
1481 for_each_obj_request(img_request, obj_request)
1482 xferred += obj_request->xferred;
1483 img_request->xferred = xferred;
1484 }
1485
bf0d5f50
AE
1486 if (img_request->callback)
1487 img_request->callback(img_request);
1488 else
1489 rbd_img_request_put(img_request);
1490}
1491
788e2df3
AE
1492/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1493
1494static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1495{
37206ee5
AE
1496 dout("%s: obj %p\n", __func__, obj_request);
1497
788e2df3
AE
1498 return wait_for_completion_interruptible(&obj_request->completion);
1499}
1500
0c425248
AE
1501/*
1502 * The default/initial value for all image request flags is 0. Each
1503 * is conditionally set to 1 at image request initialization time
1504 * and currently never change thereafter.
1505 */
1506static void img_request_write_set(struct rbd_img_request *img_request)
1507{
1508 set_bit(IMG_REQ_WRITE, &img_request->flags);
1509 smp_mb();
1510}
1511
1512static bool img_request_write_test(struct rbd_img_request *img_request)
1513{
1514 smp_mb();
1515 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1516}
1517
9849e986
AE
1518static void img_request_child_set(struct rbd_img_request *img_request)
1519{
1520 set_bit(IMG_REQ_CHILD, &img_request->flags);
1521 smp_mb();
1522}
1523
e93f3152
AE
1524static void img_request_child_clear(struct rbd_img_request *img_request)
1525{
1526 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1527 smp_mb();
1528}
1529
9849e986
AE
1530static bool img_request_child_test(struct rbd_img_request *img_request)
1531{
1532 smp_mb();
1533 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1534}
1535
d0b2e944
AE
1536static void img_request_layered_set(struct rbd_img_request *img_request)
1537{
1538 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1539 smp_mb();
1540}
1541
a2acd00e
AE
1542static void img_request_layered_clear(struct rbd_img_request *img_request)
1543{
1544 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1545 smp_mb();
1546}
1547
d0b2e944
AE
1548static bool img_request_layered_test(struct rbd_img_request *img_request)
1549{
1550 smp_mb();
1551 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1552}
1553
6e2a4505
AE
1554static void
1555rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1556{
b9434c5b
AE
1557 u64 xferred = obj_request->xferred;
1558 u64 length = obj_request->length;
1559
6e2a4505
AE
1560 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1561 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1562 xferred, length);
6e2a4505 1563 /*
17c1cc1d
JD
1564 * ENOENT means a hole in the image. We zero-fill the entire
1565 * length of the request. A short read also implies zero-fill
1566 * to the end of the request. An error requires the whole
1567 * length of the request to be reported finished with an error
1568 * to the block layer. In each case we update the xferred
1569 * count to indicate the whole request was satisfied.
6e2a4505 1570 */
b9434c5b 1571 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1572 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1573 if (obj_request->type == OBJ_REQUEST_BIO)
1574 zero_bio_chain(obj_request->bio_list, 0);
1575 else
1576 zero_pages(obj_request->pages, 0, length);
6e2a4505 1577 obj_request->result = 0;
b9434c5b
AE
1578 } else if (xferred < length && !obj_request->result) {
1579 if (obj_request->type == OBJ_REQUEST_BIO)
1580 zero_bio_chain(obj_request->bio_list, xferred);
1581 else
1582 zero_pages(obj_request->pages, xferred, length);
6e2a4505 1583 }
17c1cc1d 1584 obj_request->xferred = length;
6e2a4505
AE
1585 obj_request_done_set(obj_request);
1586}
1587
bf0d5f50
AE
1588static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1589{
37206ee5
AE
1590 dout("%s: obj %p cb %p\n", __func__, obj_request,
1591 obj_request->callback);
bf0d5f50
AE
1592 if (obj_request->callback)
1593 obj_request->callback(obj_request);
788e2df3
AE
1594 else
1595 complete_all(&obj_request->completion);
bf0d5f50
AE
1596}
1597
c47f9371 1598static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1599{
1600 dout("%s: obj %p\n", __func__, obj_request);
1601 obj_request_done_set(obj_request);
1602}
1603
c47f9371 1604static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1605{
57acbaa7 1606 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1607 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1608 bool layered = false;
1609
1610 if (obj_request_img_data_test(obj_request)) {
1611 img_request = obj_request->img_request;
1612 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1613 rbd_dev = img_request->rbd_dev;
57acbaa7 1614 }
8b3e1a56
AE
1615
1616 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1617 obj_request, img_request, obj_request->result,
1618 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1619 if (layered && obj_request->result == -ENOENT &&
1620 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1621 rbd_img_parent_read(obj_request);
1622 else if (img_request)
6e2a4505
AE
1623 rbd_img_obj_request_read_callback(obj_request);
1624 else
1625 obj_request_done_set(obj_request);
bf0d5f50
AE
1626}
1627
c47f9371 1628static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1629{
1b83bef2
SW
1630 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1631 obj_request->result, obj_request->length);
1632 /*
8b3e1a56
AE
1633 * There is no such thing as a successful short write. Set
1634 * it to our originally-requested length.
1b83bef2
SW
1635 */
1636 obj_request->xferred = obj_request->length;
07741308 1637 obj_request_done_set(obj_request);
bf0d5f50
AE
1638}
1639
fbfab539
AE
1640/*
1641 * For a simple stat call there's nothing to do. We'll do more if
1642 * this is part of a write sequence for a layered image.
1643 */
c47f9371 1644static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1645{
37206ee5 1646 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1647 obj_request_done_set(obj_request);
1648}
1649
bf0d5f50
AE
1650static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1651 struct ceph_msg *msg)
1652{
1653 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1654 u16 opcode;
1655
37206ee5 1656 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50 1657 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1658 if (obj_request_img_data_test(obj_request)) {
1659 rbd_assert(obj_request->img_request);
1660 rbd_assert(obj_request->which != BAD_WHICH);
1661 } else {
1662 rbd_assert(obj_request->which == BAD_WHICH);
1663 }
bf0d5f50 1664
1b83bef2
SW
1665 if (osd_req->r_result < 0)
1666 obj_request->result = osd_req->r_result;
bf0d5f50 1667
0eefd470 1668 BUG_ON(osd_req->r_num_ops > 2);
bf0d5f50 1669
c47f9371
AE
1670 /*
1671 * We support a 64-bit length, but ultimately it has to be
1672 * passed to blk_end_request(), which takes an unsigned int.
1673 */
1b83bef2 1674 obj_request->xferred = osd_req->r_reply_op_len[0];
8b3e1a56 1675 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
79528734 1676 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1677 switch (opcode) {
1678 case CEPH_OSD_OP_READ:
c47f9371 1679 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1680 break;
1681 case CEPH_OSD_OP_WRITE:
c47f9371 1682 rbd_osd_write_callback(obj_request);
bf0d5f50 1683 break;
fbfab539 1684 case CEPH_OSD_OP_STAT:
c47f9371 1685 rbd_osd_stat_callback(obj_request);
fbfab539 1686 break;
36be9a76 1687 case CEPH_OSD_OP_CALL:
b8d70035 1688 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1689 case CEPH_OSD_OP_WATCH:
c47f9371 1690 rbd_osd_trivial_callback(obj_request);
9969ebc5 1691 break;
bf0d5f50
AE
1692 default:
1693 rbd_warn(NULL, "%s: unsupported op %hu\n",
1694 obj_request->object_name, (unsigned short) opcode);
1695 break;
1696 }
1697
07741308 1698 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1699 rbd_obj_request_complete(obj_request);
1700}
1701
9d4df01f 1702static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3
AE
1703{
1704 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1705 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1706 u64 snap_id;
430c28c3 1707
8c042b0d 1708 rbd_assert(osd_req != NULL);
430c28c3 1709
9d4df01f 1710 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
8c042b0d 1711 ceph_osdc_build_request(osd_req, obj_request->offset,
9d4df01f
AE
1712 NULL, snap_id, NULL);
1713}
1714
1715static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1716{
1717 struct rbd_img_request *img_request = obj_request->img_request;
1718 struct ceph_osd_request *osd_req = obj_request->osd_req;
1719 struct ceph_snap_context *snapc;
1720 struct timespec mtime = CURRENT_TIME;
1721
1722 rbd_assert(osd_req != NULL);
1723
1724 snapc = img_request ? img_request->snapc : NULL;
1725 ceph_osdc_build_request(osd_req, obj_request->offset,
1726 snapc, CEPH_NOSNAP, &mtime);
430c28c3
AE
1727}
1728
bf0d5f50
AE
1729static struct ceph_osd_request *rbd_osd_req_create(
1730 struct rbd_device *rbd_dev,
1731 bool write_request,
430c28c3 1732 struct rbd_obj_request *obj_request)
bf0d5f50 1733{
bf0d5f50
AE
1734 struct ceph_snap_context *snapc = NULL;
1735 struct ceph_osd_client *osdc;
1736 struct ceph_osd_request *osd_req;
bf0d5f50 1737
6365d33a
AE
1738 if (obj_request_img_data_test(obj_request)) {
1739 struct rbd_img_request *img_request = obj_request->img_request;
1740
0c425248
AE
1741 rbd_assert(write_request ==
1742 img_request_write_test(img_request));
1743 if (write_request)
bf0d5f50 1744 snapc = img_request->snapc;
bf0d5f50
AE
1745 }
1746
1747 /* Allocate and initialize the request, for the single op */
1748
1749 osdc = &rbd_dev->rbd_client->client->osdc;
1750 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1751 if (!osd_req)
1752 return NULL; /* ENOMEM */
bf0d5f50 1753
430c28c3 1754 if (write_request)
bf0d5f50 1755 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1756 else
bf0d5f50 1757 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1758
1759 osd_req->r_callback = rbd_osd_req_callback;
1760 osd_req->r_priv = obj_request;
1761
1762 osd_req->r_oid_len = strlen(obj_request->object_name);
1763 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1764 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1765
1766 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1767
bf0d5f50
AE
1768 return osd_req;
1769}
1770
0eefd470
AE
1771/*
1772 * Create a copyup osd request based on the information in the
1773 * object request supplied. A copyup request has two osd ops,
1774 * a copyup method call, and a "normal" write request.
1775 */
1776static struct ceph_osd_request *
1777rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1778{
1779 struct rbd_img_request *img_request;
1780 struct ceph_snap_context *snapc;
1781 struct rbd_device *rbd_dev;
1782 struct ceph_osd_client *osdc;
1783 struct ceph_osd_request *osd_req;
1784
1785 rbd_assert(obj_request_img_data_test(obj_request));
1786 img_request = obj_request->img_request;
1787 rbd_assert(img_request);
1788 rbd_assert(img_request_write_test(img_request));
1789
1790 /* Allocate and initialize the request, for the two ops */
1791
1792 snapc = img_request->snapc;
1793 rbd_dev = img_request->rbd_dev;
1794 osdc = &rbd_dev->rbd_client->client->osdc;
1795 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1796 if (!osd_req)
1797 return NULL; /* ENOMEM */
1798
1799 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1800 osd_req->r_callback = rbd_osd_req_callback;
1801 osd_req->r_priv = obj_request;
1802
1803 osd_req->r_oid_len = strlen(obj_request->object_name);
1804 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1805 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1806
1807 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1808
1809 return osd_req;
1810}
1811
1812
bf0d5f50
AE
1813static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1814{
1815 ceph_osdc_put_request(osd_req);
1816}
1817
1818/* object_name is assumed to be a non-null pointer and NUL-terminated */
1819
1820static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1821 u64 offset, u64 length,
1822 enum obj_request_type type)
1823{
1824 struct rbd_obj_request *obj_request;
1825 size_t size;
1826 char *name;
1827
1828 rbd_assert(obj_request_type_valid(type));
1829
1830 size = strlen(object_name) + 1;
f907ad55
AE
1831 name = kmalloc(size, GFP_KERNEL);
1832 if (!name)
bf0d5f50
AE
1833 return NULL;
1834
868311b1 1835 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
f907ad55
AE
1836 if (!obj_request) {
1837 kfree(name);
1838 return NULL;
1839 }
1840
bf0d5f50
AE
1841 obj_request->object_name = memcpy(name, object_name, size);
1842 obj_request->offset = offset;
1843 obj_request->length = length;
926f9b3f 1844 obj_request->flags = 0;
bf0d5f50
AE
1845 obj_request->which = BAD_WHICH;
1846 obj_request->type = type;
1847 INIT_LIST_HEAD(&obj_request->links);
788e2df3 1848 init_completion(&obj_request->completion);
bf0d5f50
AE
1849 kref_init(&obj_request->kref);
1850
37206ee5
AE
1851 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1852 offset, length, (int)type, obj_request);
1853
bf0d5f50
AE
1854 return obj_request;
1855}
1856
1857static void rbd_obj_request_destroy(struct kref *kref)
1858{
1859 struct rbd_obj_request *obj_request;
1860
1861 obj_request = container_of(kref, struct rbd_obj_request, kref);
1862
37206ee5
AE
1863 dout("%s: obj %p\n", __func__, obj_request);
1864
bf0d5f50
AE
1865 rbd_assert(obj_request->img_request == NULL);
1866 rbd_assert(obj_request->which == BAD_WHICH);
1867
1868 if (obj_request->osd_req)
1869 rbd_osd_req_destroy(obj_request->osd_req);
1870
1871 rbd_assert(obj_request_type_valid(obj_request->type));
1872 switch (obj_request->type) {
9969ebc5
AE
1873 case OBJ_REQUEST_NODATA:
1874 break; /* Nothing to do */
bf0d5f50
AE
1875 case OBJ_REQUEST_BIO:
1876 if (obj_request->bio_list)
1877 bio_chain_put(obj_request->bio_list);
1878 break;
788e2df3
AE
1879 case OBJ_REQUEST_PAGES:
1880 if (obj_request->pages)
1881 ceph_release_page_vector(obj_request->pages,
1882 obj_request->page_count);
1883 break;
bf0d5f50
AE
1884 }
1885
f907ad55 1886 kfree(obj_request->object_name);
868311b1
AE
1887 obj_request->object_name = NULL;
1888 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
1889}
1890
fb65d228
AE
1891/* It's OK to call this for a device with no parent */
1892
1893static void rbd_spec_put(struct rbd_spec *spec);
1894static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1895{
1896 rbd_dev_remove_parent(rbd_dev);
1897 rbd_spec_put(rbd_dev->parent_spec);
1898 rbd_dev->parent_spec = NULL;
1899 rbd_dev->parent_overlap = 0;
1900}
1901
a2acd00e
AE
1902/*
1903 * Parent image reference counting is used to determine when an
1904 * image's parent fields can be safely torn down--after there are no
1905 * more in-flight requests to the parent image. When the last
1906 * reference is dropped, cleaning them up is safe.
1907 */
1908static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1909{
1910 int counter;
1911
1912 if (!rbd_dev->parent_spec)
1913 return;
1914
1915 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1916 if (counter > 0)
1917 return;
1918
1919 /* Last reference; clean up parent data structures */
1920
1921 if (!counter)
1922 rbd_dev_unparent(rbd_dev);
1923 else
1924 rbd_warn(rbd_dev, "parent reference underflow\n");
1925}
1926
1927/*
1928 * If an image has a non-zero parent overlap, get a reference to its
1929 * parent.
1930 *
392a9dad
AE
1931 * We must get the reference before checking for the overlap to
1932 * coordinate properly with zeroing the parent overlap in
1933 * rbd_dev_v2_parent_info() when an image gets flattened. We
1934 * drop it again if there is no overlap.
1935 *
a2acd00e
AE
1936 * Returns true if the rbd device has a parent with a non-zero
1937 * overlap and a reference for it was successfully taken, or
1938 * false otherwise.
1939 */
1940static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1941{
1942 int counter;
1943
1944 if (!rbd_dev->parent_spec)
1945 return false;
1946
1947 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1948 if (counter > 0 && rbd_dev->parent_overlap)
1949 return true;
1950
1951 /* Image was flattened, but parent is not yet torn down */
1952
1953 if (counter < 0)
1954 rbd_warn(rbd_dev, "parent reference overflow\n");
1955
1956 return false;
1957}
1958
bf0d5f50
AE
1959/*
1960 * Caller is responsible for filling in the list of object requests
1961 * that comprises the image request, and the Linux request pointer
1962 * (if there is one).
1963 */
cc344fa1
AE
1964static struct rbd_img_request *rbd_img_request_create(
1965 struct rbd_device *rbd_dev,
bf0d5f50 1966 u64 offset, u64 length,
e93f3152 1967 bool write_request)
bf0d5f50
AE
1968{
1969 struct rbd_img_request *img_request;
bf0d5f50 1970
1c2a9dfe 1971 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
bf0d5f50
AE
1972 if (!img_request)
1973 return NULL;
1974
1975 if (write_request) {
1976 down_read(&rbd_dev->header_rwsem);
812164f8 1977 ceph_get_snap_context(rbd_dev->header.snapc);
bf0d5f50 1978 up_read(&rbd_dev->header_rwsem);
bf0d5f50
AE
1979 }
1980
1981 img_request->rq = NULL;
1982 img_request->rbd_dev = rbd_dev;
1983 img_request->offset = offset;
1984 img_request->length = length;
0c425248
AE
1985 img_request->flags = 0;
1986 if (write_request) {
1987 img_request_write_set(img_request);
468521c1 1988 img_request->snapc = rbd_dev->header.snapc;
0c425248 1989 } else {
bf0d5f50 1990 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 1991 }
a2acd00e 1992 if (rbd_dev_parent_get(rbd_dev))
d0b2e944 1993 img_request_layered_set(img_request);
bf0d5f50
AE
1994 spin_lock_init(&img_request->completion_lock);
1995 img_request->next_completion = 0;
1996 img_request->callback = NULL;
a5a337d4 1997 img_request->result = 0;
bf0d5f50
AE
1998 img_request->obj_request_count = 0;
1999 INIT_LIST_HEAD(&img_request->obj_requests);
2000 kref_init(&img_request->kref);
2001
37206ee5
AE
2002 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2003 write_request ? "write" : "read", offset, length,
2004 img_request);
2005
bf0d5f50
AE
2006 return img_request;
2007}
2008
2009static void rbd_img_request_destroy(struct kref *kref)
2010{
2011 struct rbd_img_request *img_request;
2012 struct rbd_obj_request *obj_request;
2013 struct rbd_obj_request *next_obj_request;
2014
2015 img_request = container_of(kref, struct rbd_img_request, kref);
2016
37206ee5
AE
2017 dout("%s: img %p\n", __func__, img_request);
2018
bf0d5f50
AE
2019 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2020 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 2021 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 2022
a2acd00e
AE
2023 if (img_request_layered_test(img_request)) {
2024 img_request_layered_clear(img_request);
2025 rbd_dev_parent_put(img_request->rbd_dev);
2026 }
2027
0c425248 2028 if (img_request_write_test(img_request))
812164f8 2029 ceph_put_snap_context(img_request->snapc);
bf0d5f50 2030
1c2a9dfe 2031 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
2032}
2033
e93f3152
AE
2034static struct rbd_img_request *rbd_parent_request_create(
2035 struct rbd_obj_request *obj_request,
2036 u64 img_offset, u64 length)
2037{
2038 struct rbd_img_request *parent_request;
2039 struct rbd_device *rbd_dev;
2040
2041 rbd_assert(obj_request->img_request);
2042 rbd_dev = obj_request->img_request->rbd_dev;
2043
2044 parent_request = rbd_img_request_create(rbd_dev->parent,
2045 img_offset, length, false);
2046 if (!parent_request)
2047 return NULL;
2048
2049 img_request_child_set(parent_request);
2050 rbd_obj_request_get(obj_request);
2051 parent_request->obj_request = obj_request;
2052
2053 return parent_request;
2054}
2055
2056static void rbd_parent_request_destroy(struct kref *kref)
2057{
2058 struct rbd_img_request *parent_request;
2059 struct rbd_obj_request *orig_request;
2060
2061 parent_request = container_of(kref, struct rbd_img_request, kref);
2062 orig_request = parent_request->obj_request;
2063
2064 parent_request->obj_request = NULL;
2065 rbd_obj_request_put(orig_request);
2066 img_request_child_clear(parent_request);
2067
2068 rbd_img_request_destroy(kref);
2069}
2070
1217857f
AE
2071static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2072{
6365d33a 2073 struct rbd_img_request *img_request;
1217857f
AE
2074 unsigned int xferred;
2075 int result;
8b3e1a56 2076 bool more;
1217857f 2077
6365d33a
AE
2078 rbd_assert(obj_request_img_data_test(obj_request));
2079 img_request = obj_request->img_request;
2080
1217857f
AE
2081 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2082 xferred = (unsigned int)obj_request->xferred;
2083 result = obj_request->result;
2084 if (result) {
2085 struct rbd_device *rbd_dev = img_request->rbd_dev;
2086
2087 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
2088 img_request_write_test(img_request) ? "write" : "read",
2089 obj_request->length, obj_request->img_offset,
2090 obj_request->offset);
2091 rbd_warn(rbd_dev, " result %d xferred %x\n",
2092 result, xferred);
2093 if (!img_request->result)
2094 img_request->result = result;
2095 }
2096
f1a4739f
AE
2097 /* Image object requests don't own their page array */
2098
2099 if (obj_request->type == OBJ_REQUEST_PAGES) {
2100 obj_request->pages = NULL;
2101 obj_request->page_count = 0;
2102 }
2103
8b3e1a56
AE
2104 if (img_request_child_test(img_request)) {
2105 rbd_assert(img_request->obj_request != NULL);
2106 more = obj_request->which < img_request->obj_request_count - 1;
2107 } else {
2108 rbd_assert(img_request->rq != NULL);
2109 more = blk_end_request(img_request->rq, result, xferred);
2110 }
2111
2112 return more;
1217857f
AE
2113}
2114
2169238d
AE
2115static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2116{
2117 struct rbd_img_request *img_request;
2118 u32 which = obj_request->which;
2119 bool more = true;
2120
6365d33a 2121 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
2122 img_request = obj_request->img_request;
2123
2124 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2125 rbd_assert(img_request != NULL);
2169238d
AE
2126 rbd_assert(img_request->obj_request_count > 0);
2127 rbd_assert(which != BAD_WHICH);
2128 rbd_assert(which < img_request->obj_request_count);
2129 rbd_assert(which >= img_request->next_completion);
2130
2131 spin_lock_irq(&img_request->completion_lock);
2132 if (which != img_request->next_completion)
2133 goto out;
2134
2135 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
2136 rbd_assert(more);
2137 rbd_assert(which < img_request->obj_request_count);
2138
2139 if (!obj_request_done_test(obj_request))
2140 break;
1217857f 2141 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
2142 which++;
2143 }
2144
2145 rbd_assert(more ^ (which == img_request->obj_request_count));
2146 img_request->next_completion = which;
2147out:
2148 spin_unlock_irq(&img_request->completion_lock);
2149
2150 if (!more)
2151 rbd_img_request_complete(img_request);
2152}
2153
f1a4739f
AE
2154/*
2155 * Split up an image request into one or more object requests, each
2156 * to a different object. The "type" parameter indicates whether
2157 * "data_desc" is the pointer to the head of a list of bio
2158 * structures, or the base of a page array. In either case this
2159 * function assumes data_desc describes memory sufficient to hold
2160 * all data described by the image request.
2161 */
2162static int rbd_img_request_fill(struct rbd_img_request *img_request,
2163 enum obj_request_type type,
2164 void *data_desc)
bf0d5f50
AE
2165{
2166 struct rbd_device *rbd_dev = img_request->rbd_dev;
2167 struct rbd_obj_request *obj_request = NULL;
2168 struct rbd_obj_request *next_obj_request;
0c425248 2169 bool write_request = img_request_write_test(img_request);
a158073c 2170 struct bio *bio_list = NULL;
f1a4739f 2171 unsigned int bio_offset = 0;
a158073c 2172 struct page **pages = NULL;
7da22d29 2173 u64 img_offset;
bf0d5f50
AE
2174 u64 resid;
2175 u16 opcode;
2176
f1a4739f
AE
2177 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2178 (int)type, data_desc);
37206ee5 2179
430c28c3 2180 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
7da22d29 2181 img_offset = img_request->offset;
bf0d5f50 2182 resid = img_request->length;
4dda41d3 2183 rbd_assert(resid > 0);
f1a4739f
AE
2184
2185 if (type == OBJ_REQUEST_BIO) {
2186 bio_list = data_desc;
2187 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2188 } else {
2189 rbd_assert(type == OBJ_REQUEST_PAGES);
2190 pages = data_desc;
2191 }
2192
bf0d5f50 2193 while (resid) {
2fa12320 2194 struct ceph_osd_request *osd_req;
bf0d5f50 2195 const char *object_name;
bf0d5f50
AE
2196 u64 offset;
2197 u64 length;
2198
7da22d29 2199 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
2200 if (!object_name)
2201 goto out_unwind;
7da22d29
AE
2202 offset = rbd_segment_offset(rbd_dev, img_offset);
2203 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2204 obj_request = rbd_obj_request_create(object_name,
f1a4739f 2205 offset, length, type);
78c2a44a
AE
2206 /* object request has its own copy of the object name */
2207 rbd_segment_name_free(object_name);
bf0d5f50
AE
2208 if (!obj_request)
2209 goto out_unwind;
03507db6
JD
2210 /*
2211 * set obj_request->img_request before creating the
2212 * osd_request so that it gets the right snapc
2213 */
2214 rbd_img_obj_request_add(img_request, obj_request);
bf0d5f50 2215
f1a4739f
AE
2216 if (type == OBJ_REQUEST_BIO) {
2217 unsigned int clone_size;
2218
2219 rbd_assert(length <= (u64)UINT_MAX);
2220 clone_size = (unsigned int)length;
2221 obj_request->bio_list =
2222 bio_chain_clone_range(&bio_list,
2223 &bio_offset,
2224 clone_size,
2225 GFP_ATOMIC);
2226 if (!obj_request->bio_list)
2227 goto out_partial;
2228 } else {
2229 unsigned int page_count;
2230
2231 obj_request->pages = pages;
2232 page_count = (u32)calc_pages_for(offset, length);
2233 obj_request->page_count = page_count;
2234 if ((offset + length) & ~PAGE_MASK)
2235 page_count--; /* more on last page */
2236 pages += page_count;
2237 }
bf0d5f50 2238
2fa12320
AE
2239 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2240 obj_request);
2241 if (!osd_req)
bf0d5f50 2242 goto out_partial;
2fa12320 2243 obj_request->osd_req = osd_req;
2169238d 2244 obj_request->callback = rbd_img_obj_callback;
430c28c3 2245
2fa12320
AE
2246 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2247 0, 0);
f1a4739f
AE
2248 if (type == OBJ_REQUEST_BIO)
2249 osd_req_op_extent_osd_data_bio(osd_req, 0,
2250 obj_request->bio_list, length);
2251 else
2252 osd_req_op_extent_osd_data_pages(osd_req, 0,
2253 obj_request->pages, length,
2254 offset & ~PAGE_MASK, false, false);
9d4df01f
AE
2255
2256 if (write_request)
2257 rbd_osd_req_format_write(obj_request);
2258 else
2259 rbd_osd_req_format_read(obj_request);
430c28c3 2260
7da22d29 2261 obj_request->img_offset = img_offset;
bf0d5f50 2262
7da22d29 2263 img_offset += length;
bf0d5f50
AE
2264 resid -= length;
2265 }
2266
2267 return 0;
2268
2269out_partial:
2270 rbd_obj_request_put(obj_request);
2271out_unwind:
2272 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2273 rbd_obj_request_put(obj_request);
2274
2275 return -ENOMEM;
2276}
2277
0eefd470
AE
2278static void
2279rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2280{
2281 struct rbd_img_request *img_request;
2282 struct rbd_device *rbd_dev;
ebda6408 2283 struct page **pages;
0eefd470
AE
2284 u32 page_count;
2285
2286 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2287 rbd_assert(obj_request_img_data_test(obj_request));
2288 img_request = obj_request->img_request;
2289 rbd_assert(img_request);
2290
2291 rbd_dev = img_request->rbd_dev;
2292 rbd_assert(rbd_dev);
0eefd470 2293
ebda6408
AE
2294 pages = obj_request->copyup_pages;
2295 rbd_assert(pages != NULL);
0eefd470 2296 obj_request->copyup_pages = NULL;
ebda6408
AE
2297 page_count = obj_request->copyup_page_count;
2298 rbd_assert(page_count);
2299 obj_request->copyup_page_count = 0;
2300 ceph_release_page_vector(pages, page_count);
0eefd470
AE
2301
2302 /*
2303 * We want the transfer count to reflect the size of the
2304 * original write request. There is no such thing as a
2305 * successful short write, so if the request was successful
2306 * we can just set it to the originally-requested length.
2307 */
2308 if (!obj_request->result)
2309 obj_request->xferred = obj_request->length;
2310
2311 /* Finish up with the normal image object callback */
2312
2313 rbd_img_obj_callback(obj_request);
2314}
2315
3d7efd18
AE
2316static void
2317rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2318{
2319 struct rbd_obj_request *orig_request;
0eefd470
AE
2320 struct ceph_osd_request *osd_req;
2321 struct ceph_osd_client *osdc;
2322 struct rbd_device *rbd_dev;
3d7efd18 2323 struct page **pages;
ebda6408 2324 u32 page_count;
bbea1c1a 2325 int img_result;
ebda6408 2326 u64 parent_length;
b91f09f1
AE
2327 u64 offset;
2328 u64 length;
3d7efd18
AE
2329
2330 rbd_assert(img_request_child_test(img_request));
2331
2332 /* First get what we need from the image request */
2333
2334 pages = img_request->copyup_pages;
2335 rbd_assert(pages != NULL);
2336 img_request->copyup_pages = NULL;
ebda6408
AE
2337 page_count = img_request->copyup_page_count;
2338 rbd_assert(page_count);
2339 img_request->copyup_page_count = 0;
3d7efd18
AE
2340
2341 orig_request = img_request->obj_request;
2342 rbd_assert(orig_request != NULL);
b91f09f1 2343 rbd_assert(obj_request_type_valid(orig_request->type));
bbea1c1a 2344 img_result = img_request->result;
ebda6408
AE
2345 parent_length = img_request->length;
2346 rbd_assert(parent_length == img_request->xferred);
91c6febb 2347 rbd_img_request_put(img_request);
3d7efd18 2348
91c6febb
AE
2349 rbd_assert(orig_request->img_request);
2350 rbd_dev = orig_request->img_request->rbd_dev;
0eefd470 2351 rbd_assert(rbd_dev);
0eefd470 2352
bbea1c1a
AE
2353 /*
2354 * If the overlap has become 0 (most likely because the
2355 * image has been flattened) we need to free the pages
2356 * and re-submit the original write request.
2357 */
2358 if (!rbd_dev->parent_overlap) {
2359 struct ceph_osd_client *osdc;
3d7efd18 2360
bbea1c1a
AE
2361 ceph_release_page_vector(pages, page_count);
2362 osdc = &rbd_dev->rbd_client->client->osdc;
2363 img_result = rbd_obj_request_submit(osdc, orig_request);
2364 if (!img_result)
2365 return;
2366 }
0eefd470 2367
bbea1c1a 2368 if (img_result)
0eefd470 2369 goto out_err;
0eefd470 2370
8785b1d4
AE
2371 /*
2372 * The original osd request is of no use to use any more.
2373 * We need a new one that can hold the two ops in a copyup
2374 * request. Allocate the new copyup osd request for the
2375 * original request, and release the old one.
2376 */
bbea1c1a 2377 img_result = -ENOMEM;
0eefd470
AE
2378 osd_req = rbd_osd_req_create_copyup(orig_request);
2379 if (!osd_req)
2380 goto out_err;
8785b1d4 2381 rbd_osd_req_destroy(orig_request->osd_req);
0eefd470
AE
2382 orig_request->osd_req = osd_req;
2383 orig_request->copyup_pages = pages;
ebda6408 2384 orig_request->copyup_page_count = page_count;
3d7efd18 2385
0eefd470 2386 /* Initialize the copyup op */
3d7efd18 2387
0eefd470 2388 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
ebda6408 2389 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
0eefd470 2390 false, false);
3d7efd18 2391
0eefd470
AE
2392 /* Then the original write request op */
2393
b91f09f1
AE
2394 offset = orig_request->offset;
2395 length = orig_request->length;
0eefd470 2396 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
b91f09f1
AE
2397 offset, length, 0, 0);
2398 if (orig_request->type == OBJ_REQUEST_BIO)
2399 osd_req_op_extent_osd_data_bio(osd_req, 1,
2400 orig_request->bio_list, length);
2401 else
2402 osd_req_op_extent_osd_data_pages(osd_req, 1,
2403 orig_request->pages, length,
2404 offset & ~PAGE_MASK, false, false);
0eefd470
AE
2405
2406 rbd_osd_req_format_write(orig_request);
2407
2408 /* All set, send it off. */
2409
2410 orig_request->callback = rbd_img_obj_copyup_callback;
2411 osdc = &rbd_dev->rbd_client->client->osdc;
bbea1c1a
AE
2412 img_result = rbd_obj_request_submit(osdc, orig_request);
2413 if (!img_result)
0eefd470
AE
2414 return;
2415out_err:
2416 /* Record the error code and complete the request */
2417
bbea1c1a 2418 orig_request->result = img_result;
0eefd470
AE
2419 orig_request->xferred = 0;
2420 obj_request_done_set(orig_request);
2421 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2422}
2423
2424/*
2425 * Read from the parent image the range of data that covers the
2426 * entire target of the given object request. This is used for
2427 * satisfying a layered image write request when the target of an
2428 * object request from the image request does not exist.
2429 *
2430 * A page array big enough to hold the returned data is allocated
2431 * and supplied to rbd_img_request_fill() as the "data descriptor."
2432 * When the read completes, this page array will be transferred to
2433 * the original object request for the copyup operation.
2434 *
2435 * If an error occurs, record it as the result of the original
2436 * object request and mark it done so it gets completed.
2437 */
2438static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2439{
2440 struct rbd_img_request *img_request = NULL;
2441 struct rbd_img_request *parent_request = NULL;
2442 struct rbd_device *rbd_dev;
2443 u64 img_offset;
2444 u64 length;
2445 struct page **pages = NULL;
2446 u32 page_count;
2447 int result;
2448
2449 rbd_assert(obj_request_img_data_test(obj_request));
b91f09f1 2450 rbd_assert(obj_request_type_valid(obj_request->type));
3d7efd18
AE
2451
2452 img_request = obj_request->img_request;
2453 rbd_assert(img_request != NULL);
2454 rbd_dev = img_request->rbd_dev;
2455 rbd_assert(rbd_dev->parent != NULL);
2456
2457 /*
2458 * Determine the byte range covered by the object in the
2459 * child image to which the original request was to be sent.
2460 */
2461 img_offset = obj_request->img_offset - obj_request->offset;
2462 length = (u64)1 << rbd_dev->header.obj_order;
2463
a9e8ba2c
AE
2464 /*
2465 * There is no defined parent data beyond the parent
2466 * overlap, so limit what we read at that boundary if
2467 * necessary.
2468 */
2469 if (img_offset + length > rbd_dev->parent_overlap) {
2470 rbd_assert(img_offset < rbd_dev->parent_overlap);
2471 length = rbd_dev->parent_overlap - img_offset;
2472 }
2473
3d7efd18
AE
2474 /*
2475 * Allocate a page array big enough to receive the data read
2476 * from the parent.
2477 */
2478 page_count = (u32)calc_pages_for(0, length);
2479 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2480 if (IS_ERR(pages)) {
2481 result = PTR_ERR(pages);
2482 pages = NULL;
2483 goto out_err;
2484 }
2485
2486 result = -ENOMEM;
e93f3152
AE
2487 parent_request = rbd_parent_request_create(obj_request,
2488 img_offset, length);
3d7efd18
AE
2489 if (!parent_request)
2490 goto out_err;
3d7efd18
AE
2491
2492 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2493 if (result)
2494 goto out_err;
2495 parent_request->copyup_pages = pages;
ebda6408 2496 parent_request->copyup_page_count = page_count;
3d7efd18
AE
2497
2498 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2499 result = rbd_img_request_submit(parent_request);
2500 if (!result)
2501 return 0;
2502
2503 parent_request->copyup_pages = NULL;
ebda6408 2504 parent_request->copyup_page_count = 0;
3d7efd18
AE
2505 parent_request->obj_request = NULL;
2506 rbd_obj_request_put(obj_request);
2507out_err:
2508 if (pages)
2509 ceph_release_page_vector(pages, page_count);
2510 if (parent_request)
2511 rbd_img_request_put(parent_request);
2512 obj_request->result = result;
2513 obj_request->xferred = 0;
2514 obj_request_done_set(obj_request);
2515
2516 return result;
2517}
2518
c5b5ef6c
AE
2519static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2520{
c5b5ef6c 2521 struct rbd_obj_request *orig_request;
638f5abe 2522 struct rbd_device *rbd_dev;
c5b5ef6c
AE
2523 int result;
2524
2525 rbd_assert(!obj_request_img_data_test(obj_request));
2526
2527 /*
2528 * All we need from the object request is the original
2529 * request and the result of the STAT op. Grab those, then
2530 * we're done with the request.
2531 */
2532 orig_request = obj_request->obj_request;
2533 obj_request->obj_request = NULL;
912c317d 2534 rbd_obj_request_put(orig_request);
c5b5ef6c
AE
2535 rbd_assert(orig_request);
2536 rbd_assert(orig_request->img_request);
2537
2538 result = obj_request->result;
2539 obj_request->result = 0;
2540
2541 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2542 obj_request, orig_request, result,
2543 obj_request->xferred, obj_request->length);
2544 rbd_obj_request_put(obj_request);
2545
638f5abe
AE
2546 /*
2547 * If the overlap has become 0 (most likely because the
2548 * image has been flattened) we need to free the pages
2549 * and re-submit the original write request.
2550 */
2551 rbd_dev = orig_request->img_request->rbd_dev;
2552 if (!rbd_dev->parent_overlap) {
2553 struct ceph_osd_client *osdc;
2554
638f5abe
AE
2555 osdc = &rbd_dev->rbd_client->client->osdc;
2556 result = rbd_obj_request_submit(osdc, orig_request);
2557 if (!result)
2558 return;
2559 }
c5b5ef6c
AE
2560
2561 /*
2562 * Our only purpose here is to determine whether the object
2563 * exists, and we don't want to treat the non-existence as
2564 * an error. If something else comes back, transfer the
2565 * error to the original request and complete it now.
2566 */
2567 if (!result) {
2568 obj_request_existence_set(orig_request, true);
2569 } else if (result == -ENOENT) {
2570 obj_request_existence_set(orig_request, false);
2571 } else if (result) {
2572 orig_request->result = result;
3d7efd18 2573 goto out;
c5b5ef6c
AE
2574 }
2575
2576 /*
2577 * Resubmit the original request now that we have recorded
2578 * whether the target object exists.
2579 */
b454e36d 2580 orig_request->result = rbd_img_obj_request_submit(orig_request);
3d7efd18 2581out:
c5b5ef6c
AE
2582 if (orig_request->result)
2583 rbd_obj_request_complete(orig_request);
c5b5ef6c
AE
2584}
2585
2586static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2587{
2588 struct rbd_obj_request *stat_request;
2589 struct rbd_device *rbd_dev;
2590 struct ceph_osd_client *osdc;
2591 struct page **pages = NULL;
2592 u32 page_count;
2593 size_t size;
2594 int ret;
2595
2596 /*
2597 * The response data for a STAT call consists of:
2598 * le64 length;
2599 * struct {
2600 * le32 tv_sec;
2601 * le32 tv_nsec;
2602 * } mtime;
2603 */
2604 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2605 page_count = (u32)calc_pages_for(0, size);
2606 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2607 if (IS_ERR(pages))
2608 return PTR_ERR(pages);
2609
2610 ret = -ENOMEM;
2611 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2612 OBJ_REQUEST_PAGES);
2613 if (!stat_request)
2614 goto out;
2615
2616 rbd_obj_request_get(obj_request);
2617 stat_request->obj_request = obj_request;
2618 stat_request->pages = pages;
2619 stat_request->page_count = page_count;
2620
2621 rbd_assert(obj_request->img_request);
2622 rbd_dev = obj_request->img_request->rbd_dev;
2623 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2624 stat_request);
2625 if (!stat_request->osd_req)
2626 goto out;
2627 stat_request->callback = rbd_img_obj_exists_callback;
2628
2629 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2630 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2631 false, false);
9d4df01f 2632 rbd_osd_req_format_read(stat_request);
c5b5ef6c
AE
2633
2634 osdc = &rbd_dev->rbd_client->client->osdc;
2635 ret = rbd_obj_request_submit(osdc, stat_request);
2636out:
2637 if (ret)
2638 rbd_obj_request_put(obj_request);
2639
2640 return ret;
2641}
2642
b454e36d
AE
2643static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2644{
2645 struct rbd_img_request *img_request;
a9e8ba2c 2646 struct rbd_device *rbd_dev;
3d7efd18 2647 bool known;
b454e36d
AE
2648
2649 rbd_assert(obj_request_img_data_test(obj_request));
2650
2651 img_request = obj_request->img_request;
2652 rbd_assert(img_request);
a9e8ba2c 2653 rbd_dev = img_request->rbd_dev;
b454e36d 2654
b454e36d 2655 /*
a9e8ba2c
AE
2656 * Only writes to layered images need special handling.
2657 * Reads and non-layered writes are simple object requests.
2658 * Layered writes that start beyond the end of the overlap
2659 * with the parent have no parent data, so they too are
2660 * simple object requests. Finally, if the target object is
2661 * known to already exist, its parent data has already been
2662 * copied, so a write to the object can also be handled as a
2663 * simple object request.
b454e36d
AE
2664 */
2665 if (!img_request_write_test(img_request) ||
2666 !img_request_layered_test(img_request) ||
a9e8ba2c 2667 rbd_dev->parent_overlap <= obj_request->img_offset ||
3d7efd18
AE
2668 ((known = obj_request_known_test(obj_request)) &&
2669 obj_request_exists_test(obj_request))) {
b454e36d
AE
2670
2671 struct rbd_device *rbd_dev;
2672 struct ceph_osd_client *osdc;
2673
2674 rbd_dev = obj_request->img_request->rbd_dev;
2675 osdc = &rbd_dev->rbd_client->client->osdc;
2676
2677 return rbd_obj_request_submit(osdc, obj_request);
2678 }
2679
2680 /*
3d7efd18
AE
2681 * It's a layered write. The target object might exist but
2682 * we may not know that yet. If we know it doesn't exist,
2683 * start by reading the data for the full target object from
2684 * the parent so we can use it for a copyup to the target.
b454e36d 2685 */
3d7efd18
AE
2686 if (known)
2687 return rbd_img_obj_parent_read_full(obj_request);
2688
2689 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2690
2691 return rbd_img_obj_exists_submit(obj_request);
2692}
2693
bf0d5f50
AE
2694static int rbd_img_request_submit(struct rbd_img_request *img_request)
2695{
bf0d5f50 2696 struct rbd_obj_request *obj_request;
46faeed4 2697 struct rbd_obj_request *next_obj_request;
bf0d5f50 2698
37206ee5 2699 dout("%s: img %p\n", __func__, img_request);
46faeed4 2700 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
2701 int ret;
2702
b454e36d 2703 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50
AE
2704 if (ret)
2705 return ret;
bf0d5f50
AE
2706 }
2707
2708 return 0;
2709}
8b3e1a56
AE
2710
2711static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2712{
2713 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2714 struct rbd_device *rbd_dev;
2715 u64 obj_end;
02c74fba
AE
2716 u64 img_xferred;
2717 int img_result;
8b3e1a56
AE
2718
2719 rbd_assert(img_request_child_test(img_request));
2720
02c74fba
AE
2721 /* First get what we need from the image request and release it */
2722
8b3e1a56 2723 obj_request = img_request->obj_request;
02c74fba
AE
2724 img_xferred = img_request->xferred;
2725 img_result = img_request->result;
2726 rbd_img_request_put(img_request);
2727
2728 /*
2729 * If the overlap has become 0 (most likely because the
2730 * image has been flattened) we need to re-submit the
2731 * original request.
2732 */
a9e8ba2c
AE
2733 rbd_assert(obj_request);
2734 rbd_assert(obj_request->img_request);
02c74fba
AE
2735 rbd_dev = obj_request->img_request->rbd_dev;
2736 if (!rbd_dev->parent_overlap) {
2737 struct ceph_osd_client *osdc;
2738
2739 osdc = &rbd_dev->rbd_client->client->osdc;
2740 img_result = rbd_obj_request_submit(osdc, obj_request);
2741 if (!img_result)
2742 return;
2743 }
a9e8ba2c 2744
02c74fba 2745 obj_request->result = img_result;
a9e8ba2c
AE
2746 if (obj_request->result)
2747 goto out;
2748
2749 /*
2750 * We need to zero anything beyond the parent overlap
2751 * boundary. Since rbd_img_obj_request_read_callback()
2752 * will zero anything beyond the end of a short read, an
2753 * easy way to do this is to pretend the data from the
2754 * parent came up short--ending at the overlap boundary.
2755 */
2756 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2757 obj_end = obj_request->img_offset + obj_request->length;
a9e8ba2c
AE
2758 if (obj_end > rbd_dev->parent_overlap) {
2759 u64 xferred = 0;
2760
2761 if (obj_request->img_offset < rbd_dev->parent_overlap)
2762 xferred = rbd_dev->parent_overlap -
2763 obj_request->img_offset;
8b3e1a56 2764
02c74fba 2765 obj_request->xferred = min(img_xferred, xferred);
a9e8ba2c 2766 } else {
02c74fba 2767 obj_request->xferred = img_xferred;
a9e8ba2c
AE
2768 }
2769out:
8b3e1a56
AE
2770 rbd_img_obj_request_read_callback(obj_request);
2771 rbd_obj_request_complete(obj_request);
2772}
2773
2774static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2775{
8b3e1a56
AE
2776 struct rbd_img_request *img_request;
2777 int result;
2778
2779 rbd_assert(obj_request_img_data_test(obj_request));
2780 rbd_assert(obj_request->img_request != NULL);
2781 rbd_assert(obj_request->result == (s32) -ENOENT);
5b2ab72d 2782 rbd_assert(obj_request_type_valid(obj_request->type));
8b3e1a56 2783
8b3e1a56 2784 /* rbd_read_finish(obj_request, obj_request->length); */
e93f3152 2785 img_request = rbd_parent_request_create(obj_request,
8b3e1a56 2786 obj_request->img_offset,
e93f3152 2787 obj_request->length);
8b3e1a56
AE
2788 result = -ENOMEM;
2789 if (!img_request)
2790 goto out_err;
2791
5b2ab72d
AE
2792 if (obj_request->type == OBJ_REQUEST_BIO)
2793 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2794 obj_request->bio_list);
2795 else
2796 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2797 obj_request->pages);
8b3e1a56
AE
2798 if (result)
2799 goto out_err;
2800
2801 img_request->callback = rbd_img_parent_read_callback;
2802 result = rbd_img_request_submit(img_request);
2803 if (result)
2804 goto out_err;
2805
2806 return;
2807out_err:
2808 if (img_request)
2809 rbd_img_request_put(img_request);
2810 obj_request->result = result;
2811 obj_request->xferred = 0;
2812 obj_request_done_set(obj_request);
2813}
bf0d5f50 2814
cc4a38bd 2815static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
b8d70035
AE
2816{
2817 struct rbd_obj_request *obj_request;
2169238d 2818 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
b8d70035
AE
2819 int ret;
2820
2821 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2822 OBJ_REQUEST_NODATA);
2823 if (!obj_request)
2824 return -ENOMEM;
2825
2826 ret = -ENOMEM;
430c28c3 2827 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
b8d70035
AE
2828 if (!obj_request->osd_req)
2829 goto out;
2169238d 2830 obj_request->callback = rbd_obj_request_put;
b8d70035 2831
c99d2d4a 2832 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
cc4a38bd 2833 notify_id, 0, 0);
9d4df01f 2834 rbd_osd_req_format_read(obj_request);
430c28c3 2835
b8d70035 2836 ret = rbd_obj_request_submit(osdc, obj_request);
b8d70035 2837out:
cf81b60e
AE
2838 if (ret)
2839 rbd_obj_request_put(obj_request);
b8d70035
AE
2840
2841 return ret;
2842}
2843
2844static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2845{
2846 struct rbd_device *rbd_dev = (struct rbd_device *)data;
e627db08 2847 int ret;
b8d70035
AE
2848
2849 if (!rbd_dev)
2850 return;
2851
37206ee5 2852 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
cc4a38bd
AE
2853 rbd_dev->header_name, (unsigned long long)notify_id,
2854 (unsigned int)opcode);
e627db08
AE
2855 ret = rbd_dev_refresh(rbd_dev);
2856 if (ret)
3b5cf2a2 2857 rbd_warn(rbd_dev, "header refresh error (%d)\n", ret);
b8d70035 2858
cc4a38bd 2859 rbd_obj_notify_ack(rbd_dev, notify_id);
b8d70035
AE
2860}
2861
9969ebc5
AE
2862/*
2863 * Request sync osd watch/unwatch. The value of "start" determines
2864 * whether a watch request is being initiated or torn down.
2865 */
1f3ef788 2866static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
9969ebc5
AE
2867{
2868 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2869 struct rbd_obj_request *obj_request;
9969ebc5
AE
2870 int ret;
2871
2872 rbd_assert(start ^ !!rbd_dev->watch_event);
2873 rbd_assert(start ^ !!rbd_dev->watch_request);
2874
2875 if (start) {
3c663bbd 2876 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
2877 &rbd_dev->watch_event);
2878 if (ret < 0)
2879 return ret;
8eb87565 2880 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
2881 }
2882
2883 ret = -ENOMEM;
2884 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2885 OBJ_REQUEST_NODATA);
2886 if (!obj_request)
2887 goto out_cancel;
2888
430c28c3
AE
2889 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2890 if (!obj_request->osd_req)
2891 goto out_cancel;
2892
8eb87565 2893 if (start)
975241af 2894 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 2895 else
6977c3f9 2896 ceph_osdc_unregister_linger_request(osdc,
975241af 2897 rbd_dev->watch_request->osd_req);
2169238d
AE
2898
2899 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1f3ef788 2900 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
9d4df01f 2901 rbd_osd_req_format_write(obj_request);
2169238d 2902
9969ebc5
AE
2903 ret = rbd_obj_request_submit(osdc, obj_request);
2904 if (ret)
2905 goto out_cancel;
2906 ret = rbd_obj_request_wait(obj_request);
2907 if (ret)
2908 goto out_cancel;
9969ebc5
AE
2909 ret = obj_request->result;
2910 if (ret)
2911 goto out_cancel;
2912
8eb87565
AE
2913 /*
2914 * A watch request is set to linger, so the underlying osd
2915 * request won't go away until we unregister it. We retain
2916 * a pointer to the object request during that time (in
2917 * rbd_dev->watch_request), so we'll keep a reference to
2918 * it. We'll drop that reference (below) after we've
2919 * unregistered it.
2920 */
2921 if (start) {
2922 rbd_dev->watch_request = obj_request;
2923
2924 return 0;
2925 }
2926
2927 /* We have successfully torn down the watch request */
2928
2929 rbd_obj_request_put(rbd_dev->watch_request);
2930 rbd_dev->watch_request = NULL;
9969ebc5
AE
2931out_cancel:
2932 /* Cancel the event if we're tearing down, or on error */
2933 ceph_osdc_cancel_event(rbd_dev->watch_event);
2934 rbd_dev->watch_event = NULL;
9969ebc5
AE
2935 if (obj_request)
2936 rbd_obj_request_put(obj_request);
2937
2938 return ret;
2939}
2940
36be9a76 2941/*
f40eb349
AE
2942 * Synchronous osd object method call. Returns the number of bytes
2943 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
2944 */
2945static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2946 const char *object_name,
2947 const char *class_name,
2948 const char *method_name,
4157976b 2949 const void *outbound,
36be9a76 2950 size_t outbound_size,
4157976b 2951 void *inbound,
e2a58ee5 2952 size_t inbound_size)
36be9a76 2953{
2169238d 2954 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 2955 struct rbd_obj_request *obj_request;
36be9a76
AE
2956 struct page **pages;
2957 u32 page_count;
2958 int ret;
2959
2960 /*
6010a451
AE
2961 * Method calls are ultimately read operations. The result
2962 * should placed into the inbound buffer provided. They
2963 * also supply outbound data--parameters for the object
2964 * method. Currently if this is present it will be a
2965 * snapshot id.
36be9a76 2966 */
57385b51 2967 page_count = (u32)calc_pages_for(0, inbound_size);
36be9a76
AE
2968 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2969 if (IS_ERR(pages))
2970 return PTR_ERR(pages);
2971
2972 ret = -ENOMEM;
6010a451 2973 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
2974 OBJ_REQUEST_PAGES);
2975 if (!obj_request)
2976 goto out;
2977
2978 obj_request->pages = pages;
2979 obj_request->page_count = page_count;
2980
430c28c3 2981 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
36be9a76
AE
2982 if (!obj_request->osd_req)
2983 goto out;
2984
c99d2d4a 2985 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
2986 class_name, method_name);
2987 if (outbound_size) {
2988 struct ceph_pagelist *pagelist;
2989
2990 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2991 if (!pagelist)
2992 goto out;
2993
2994 ceph_pagelist_init(pagelist);
2995 ceph_pagelist_append(pagelist, outbound, outbound_size);
2996 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2997 pagelist);
2998 }
a4ce40a9
AE
2999 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
3000 obj_request->pages, inbound_size,
44cd188d 3001 0, false, false);
9d4df01f 3002 rbd_osd_req_format_read(obj_request);
430c28c3 3003
36be9a76
AE
3004 ret = rbd_obj_request_submit(osdc, obj_request);
3005 if (ret)
3006 goto out;
3007 ret = rbd_obj_request_wait(obj_request);
3008 if (ret)
3009 goto out;
3010
3011 ret = obj_request->result;
3012 if (ret < 0)
3013 goto out;
57385b51
AE
3014
3015 rbd_assert(obj_request->xferred < (u64)INT_MAX);
3016 ret = (int)obj_request->xferred;
903bb32e 3017 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
3018out:
3019 if (obj_request)
3020 rbd_obj_request_put(obj_request);
3021 else
3022 ceph_release_page_vector(pages, page_count);
3023
3024 return ret;
3025}
3026
bf0d5f50 3027static void rbd_request_fn(struct request_queue *q)
cc344fa1 3028 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
3029{
3030 struct rbd_device *rbd_dev = q->queuedata;
3031 bool read_only = rbd_dev->mapping.read_only;
3032 struct request *rq;
3033 int result;
3034
3035 while ((rq = blk_fetch_request(q))) {
3036 bool write_request = rq_data_dir(rq) == WRITE;
3037 struct rbd_img_request *img_request;
3038 u64 offset;
3039 u64 length;
3040
3041 /* Ignore any non-FS requests that filter through. */
3042
3043 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
3044 dout("%s: non-fs request type %d\n", __func__,
3045 (int) rq->cmd_type);
3046 __blk_end_request_all(rq, 0);
3047 continue;
3048 }
3049
3050 /* Ignore/skip any zero-length requests */
3051
3052 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
3053 length = (u64) blk_rq_bytes(rq);
3054
3055 if (!length) {
3056 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
3057 __blk_end_request_all(rq, 0);
3058 continue;
3059 }
3060
3061 spin_unlock_irq(q->queue_lock);
3062
3063 /* Disallow writes to a read-only device */
3064
3065 if (write_request) {
3066 result = -EROFS;
3067 if (read_only)
3068 goto end_request;
3069 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3070 }
3071
6d292906
AE
3072 /*
3073 * Quit early if the mapped snapshot no longer
3074 * exists. It's still possible the snapshot will
3075 * have disappeared by the time our request arrives
3076 * at the osd, but there's no sense in sending it if
3077 * we already know.
3078 */
3079 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
3080 dout("request for non-existent snapshot");
3081 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3082 result = -ENXIO;
3083 goto end_request;
3084 }
3085
bf0d5f50 3086 result = -EINVAL;
c0cd10db
AE
3087 if (offset && length > U64_MAX - offset + 1) {
3088 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
3089 offset, length);
bf0d5f50 3090 goto end_request; /* Shouldn't happen */
c0cd10db 3091 }
bf0d5f50 3092
00a653e2
AE
3093 result = -EIO;
3094 if (offset + length > rbd_dev->mapping.size) {
3095 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
3096 offset, length, rbd_dev->mapping.size);
3097 goto end_request;
3098 }
3099
bf0d5f50
AE
3100 result = -ENOMEM;
3101 img_request = rbd_img_request_create(rbd_dev, offset, length,
e93f3152 3102 write_request);
bf0d5f50
AE
3103 if (!img_request)
3104 goto end_request;
3105
3106 img_request->rq = rq;
3107
f1a4739f
AE
3108 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3109 rq->bio);
bf0d5f50
AE
3110 if (!result)
3111 result = rbd_img_request_submit(img_request);
3112 if (result)
3113 rbd_img_request_put(img_request);
3114end_request:
3115 spin_lock_irq(q->queue_lock);
3116 if (result < 0) {
7da22d29
AE
3117 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
3118 write_request ? "write" : "read",
3119 length, offset, result);
3120
bf0d5f50
AE
3121 __blk_end_request_all(rq, result);
3122 }
3123 }
3124}
3125
602adf40
YS
3126/*
3127 * a queue callback. Makes sure that we don't create a bio that spans across
3128 * multiple osd objects. One exception would be with a single page bios,
f7760dad 3129 * which we handle later at bio_chain_clone_range()
602adf40
YS
3130 */
3131static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3132 struct bio_vec *bvec)
3133{
3134 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
3135 sector_t sector_offset;
3136 sector_t sectors_per_obj;
3137 sector_t obj_sector_offset;
3138 int ret;
3139
3140 /*
3141 * Find how far into its rbd object the partition-relative
3142 * bio start sector is to offset relative to the enclosing
3143 * device.
3144 */
3145 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3146 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3147 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3148
3149 /*
3150 * Compute the number of bytes from that offset to the end
3151 * of the object. Account for what's already used by the bio.
3152 */
3153 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3154 if (ret > bmd->bi_size)
3155 ret -= bmd->bi_size;
3156 else
3157 ret = 0;
3158
3159 /*
3160 * Don't send back more than was asked for. And if the bio
3161 * was empty, let the whole thing through because: "Note
3162 * that a block device *must* allow a single page to be
3163 * added to an empty bio."
3164 */
3165 rbd_assert(bvec->bv_len <= PAGE_SIZE);
3166 if (ret > (int) bvec->bv_len || !bmd->bi_size)
3167 ret = (int) bvec->bv_len;
3168
3169 return ret;
602adf40
YS
3170}
3171
3172static void rbd_free_disk(struct rbd_device *rbd_dev)
3173{
3174 struct gendisk *disk = rbd_dev->disk;
3175
3176 if (!disk)
3177 return;
3178
a0cab924
AE
3179 rbd_dev->disk = NULL;
3180 if (disk->flags & GENHD_FL_UP) {
602adf40 3181 del_gendisk(disk);
a0cab924
AE
3182 if (disk->queue)
3183 blk_cleanup_queue(disk->queue);
3184 }
602adf40
YS
3185 put_disk(disk);
3186}
3187
788e2df3
AE
3188static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3189 const char *object_name,
7097f8df 3190 u64 offset, u64 length, void *buf)
788e2df3
AE
3191
3192{
2169238d 3193 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 3194 struct rbd_obj_request *obj_request;
788e2df3
AE
3195 struct page **pages = NULL;
3196 u32 page_count;
1ceae7ef 3197 size_t size;
788e2df3
AE
3198 int ret;
3199
3200 page_count = (u32) calc_pages_for(offset, length);
3201 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3202 if (IS_ERR(pages))
3203 ret = PTR_ERR(pages);
3204
3205 ret = -ENOMEM;
3206 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 3207 OBJ_REQUEST_PAGES);
788e2df3
AE
3208 if (!obj_request)
3209 goto out;
3210
3211 obj_request->pages = pages;
3212 obj_request->page_count = page_count;
3213
430c28c3 3214 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
788e2df3
AE
3215 if (!obj_request->osd_req)
3216 goto out;
3217
c99d2d4a
AE
3218 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3219 offset, length, 0, 0);
406e2c9f 3220 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 3221 obj_request->pages,
44cd188d
AE
3222 obj_request->length,
3223 obj_request->offset & ~PAGE_MASK,
3224 false, false);
9d4df01f 3225 rbd_osd_req_format_read(obj_request);
430c28c3 3226
788e2df3
AE
3227 ret = rbd_obj_request_submit(osdc, obj_request);
3228 if (ret)
3229 goto out;
3230 ret = rbd_obj_request_wait(obj_request);
3231 if (ret)
3232 goto out;
3233
3234 ret = obj_request->result;
3235 if (ret < 0)
3236 goto out;
1ceae7ef
AE
3237
3238 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3239 size = (size_t) obj_request->xferred;
903bb32e 3240 ceph_copy_from_page_vector(pages, buf, 0, size);
7097f8df
AE
3241 rbd_assert(size <= (size_t)INT_MAX);
3242 ret = (int)size;
788e2df3
AE
3243out:
3244 if (obj_request)
3245 rbd_obj_request_put(obj_request);
3246 else
3247 ceph_release_page_vector(pages, page_count);
3248
3249 return ret;
3250}
3251
602adf40 3252/*
662518b1
AE
3253 * Read the complete header for the given rbd device. On successful
3254 * return, the rbd_dev->header field will contain up-to-date
3255 * information about the image.
602adf40 3256 */
99a41ebc 3257static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 3258{
4156d998 3259 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 3260 u32 snap_count = 0;
4156d998
AE
3261 u64 names_size = 0;
3262 u32 want_count;
3263 int ret;
602adf40 3264
00f1f36f 3265 /*
4156d998
AE
3266 * The complete header will include an array of its 64-bit
3267 * snapshot ids, followed by the names of those snapshots as
3268 * a contiguous block of NUL-terminated strings. Note that
3269 * the number of snapshots could change by the time we read
3270 * it in, in which case we re-read it.
00f1f36f 3271 */
4156d998
AE
3272 do {
3273 size_t size;
3274
3275 kfree(ondisk);
3276
3277 size = sizeof (*ondisk);
3278 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3279 size += names_size;
3280 ondisk = kmalloc(size, GFP_KERNEL);
3281 if (!ondisk)
662518b1 3282 return -ENOMEM;
4156d998 3283
788e2df3 3284 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
7097f8df 3285 0, size, ondisk);
4156d998 3286 if (ret < 0)
662518b1 3287 goto out;
c0cd10db 3288 if ((size_t)ret < size) {
4156d998 3289 ret = -ENXIO;
06ecc6cb
AE
3290 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3291 size, ret);
662518b1 3292 goto out;
4156d998
AE
3293 }
3294 if (!rbd_dev_ondisk_valid(ondisk)) {
3295 ret = -ENXIO;
06ecc6cb 3296 rbd_warn(rbd_dev, "invalid header");
662518b1 3297 goto out;
81e759fb 3298 }
602adf40 3299
4156d998
AE
3300 names_size = le64_to_cpu(ondisk->snap_names_len);
3301 want_count = snap_count;
3302 snap_count = le32_to_cpu(ondisk->snap_count);
3303 } while (snap_count != want_count);
00f1f36f 3304
662518b1
AE
3305 ret = rbd_header_from_disk(rbd_dev, ondisk);
3306out:
4156d998
AE
3307 kfree(ondisk);
3308
3309 return ret;
602adf40
YS
3310}
3311
15228ede
AE
3312/*
3313 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3314 * has disappeared from the (just updated) snapshot context.
3315 */
3316static void rbd_exists_validate(struct rbd_device *rbd_dev)
3317{
3318 u64 snap_id;
3319
3320 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3321 return;
3322
3323 snap_id = rbd_dev->spec->snap_id;
3324 if (snap_id == CEPH_NOSNAP)
3325 return;
3326
3327 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3328 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3329}
3330
cc4a38bd 3331static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 3332{
e627db08 3333 u64 mapping_size;
1fe5e993
AE
3334 int ret;
3335
117973fb 3336 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
cfbf6377 3337 down_write(&rbd_dev->header_rwsem);
3b5cf2a2 3338 mapping_size = rbd_dev->mapping.size;
117973fb 3339 if (rbd_dev->image_format == 1)
99a41ebc 3340 ret = rbd_dev_v1_header_info(rbd_dev);
117973fb 3341 else
2df3fac7 3342 ret = rbd_dev_v2_header_info(rbd_dev);
15228ede
AE
3343
3344 /* If it's a mapped snapshot, validate its EXISTS flag */
3345
3346 rbd_exists_validate(rbd_dev);
cfbf6377
AE
3347 up_write(&rbd_dev->header_rwsem);
3348
00a653e2
AE
3349 if (mapping_size != rbd_dev->mapping.size) {
3350 sector_t size;
3351
3352 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3353 dout("setting size to %llu sectors", (unsigned long long)size);
3354 set_capacity(rbd_dev->disk, size);
a3fbe5d4 3355 revalidate_disk(rbd_dev->disk);
00a653e2 3356 }
1fe5e993
AE
3357
3358 return ret;
3359}
3360
602adf40
YS
3361static int rbd_init_disk(struct rbd_device *rbd_dev)
3362{
3363 struct gendisk *disk;
3364 struct request_queue *q;
593a9e7b 3365 u64 segment_size;
602adf40 3366
602adf40 3367 /* create gendisk info */
602adf40
YS
3368 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3369 if (!disk)
1fcdb8aa 3370 return -ENOMEM;
602adf40 3371
f0f8cef5 3372 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3373 rbd_dev->dev_id);
602adf40
YS
3374 disk->major = rbd_dev->major;
3375 disk->first_minor = 0;
3376 disk->fops = &rbd_bd_ops;
3377 disk->private_data = rbd_dev;
3378
bf0d5f50 3379 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
3380 if (!q)
3381 goto out_disk;
029bcbd8 3382
593a9e7b
AE
3383 /* We use the default size, but let's be explicit about it. */
3384 blk_queue_physical_block_size(q, SECTOR_SIZE);
3385
029bcbd8 3386 /* set io sizes to object size */
593a9e7b
AE
3387 segment_size = rbd_obj_bytes(&rbd_dev->header);
3388 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3389 blk_queue_max_segment_size(q, segment_size);
3390 blk_queue_io_min(q, segment_size);
3391 blk_queue_io_opt(q, segment_size);
029bcbd8 3392
602adf40
YS
3393 blk_queue_merge_bvec(q, rbd_merge_bvec);
3394 disk->queue = q;
3395
3396 q->queuedata = rbd_dev;
3397
3398 rbd_dev->disk = disk;
602adf40 3399
602adf40 3400 return 0;
602adf40
YS
3401out_disk:
3402 put_disk(disk);
1fcdb8aa
AE
3403
3404 return -ENOMEM;
602adf40
YS
3405}
3406
dfc5606d
YS
3407/*
3408 sysfs
3409*/
3410
593a9e7b
AE
3411static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3412{
3413 return container_of(dev, struct rbd_device, dev);
3414}
3415
dfc5606d
YS
3416static ssize_t rbd_size_show(struct device *dev,
3417 struct device_attribute *attr, char *buf)
3418{
593a9e7b 3419 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 3420
fc71d833
AE
3421 return sprintf(buf, "%llu\n",
3422 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
3423}
3424
34b13184
AE
3425/*
3426 * Note this shows the features for whatever's mapped, which is not
3427 * necessarily the base image.
3428 */
3429static ssize_t rbd_features_show(struct device *dev,
3430 struct device_attribute *attr, char *buf)
3431{
3432 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3433
3434 return sprintf(buf, "0x%016llx\n",
fc71d833 3435 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
3436}
3437
dfc5606d
YS
3438static ssize_t rbd_major_show(struct device *dev,
3439 struct device_attribute *attr, char *buf)
3440{
593a9e7b 3441 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 3442
fc71d833
AE
3443 if (rbd_dev->major)
3444 return sprintf(buf, "%d\n", rbd_dev->major);
3445
3446 return sprintf(buf, "(none)\n");
3447
dfc5606d
YS
3448}
3449
3450static ssize_t rbd_client_id_show(struct device *dev,
3451 struct device_attribute *attr, char *buf)
602adf40 3452{
593a9e7b 3453 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3454
1dbb4399
AE
3455 return sprintf(buf, "client%lld\n",
3456 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
3457}
3458
dfc5606d
YS
3459static ssize_t rbd_pool_show(struct device *dev,
3460 struct device_attribute *attr, char *buf)
602adf40 3461{
593a9e7b 3462 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3463
0d7dbfce 3464 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
3465}
3466
9bb2f334
AE
3467static ssize_t rbd_pool_id_show(struct device *dev,
3468 struct device_attribute *attr, char *buf)
3469{
3470 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3471
0d7dbfce 3472 return sprintf(buf, "%llu\n",
fc71d833 3473 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
3474}
3475
dfc5606d
YS
3476static ssize_t rbd_name_show(struct device *dev,
3477 struct device_attribute *attr, char *buf)
3478{
593a9e7b 3479 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3480
a92ffdf8
AE
3481 if (rbd_dev->spec->image_name)
3482 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3483
3484 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
3485}
3486
589d30e0
AE
3487static ssize_t rbd_image_id_show(struct device *dev,
3488 struct device_attribute *attr, char *buf)
3489{
3490 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3491
0d7dbfce 3492 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
3493}
3494
34b13184
AE
3495/*
3496 * Shows the name of the currently-mapped snapshot (or
3497 * RBD_SNAP_HEAD_NAME for the base image).
3498 */
dfc5606d
YS
3499static ssize_t rbd_snap_show(struct device *dev,
3500 struct device_attribute *attr,
3501 char *buf)
3502{
593a9e7b 3503 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3504
0d7dbfce 3505 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
3506}
3507
86b00e0d
AE
3508/*
3509 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3510 * for the parent image. If there is no parent, simply shows
3511 * "(no parent image)".
3512 */
3513static ssize_t rbd_parent_show(struct device *dev,
3514 struct device_attribute *attr,
3515 char *buf)
3516{
3517 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3518 struct rbd_spec *spec = rbd_dev->parent_spec;
3519 int count;
3520 char *bufp = buf;
3521
3522 if (!spec)
3523 return sprintf(buf, "(no parent image)\n");
3524
3525 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3526 (unsigned long long) spec->pool_id, spec->pool_name);
3527 if (count < 0)
3528 return count;
3529 bufp += count;
3530
3531 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3532 spec->image_name ? spec->image_name : "(unknown)");
3533 if (count < 0)
3534 return count;
3535 bufp += count;
3536
3537 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3538 (unsigned long long) spec->snap_id, spec->snap_name);
3539 if (count < 0)
3540 return count;
3541 bufp += count;
3542
3543 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3544 if (count < 0)
3545 return count;
3546 bufp += count;
3547
3548 return (ssize_t) (bufp - buf);
3549}
3550
dfc5606d
YS
3551static ssize_t rbd_image_refresh(struct device *dev,
3552 struct device_attribute *attr,
3553 const char *buf,
3554 size_t size)
3555{
593a9e7b 3556 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 3557 int ret;
602adf40 3558
cc4a38bd 3559 ret = rbd_dev_refresh(rbd_dev);
e627db08
AE
3560 if (ret)
3561 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
b813623a
AE
3562
3563 return ret < 0 ? ret : size;
dfc5606d 3564}
602adf40 3565
dfc5606d 3566static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 3567static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
3568static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3569static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3570static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 3571static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 3572static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 3573static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
3574static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3575static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 3576static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
3577
3578static struct attribute *rbd_attrs[] = {
3579 &dev_attr_size.attr,
34b13184 3580 &dev_attr_features.attr,
dfc5606d
YS
3581 &dev_attr_major.attr,
3582 &dev_attr_client_id.attr,
3583 &dev_attr_pool.attr,
9bb2f334 3584 &dev_attr_pool_id.attr,
dfc5606d 3585 &dev_attr_name.attr,
589d30e0 3586 &dev_attr_image_id.attr,
dfc5606d 3587 &dev_attr_current_snap.attr,
86b00e0d 3588 &dev_attr_parent.attr,
dfc5606d 3589 &dev_attr_refresh.attr,
dfc5606d
YS
3590 NULL
3591};
3592
3593static struct attribute_group rbd_attr_group = {
3594 .attrs = rbd_attrs,
3595};
3596
3597static const struct attribute_group *rbd_attr_groups[] = {
3598 &rbd_attr_group,
3599 NULL
3600};
3601
3602static void rbd_sysfs_dev_release(struct device *dev)
3603{
3604}
3605
3606static struct device_type rbd_device_type = {
3607 .name = "rbd",
3608 .groups = rbd_attr_groups,
3609 .release = rbd_sysfs_dev_release,
3610};
3611
8b8fb99c
AE
3612static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3613{
3614 kref_get(&spec->kref);
3615
3616 return spec;
3617}
3618
3619static void rbd_spec_free(struct kref *kref);
3620static void rbd_spec_put(struct rbd_spec *spec)
3621{
3622 if (spec)
3623 kref_put(&spec->kref, rbd_spec_free);
3624}
3625
3626static struct rbd_spec *rbd_spec_alloc(void)
3627{
3628 struct rbd_spec *spec;
3629
3630 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3631 if (!spec)
3632 return NULL;
3633 kref_init(&spec->kref);
3634
8b8fb99c
AE
3635 return spec;
3636}
3637
3638static void rbd_spec_free(struct kref *kref)
3639{
3640 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3641
3642 kfree(spec->pool_name);
3643 kfree(spec->image_id);
3644 kfree(spec->image_name);
3645 kfree(spec->snap_name);
3646 kfree(spec);
3647}
3648
cc344fa1 3649static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
3650 struct rbd_spec *spec)
3651{
3652 struct rbd_device *rbd_dev;
3653
3654 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3655 if (!rbd_dev)
3656 return NULL;
3657
3658 spin_lock_init(&rbd_dev->lock);
6d292906 3659 rbd_dev->flags = 0;
a2acd00e 3660 atomic_set(&rbd_dev->parent_ref, 0);
c53d5893 3661 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
3662 init_rwsem(&rbd_dev->header_rwsem);
3663
3664 rbd_dev->spec = spec;
3665 rbd_dev->rbd_client = rbdc;
3666
0903e875
AE
3667 /* Initialize the layout used for all rbd requests */
3668
3669 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3670 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3671 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3672 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3673
c53d5893
AE
3674 return rbd_dev;
3675}
3676
3677static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3678{
c53d5893
AE
3679 rbd_put_client(rbd_dev->rbd_client);
3680 rbd_spec_put(rbd_dev->spec);
3681 kfree(rbd_dev);
3682}
3683
9d475de5
AE
3684/*
3685 * Get the size and object order for an image snapshot, or if
3686 * snap_id is CEPH_NOSNAP, gets this information for the base
3687 * image.
3688 */
3689static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3690 u8 *order, u64 *snap_size)
3691{
3692 __le64 snapid = cpu_to_le64(snap_id);
3693 int ret;
3694 struct {
3695 u8 order;
3696 __le64 size;
3697 } __attribute__ ((packed)) size_buf = { 0 };
3698
36be9a76 3699 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5 3700 "rbd", "get_size",
4157976b 3701 &snapid, sizeof (snapid),
e2a58ee5 3702 &size_buf, sizeof (size_buf));
36be9a76 3703 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
3704 if (ret < 0)
3705 return ret;
57385b51
AE
3706 if (ret < sizeof (size_buf))
3707 return -ERANGE;
9d475de5 3708
c3545579 3709 if (order) {
c86f86e9 3710 *order = size_buf.order;
c3545579
JD
3711 dout(" order %u", (unsigned int)*order);
3712 }
9d475de5
AE
3713 *snap_size = le64_to_cpu(size_buf.size);
3714
c3545579
JD
3715 dout(" snap_id 0x%016llx snap_size = %llu\n",
3716 (unsigned long long)snap_id,
57385b51 3717 (unsigned long long)*snap_size);
9d475de5
AE
3718
3719 return 0;
3720}
3721
3722static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3723{
3724 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3725 &rbd_dev->header.obj_order,
3726 &rbd_dev->header.image_size);
3727}
3728
1e130199
AE
3729static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3730{
3731 void *reply_buf;
3732 int ret;
3733 void *p;
3734
3735 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3736 if (!reply_buf)
3737 return -ENOMEM;
3738
36be9a76 3739 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3740 "rbd", "get_object_prefix", NULL, 0,
e2a58ee5 3741 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 3742 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
3743 if (ret < 0)
3744 goto out;
3745
3746 p = reply_buf;
3747 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
3748 p + ret, NULL, GFP_NOIO);
3749 ret = 0;
1e130199
AE
3750
3751 if (IS_ERR(rbd_dev->header.object_prefix)) {
3752 ret = PTR_ERR(rbd_dev->header.object_prefix);
3753 rbd_dev->header.object_prefix = NULL;
3754 } else {
3755 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3756 }
1e130199
AE
3757out:
3758 kfree(reply_buf);
3759
3760 return ret;
3761}
3762
b1b5402a
AE
3763static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3764 u64 *snap_features)
3765{
3766 __le64 snapid = cpu_to_le64(snap_id);
3767 struct {
3768 __le64 features;
3769 __le64 incompat;
4157976b 3770 } __attribute__ ((packed)) features_buf = { 0 };
d889140c 3771 u64 incompat;
b1b5402a
AE
3772 int ret;
3773
36be9a76 3774 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a 3775 "rbd", "get_features",
4157976b 3776 &snapid, sizeof (snapid),
e2a58ee5 3777 &features_buf, sizeof (features_buf));
36be9a76 3778 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
3779 if (ret < 0)
3780 return ret;
57385b51
AE
3781 if (ret < sizeof (features_buf))
3782 return -ERANGE;
d889140c
AE
3783
3784 incompat = le64_to_cpu(features_buf.incompat);
5cbf6f12 3785 if (incompat & ~RBD_FEATURES_SUPPORTED)
b8f5c6ed 3786 return -ENXIO;
d889140c 3787
b1b5402a
AE
3788 *snap_features = le64_to_cpu(features_buf.features);
3789
3790 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
3791 (unsigned long long)snap_id,
3792 (unsigned long long)*snap_features,
3793 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
3794
3795 return 0;
3796}
3797
3798static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3799{
3800 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3801 &rbd_dev->header.features);
3802}
3803
86b00e0d
AE
3804static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3805{
3806 struct rbd_spec *parent_spec;
3807 size_t size;
3808 void *reply_buf = NULL;
3809 __le64 snapid;
3810 void *p;
3811 void *end;
642a2537 3812 u64 pool_id;
86b00e0d 3813 char *image_id;
3b5cf2a2 3814 u64 snap_id;
86b00e0d 3815 u64 overlap;
86b00e0d
AE
3816 int ret;
3817
3818 parent_spec = rbd_spec_alloc();
3819 if (!parent_spec)
3820 return -ENOMEM;
3821
3822 size = sizeof (__le64) + /* pool_id */
3823 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3824 sizeof (__le64) + /* snap_id */
3825 sizeof (__le64); /* overlap */
3826 reply_buf = kmalloc(size, GFP_KERNEL);
3827 if (!reply_buf) {
3828 ret = -ENOMEM;
3829 goto out_err;
3830 }
3831
3832 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 3833 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d 3834 "rbd", "get_parent",
4157976b 3835 &snapid, sizeof (snapid),
e2a58ee5 3836 reply_buf, size);
36be9a76 3837 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
3838 if (ret < 0)
3839 goto out_err;
3840
86b00e0d 3841 p = reply_buf;
57385b51
AE
3842 end = reply_buf + ret;
3843 ret = -ERANGE;
642a2537 3844 ceph_decode_64_safe(&p, end, pool_id, out_err);
392a9dad
AE
3845 if (pool_id == CEPH_NOPOOL) {
3846 /*
3847 * Either the parent never existed, or we have
3848 * record of it but the image got flattened so it no
3849 * longer has a parent. When the parent of a
3850 * layered image disappears we immediately set the
3851 * overlap to 0. The effect of this is that all new
3852 * requests will be treated as if the image had no
3853 * parent.
3854 */
3855 if (rbd_dev->parent_overlap) {
3856 rbd_dev->parent_overlap = 0;
3857 smp_mb();
3858 rbd_dev_parent_put(rbd_dev);
3859 pr_info("%s: clone image has been flattened\n",
3860 rbd_dev->disk->disk_name);
3861 }
3862
86b00e0d 3863 goto out; /* No parent? No problem. */
392a9dad 3864 }
86b00e0d 3865
0903e875
AE
3866 /* The ceph file layout needs to fit pool id in 32 bits */
3867
3868 ret = -EIO;
642a2537 3869 if (pool_id > (u64)U32_MAX) {
c0cd10db 3870 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
642a2537 3871 (unsigned long long)pool_id, U32_MAX);
57385b51 3872 goto out_err;
c0cd10db 3873 }
0903e875 3874
979ed480 3875 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
3876 if (IS_ERR(image_id)) {
3877 ret = PTR_ERR(image_id);
3878 goto out_err;
3879 }
3b5cf2a2 3880 ceph_decode_64_safe(&p, end, snap_id, out_err);
86b00e0d
AE
3881 ceph_decode_64_safe(&p, end, overlap, out_err);
3882
3b5cf2a2
AE
3883 /*
3884 * The parent won't change (except when the clone is
3885 * flattened, already handled that). So we only need to
3886 * record the parent spec we have not already done so.
3887 */
3888 if (!rbd_dev->parent_spec) {
3889 parent_spec->pool_id = pool_id;
3890 parent_spec->image_id = image_id;
3891 parent_spec->snap_id = snap_id;
70cf49cf
AE
3892 rbd_dev->parent_spec = parent_spec;
3893 parent_spec = NULL; /* rbd_dev now owns this */
3b5cf2a2
AE
3894 }
3895
3896 /*
3897 * We always update the parent overlap. If it's zero we
3898 * treat it specially.
3899 */
3900 rbd_dev->parent_overlap = overlap;
3901 smp_mb();
3902 if (!overlap) {
3903
3904 /* A null parent_spec indicates it's the initial probe */
3905
3906 if (parent_spec) {
3907 /*
3908 * The overlap has become zero, so the clone
3909 * must have been resized down to 0 at some
3910 * point. Treat this the same as a flatten.
3911 */
3912 rbd_dev_parent_put(rbd_dev);
3913 pr_info("%s: clone image now standalone\n",
3914 rbd_dev->disk->disk_name);
3915 } else {
3916 /*
3917 * For the initial probe, if we find the
3918 * overlap is zero we just pretend there was
3919 * no parent image.
3920 */
3921 rbd_warn(rbd_dev, "ignoring parent of "
3922 "clone with overlap 0\n");
3923 }
70cf49cf 3924 }
86b00e0d
AE
3925out:
3926 ret = 0;
3927out_err:
3928 kfree(reply_buf);
3929 rbd_spec_put(parent_spec);
3930
3931 return ret;
3932}
3933
cc070d59
AE
3934static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3935{
3936 struct {
3937 __le64 stripe_unit;
3938 __le64 stripe_count;
3939 } __attribute__ ((packed)) striping_info_buf = { 0 };
3940 size_t size = sizeof (striping_info_buf);
3941 void *p;
3942 u64 obj_size;
3943 u64 stripe_unit;
3944 u64 stripe_count;
3945 int ret;
3946
3947 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3948 "rbd", "get_stripe_unit_count", NULL, 0,
e2a58ee5 3949 (char *)&striping_info_buf, size);
cc070d59
AE
3950 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3951 if (ret < 0)
3952 return ret;
3953 if (ret < size)
3954 return -ERANGE;
3955
3956 /*
3957 * We don't actually support the "fancy striping" feature
3958 * (STRIPINGV2) yet, but if the striping sizes are the
3959 * defaults the behavior is the same as before. So find
3960 * out, and only fail if the image has non-default values.
3961 */
3962 ret = -EINVAL;
3963 obj_size = (u64)1 << rbd_dev->header.obj_order;
3964 p = &striping_info_buf;
3965 stripe_unit = ceph_decode_64(&p);
3966 if (stripe_unit != obj_size) {
3967 rbd_warn(rbd_dev, "unsupported stripe unit "
3968 "(got %llu want %llu)",
3969 stripe_unit, obj_size);
3970 return -EINVAL;
3971 }
3972 stripe_count = ceph_decode_64(&p);
3973 if (stripe_count != 1) {
3974 rbd_warn(rbd_dev, "unsupported stripe count "
3975 "(got %llu want 1)", stripe_count);
3976 return -EINVAL;
3977 }
500d0c0f
AE
3978 rbd_dev->header.stripe_unit = stripe_unit;
3979 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
3980
3981 return 0;
3982}
3983
9e15b77d
AE
3984static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3985{
3986 size_t image_id_size;
3987 char *image_id;
3988 void *p;
3989 void *end;
3990 size_t size;
3991 void *reply_buf = NULL;
3992 size_t len = 0;
3993 char *image_name = NULL;
3994 int ret;
3995
3996 rbd_assert(!rbd_dev->spec->image_name);
3997
69e7a02f
AE
3998 len = strlen(rbd_dev->spec->image_id);
3999 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
4000 image_id = kmalloc(image_id_size, GFP_KERNEL);
4001 if (!image_id)
4002 return NULL;
4003
4004 p = image_id;
4157976b 4005 end = image_id + image_id_size;
57385b51 4006 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
4007
4008 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4009 reply_buf = kmalloc(size, GFP_KERNEL);
4010 if (!reply_buf)
4011 goto out;
4012
36be9a76 4013 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
4014 "rbd", "dir_get_name",
4015 image_id, image_id_size,
e2a58ee5 4016 reply_buf, size);
9e15b77d
AE
4017 if (ret < 0)
4018 goto out;
4019 p = reply_buf;
f40eb349
AE
4020 end = reply_buf + ret;
4021
9e15b77d
AE
4022 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4023 if (IS_ERR(image_name))
4024 image_name = NULL;
4025 else
4026 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4027out:
4028 kfree(reply_buf);
4029 kfree(image_id);
4030
4031 return image_name;
4032}
4033
2ad3d716
AE
4034static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4035{
4036 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4037 const char *snap_name;
4038 u32 which = 0;
4039
4040 /* Skip over names until we find the one we are looking for */
4041
4042 snap_name = rbd_dev->header.snap_names;
4043 while (which < snapc->num_snaps) {
4044 if (!strcmp(name, snap_name))
4045 return snapc->snaps[which];
4046 snap_name += strlen(snap_name) + 1;
4047 which++;
4048 }
4049 return CEPH_NOSNAP;
4050}
4051
4052static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4053{
4054 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4055 u32 which;
4056 bool found = false;
4057 u64 snap_id;
4058
4059 for (which = 0; !found && which < snapc->num_snaps; which++) {
4060 const char *snap_name;
4061
4062 snap_id = snapc->snaps[which];
4063 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4064 if (IS_ERR(snap_name))
4065 break;
4066 found = !strcmp(name, snap_name);
4067 kfree(snap_name);
4068 }
4069 return found ? snap_id : CEPH_NOSNAP;
4070}
4071
4072/*
4073 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4074 * no snapshot by that name is found, or if an error occurs.
4075 */
4076static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4077{
4078 if (rbd_dev->image_format == 1)
4079 return rbd_v1_snap_id_by_name(rbd_dev, name);
4080
4081 return rbd_v2_snap_id_by_name(rbd_dev, name);
4082}
4083
9e15b77d 4084/*
2e9f7f1c
AE
4085 * When an rbd image has a parent image, it is identified by the
4086 * pool, image, and snapshot ids (not names). This function fills
4087 * in the names for those ids. (It's OK if we can't figure out the
4088 * name for an image id, but the pool and snapshot ids should always
4089 * exist and have names.) All names in an rbd spec are dynamically
4090 * allocated.
e1d4213f
AE
4091 *
4092 * When an image being mapped (not a parent) is probed, we have the
4093 * pool name and pool id, image name and image id, and the snapshot
4094 * name. The only thing we're missing is the snapshot id.
9e15b77d 4095 */
2e9f7f1c 4096static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
9e15b77d 4097{
2e9f7f1c
AE
4098 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4099 struct rbd_spec *spec = rbd_dev->spec;
4100 const char *pool_name;
4101 const char *image_name;
4102 const char *snap_name;
9e15b77d
AE
4103 int ret;
4104
e1d4213f
AE
4105 /*
4106 * An image being mapped will have the pool name (etc.), but
4107 * we need to look up the snapshot id.
4108 */
2e9f7f1c
AE
4109 if (spec->pool_name) {
4110 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
2ad3d716 4111 u64 snap_id;
e1d4213f 4112
2ad3d716
AE
4113 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4114 if (snap_id == CEPH_NOSNAP)
e1d4213f 4115 return -ENOENT;
2ad3d716 4116 spec->snap_id = snap_id;
e1d4213f 4117 } else {
2e9f7f1c 4118 spec->snap_id = CEPH_NOSNAP;
e1d4213f
AE
4119 }
4120
4121 return 0;
4122 }
9e15b77d 4123
2e9f7f1c 4124 /* Get the pool name; we have to make our own copy of this */
9e15b77d 4125
2e9f7f1c
AE
4126 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4127 if (!pool_name) {
4128 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
4129 return -EIO;
4130 }
2e9f7f1c
AE
4131 pool_name = kstrdup(pool_name, GFP_KERNEL);
4132 if (!pool_name)
9e15b77d
AE
4133 return -ENOMEM;
4134
4135 /* Fetch the image name; tolerate failure here */
4136
2e9f7f1c
AE
4137 image_name = rbd_dev_image_name(rbd_dev);
4138 if (!image_name)
06ecc6cb 4139 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 4140
2e9f7f1c 4141 /* Look up the snapshot name, and make a copy */
9e15b77d 4142
2e9f7f1c 4143 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
2e9f7f1c
AE
4144 if (!snap_name) {
4145 ret = -ENOMEM;
9e15b77d 4146 goto out_err;
2e9f7f1c
AE
4147 }
4148
4149 spec->pool_name = pool_name;
4150 spec->image_name = image_name;
4151 spec->snap_name = snap_name;
9e15b77d
AE
4152
4153 return 0;
4154out_err:
2e9f7f1c
AE
4155 kfree(image_name);
4156 kfree(pool_name);
9e15b77d
AE
4157
4158 return ret;
4159}
4160
cc4a38bd 4161static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
4162{
4163 size_t size;
4164 int ret;
4165 void *reply_buf;
4166 void *p;
4167 void *end;
4168 u64 seq;
4169 u32 snap_count;
4170 struct ceph_snap_context *snapc;
4171 u32 i;
4172
4173 /*
4174 * We'll need room for the seq value (maximum snapshot id),
4175 * snapshot count, and array of that many snapshot ids.
4176 * For now we have a fixed upper limit on the number we're
4177 * prepared to receive.
4178 */
4179 size = sizeof (__le64) + sizeof (__le32) +
4180 RBD_MAX_SNAP_COUNT * sizeof (__le64);
4181 reply_buf = kzalloc(size, GFP_KERNEL);
4182 if (!reply_buf)
4183 return -ENOMEM;
4184
36be9a76 4185 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 4186 "rbd", "get_snapcontext", NULL, 0,
e2a58ee5 4187 reply_buf, size);
36be9a76 4188 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
4189 if (ret < 0)
4190 goto out;
4191
35d489f9 4192 p = reply_buf;
57385b51
AE
4193 end = reply_buf + ret;
4194 ret = -ERANGE;
35d489f9
AE
4195 ceph_decode_64_safe(&p, end, seq, out);
4196 ceph_decode_32_safe(&p, end, snap_count, out);
4197
4198 /*
4199 * Make sure the reported number of snapshot ids wouldn't go
4200 * beyond the end of our buffer. But before checking that,
4201 * make sure the computed size of the snapshot context we
4202 * allocate is representable in a size_t.
4203 */
4204 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4205 / sizeof (u64)) {
4206 ret = -EINVAL;
4207 goto out;
4208 }
4209 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4210 goto out;
468521c1 4211 ret = 0;
35d489f9 4212
812164f8 4213 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
4214 if (!snapc) {
4215 ret = -ENOMEM;
4216 goto out;
4217 }
35d489f9 4218 snapc->seq = seq;
35d489f9
AE
4219 for (i = 0; i < snap_count; i++)
4220 snapc->snaps[i] = ceph_decode_64(&p);
4221
49ece554 4222 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
4223 rbd_dev->header.snapc = snapc;
4224
4225 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 4226 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
4227out:
4228 kfree(reply_buf);
4229
57385b51 4230 return ret;
35d489f9
AE
4231}
4232
54cac61f
AE
4233static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4234 u64 snap_id)
b8b1e2db
AE
4235{
4236 size_t size;
4237 void *reply_buf;
54cac61f 4238 __le64 snapid;
b8b1e2db
AE
4239 int ret;
4240 void *p;
4241 void *end;
b8b1e2db
AE
4242 char *snap_name;
4243
4244 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4245 reply_buf = kmalloc(size, GFP_KERNEL);
4246 if (!reply_buf)
4247 return ERR_PTR(-ENOMEM);
4248
54cac61f 4249 snapid = cpu_to_le64(snap_id);
36be9a76 4250 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db 4251 "rbd", "get_snapshot_name",
54cac61f 4252 &snapid, sizeof (snapid),
e2a58ee5 4253 reply_buf, size);
36be9a76 4254 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
4255 if (ret < 0) {
4256 snap_name = ERR_PTR(ret);
b8b1e2db 4257 goto out;
f40eb349 4258 }
b8b1e2db
AE
4259
4260 p = reply_buf;
f40eb349 4261 end = reply_buf + ret;
e5c35534 4262 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 4263 if (IS_ERR(snap_name))
b8b1e2db 4264 goto out;
b8b1e2db 4265
f40eb349 4266 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 4267 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
4268out:
4269 kfree(reply_buf);
4270
f40eb349 4271 return snap_name;
b8b1e2db
AE
4272}
4273
2df3fac7 4274static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 4275{
2df3fac7 4276 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 4277 int ret;
117973fb 4278
1617e40c
JD
4279 ret = rbd_dev_v2_image_size(rbd_dev);
4280 if (ret)
cfbf6377 4281 return ret;
1617e40c 4282
2df3fac7
AE
4283 if (first_time) {
4284 ret = rbd_dev_v2_header_onetime(rbd_dev);
4285 if (ret)
cfbf6377 4286 return ret;
2df3fac7
AE
4287 }
4288
642a2537
AE
4289 /*
4290 * If the image supports layering, get the parent info. We
4291 * need to probe the first time regardless. Thereafter we
4292 * only need to if there's a parent, to see if it has
4293 * disappeared due to the mapped image getting flattened.
4294 */
4295 if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4296 (first_time || rbd_dev->parent_spec)) {
4297 bool warn;
4298
4299 ret = rbd_dev_v2_parent_info(rbd_dev);
4300 if (ret)
cfbf6377 4301 return ret;
642a2537
AE
4302
4303 /*
4304 * Print a warning if this is the initial probe and
4305 * the image has a parent. Don't print it if the
4306 * image now being probed is itself a parent. We
4307 * can tell at this point because we won't know its
4308 * pool name yet (just its pool id).
4309 */
4310 warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4311 if (first_time && warn)
4312 rbd_warn(rbd_dev, "WARNING: kernel layering "
4313 "is EXPERIMENTAL!");
4314 }
4315
29334ba4
AE
4316 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4317 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4318 rbd_dev->mapping.size = rbd_dev->header.image_size;
117973fb 4319
cc4a38bd 4320 ret = rbd_dev_v2_snap_context(rbd_dev);
117973fb 4321 dout("rbd_dev_v2_snap_context returned %d\n", ret);
117973fb
AE
4322
4323 return ret;
4324}
4325
dfc5606d
YS
4326static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4327{
dfc5606d 4328 struct device *dev;
cd789ab9 4329 int ret;
dfc5606d 4330
cd789ab9 4331 dev = &rbd_dev->dev;
dfc5606d
YS
4332 dev->bus = &rbd_bus_type;
4333 dev->type = &rbd_device_type;
4334 dev->parent = &rbd_root_dev;
200a6a8b 4335 dev->release = rbd_dev_device_release;
de71a297 4336 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 4337 ret = device_register(dev);
dfc5606d 4338
dfc5606d 4339 return ret;
602adf40
YS
4340}
4341
dfc5606d
YS
4342static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4343{
4344 device_unregister(&rbd_dev->dev);
4345}
4346
e2839308 4347static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
4348
4349/*
499afd5b
AE
4350 * Get a unique rbd identifier for the given new rbd_dev, and add
4351 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 4352 */
e2839308 4353static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 4354{
e2839308 4355 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
4356
4357 spin_lock(&rbd_dev_list_lock);
4358 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4359 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
4360 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4361 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 4362}
b7f23c36 4363
1ddbe94e 4364/*
499afd5b
AE
4365 * Remove an rbd_dev from the global list, and record that its
4366 * identifier is no longer in use.
1ddbe94e 4367 */
e2839308 4368static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 4369{
d184f6bf 4370 struct list_head *tmp;
de71a297 4371 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
4372 int max_id;
4373
aafb230e 4374 rbd_assert(rbd_id > 0);
499afd5b 4375
e2839308
AE
4376 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4377 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
4378 spin_lock(&rbd_dev_list_lock);
4379 list_del_init(&rbd_dev->node);
d184f6bf
AE
4380
4381 /*
4382 * If the id being "put" is not the current maximum, there
4383 * is nothing special we need to do.
4384 */
e2839308 4385 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
4386 spin_unlock(&rbd_dev_list_lock);
4387 return;
4388 }
4389
4390 /*
4391 * We need to update the current maximum id. Search the
4392 * list to find out what it is. We're more likely to find
4393 * the maximum at the end, so search the list backward.
4394 */
4395 max_id = 0;
4396 list_for_each_prev(tmp, &rbd_dev_list) {
4397 struct rbd_device *rbd_dev;
4398
4399 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
4400 if (rbd_dev->dev_id > max_id)
4401 max_id = rbd_dev->dev_id;
d184f6bf 4402 }
499afd5b 4403 spin_unlock(&rbd_dev_list_lock);
b7f23c36 4404
1ddbe94e 4405 /*
e2839308 4406 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
4407 * which case it now accurately reflects the new maximum.
4408 * Be careful not to overwrite the maximum value in that
4409 * case.
1ddbe94e 4410 */
e2839308
AE
4411 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4412 dout(" max dev id has been reset\n");
b7f23c36
AE
4413}
4414
e28fff26
AE
4415/*
4416 * Skips over white space at *buf, and updates *buf to point to the
4417 * first found non-space character (if any). Returns the length of
593a9e7b
AE
4418 * the token (string of non-white space characters) found. Note
4419 * that *buf must be terminated with '\0'.
e28fff26
AE
4420 */
4421static inline size_t next_token(const char **buf)
4422{
4423 /*
4424 * These are the characters that produce nonzero for
4425 * isspace() in the "C" and "POSIX" locales.
4426 */
4427 const char *spaces = " \f\n\r\t\v";
4428
4429 *buf += strspn(*buf, spaces); /* Find start of token */
4430
4431 return strcspn(*buf, spaces); /* Return token length */
4432}
4433
4434/*
4435 * Finds the next token in *buf, and if the provided token buffer is
4436 * big enough, copies the found token into it. The result, if
593a9e7b
AE
4437 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4438 * must be terminated with '\0' on entry.
e28fff26
AE
4439 *
4440 * Returns the length of the token found (not including the '\0').
4441 * Return value will be 0 if no token is found, and it will be >=
4442 * token_size if the token would not fit.
4443 *
593a9e7b 4444 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
4445 * found token. Note that this occurs even if the token buffer is
4446 * too small to hold it.
4447 */
4448static inline size_t copy_token(const char **buf,
4449 char *token,
4450 size_t token_size)
4451{
4452 size_t len;
4453
4454 len = next_token(buf);
4455 if (len < token_size) {
4456 memcpy(token, *buf, len);
4457 *(token + len) = '\0';
4458 }
4459 *buf += len;
4460
4461 return len;
4462}
4463
ea3352f4
AE
4464/*
4465 * Finds the next token in *buf, dynamically allocates a buffer big
4466 * enough to hold a copy of it, and copies the token into the new
4467 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4468 * that a duplicate buffer is created even for a zero-length token.
4469 *
4470 * Returns a pointer to the newly-allocated duplicate, or a null
4471 * pointer if memory for the duplicate was not available. If
4472 * the lenp argument is a non-null pointer, the length of the token
4473 * (not including the '\0') is returned in *lenp.
4474 *
4475 * If successful, the *buf pointer will be updated to point beyond
4476 * the end of the found token.
4477 *
4478 * Note: uses GFP_KERNEL for allocation.
4479 */
4480static inline char *dup_token(const char **buf, size_t *lenp)
4481{
4482 char *dup;
4483 size_t len;
4484
4485 len = next_token(buf);
4caf35f9 4486 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
4487 if (!dup)
4488 return NULL;
ea3352f4
AE
4489 *(dup + len) = '\0';
4490 *buf += len;
4491
4492 if (lenp)
4493 *lenp = len;
4494
4495 return dup;
4496}
4497
a725f65e 4498/*
859c31df
AE
4499 * Parse the options provided for an "rbd add" (i.e., rbd image
4500 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4501 * and the data written is passed here via a NUL-terminated buffer.
4502 * Returns 0 if successful or an error code otherwise.
d22f76e7 4503 *
859c31df
AE
4504 * The information extracted from these options is recorded in
4505 * the other parameters which return dynamically-allocated
4506 * structures:
4507 * ceph_opts
4508 * The address of a pointer that will refer to a ceph options
4509 * structure. Caller must release the returned pointer using
4510 * ceph_destroy_options() when it is no longer needed.
4511 * rbd_opts
4512 * Address of an rbd options pointer. Fully initialized by
4513 * this function; caller must release with kfree().
4514 * spec
4515 * Address of an rbd image specification pointer. Fully
4516 * initialized by this function based on parsed options.
4517 * Caller must release with rbd_spec_put().
4518 *
4519 * The options passed take this form:
4520 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4521 * where:
4522 * <mon_addrs>
4523 * A comma-separated list of one or more monitor addresses.
4524 * A monitor address is an ip address, optionally followed
4525 * by a port number (separated by a colon).
4526 * I.e.: ip1[:port1][,ip2[:port2]...]
4527 * <options>
4528 * A comma-separated list of ceph and/or rbd options.
4529 * <pool_name>
4530 * The name of the rados pool containing the rbd image.
4531 * <image_name>
4532 * The name of the image in that pool to map.
4533 * <snap_id>
4534 * An optional snapshot id. If provided, the mapping will
4535 * present data from the image at the time that snapshot was
4536 * created. The image head is used if no snapshot id is
4537 * provided. Snapshot mappings are always read-only.
a725f65e 4538 */
859c31df 4539static int rbd_add_parse_args(const char *buf,
dc79b113 4540 struct ceph_options **ceph_opts,
859c31df
AE
4541 struct rbd_options **opts,
4542 struct rbd_spec **rbd_spec)
e28fff26 4543{
d22f76e7 4544 size_t len;
859c31df 4545 char *options;
0ddebc0c 4546 const char *mon_addrs;
ecb4dc22 4547 char *snap_name;
0ddebc0c 4548 size_t mon_addrs_size;
859c31df 4549 struct rbd_spec *spec = NULL;
4e9afeba 4550 struct rbd_options *rbd_opts = NULL;
859c31df 4551 struct ceph_options *copts;
dc79b113 4552 int ret;
e28fff26
AE
4553
4554 /* The first four tokens are required */
4555
7ef3214a 4556 len = next_token(&buf);
4fb5d671
AE
4557 if (!len) {
4558 rbd_warn(NULL, "no monitor address(es) provided");
4559 return -EINVAL;
4560 }
0ddebc0c 4561 mon_addrs = buf;
f28e565a 4562 mon_addrs_size = len + 1;
7ef3214a 4563 buf += len;
a725f65e 4564
dc79b113 4565 ret = -EINVAL;
f28e565a
AE
4566 options = dup_token(&buf, NULL);
4567 if (!options)
dc79b113 4568 return -ENOMEM;
4fb5d671
AE
4569 if (!*options) {
4570 rbd_warn(NULL, "no options provided");
4571 goto out_err;
4572 }
e28fff26 4573
859c31df
AE
4574 spec = rbd_spec_alloc();
4575 if (!spec)
f28e565a 4576 goto out_mem;
859c31df
AE
4577
4578 spec->pool_name = dup_token(&buf, NULL);
4579 if (!spec->pool_name)
4580 goto out_mem;
4fb5d671
AE
4581 if (!*spec->pool_name) {
4582 rbd_warn(NULL, "no pool name provided");
4583 goto out_err;
4584 }
e28fff26 4585
69e7a02f 4586 spec->image_name = dup_token(&buf, NULL);
859c31df 4587 if (!spec->image_name)
f28e565a 4588 goto out_mem;
4fb5d671
AE
4589 if (!*spec->image_name) {
4590 rbd_warn(NULL, "no image name provided");
4591 goto out_err;
4592 }
d4b125e9 4593
f28e565a
AE
4594 /*
4595 * Snapshot name is optional; default is to use "-"
4596 * (indicating the head/no snapshot).
4597 */
3feeb894 4598 len = next_token(&buf);
820a5f3e 4599 if (!len) {
3feeb894
AE
4600 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4601 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 4602 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 4603 ret = -ENAMETOOLONG;
f28e565a 4604 goto out_err;
849b4260 4605 }
ecb4dc22
AE
4606 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4607 if (!snap_name)
f28e565a 4608 goto out_mem;
ecb4dc22
AE
4609 *(snap_name + len) = '\0';
4610 spec->snap_name = snap_name;
e5c35534 4611
0ddebc0c 4612 /* Initialize all rbd options to the defaults */
e28fff26 4613
4e9afeba
AE
4614 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4615 if (!rbd_opts)
4616 goto out_mem;
4617
4618 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 4619
859c31df 4620 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 4621 mon_addrs + mon_addrs_size - 1,
4e9afeba 4622 parse_rbd_opts_token, rbd_opts);
859c31df
AE
4623 if (IS_ERR(copts)) {
4624 ret = PTR_ERR(copts);
dc79b113
AE
4625 goto out_err;
4626 }
859c31df
AE
4627 kfree(options);
4628
4629 *ceph_opts = copts;
4e9afeba 4630 *opts = rbd_opts;
859c31df 4631 *rbd_spec = spec;
0ddebc0c 4632
dc79b113 4633 return 0;
f28e565a 4634out_mem:
dc79b113 4635 ret = -ENOMEM;
d22f76e7 4636out_err:
859c31df
AE
4637 kfree(rbd_opts);
4638 rbd_spec_put(spec);
f28e565a 4639 kfree(options);
d22f76e7 4640
dc79b113 4641 return ret;
a725f65e
AE
4642}
4643
589d30e0
AE
4644/*
4645 * An rbd format 2 image has a unique identifier, distinct from the
4646 * name given to it by the user. Internally, that identifier is
4647 * what's used to specify the names of objects related to the image.
4648 *
4649 * A special "rbd id" object is used to map an rbd image name to its
4650 * id. If that object doesn't exist, then there is no v2 rbd image
4651 * with the supplied name.
4652 *
4653 * This function will record the given rbd_dev's image_id field if
4654 * it can be determined, and in that case will return 0. If any
4655 * errors occur a negative errno will be returned and the rbd_dev's
4656 * image_id field will be unchanged (and should be NULL).
4657 */
4658static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4659{
4660 int ret;
4661 size_t size;
4662 char *object_name;
4663 void *response;
c0fba368 4664 char *image_id;
2f82ee54 4665
2c0d0a10
AE
4666 /*
4667 * When probing a parent image, the image id is already
4668 * known (and the image name likely is not). There's no
c0fba368
AE
4669 * need to fetch the image id again in this case. We
4670 * do still need to set the image format though.
2c0d0a10 4671 */
c0fba368
AE
4672 if (rbd_dev->spec->image_id) {
4673 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4674
2c0d0a10 4675 return 0;
c0fba368 4676 }
2c0d0a10 4677
589d30e0
AE
4678 /*
4679 * First, see if the format 2 image id file exists, and if
4680 * so, get the image's persistent id from it.
4681 */
69e7a02f 4682 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
4683 object_name = kmalloc(size, GFP_NOIO);
4684 if (!object_name)
4685 return -ENOMEM;
0d7dbfce 4686 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
4687 dout("rbd id object name is %s\n", object_name);
4688
4689 /* Response will be an encoded string, which includes a length */
4690
4691 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4692 response = kzalloc(size, GFP_NOIO);
4693 if (!response) {
4694 ret = -ENOMEM;
4695 goto out;
4696 }
4697
c0fba368
AE
4698 /* If it doesn't exist we'll assume it's a format 1 image */
4699
36be9a76 4700 ret = rbd_obj_method_sync(rbd_dev, object_name,
4157976b 4701 "rbd", "get_id", NULL, 0,
e2a58ee5 4702 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 4703 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
4704 if (ret == -ENOENT) {
4705 image_id = kstrdup("", GFP_KERNEL);
4706 ret = image_id ? 0 : -ENOMEM;
4707 if (!ret)
4708 rbd_dev->image_format = 1;
4709 } else if (ret > sizeof (__le32)) {
4710 void *p = response;
4711
4712 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 4713 NULL, GFP_NOIO);
c0fba368
AE
4714 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4715 if (!ret)
4716 rbd_dev->image_format = 2;
589d30e0 4717 } else {
c0fba368
AE
4718 ret = -EINVAL;
4719 }
4720
4721 if (!ret) {
4722 rbd_dev->spec->image_id = image_id;
4723 dout("image_id is %s\n", image_id);
589d30e0
AE
4724 }
4725out:
4726 kfree(response);
4727 kfree(object_name);
4728
4729 return ret;
4730}
4731
3abef3b3
AE
4732/*
4733 * Undo whatever state changes are made by v1 or v2 header info
4734 * call.
4735 */
6fd48b3b
AE
4736static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4737{
4738 struct rbd_image_header *header;
4739
392a9dad
AE
4740 /* Drop parent reference unless it's already been done (or none) */
4741
4742 if (rbd_dev->parent_overlap)
4743 rbd_dev_parent_put(rbd_dev);
6fd48b3b
AE
4744
4745 /* Free dynamic fields from the header, then zero it out */
4746
4747 header = &rbd_dev->header;
812164f8 4748 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
4749 kfree(header->snap_sizes);
4750 kfree(header->snap_names);
4751 kfree(header->object_prefix);
4752 memset(header, 0, sizeof (*header));
4753}
4754
2df3fac7 4755static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9
AE
4756{
4757 int ret;
a30b71b9 4758
1e130199 4759 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 4760 if (ret)
b1b5402a
AE
4761 goto out_err;
4762
2df3fac7
AE
4763 /*
4764 * Get the and check features for the image. Currently the
4765 * features are assumed to never change.
4766 */
b1b5402a 4767 ret = rbd_dev_v2_features(rbd_dev);
57385b51 4768 if (ret)
9d475de5 4769 goto out_err;
35d489f9 4770
cc070d59
AE
4771 /* If the image supports fancy striping, get its parameters */
4772
4773 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4774 ret = rbd_dev_v2_striping_info(rbd_dev);
4775 if (ret < 0)
4776 goto out_err;
4777 }
2df3fac7 4778 /* No support for crypto and compression type format 2 images */
a30b71b9 4779
35152979 4780 return 0;
9d475de5 4781out_err:
642a2537 4782 rbd_dev->header.features = 0;
1e130199
AE
4783 kfree(rbd_dev->header.object_prefix);
4784 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
4785
4786 return ret;
a30b71b9
AE
4787}
4788
124afba2 4789static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
83a06263 4790{
2f82ee54 4791 struct rbd_device *parent = NULL;
124afba2
AE
4792 struct rbd_spec *parent_spec;
4793 struct rbd_client *rbdc;
4794 int ret;
4795
4796 if (!rbd_dev->parent_spec)
4797 return 0;
4798 /*
4799 * We need to pass a reference to the client and the parent
4800 * spec when creating the parent rbd_dev. Images related by
4801 * parent/child relationships always share both.
4802 */
4803 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4804 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4805
4806 ret = -ENOMEM;
4807 parent = rbd_dev_create(rbdc, parent_spec);
4808 if (!parent)
4809 goto out_err;
4810
1f3ef788 4811 ret = rbd_dev_image_probe(parent, false);
124afba2
AE
4812 if (ret < 0)
4813 goto out_err;
4814 rbd_dev->parent = parent;
a2acd00e 4815 atomic_set(&rbd_dev->parent_ref, 1);
124afba2
AE
4816
4817 return 0;
4818out_err:
4819 if (parent) {
fb65d228 4820 rbd_dev_unparent(rbd_dev);
124afba2
AE
4821 kfree(rbd_dev->header_name);
4822 rbd_dev_destroy(parent);
4823 } else {
4824 rbd_put_client(rbdc);
4825 rbd_spec_put(parent_spec);
4826 }
4827
4828 return ret;
4829}
4830
200a6a8b 4831static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 4832{
83a06263 4833 int ret;
d1cf5788 4834
83a06263
AE
4835 /* generate unique id: find highest unique id, add one */
4836 rbd_dev_id_get(rbd_dev);
4837
4838 /* Fill in the device name, now that we have its id. */
4839 BUILD_BUG_ON(DEV_NAME_LEN
4840 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4841 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4842
4843 /* Get our block major device number. */
4844
4845 ret = register_blkdev(0, rbd_dev->name);
4846 if (ret < 0)
4847 goto err_out_id;
4848 rbd_dev->major = ret;
4849
4850 /* Set up the blkdev mapping. */
4851
4852 ret = rbd_init_disk(rbd_dev);
4853 if (ret)
4854 goto err_out_blkdev;
4855
f35a4dee 4856 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
4857 if (ret)
4858 goto err_out_disk;
f35a4dee
AE
4859 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4860
4861 ret = rbd_bus_add_dev(rbd_dev);
4862 if (ret)
4863 goto err_out_mapping;
83a06263 4864
83a06263
AE
4865 /* Everything's ready. Announce the disk to the world. */
4866
129b79d4 4867 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
83a06263
AE
4868 add_disk(rbd_dev->disk);
4869
4870 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4871 (unsigned long long) rbd_dev->mapping.size);
4872
4873 return ret;
2f82ee54 4874
f35a4dee
AE
4875err_out_mapping:
4876 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4877err_out_disk:
4878 rbd_free_disk(rbd_dev);
4879err_out_blkdev:
4880 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4881err_out_id:
4882 rbd_dev_id_put(rbd_dev);
d1cf5788 4883 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4884
4885 return ret;
4886}
4887
332bb12d
AE
4888static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4889{
4890 struct rbd_spec *spec = rbd_dev->spec;
4891 size_t size;
4892
4893 /* Record the header object name for this rbd image. */
4894
4895 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4896
4897 if (rbd_dev->image_format == 1)
4898 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4899 else
4900 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4901
4902 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4903 if (!rbd_dev->header_name)
4904 return -ENOMEM;
4905
4906 if (rbd_dev->image_format == 1)
4907 sprintf(rbd_dev->header_name, "%s%s",
4908 spec->image_name, RBD_SUFFIX);
4909 else
4910 sprintf(rbd_dev->header_name, "%s%s",
4911 RBD_HEADER_PREFIX, spec->image_id);
4912 return 0;
4913}
4914
200a6a8b
AE
4915static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4916{
6fd48b3b 4917 rbd_dev_unprobe(rbd_dev);
200a6a8b 4918 kfree(rbd_dev->header_name);
6fd48b3b
AE
4919 rbd_dev->header_name = NULL;
4920 rbd_dev->image_format = 0;
4921 kfree(rbd_dev->spec->image_id);
4922 rbd_dev->spec->image_id = NULL;
4923
200a6a8b
AE
4924 rbd_dev_destroy(rbd_dev);
4925}
4926
a30b71b9
AE
4927/*
4928 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
4929 * device. If this image is the one being mapped (i.e., not a
4930 * parent), initiate a watch on its header object before using that
4931 * object to get detailed information about the rbd image.
a30b71b9 4932 */
1f3ef788 4933static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
a30b71b9
AE
4934{
4935 int ret;
b644de2b 4936 int tmp;
a30b71b9
AE
4937
4938 /*
3abef3b3
AE
4939 * Get the id from the image id object. Unless there's an
4940 * error, rbd_dev->spec->image_id will be filled in with
4941 * a dynamically-allocated string, and rbd_dev->image_format
4942 * will be set to either 1 or 2.
a30b71b9
AE
4943 */
4944 ret = rbd_dev_image_id(rbd_dev);
4945 if (ret)
c0fba368
AE
4946 return ret;
4947 rbd_assert(rbd_dev->spec->image_id);
4948 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4949
332bb12d
AE
4950 ret = rbd_dev_header_name(rbd_dev);
4951 if (ret)
4952 goto err_out_format;
4953
1f3ef788
AE
4954 if (mapping) {
4955 ret = rbd_dev_header_watch_sync(rbd_dev, true);
4956 if (ret)
4957 goto out_header_name;
4958 }
b644de2b 4959
c0fba368 4960 if (rbd_dev->image_format == 1)
99a41ebc 4961 ret = rbd_dev_v1_header_info(rbd_dev);
a30b71b9 4962 else
2df3fac7 4963 ret = rbd_dev_v2_header_info(rbd_dev);
5655c4d9 4964 if (ret)
b644de2b 4965 goto err_out_watch;
83a06263 4966
9bb81c9b
AE
4967 ret = rbd_dev_spec_update(rbd_dev);
4968 if (ret)
33dca39f 4969 goto err_out_probe;
9bb81c9b
AE
4970
4971 ret = rbd_dev_probe_parent(rbd_dev);
30d60ba2
AE
4972 if (ret)
4973 goto err_out_probe;
4974
4975 dout("discovered format %u image, header name is %s\n",
4976 rbd_dev->image_format, rbd_dev->header_name);
83a06263 4977
30d60ba2 4978 return 0;
6fd48b3b
AE
4979err_out_probe:
4980 rbd_dev_unprobe(rbd_dev);
b644de2b 4981err_out_watch:
1f3ef788
AE
4982 if (mapping) {
4983 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
4984 if (tmp)
4985 rbd_warn(rbd_dev, "unable to tear down "
4986 "watch request (%d)\n", tmp);
4987 }
332bb12d
AE
4988out_header_name:
4989 kfree(rbd_dev->header_name);
4990 rbd_dev->header_name = NULL;
4991err_out_format:
4992 rbd_dev->image_format = 0;
5655c4d9
AE
4993 kfree(rbd_dev->spec->image_id);
4994 rbd_dev->spec->image_id = NULL;
4995
4996 dout("probe failed, returning %d\n", ret);
4997
a30b71b9
AE
4998 return ret;
4999}
5000
59c2be1e
YS
5001static ssize_t rbd_add(struct bus_type *bus,
5002 const char *buf,
5003 size_t count)
602adf40 5004{
cb8627c7 5005 struct rbd_device *rbd_dev = NULL;
dc79b113 5006 struct ceph_options *ceph_opts = NULL;
4e9afeba 5007 struct rbd_options *rbd_opts = NULL;
859c31df 5008 struct rbd_spec *spec = NULL;
9d3997fd 5009 struct rbd_client *rbdc;
27cc2594 5010 struct ceph_osd_client *osdc;
51344a38 5011 bool read_only;
27cc2594 5012 int rc = -ENOMEM;
602adf40
YS
5013
5014 if (!try_module_get(THIS_MODULE))
5015 return -ENODEV;
5016
602adf40 5017 /* parse add command */
859c31df 5018 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 5019 if (rc < 0)
bd4ba655 5020 goto err_out_module;
51344a38
AE
5021 read_only = rbd_opts->read_only;
5022 kfree(rbd_opts);
5023 rbd_opts = NULL; /* done with this */
78cea76e 5024
9d3997fd
AE
5025 rbdc = rbd_get_client(ceph_opts);
5026 if (IS_ERR(rbdc)) {
5027 rc = PTR_ERR(rbdc);
0ddebc0c 5028 goto err_out_args;
9d3997fd 5029 }
602adf40 5030
602adf40 5031 /* pick the pool */
9d3997fd 5032 osdc = &rbdc->client->osdc;
859c31df 5033 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
5034 if (rc < 0)
5035 goto err_out_client;
c0cd10db 5036 spec->pool_id = (u64)rc;
859c31df 5037
0903e875
AE
5038 /* The ceph file layout needs to fit pool id in 32 bits */
5039
c0cd10db
AE
5040 if (spec->pool_id > (u64)U32_MAX) {
5041 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
5042 (unsigned long long)spec->pool_id, U32_MAX);
0903e875
AE
5043 rc = -EIO;
5044 goto err_out_client;
5045 }
5046
c53d5893 5047 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
5048 if (!rbd_dev)
5049 goto err_out_client;
c53d5893
AE
5050 rbdc = NULL; /* rbd_dev now owns this */
5051 spec = NULL; /* rbd_dev now owns this */
602adf40 5052
1f3ef788 5053 rc = rbd_dev_image_probe(rbd_dev, true);
a30b71b9 5054 if (rc < 0)
c53d5893 5055 goto err_out_rbd_dev;
05fd6f6f 5056
7ce4eef7
AE
5057 /* If we are mapping a snapshot it must be marked read-only */
5058
5059 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5060 read_only = true;
5061 rbd_dev->mapping.read_only = read_only;
5062
b536f69a 5063 rc = rbd_dev_device_setup(rbd_dev);
3abef3b3
AE
5064 if (rc) {
5065 rbd_dev_image_release(rbd_dev);
5066 goto err_out_module;
5067 }
5068
5069 return count;
b536f69a 5070
c53d5893
AE
5071err_out_rbd_dev:
5072 rbd_dev_destroy(rbd_dev);
bd4ba655 5073err_out_client:
9d3997fd 5074 rbd_put_client(rbdc);
0ddebc0c 5075err_out_args:
859c31df 5076 rbd_spec_put(spec);
bd4ba655
AE
5077err_out_module:
5078 module_put(THIS_MODULE);
27cc2594 5079
602adf40 5080 dout("Error adding device %s\n", buf);
27cc2594 5081
c0cd10db 5082 return (ssize_t)rc;
602adf40
YS
5083}
5084
200a6a8b 5085static void rbd_dev_device_release(struct device *dev)
602adf40 5086{
593a9e7b 5087 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 5088
602adf40 5089 rbd_free_disk(rbd_dev);
200a6a8b 5090 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6d80b130 5091 rbd_dev_mapping_clear(rbd_dev);
602adf40 5092 unregister_blkdev(rbd_dev->major, rbd_dev->name);
200a6a8b 5093 rbd_dev->major = 0;
e2839308 5094 rbd_dev_id_put(rbd_dev);
d1cf5788 5095 rbd_dev_mapping_clear(rbd_dev);
602adf40
YS
5096}
5097
05a46afd
AE
5098static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5099{
ad945fc1 5100 while (rbd_dev->parent) {
05a46afd
AE
5101 struct rbd_device *first = rbd_dev;
5102 struct rbd_device *second = first->parent;
5103 struct rbd_device *third;
5104
5105 /*
5106 * Follow to the parent with no grandparent and
5107 * remove it.
5108 */
5109 while (second && (third = second->parent)) {
5110 first = second;
5111 second = third;
5112 }
ad945fc1 5113 rbd_assert(second);
8ad42cd0 5114 rbd_dev_image_release(second);
ad945fc1
AE
5115 first->parent = NULL;
5116 first->parent_overlap = 0;
5117
5118 rbd_assert(first->parent_spec);
05a46afd
AE
5119 rbd_spec_put(first->parent_spec);
5120 first->parent_spec = NULL;
05a46afd
AE
5121 }
5122}
5123
dfc5606d
YS
5124static ssize_t rbd_remove(struct bus_type *bus,
5125 const char *buf,
5126 size_t count)
602adf40
YS
5127{
5128 struct rbd_device *rbd_dev = NULL;
751cc0e3
AE
5129 struct list_head *tmp;
5130 int dev_id;
602adf40 5131 unsigned long ul;
82a442d2 5132 bool already = false;
0d8189e1 5133 int ret;
602adf40 5134
0d8189e1
AE
5135 ret = strict_strtoul(buf, 10, &ul);
5136 if (ret)
5137 return ret;
602adf40
YS
5138
5139 /* convert to int; abort if we lost anything in the conversion */
751cc0e3
AE
5140 dev_id = (int)ul;
5141 if (dev_id != ul)
602adf40
YS
5142 return -EINVAL;
5143
751cc0e3
AE
5144 ret = -ENOENT;
5145 spin_lock(&rbd_dev_list_lock);
5146 list_for_each(tmp, &rbd_dev_list) {
5147 rbd_dev = list_entry(tmp, struct rbd_device, node);
5148 if (rbd_dev->dev_id == dev_id) {
5149 ret = 0;
5150 break;
5151 }
42382b70 5152 }
751cc0e3
AE
5153 if (!ret) {
5154 spin_lock_irq(&rbd_dev->lock);
5155 if (rbd_dev->open_count)
5156 ret = -EBUSY;
5157 else
82a442d2
AE
5158 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5159 &rbd_dev->flags);
751cc0e3
AE
5160 spin_unlock_irq(&rbd_dev->lock);
5161 }
5162 spin_unlock(&rbd_dev_list_lock);
82a442d2 5163 if (ret < 0 || already)
1ba0f1e7 5164 return ret;
751cc0e3 5165
b480815a 5166 rbd_bus_del_dev(rbd_dev);
1f3ef788
AE
5167 ret = rbd_dev_header_watch_sync(rbd_dev, false);
5168 if (ret)
5169 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
8ad42cd0 5170 rbd_dev_image_release(rbd_dev);
79ab7558 5171 module_put(THIS_MODULE);
aafb230e 5172
1ba0f1e7 5173 return count;
602adf40
YS
5174}
5175
602adf40
YS
5176/*
5177 * create control files in sysfs
dfc5606d 5178 * /sys/bus/rbd/...
602adf40
YS
5179 */
5180static int rbd_sysfs_init(void)
5181{
dfc5606d 5182 int ret;
602adf40 5183
fed4c143 5184 ret = device_register(&rbd_root_dev);
21079786 5185 if (ret < 0)
dfc5606d 5186 return ret;
602adf40 5187
fed4c143
AE
5188 ret = bus_register(&rbd_bus_type);
5189 if (ret < 0)
5190 device_unregister(&rbd_root_dev);
602adf40 5191
602adf40
YS
5192 return ret;
5193}
5194
5195static void rbd_sysfs_cleanup(void)
5196{
dfc5606d 5197 bus_unregister(&rbd_bus_type);
fed4c143 5198 device_unregister(&rbd_root_dev);
602adf40
YS
5199}
5200
1c2a9dfe
AE
5201static int rbd_slab_init(void)
5202{
5203 rbd_assert(!rbd_img_request_cache);
5204 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5205 sizeof (struct rbd_img_request),
5206 __alignof__(struct rbd_img_request),
5207 0, NULL);
868311b1
AE
5208 if (!rbd_img_request_cache)
5209 return -ENOMEM;
5210
5211 rbd_assert(!rbd_obj_request_cache);
5212 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5213 sizeof (struct rbd_obj_request),
5214 __alignof__(struct rbd_obj_request),
5215 0, NULL);
78c2a44a
AE
5216 if (!rbd_obj_request_cache)
5217 goto out_err;
5218
5219 rbd_assert(!rbd_segment_name_cache);
5220 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5221 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5222 if (rbd_segment_name_cache)
1c2a9dfe 5223 return 0;
78c2a44a
AE
5224out_err:
5225 if (rbd_obj_request_cache) {
5226 kmem_cache_destroy(rbd_obj_request_cache);
5227 rbd_obj_request_cache = NULL;
5228 }
1c2a9dfe 5229
868311b1
AE
5230 kmem_cache_destroy(rbd_img_request_cache);
5231 rbd_img_request_cache = NULL;
5232
1c2a9dfe
AE
5233 return -ENOMEM;
5234}
5235
5236static void rbd_slab_exit(void)
5237{
78c2a44a
AE
5238 rbd_assert(rbd_segment_name_cache);
5239 kmem_cache_destroy(rbd_segment_name_cache);
5240 rbd_segment_name_cache = NULL;
5241
868311b1
AE
5242 rbd_assert(rbd_obj_request_cache);
5243 kmem_cache_destroy(rbd_obj_request_cache);
5244 rbd_obj_request_cache = NULL;
5245
1c2a9dfe
AE
5246 rbd_assert(rbd_img_request_cache);
5247 kmem_cache_destroy(rbd_img_request_cache);
5248 rbd_img_request_cache = NULL;
5249}
5250
cc344fa1 5251static int __init rbd_init(void)
602adf40
YS
5252{
5253 int rc;
5254
1e32d34c
AE
5255 if (!libceph_compatible(NULL)) {
5256 rbd_warn(NULL, "libceph incompatibility (quitting)");
5257
5258 return -EINVAL;
5259 }
1c2a9dfe 5260 rc = rbd_slab_init();
602adf40
YS
5261 if (rc)
5262 return rc;
1c2a9dfe
AE
5263 rc = rbd_sysfs_init();
5264 if (rc)
5265 rbd_slab_exit();
5266 else
5267 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5268
5269 return rc;
602adf40
YS
5270}
5271
cc344fa1 5272static void __exit rbd_exit(void)
602adf40
YS
5273{
5274 rbd_sysfs_cleanup();
1c2a9dfe 5275 rbd_slab_exit();
602adf40
YS
5276}
5277
5278module_init(rbd_init);
5279module_exit(rbd_exit);
5280
d552c619 5281MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
602adf40
YS
5282MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5283MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5284MODULE_DESCRIPTION("rados block device");
5285
5286/* following authorship retained from original osdblk.c */
5287MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5288
5289MODULE_LICENSE("GPL");