libceph: init sent and completed when starting
[linux-2.6-block.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/decode.h>
59c2be1e 35#include <linux/parser.h>
30d1cff8 36#include <linux/bsearch.h>
602adf40
YS
37
38#include <linux/kernel.h>
39#include <linux/device.h>
40#include <linux/module.h>
41#include <linux/fs.h>
42#include <linux/blkdev.h>
1c2a9dfe 43#include <linux/slab.h>
602adf40
YS
44
45#include "rbd_types.h"
46
aafb230e
AE
47#define RBD_DEBUG /* Activate rbd_assert() calls */
48
593a9e7b
AE
49/*
50 * The basic unit of block I/O is a sector. It is interpreted in a
51 * number of contexts in Linux (blk, bio, genhd), but the default is
52 * universally 512 bytes. These symbols are just slightly more
53 * meaningful than the bare numbers they represent.
54 */
55#define SECTOR_SHIFT 9
56#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
57
f0f8cef5
AE
58#define RBD_DRV_NAME "rbd"
59#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
60
61#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
62
d4b125e9
AE
63#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
64#define RBD_MAX_SNAP_NAME_LEN \
65 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
66
35d489f9 67#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
68
69#define RBD_SNAP_HEAD_NAME "-"
70
9682fc6d
AE
71#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
72
9e15b77d
AE
73/* This allows a single page to hold an image name sent by OSD */
74#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 75#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 76
1e130199 77#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 78
d889140c
AE
79/* Feature bits */
80
5cbf6f12
AE
81#define RBD_FEATURE_LAYERING (1<<0)
82#define RBD_FEATURE_STRIPINGV2 (1<<1)
83#define RBD_FEATURES_ALL \
84 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
85
86/* Features supported by this (client software) implementation. */
87
770eba6e 88#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 89
81a89793
AE
90/*
91 * An RBD device name will be "rbd#", where the "rbd" comes from
92 * RBD_DRV_NAME above, and # is a unique integer identifier.
93 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94 * enough to hold all possible device names.
95 */
602adf40 96#define DEV_NAME_LEN 32
81a89793 97#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
98
99/*
100 * block device image metadata (in-memory version)
101 */
102struct rbd_image_header {
f35a4dee 103 /* These six fields never change for a given rbd image */
849b4260 104 char *object_prefix;
602adf40
YS
105 __u8 obj_order;
106 __u8 crypt_type;
107 __u8 comp_type;
f35a4dee
AE
108 u64 stripe_unit;
109 u64 stripe_count;
110 u64 features; /* Might be changeable someday? */
602adf40 111
f84344f3
AE
112 /* The remaining fields need to be updated occasionally */
113 u64 image_size;
114 struct ceph_snap_context *snapc;
f35a4dee
AE
115 char *snap_names; /* format 1 only */
116 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
117};
118
0d7dbfce
AE
119/*
120 * An rbd image specification.
121 *
122 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
123 * identify an image. Each rbd_dev structure includes a pointer to
124 * an rbd_spec structure that encapsulates this identity.
125 *
126 * Each of the id's in an rbd_spec has an associated name. For a
127 * user-mapped image, the names are supplied and the id's associated
128 * with them are looked up. For a layered image, a parent image is
129 * defined by the tuple, and the names are looked up.
130 *
131 * An rbd_dev structure contains a parent_spec pointer which is
132 * non-null if the image it represents is a child in a layered
133 * image. This pointer will refer to the rbd_spec structure used
134 * by the parent rbd_dev for its own identity (i.e., the structure
135 * is shared between the parent and child).
136 *
137 * Since these structures are populated once, during the discovery
138 * phase of image construction, they are effectively immutable so
139 * we make no effort to synchronize access to them.
140 *
141 * Note that code herein does not assume the image name is known (it
142 * could be a null pointer).
0d7dbfce
AE
143 */
144struct rbd_spec {
145 u64 pool_id;
ecb4dc22 146 const char *pool_name;
0d7dbfce 147
ecb4dc22
AE
148 const char *image_id;
149 const char *image_name;
0d7dbfce
AE
150
151 u64 snap_id;
ecb4dc22 152 const char *snap_name;
0d7dbfce
AE
153
154 struct kref kref;
155};
156
602adf40 157/*
f0f8cef5 158 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
159 */
160struct rbd_client {
161 struct ceph_client *client;
162 struct kref kref;
163 struct list_head node;
164};
165
bf0d5f50
AE
166struct rbd_img_request;
167typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
168
169#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
170
171struct rbd_obj_request;
172typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
173
9969ebc5
AE
174enum obj_request_type {
175 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
176};
bf0d5f50 177
926f9b3f
AE
178enum obj_req_flags {
179 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 180 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
181 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
182 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
183};
184
bf0d5f50
AE
185struct rbd_obj_request {
186 const char *object_name;
187 u64 offset; /* object start byte */
188 u64 length; /* bytes from offset */
926f9b3f 189 unsigned long flags;
bf0d5f50 190
c5b5ef6c
AE
191 /*
192 * An object request associated with an image will have its
193 * img_data flag set; a standalone object request will not.
194 *
195 * A standalone object request will have which == BAD_WHICH
196 * and a null obj_request pointer.
197 *
198 * An object request initiated in support of a layered image
199 * object (to check for its existence before a write) will
200 * have which == BAD_WHICH and a non-null obj_request pointer.
201 *
202 * Finally, an object request for rbd image data will have
203 * which != BAD_WHICH, and will have a non-null img_request
204 * pointer. The value of which will be in the range
205 * 0..(img_request->obj_request_count-1).
206 */
207 union {
208 struct rbd_obj_request *obj_request; /* STAT op */
209 struct {
210 struct rbd_img_request *img_request;
211 u64 img_offset;
212 /* links for img_request->obj_requests list */
213 struct list_head links;
214 };
215 };
bf0d5f50
AE
216 u32 which; /* posn image request list */
217
218 enum obj_request_type type;
788e2df3
AE
219 union {
220 struct bio *bio_list;
221 struct {
222 struct page **pages;
223 u32 page_count;
224 };
225 };
0eefd470 226 struct page **copyup_pages;
bf0d5f50
AE
227
228 struct ceph_osd_request *osd_req;
229
230 u64 xferred; /* bytes transferred */
1b83bef2 231 int result;
bf0d5f50
AE
232
233 rbd_obj_callback_t callback;
788e2df3 234 struct completion completion;
bf0d5f50
AE
235
236 struct kref kref;
237};
238
0c425248 239enum img_req_flags {
9849e986
AE
240 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
241 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 242 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
243};
244
bf0d5f50 245struct rbd_img_request {
bf0d5f50
AE
246 struct rbd_device *rbd_dev;
247 u64 offset; /* starting image byte offset */
248 u64 length; /* byte count from offset */
0c425248 249 unsigned long flags;
bf0d5f50 250 union {
9849e986 251 u64 snap_id; /* for reads */
bf0d5f50 252 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
253 };
254 union {
255 struct request *rq; /* block request */
256 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 257 };
3d7efd18 258 struct page **copyup_pages;
bf0d5f50
AE
259 spinlock_t completion_lock;/* protects next_completion */
260 u32 next_completion;
261 rbd_img_callback_t callback;
55f27e09 262 u64 xferred;/* aggregate bytes transferred */
a5a337d4 263 int result; /* first nonzero obj_request result */
bf0d5f50
AE
264
265 u32 obj_request_count;
266 struct list_head obj_requests; /* rbd_obj_request structs */
267
268 struct kref kref;
269};
270
271#define for_each_obj_request(ireq, oreq) \
ef06f4d3 272 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 273#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 274 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 275#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 276 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 277
f84344f3 278struct rbd_mapping {
99c1f08f 279 u64 size;
34b13184 280 u64 features;
f84344f3
AE
281 bool read_only;
282};
283
602adf40
YS
284/*
285 * a single device
286 */
287struct rbd_device {
de71a297 288 int dev_id; /* blkdev unique id */
602adf40
YS
289
290 int major; /* blkdev assigned major */
291 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 292
a30b71b9 293 u32 image_format; /* Either 1 or 2 */
602adf40
YS
294 struct rbd_client *rbd_client;
295
296 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
297
b82d167b 298 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
299
300 struct rbd_image_header header;
b82d167b 301 unsigned long flags; /* possibly lock protected */
0d7dbfce 302 struct rbd_spec *spec;
602adf40 303
0d7dbfce 304 char *header_name;
971f839a 305
0903e875
AE
306 struct ceph_file_layout layout;
307
59c2be1e 308 struct ceph_osd_event *watch_event;
975241af 309 struct rbd_obj_request *watch_request;
59c2be1e 310
86b00e0d
AE
311 struct rbd_spec *parent_spec;
312 u64 parent_overlap;
2f82ee54 313 struct rbd_device *parent;
86b00e0d 314
c666601a
JD
315 /* protects updating the header */
316 struct rw_semaphore header_rwsem;
f84344f3
AE
317
318 struct rbd_mapping mapping;
602adf40
YS
319
320 struct list_head node;
dfc5606d 321
dfc5606d
YS
322 /* sysfs related */
323 struct device dev;
b82d167b 324 unsigned long open_count; /* protected by lock */
dfc5606d
YS
325};
326
b82d167b
AE
327/*
328 * Flag bits for rbd_dev->flags. If atomicity is required,
329 * rbd_dev->lock is used to protect access.
330 *
331 * Currently, only the "removing" flag (which is coupled with the
332 * "open_count" field) requires atomic access.
333 */
6d292906
AE
334enum rbd_dev_flags {
335 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 336 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
337};
338
602adf40 339static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 340
602adf40 341static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
342static DEFINE_SPINLOCK(rbd_dev_list_lock);
343
432b8587
AE
344static LIST_HEAD(rbd_client_list); /* clients */
345static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 346
78c2a44a
AE
347/* Slab caches for frequently-allocated structures */
348
1c2a9dfe 349static struct kmem_cache *rbd_img_request_cache;
868311b1 350static struct kmem_cache *rbd_obj_request_cache;
78c2a44a 351static struct kmem_cache *rbd_segment_name_cache;
1c2a9dfe 352
3d7efd18
AE
353static int rbd_img_request_submit(struct rbd_img_request *img_request);
354
200a6a8b 355static void rbd_dev_device_release(struct device *dev);
dfc5606d 356
f0f8cef5
AE
357static ssize_t rbd_add(struct bus_type *bus, const char *buf,
358 size_t count);
359static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
360 size_t count);
1f3ef788 361static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
f0f8cef5
AE
362
363static struct bus_attribute rbd_bus_attrs[] = {
364 __ATTR(add, S_IWUSR, NULL, rbd_add),
365 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
366 __ATTR_NULL
367};
368
369static struct bus_type rbd_bus_type = {
370 .name = "rbd",
371 .bus_attrs = rbd_bus_attrs,
372};
373
374static void rbd_root_dev_release(struct device *dev)
375{
376}
377
378static struct device rbd_root_dev = {
379 .init_name = "rbd",
380 .release = rbd_root_dev_release,
381};
382
06ecc6cb
AE
383static __printf(2, 3)
384void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
385{
386 struct va_format vaf;
387 va_list args;
388
389 va_start(args, fmt);
390 vaf.fmt = fmt;
391 vaf.va = &args;
392
393 if (!rbd_dev)
394 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
395 else if (rbd_dev->disk)
396 printk(KERN_WARNING "%s: %s: %pV\n",
397 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
398 else if (rbd_dev->spec && rbd_dev->spec->image_name)
399 printk(KERN_WARNING "%s: image %s: %pV\n",
400 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
401 else if (rbd_dev->spec && rbd_dev->spec->image_id)
402 printk(KERN_WARNING "%s: id %s: %pV\n",
403 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
404 else /* punt */
405 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
406 RBD_DRV_NAME, rbd_dev, &vaf);
407 va_end(args);
408}
409
aafb230e
AE
410#ifdef RBD_DEBUG
411#define rbd_assert(expr) \
412 if (unlikely(!(expr))) { \
413 printk(KERN_ERR "\nAssertion failure in %s() " \
414 "at line %d:\n\n" \
415 "\trbd_assert(%s);\n\n", \
416 __func__, __LINE__, #expr); \
417 BUG(); \
418 }
419#else /* !RBD_DEBUG */
420# define rbd_assert(expr) ((void) 0)
421#endif /* !RBD_DEBUG */
dfc5606d 422
b454e36d 423static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
424static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
425static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 426
cc4a38bd 427static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7
AE
428static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
429static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
54cac61f
AE
430static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
431 u64 snap_id);
2ad3d716
AE
432static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
433 u8 *order, u64 *snap_size);
434static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
435 u64 *snap_features);
436static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
59c2be1e 437
602adf40
YS
438static int rbd_open(struct block_device *bdev, fmode_t mode)
439{
f0f8cef5 440 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 441 bool removing = false;
602adf40 442
f84344f3 443 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
444 return -EROFS;
445
a14ea269 446 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
447 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
448 removing = true;
449 else
450 rbd_dev->open_count++;
a14ea269 451 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
452 if (removing)
453 return -ENOENT;
454
42382b70 455 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 456 (void) get_device(&rbd_dev->dev);
f84344f3 457 set_device_ro(bdev, rbd_dev->mapping.read_only);
42382b70 458 mutex_unlock(&ctl_mutex);
340c7a2b 459
602adf40
YS
460 return 0;
461}
462
dfc5606d
YS
463static int rbd_release(struct gendisk *disk, fmode_t mode)
464{
465 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
466 unsigned long open_count_before;
467
a14ea269 468 spin_lock_irq(&rbd_dev->lock);
b82d167b 469 open_count_before = rbd_dev->open_count--;
a14ea269 470 spin_unlock_irq(&rbd_dev->lock);
b82d167b 471 rbd_assert(open_count_before > 0);
dfc5606d 472
42382b70 473 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 474 put_device(&rbd_dev->dev);
42382b70 475 mutex_unlock(&ctl_mutex);
dfc5606d
YS
476
477 return 0;
478}
479
602adf40
YS
480static const struct block_device_operations rbd_bd_ops = {
481 .owner = THIS_MODULE,
482 .open = rbd_open,
dfc5606d 483 .release = rbd_release,
602adf40
YS
484};
485
486/*
487 * Initialize an rbd client instance.
43ae4701 488 * We own *ceph_opts.
602adf40 489 */
f8c38929 490static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
491{
492 struct rbd_client *rbdc;
493 int ret = -ENOMEM;
494
37206ee5 495 dout("%s:\n", __func__);
602adf40
YS
496 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
497 if (!rbdc)
498 goto out_opt;
499
500 kref_init(&rbdc->kref);
501 INIT_LIST_HEAD(&rbdc->node);
502
bc534d86
AE
503 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
504
43ae4701 505 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 506 if (IS_ERR(rbdc->client))
bc534d86 507 goto out_mutex;
43ae4701 508 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
509
510 ret = ceph_open_session(rbdc->client);
511 if (ret < 0)
512 goto out_err;
513
432b8587 514 spin_lock(&rbd_client_list_lock);
602adf40 515 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 516 spin_unlock(&rbd_client_list_lock);
602adf40 517
bc534d86 518 mutex_unlock(&ctl_mutex);
37206ee5 519 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 520
602adf40
YS
521 return rbdc;
522
523out_err:
524 ceph_destroy_client(rbdc->client);
bc534d86
AE
525out_mutex:
526 mutex_unlock(&ctl_mutex);
602adf40
YS
527 kfree(rbdc);
528out_opt:
43ae4701
AE
529 if (ceph_opts)
530 ceph_destroy_options(ceph_opts);
37206ee5
AE
531 dout("%s: error %d\n", __func__, ret);
532
28f259b7 533 return ERR_PTR(ret);
602adf40
YS
534}
535
2f82ee54
AE
536static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
537{
538 kref_get(&rbdc->kref);
539
540 return rbdc;
541}
542
602adf40 543/*
1f7ba331
AE
544 * Find a ceph client with specific addr and configuration. If
545 * found, bump its reference count.
602adf40 546 */
1f7ba331 547static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
548{
549 struct rbd_client *client_node;
1f7ba331 550 bool found = false;
602adf40 551
43ae4701 552 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
553 return NULL;
554
1f7ba331
AE
555 spin_lock(&rbd_client_list_lock);
556 list_for_each_entry(client_node, &rbd_client_list, node) {
557 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
558 __rbd_get_client(client_node);
559
1f7ba331
AE
560 found = true;
561 break;
562 }
563 }
564 spin_unlock(&rbd_client_list_lock);
565
566 return found ? client_node : NULL;
602adf40
YS
567}
568
59c2be1e
YS
569/*
570 * mount options
571 */
572enum {
59c2be1e
YS
573 Opt_last_int,
574 /* int args above */
575 Opt_last_string,
576 /* string args above */
cc0538b6
AE
577 Opt_read_only,
578 Opt_read_write,
579 /* Boolean args above */
580 Opt_last_bool,
59c2be1e
YS
581};
582
43ae4701 583static match_table_t rbd_opts_tokens = {
59c2be1e
YS
584 /* int args above */
585 /* string args above */
be466c1c 586 {Opt_read_only, "read_only"},
cc0538b6
AE
587 {Opt_read_only, "ro"}, /* Alternate spelling */
588 {Opt_read_write, "read_write"},
589 {Opt_read_write, "rw"}, /* Alternate spelling */
590 /* Boolean args above */
59c2be1e
YS
591 {-1, NULL}
592};
593
98571b5a
AE
594struct rbd_options {
595 bool read_only;
596};
597
598#define RBD_READ_ONLY_DEFAULT false
599
59c2be1e
YS
600static int parse_rbd_opts_token(char *c, void *private)
601{
43ae4701 602 struct rbd_options *rbd_opts = private;
59c2be1e
YS
603 substring_t argstr[MAX_OPT_ARGS];
604 int token, intval, ret;
605
43ae4701 606 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
607 if (token < 0)
608 return -EINVAL;
609
610 if (token < Opt_last_int) {
611 ret = match_int(&argstr[0], &intval);
612 if (ret < 0) {
613 pr_err("bad mount option arg (not int) "
614 "at '%s'\n", c);
615 return ret;
616 }
617 dout("got int token %d val %d\n", token, intval);
618 } else if (token > Opt_last_int && token < Opt_last_string) {
619 dout("got string token %d val %s\n", token,
620 argstr[0].from);
cc0538b6
AE
621 } else if (token > Opt_last_string && token < Opt_last_bool) {
622 dout("got Boolean token %d\n", token);
59c2be1e
YS
623 } else {
624 dout("got token %d\n", token);
625 }
626
627 switch (token) {
cc0538b6
AE
628 case Opt_read_only:
629 rbd_opts->read_only = true;
630 break;
631 case Opt_read_write:
632 rbd_opts->read_only = false;
633 break;
59c2be1e 634 default:
aafb230e
AE
635 rbd_assert(false);
636 break;
59c2be1e
YS
637 }
638 return 0;
639}
640
602adf40
YS
641/*
642 * Get a ceph client with specific addr and configuration, if one does
643 * not exist create it.
644 */
9d3997fd 645static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 646{
f8c38929 647 struct rbd_client *rbdc;
59c2be1e 648
1f7ba331 649 rbdc = rbd_client_find(ceph_opts);
9d3997fd 650 if (rbdc) /* using an existing client */
43ae4701 651 ceph_destroy_options(ceph_opts);
9d3997fd 652 else
f8c38929 653 rbdc = rbd_client_create(ceph_opts);
602adf40 654
9d3997fd 655 return rbdc;
602adf40
YS
656}
657
658/*
659 * Destroy ceph client
d23a4b3f 660 *
432b8587 661 * Caller must hold rbd_client_list_lock.
602adf40
YS
662 */
663static void rbd_client_release(struct kref *kref)
664{
665 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
666
37206ee5 667 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 668 spin_lock(&rbd_client_list_lock);
602adf40 669 list_del(&rbdc->node);
cd9d9f5d 670 spin_unlock(&rbd_client_list_lock);
602adf40
YS
671
672 ceph_destroy_client(rbdc->client);
673 kfree(rbdc);
674}
675
676/*
677 * Drop reference to ceph client node. If it's not referenced anymore, release
678 * it.
679 */
9d3997fd 680static void rbd_put_client(struct rbd_client *rbdc)
602adf40 681{
c53d5893
AE
682 if (rbdc)
683 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
684}
685
a30b71b9
AE
686static bool rbd_image_format_valid(u32 image_format)
687{
688 return image_format == 1 || image_format == 2;
689}
690
8e94af8e
AE
691static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
692{
103a150f
AE
693 size_t size;
694 u32 snap_count;
695
696 /* The header has to start with the magic rbd header text */
697 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
698 return false;
699
db2388b6
AE
700 /* The bio layer requires at least sector-sized I/O */
701
702 if (ondisk->options.order < SECTOR_SHIFT)
703 return false;
704
705 /* If we use u64 in a few spots we may be able to loosen this */
706
707 if (ondisk->options.order > 8 * sizeof (int) - 1)
708 return false;
709
103a150f
AE
710 /*
711 * The size of a snapshot header has to fit in a size_t, and
712 * that limits the number of snapshots.
713 */
714 snap_count = le32_to_cpu(ondisk->snap_count);
715 size = SIZE_MAX - sizeof (struct ceph_snap_context);
716 if (snap_count > size / sizeof (__le64))
717 return false;
718
719 /*
720 * Not only that, but the size of the entire the snapshot
721 * header must also be representable in a size_t.
722 */
723 size -= snap_count * sizeof (__le64);
724 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
725 return false;
726
727 return true;
8e94af8e
AE
728}
729
602adf40 730/*
bb23e37a
AE
731 * Fill an rbd image header with information from the given format 1
732 * on-disk header.
602adf40 733 */
662518b1 734static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 735 struct rbd_image_header_ondisk *ondisk)
602adf40 736{
662518b1 737 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
738 bool first_time = header->object_prefix == NULL;
739 struct ceph_snap_context *snapc;
740 char *object_prefix = NULL;
741 char *snap_names = NULL;
742 u64 *snap_sizes = NULL;
ccece235 743 u32 snap_count;
d2bb24e5 744 size_t size;
bb23e37a 745 int ret = -ENOMEM;
621901d6 746 u32 i;
602adf40 747
bb23e37a
AE
748 /* Allocate this now to avoid having to handle failure below */
749
750 if (first_time) {
751 size_t len;
752
753 len = strnlen(ondisk->object_prefix,
754 sizeof (ondisk->object_prefix));
755 object_prefix = kmalloc(len + 1, GFP_KERNEL);
756 if (!object_prefix)
757 return -ENOMEM;
758 memcpy(object_prefix, ondisk->object_prefix, len);
759 object_prefix[len] = '\0';
760 }
103a150f 761
bb23e37a 762 /* Allocate the snapshot context and fill it in */
00f1f36f 763
bb23e37a
AE
764 snap_count = le32_to_cpu(ondisk->snap_count);
765 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
766 if (!snapc)
767 goto out_err;
768 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 769 if (snap_count) {
bb23e37a 770 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
771 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
772
bb23e37a 773 /* We'll keep a copy of the snapshot names... */
621901d6 774
bb23e37a
AE
775 if (snap_names_len > (u64)SIZE_MAX)
776 goto out_2big;
777 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
778 if (!snap_names)
6a52325f
AE
779 goto out_err;
780
bb23e37a 781 /* ...as well as the array of their sizes. */
621901d6 782
d2bb24e5 783 size = snap_count * sizeof (*header->snap_sizes);
bb23e37a
AE
784 snap_sizes = kmalloc(size, GFP_KERNEL);
785 if (!snap_sizes)
6a52325f 786 goto out_err;
bb23e37a
AE
787
788 /*
789 * Copy the names, and fill in each snapshot's id
790 * and size.
791 *
99a41ebc 792 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a
AE
793 * ondisk buffer we're working with has
794 * snap_names_len bytes beyond the end of the
795 * snapshot id array, this memcpy() is safe.
796 */
797 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
798 snaps = ondisk->snaps;
799 for (i = 0; i < snap_count; i++) {
800 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
801 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
802 }
602adf40 803 }
849b4260 804
bb23e37a
AE
805 /* We won't fail any more, fill in the header */
806
662518b1 807 down_write(&rbd_dev->header_rwsem);
bb23e37a
AE
808 if (first_time) {
809 header->object_prefix = object_prefix;
810 header->obj_order = ondisk->options.order;
811 header->crypt_type = ondisk->options.crypt_type;
812 header->comp_type = ondisk->options.comp_type;
813 /* The rest aren't used for format 1 images */
814 header->stripe_unit = 0;
815 header->stripe_count = 0;
816 header->features = 0;
662518b1
AE
817 } else {
818 ceph_put_snap_context(header->snapc);
819 kfree(header->snap_names);
820 kfree(header->snap_sizes);
bb23e37a 821 }
6a52325f 822
bb23e37a 823 /* The remaining fields always get updated (when we refresh) */
621901d6 824
f84344f3 825 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
826 header->snapc = snapc;
827 header->snap_names = snap_names;
828 header->snap_sizes = snap_sizes;
602adf40 829
662518b1
AE
830 /* Make sure mapping size is consistent with header info */
831
832 if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
833 if (rbd_dev->mapping.size != header->image_size)
834 rbd_dev->mapping.size = header->image_size;
835
836 up_write(&rbd_dev->header_rwsem);
837
602adf40 838 return 0;
bb23e37a
AE
839out_2big:
840 ret = -EIO;
6a52325f 841out_err:
bb23e37a
AE
842 kfree(snap_sizes);
843 kfree(snap_names);
844 ceph_put_snap_context(snapc);
845 kfree(object_prefix);
846
847 return ret;
602adf40
YS
848}
849
9682fc6d
AE
850static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
851{
852 const char *snap_name;
853
854 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
855
856 /* Skip over names until we find the one we are looking for */
857
858 snap_name = rbd_dev->header.snap_names;
859 while (which--)
860 snap_name += strlen(snap_name) + 1;
861
862 return kstrdup(snap_name, GFP_KERNEL);
863}
864
30d1cff8
AE
865/*
866 * Snapshot id comparison function for use with qsort()/bsearch().
867 * Note that result is for snapshots in *descending* order.
868 */
869static int snapid_compare_reverse(const void *s1, const void *s2)
870{
871 u64 snap_id1 = *(u64 *)s1;
872 u64 snap_id2 = *(u64 *)s2;
873
874 if (snap_id1 < snap_id2)
875 return 1;
876 return snap_id1 == snap_id2 ? 0 : -1;
877}
878
879/*
880 * Search a snapshot context to see if the given snapshot id is
881 * present.
882 *
883 * Returns the position of the snapshot id in the array if it's found,
884 * or BAD_SNAP_INDEX otherwise.
885 *
886 * Note: The snapshot array is in kept sorted (by the osd) in
887 * reverse order, highest snapshot id first.
888 */
9682fc6d
AE
889static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
890{
891 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 892 u64 *found;
9682fc6d 893
30d1cff8
AE
894 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
895 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 896
30d1cff8 897 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
898}
899
2ad3d716
AE
900static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
901 u64 snap_id)
9e15b77d 902{
54cac61f 903 u32 which;
9e15b77d 904
54cac61f
AE
905 which = rbd_dev_snap_index(rbd_dev, snap_id);
906 if (which == BAD_SNAP_INDEX)
907 return NULL;
908
909 return _rbd_dev_v1_snap_name(rbd_dev, which);
910}
911
912static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
913{
9e15b77d
AE
914 if (snap_id == CEPH_NOSNAP)
915 return RBD_SNAP_HEAD_NAME;
916
54cac61f
AE
917 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
918 if (rbd_dev->image_format == 1)
919 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 920
54cac61f 921 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
922}
923
2ad3d716
AE
924static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
925 u64 *snap_size)
602adf40 926{
2ad3d716
AE
927 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
928 if (snap_id == CEPH_NOSNAP) {
929 *snap_size = rbd_dev->header.image_size;
930 } else if (rbd_dev->image_format == 1) {
931 u32 which;
602adf40 932
2ad3d716
AE
933 which = rbd_dev_snap_index(rbd_dev, snap_id);
934 if (which == BAD_SNAP_INDEX)
935 return -ENOENT;
e86924a8 936
2ad3d716
AE
937 *snap_size = rbd_dev->header.snap_sizes[which];
938 } else {
939 u64 size = 0;
940 int ret;
941
942 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
943 if (ret)
944 return ret;
945
946 *snap_size = size;
947 }
948 return 0;
602adf40
YS
949}
950
2ad3d716
AE
951static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
952 u64 *snap_features)
602adf40 953{
2ad3d716
AE
954 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
955 if (snap_id == CEPH_NOSNAP) {
956 *snap_features = rbd_dev->header.features;
957 } else if (rbd_dev->image_format == 1) {
958 *snap_features = 0; /* No features for format 1 */
602adf40 959 } else {
2ad3d716
AE
960 u64 features = 0;
961 int ret;
8b0241f8 962
2ad3d716
AE
963 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
964 if (ret)
965 return ret;
966
967 *snap_features = features;
968 }
969 return 0;
970}
971
972static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
973{
8f4b7d98 974 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
975 u64 size = 0;
976 u64 features = 0;
977 int ret;
978
2ad3d716
AE
979 ret = rbd_snap_size(rbd_dev, snap_id, &size);
980 if (ret)
981 return ret;
982 ret = rbd_snap_features(rbd_dev, snap_id, &features);
983 if (ret)
984 return ret;
985
986 rbd_dev->mapping.size = size;
987 rbd_dev->mapping.features = features;
988
8b0241f8 989 return 0;
602adf40
YS
990}
991
d1cf5788
AE
992static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
993{
994 rbd_dev->mapping.size = 0;
995 rbd_dev->mapping.features = 0;
d1cf5788
AE
996}
997
98571b5a 998static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 999{
65ccfe21
AE
1000 char *name;
1001 u64 segment;
1002 int ret;
602adf40 1003
78c2a44a 1004 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
65ccfe21
AE
1005 if (!name)
1006 return NULL;
1007 segment = offset >> rbd_dev->header.obj_order;
2fd82b9e 1008 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
65ccfe21 1009 rbd_dev->header.object_prefix, segment);
2fd82b9e 1010 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
1011 pr_err("error formatting segment name for #%llu (%d)\n",
1012 segment, ret);
1013 kfree(name);
1014 name = NULL;
1015 }
602adf40 1016
65ccfe21
AE
1017 return name;
1018}
602adf40 1019
78c2a44a
AE
1020static void rbd_segment_name_free(const char *name)
1021{
1022 /* The explicit cast here is needed to drop the const qualifier */
1023
1024 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1025}
1026
65ccfe21
AE
1027static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1028{
1029 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 1030
65ccfe21
AE
1031 return offset & (segment_size - 1);
1032}
1033
1034static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1035 u64 offset, u64 length)
1036{
1037 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1038
1039 offset &= segment_size - 1;
1040
aafb230e 1041 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1042 if (offset + length > segment_size)
1043 length = segment_size - offset;
1044
1045 return length;
602adf40
YS
1046}
1047
029bcbd8
JD
1048/*
1049 * returns the size of an object in the image
1050 */
1051static u64 rbd_obj_bytes(struct rbd_image_header *header)
1052{
1053 return 1 << header->obj_order;
1054}
1055
602adf40
YS
1056/*
1057 * bio helpers
1058 */
1059
1060static void bio_chain_put(struct bio *chain)
1061{
1062 struct bio *tmp;
1063
1064 while (chain) {
1065 tmp = chain;
1066 chain = chain->bi_next;
1067 bio_put(tmp);
1068 }
1069}
1070
1071/*
1072 * zeros a bio chain, starting at specific offset
1073 */
1074static void zero_bio_chain(struct bio *chain, int start_ofs)
1075{
1076 struct bio_vec *bv;
1077 unsigned long flags;
1078 void *buf;
1079 int i;
1080 int pos = 0;
1081
1082 while (chain) {
1083 bio_for_each_segment(bv, chain, i) {
1084 if (pos + bv->bv_len > start_ofs) {
1085 int remainder = max(start_ofs - pos, 0);
1086 buf = bvec_kmap_irq(bv, &flags);
1087 memset(buf + remainder, 0,
1088 bv->bv_len - remainder);
85b5aaa6 1089 bvec_kunmap_irq(buf, &flags);
602adf40
YS
1090 }
1091 pos += bv->bv_len;
1092 }
1093
1094 chain = chain->bi_next;
1095 }
1096}
1097
b9434c5b
AE
1098/*
1099 * similar to zero_bio_chain(), zeros data defined by a page array,
1100 * starting at the given byte offset from the start of the array and
1101 * continuing up to the given end offset. The pages array is
1102 * assumed to be big enough to hold all bytes up to the end.
1103 */
1104static void zero_pages(struct page **pages, u64 offset, u64 end)
1105{
1106 struct page **page = &pages[offset >> PAGE_SHIFT];
1107
1108 rbd_assert(end > offset);
1109 rbd_assert(end - offset <= (u64)SIZE_MAX);
1110 while (offset < end) {
1111 size_t page_offset;
1112 size_t length;
1113 unsigned long flags;
1114 void *kaddr;
1115
1116 page_offset = (size_t)(offset & ~PAGE_MASK);
1117 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1118 local_irq_save(flags);
1119 kaddr = kmap_atomic(*page);
1120 memset(kaddr + page_offset, 0, length);
1121 kunmap_atomic(kaddr);
1122 local_irq_restore(flags);
1123
1124 offset += length;
1125 page++;
1126 }
1127}
1128
602adf40 1129/*
f7760dad
AE
1130 * Clone a portion of a bio, starting at the given byte offset
1131 * and continuing for the number of bytes indicated.
602adf40 1132 */
f7760dad
AE
1133static struct bio *bio_clone_range(struct bio *bio_src,
1134 unsigned int offset,
1135 unsigned int len,
1136 gfp_t gfpmask)
602adf40 1137{
f7760dad
AE
1138 struct bio_vec *bv;
1139 unsigned int resid;
1140 unsigned short idx;
1141 unsigned int voff;
1142 unsigned short end_idx;
1143 unsigned short vcnt;
1144 struct bio *bio;
1145
1146 /* Handle the easy case for the caller */
1147
1148 if (!offset && len == bio_src->bi_size)
1149 return bio_clone(bio_src, gfpmask);
1150
1151 if (WARN_ON_ONCE(!len))
1152 return NULL;
1153 if (WARN_ON_ONCE(len > bio_src->bi_size))
1154 return NULL;
1155 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1156 return NULL;
1157
1158 /* Find first affected segment... */
1159
1160 resid = offset;
1161 __bio_for_each_segment(bv, bio_src, idx, 0) {
1162 if (resid < bv->bv_len)
1163 break;
1164 resid -= bv->bv_len;
602adf40 1165 }
f7760dad 1166 voff = resid;
602adf40 1167
f7760dad 1168 /* ...and the last affected segment */
602adf40 1169
f7760dad
AE
1170 resid += len;
1171 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1172 if (resid <= bv->bv_len)
1173 break;
1174 resid -= bv->bv_len;
1175 }
1176 vcnt = end_idx - idx + 1;
1177
1178 /* Build the clone */
1179
1180 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1181 if (!bio)
1182 return NULL; /* ENOMEM */
602adf40 1183
f7760dad
AE
1184 bio->bi_bdev = bio_src->bi_bdev;
1185 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1186 bio->bi_rw = bio_src->bi_rw;
1187 bio->bi_flags |= 1 << BIO_CLONED;
1188
1189 /*
1190 * Copy over our part of the bio_vec, then update the first
1191 * and last (or only) entries.
1192 */
1193 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1194 vcnt * sizeof (struct bio_vec));
1195 bio->bi_io_vec[0].bv_offset += voff;
1196 if (vcnt > 1) {
1197 bio->bi_io_vec[0].bv_len -= voff;
1198 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1199 } else {
1200 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
1201 }
1202
f7760dad
AE
1203 bio->bi_vcnt = vcnt;
1204 bio->bi_size = len;
1205 bio->bi_idx = 0;
1206
1207 return bio;
1208}
1209
1210/*
1211 * Clone a portion of a bio chain, starting at the given byte offset
1212 * into the first bio in the source chain and continuing for the
1213 * number of bytes indicated. The result is another bio chain of
1214 * exactly the given length, or a null pointer on error.
1215 *
1216 * The bio_src and offset parameters are both in-out. On entry they
1217 * refer to the first source bio and the offset into that bio where
1218 * the start of data to be cloned is located.
1219 *
1220 * On return, bio_src is updated to refer to the bio in the source
1221 * chain that contains first un-cloned byte, and *offset will
1222 * contain the offset of that byte within that bio.
1223 */
1224static struct bio *bio_chain_clone_range(struct bio **bio_src,
1225 unsigned int *offset,
1226 unsigned int len,
1227 gfp_t gfpmask)
1228{
1229 struct bio *bi = *bio_src;
1230 unsigned int off = *offset;
1231 struct bio *chain = NULL;
1232 struct bio **end;
1233
1234 /* Build up a chain of clone bios up to the limit */
1235
1236 if (!bi || off >= bi->bi_size || !len)
1237 return NULL; /* Nothing to clone */
602adf40 1238
f7760dad
AE
1239 end = &chain;
1240 while (len) {
1241 unsigned int bi_size;
1242 struct bio *bio;
1243
f5400b7a
AE
1244 if (!bi) {
1245 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1246 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1247 }
f7760dad
AE
1248 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1249 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1250 if (!bio)
1251 goto out_err; /* ENOMEM */
1252
1253 *end = bio;
1254 end = &bio->bi_next;
602adf40 1255
f7760dad
AE
1256 off += bi_size;
1257 if (off == bi->bi_size) {
1258 bi = bi->bi_next;
1259 off = 0;
1260 }
1261 len -= bi_size;
1262 }
1263 *bio_src = bi;
1264 *offset = off;
1265
1266 return chain;
1267out_err:
1268 bio_chain_put(chain);
602adf40 1269
602adf40
YS
1270 return NULL;
1271}
1272
926f9b3f
AE
1273/*
1274 * The default/initial value for all object request flags is 0. For
1275 * each flag, once its value is set to 1 it is never reset to 0
1276 * again.
1277 */
57acbaa7 1278static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1279{
57acbaa7 1280 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1281 struct rbd_device *rbd_dev;
1282
57acbaa7
AE
1283 rbd_dev = obj_request->img_request->rbd_dev;
1284 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
926f9b3f
AE
1285 obj_request);
1286 }
1287}
1288
57acbaa7 1289static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1290{
1291 smp_mb();
57acbaa7 1292 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1293}
1294
57acbaa7 1295static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1296{
57acbaa7
AE
1297 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1298 struct rbd_device *rbd_dev = NULL;
6365d33a 1299
57acbaa7
AE
1300 if (obj_request_img_data_test(obj_request))
1301 rbd_dev = obj_request->img_request->rbd_dev;
1302 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
6365d33a
AE
1303 obj_request);
1304 }
1305}
1306
57acbaa7 1307static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1308{
1309 smp_mb();
57acbaa7 1310 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1311}
1312
5679c59f
AE
1313/*
1314 * This sets the KNOWN flag after (possibly) setting the EXISTS
1315 * flag. The latter is set based on the "exists" value provided.
1316 *
1317 * Note that for our purposes once an object exists it never goes
1318 * away again. It's possible that the response from two existence
1319 * checks are separated by the creation of the target object, and
1320 * the first ("doesn't exist") response arrives *after* the second
1321 * ("does exist"). In that case we ignore the second one.
1322 */
1323static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1324 bool exists)
1325{
1326 if (exists)
1327 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1328 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1329 smp_mb();
1330}
1331
1332static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1333{
1334 smp_mb();
1335 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1336}
1337
1338static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1339{
1340 smp_mb();
1341 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1342}
1343
bf0d5f50
AE
1344static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1345{
37206ee5
AE
1346 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1347 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1348 kref_get(&obj_request->kref);
1349}
1350
1351static void rbd_obj_request_destroy(struct kref *kref);
1352static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1353{
1354 rbd_assert(obj_request != NULL);
37206ee5
AE
1355 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1356 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1357 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1358}
1359
bf0d5f50
AE
1360static void rbd_img_request_destroy(struct kref *kref);
1361static void rbd_img_request_put(struct rbd_img_request *img_request)
1362{
1363 rbd_assert(img_request != NULL);
37206ee5
AE
1364 dout("%s: img %p (was %d)\n", __func__, img_request,
1365 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1366 kref_put(&img_request->kref, rbd_img_request_destroy);
1367}
1368
1369static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1370 struct rbd_obj_request *obj_request)
1371{
25dcf954
AE
1372 rbd_assert(obj_request->img_request == NULL);
1373
b155e86c 1374 /* Image request now owns object's original reference */
bf0d5f50 1375 obj_request->img_request = img_request;
25dcf954 1376 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1377 rbd_assert(!obj_request_img_data_test(obj_request));
1378 obj_request_img_data_set(obj_request);
bf0d5f50 1379 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1380 img_request->obj_request_count++;
1381 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1382 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1383 obj_request->which);
bf0d5f50
AE
1384}
1385
1386static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1387 struct rbd_obj_request *obj_request)
1388{
1389 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1390
37206ee5
AE
1391 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1392 obj_request->which);
bf0d5f50 1393 list_del(&obj_request->links);
25dcf954
AE
1394 rbd_assert(img_request->obj_request_count > 0);
1395 img_request->obj_request_count--;
1396 rbd_assert(obj_request->which == img_request->obj_request_count);
1397 obj_request->which = BAD_WHICH;
6365d33a 1398 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1399 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1400 obj_request->img_request = NULL;
25dcf954 1401 obj_request->callback = NULL;
bf0d5f50
AE
1402 rbd_obj_request_put(obj_request);
1403}
1404
1405static bool obj_request_type_valid(enum obj_request_type type)
1406{
1407 switch (type) {
9969ebc5 1408 case OBJ_REQUEST_NODATA:
bf0d5f50 1409 case OBJ_REQUEST_BIO:
788e2df3 1410 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1411 return true;
1412 default:
1413 return false;
1414 }
1415}
1416
bf0d5f50
AE
1417static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1418 struct rbd_obj_request *obj_request)
1419{
37206ee5
AE
1420 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1421
bf0d5f50
AE
1422 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1423}
1424
1425static void rbd_img_request_complete(struct rbd_img_request *img_request)
1426{
55f27e09 1427
37206ee5 1428 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1429
1430 /*
1431 * If no error occurred, compute the aggregate transfer
1432 * count for the image request. We could instead use
1433 * atomic64_cmpxchg() to update it as each object request
1434 * completes; not clear which way is better off hand.
1435 */
1436 if (!img_request->result) {
1437 struct rbd_obj_request *obj_request;
1438 u64 xferred = 0;
1439
1440 for_each_obj_request(img_request, obj_request)
1441 xferred += obj_request->xferred;
1442 img_request->xferred = xferred;
1443 }
1444
bf0d5f50
AE
1445 if (img_request->callback)
1446 img_request->callback(img_request);
1447 else
1448 rbd_img_request_put(img_request);
1449}
1450
788e2df3
AE
1451/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1452
1453static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1454{
37206ee5
AE
1455 dout("%s: obj %p\n", __func__, obj_request);
1456
788e2df3
AE
1457 return wait_for_completion_interruptible(&obj_request->completion);
1458}
1459
0c425248
AE
1460/*
1461 * The default/initial value for all image request flags is 0. Each
1462 * is conditionally set to 1 at image request initialization time
1463 * and currently never change thereafter.
1464 */
1465static void img_request_write_set(struct rbd_img_request *img_request)
1466{
1467 set_bit(IMG_REQ_WRITE, &img_request->flags);
1468 smp_mb();
1469}
1470
1471static bool img_request_write_test(struct rbd_img_request *img_request)
1472{
1473 smp_mb();
1474 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1475}
1476
9849e986
AE
1477static void img_request_child_set(struct rbd_img_request *img_request)
1478{
1479 set_bit(IMG_REQ_CHILD, &img_request->flags);
1480 smp_mb();
1481}
1482
1483static bool img_request_child_test(struct rbd_img_request *img_request)
1484{
1485 smp_mb();
1486 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1487}
1488
d0b2e944
AE
1489static void img_request_layered_set(struct rbd_img_request *img_request)
1490{
1491 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1492 smp_mb();
1493}
1494
1495static bool img_request_layered_test(struct rbd_img_request *img_request)
1496{
1497 smp_mb();
1498 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1499}
1500
6e2a4505
AE
1501static void
1502rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1503{
b9434c5b
AE
1504 u64 xferred = obj_request->xferred;
1505 u64 length = obj_request->length;
1506
6e2a4505
AE
1507 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1508 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1509 xferred, length);
6e2a4505
AE
1510 /*
1511 * ENOENT means a hole in the image. We zero-fill the
1512 * entire length of the request. A short read also implies
1513 * zero-fill to the end of the request. Either way we
1514 * update the xferred count to indicate the whole request
1515 * was satisfied.
1516 */
b9434c5b 1517 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1518 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1519 if (obj_request->type == OBJ_REQUEST_BIO)
1520 zero_bio_chain(obj_request->bio_list, 0);
1521 else
1522 zero_pages(obj_request->pages, 0, length);
6e2a4505 1523 obj_request->result = 0;
b9434c5b
AE
1524 obj_request->xferred = length;
1525 } else if (xferred < length && !obj_request->result) {
1526 if (obj_request->type == OBJ_REQUEST_BIO)
1527 zero_bio_chain(obj_request->bio_list, xferred);
1528 else
1529 zero_pages(obj_request->pages, xferred, length);
1530 obj_request->xferred = length;
6e2a4505
AE
1531 }
1532 obj_request_done_set(obj_request);
1533}
1534
bf0d5f50
AE
1535static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1536{
37206ee5
AE
1537 dout("%s: obj %p cb %p\n", __func__, obj_request,
1538 obj_request->callback);
bf0d5f50
AE
1539 if (obj_request->callback)
1540 obj_request->callback(obj_request);
788e2df3
AE
1541 else
1542 complete_all(&obj_request->completion);
bf0d5f50
AE
1543}
1544
c47f9371 1545static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1546{
1547 dout("%s: obj %p\n", __func__, obj_request);
1548 obj_request_done_set(obj_request);
1549}
1550
c47f9371 1551static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1552{
57acbaa7 1553 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1554 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1555 bool layered = false;
1556
1557 if (obj_request_img_data_test(obj_request)) {
1558 img_request = obj_request->img_request;
1559 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1560 rbd_dev = img_request->rbd_dev;
57acbaa7 1561 }
8b3e1a56
AE
1562
1563 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1564 obj_request, img_request, obj_request->result,
1565 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1566 if (layered && obj_request->result == -ENOENT &&
1567 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1568 rbd_img_parent_read(obj_request);
1569 else if (img_request)
6e2a4505
AE
1570 rbd_img_obj_request_read_callback(obj_request);
1571 else
1572 obj_request_done_set(obj_request);
bf0d5f50
AE
1573}
1574
c47f9371 1575static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1576{
1b83bef2
SW
1577 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1578 obj_request->result, obj_request->length);
1579 /*
8b3e1a56
AE
1580 * There is no such thing as a successful short write. Set
1581 * it to our originally-requested length.
1b83bef2
SW
1582 */
1583 obj_request->xferred = obj_request->length;
07741308 1584 obj_request_done_set(obj_request);
bf0d5f50
AE
1585}
1586
fbfab539
AE
1587/*
1588 * For a simple stat call there's nothing to do. We'll do more if
1589 * this is part of a write sequence for a layered image.
1590 */
c47f9371 1591static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1592{
37206ee5 1593 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1594 obj_request_done_set(obj_request);
1595}
1596
bf0d5f50
AE
1597static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1598 struct ceph_msg *msg)
1599{
1600 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1601 u16 opcode;
1602
37206ee5 1603 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50 1604 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1605 if (obj_request_img_data_test(obj_request)) {
1606 rbd_assert(obj_request->img_request);
1607 rbd_assert(obj_request->which != BAD_WHICH);
1608 } else {
1609 rbd_assert(obj_request->which == BAD_WHICH);
1610 }
bf0d5f50 1611
1b83bef2
SW
1612 if (osd_req->r_result < 0)
1613 obj_request->result = osd_req->r_result;
bf0d5f50 1614
0eefd470 1615 BUG_ON(osd_req->r_num_ops > 2);
bf0d5f50 1616
c47f9371
AE
1617 /*
1618 * We support a 64-bit length, but ultimately it has to be
1619 * passed to blk_end_request(), which takes an unsigned int.
1620 */
1b83bef2 1621 obj_request->xferred = osd_req->r_reply_op_len[0];
8b3e1a56 1622 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
79528734 1623 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1624 switch (opcode) {
1625 case CEPH_OSD_OP_READ:
c47f9371 1626 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1627 break;
1628 case CEPH_OSD_OP_WRITE:
c47f9371 1629 rbd_osd_write_callback(obj_request);
bf0d5f50 1630 break;
fbfab539 1631 case CEPH_OSD_OP_STAT:
c47f9371 1632 rbd_osd_stat_callback(obj_request);
fbfab539 1633 break;
36be9a76 1634 case CEPH_OSD_OP_CALL:
b8d70035 1635 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1636 case CEPH_OSD_OP_WATCH:
c47f9371 1637 rbd_osd_trivial_callback(obj_request);
9969ebc5 1638 break;
bf0d5f50
AE
1639 default:
1640 rbd_warn(NULL, "%s: unsupported op %hu\n",
1641 obj_request->object_name, (unsigned short) opcode);
1642 break;
1643 }
1644
07741308 1645 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1646 rbd_obj_request_complete(obj_request);
1647}
1648
9d4df01f 1649static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3
AE
1650{
1651 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1652 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1653 u64 snap_id;
430c28c3 1654
8c042b0d 1655 rbd_assert(osd_req != NULL);
430c28c3 1656
9d4df01f 1657 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
8c042b0d 1658 ceph_osdc_build_request(osd_req, obj_request->offset,
9d4df01f
AE
1659 NULL, snap_id, NULL);
1660}
1661
1662static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1663{
1664 struct rbd_img_request *img_request = obj_request->img_request;
1665 struct ceph_osd_request *osd_req = obj_request->osd_req;
1666 struct ceph_snap_context *snapc;
1667 struct timespec mtime = CURRENT_TIME;
1668
1669 rbd_assert(osd_req != NULL);
1670
1671 snapc = img_request ? img_request->snapc : NULL;
1672 ceph_osdc_build_request(osd_req, obj_request->offset,
1673 snapc, CEPH_NOSNAP, &mtime);
430c28c3
AE
1674}
1675
bf0d5f50
AE
1676static struct ceph_osd_request *rbd_osd_req_create(
1677 struct rbd_device *rbd_dev,
1678 bool write_request,
430c28c3 1679 struct rbd_obj_request *obj_request)
bf0d5f50 1680{
bf0d5f50
AE
1681 struct ceph_snap_context *snapc = NULL;
1682 struct ceph_osd_client *osdc;
1683 struct ceph_osd_request *osd_req;
bf0d5f50 1684
6365d33a
AE
1685 if (obj_request_img_data_test(obj_request)) {
1686 struct rbd_img_request *img_request = obj_request->img_request;
1687
0c425248
AE
1688 rbd_assert(write_request ==
1689 img_request_write_test(img_request));
1690 if (write_request)
bf0d5f50 1691 snapc = img_request->snapc;
bf0d5f50
AE
1692 }
1693
1694 /* Allocate and initialize the request, for the single op */
1695
1696 osdc = &rbd_dev->rbd_client->client->osdc;
1697 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1698 if (!osd_req)
1699 return NULL; /* ENOMEM */
bf0d5f50 1700
430c28c3 1701 if (write_request)
bf0d5f50 1702 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1703 else
bf0d5f50 1704 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1705
1706 osd_req->r_callback = rbd_osd_req_callback;
1707 osd_req->r_priv = obj_request;
1708
1709 osd_req->r_oid_len = strlen(obj_request->object_name);
1710 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1711 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1712
1713 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1714
bf0d5f50
AE
1715 return osd_req;
1716}
1717
0eefd470
AE
1718/*
1719 * Create a copyup osd request based on the information in the
1720 * object request supplied. A copyup request has two osd ops,
1721 * a copyup method call, and a "normal" write request.
1722 */
1723static struct ceph_osd_request *
1724rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1725{
1726 struct rbd_img_request *img_request;
1727 struct ceph_snap_context *snapc;
1728 struct rbd_device *rbd_dev;
1729 struct ceph_osd_client *osdc;
1730 struct ceph_osd_request *osd_req;
1731
1732 rbd_assert(obj_request_img_data_test(obj_request));
1733 img_request = obj_request->img_request;
1734 rbd_assert(img_request);
1735 rbd_assert(img_request_write_test(img_request));
1736
1737 /* Allocate and initialize the request, for the two ops */
1738
1739 snapc = img_request->snapc;
1740 rbd_dev = img_request->rbd_dev;
1741 osdc = &rbd_dev->rbd_client->client->osdc;
1742 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1743 if (!osd_req)
1744 return NULL; /* ENOMEM */
1745
1746 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1747 osd_req->r_callback = rbd_osd_req_callback;
1748 osd_req->r_priv = obj_request;
1749
1750 osd_req->r_oid_len = strlen(obj_request->object_name);
1751 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1752 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1753
1754 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1755
1756 return osd_req;
1757}
1758
1759
bf0d5f50
AE
1760static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1761{
1762 ceph_osdc_put_request(osd_req);
1763}
1764
1765/* object_name is assumed to be a non-null pointer and NUL-terminated */
1766
1767static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1768 u64 offset, u64 length,
1769 enum obj_request_type type)
1770{
1771 struct rbd_obj_request *obj_request;
1772 size_t size;
1773 char *name;
1774
1775 rbd_assert(obj_request_type_valid(type));
1776
1777 size = strlen(object_name) + 1;
f907ad55
AE
1778 name = kmalloc(size, GFP_KERNEL);
1779 if (!name)
bf0d5f50
AE
1780 return NULL;
1781
868311b1 1782 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
f907ad55
AE
1783 if (!obj_request) {
1784 kfree(name);
1785 return NULL;
1786 }
1787
bf0d5f50
AE
1788 obj_request->object_name = memcpy(name, object_name, size);
1789 obj_request->offset = offset;
1790 obj_request->length = length;
926f9b3f 1791 obj_request->flags = 0;
bf0d5f50
AE
1792 obj_request->which = BAD_WHICH;
1793 obj_request->type = type;
1794 INIT_LIST_HEAD(&obj_request->links);
788e2df3 1795 init_completion(&obj_request->completion);
bf0d5f50
AE
1796 kref_init(&obj_request->kref);
1797
37206ee5
AE
1798 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1799 offset, length, (int)type, obj_request);
1800
bf0d5f50
AE
1801 return obj_request;
1802}
1803
1804static void rbd_obj_request_destroy(struct kref *kref)
1805{
1806 struct rbd_obj_request *obj_request;
1807
1808 obj_request = container_of(kref, struct rbd_obj_request, kref);
1809
37206ee5
AE
1810 dout("%s: obj %p\n", __func__, obj_request);
1811
bf0d5f50
AE
1812 rbd_assert(obj_request->img_request == NULL);
1813 rbd_assert(obj_request->which == BAD_WHICH);
1814
1815 if (obj_request->osd_req)
1816 rbd_osd_req_destroy(obj_request->osd_req);
1817
1818 rbd_assert(obj_request_type_valid(obj_request->type));
1819 switch (obj_request->type) {
9969ebc5
AE
1820 case OBJ_REQUEST_NODATA:
1821 break; /* Nothing to do */
bf0d5f50
AE
1822 case OBJ_REQUEST_BIO:
1823 if (obj_request->bio_list)
1824 bio_chain_put(obj_request->bio_list);
1825 break;
788e2df3
AE
1826 case OBJ_REQUEST_PAGES:
1827 if (obj_request->pages)
1828 ceph_release_page_vector(obj_request->pages,
1829 obj_request->page_count);
1830 break;
bf0d5f50
AE
1831 }
1832
f907ad55 1833 kfree(obj_request->object_name);
868311b1
AE
1834 obj_request->object_name = NULL;
1835 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
1836}
1837
1838/*
1839 * Caller is responsible for filling in the list of object requests
1840 * that comprises the image request, and the Linux request pointer
1841 * (if there is one).
1842 */
cc344fa1
AE
1843static struct rbd_img_request *rbd_img_request_create(
1844 struct rbd_device *rbd_dev,
bf0d5f50 1845 u64 offset, u64 length,
9849e986
AE
1846 bool write_request,
1847 bool child_request)
bf0d5f50
AE
1848{
1849 struct rbd_img_request *img_request;
bf0d5f50 1850
1c2a9dfe 1851 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
bf0d5f50
AE
1852 if (!img_request)
1853 return NULL;
1854
1855 if (write_request) {
1856 down_read(&rbd_dev->header_rwsem);
812164f8 1857 ceph_get_snap_context(rbd_dev->header.snapc);
bf0d5f50 1858 up_read(&rbd_dev->header_rwsem);
bf0d5f50
AE
1859 }
1860
1861 img_request->rq = NULL;
1862 img_request->rbd_dev = rbd_dev;
1863 img_request->offset = offset;
1864 img_request->length = length;
0c425248
AE
1865 img_request->flags = 0;
1866 if (write_request) {
1867 img_request_write_set(img_request);
468521c1 1868 img_request->snapc = rbd_dev->header.snapc;
0c425248 1869 } else {
bf0d5f50 1870 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 1871 }
9849e986
AE
1872 if (child_request)
1873 img_request_child_set(img_request);
d0b2e944
AE
1874 if (rbd_dev->parent_spec)
1875 img_request_layered_set(img_request);
bf0d5f50
AE
1876 spin_lock_init(&img_request->completion_lock);
1877 img_request->next_completion = 0;
1878 img_request->callback = NULL;
a5a337d4 1879 img_request->result = 0;
bf0d5f50
AE
1880 img_request->obj_request_count = 0;
1881 INIT_LIST_HEAD(&img_request->obj_requests);
1882 kref_init(&img_request->kref);
1883
37206ee5
AE
1884 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1885 write_request ? "write" : "read", offset, length,
1886 img_request);
1887
bf0d5f50
AE
1888 return img_request;
1889}
1890
1891static void rbd_img_request_destroy(struct kref *kref)
1892{
1893 struct rbd_img_request *img_request;
1894 struct rbd_obj_request *obj_request;
1895 struct rbd_obj_request *next_obj_request;
1896
1897 img_request = container_of(kref, struct rbd_img_request, kref);
1898
37206ee5
AE
1899 dout("%s: img %p\n", __func__, img_request);
1900
bf0d5f50
AE
1901 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1902 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 1903 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 1904
0c425248 1905 if (img_request_write_test(img_request))
812164f8 1906 ceph_put_snap_context(img_request->snapc);
bf0d5f50 1907
8b3e1a56
AE
1908 if (img_request_child_test(img_request))
1909 rbd_obj_request_put(img_request->obj_request);
1910
1c2a9dfe 1911 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
1912}
1913
1217857f
AE
1914static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1915{
6365d33a 1916 struct rbd_img_request *img_request;
1217857f
AE
1917 unsigned int xferred;
1918 int result;
8b3e1a56 1919 bool more;
1217857f 1920
6365d33a
AE
1921 rbd_assert(obj_request_img_data_test(obj_request));
1922 img_request = obj_request->img_request;
1923
1217857f
AE
1924 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1925 xferred = (unsigned int)obj_request->xferred;
1926 result = obj_request->result;
1927 if (result) {
1928 struct rbd_device *rbd_dev = img_request->rbd_dev;
1929
1930 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1931 img_request_write_test(img_request) ? "write" : "read",
1932 obj_request->length, obj_request->img_offset,
1933 obj_request->offset);
1934 rbd_warn(rbd_dev, " result %d xferred %x\n",
1935 result, xferred);
1936 if (!img_request->result)
1937 img_request->result = result;
1938 }
1939
f1a4739f
AE
1940 /* Image object requests don't own their page array */
1941
1942 if (obj_request->type == OBJ_REQUEST_PAGES) {
1943 obj_request->pages = NULL;
1944 obj_request->page_count = 0;
1945 }
1946
8b3e1a56
AE
1947 if (img_request_child_test(img_request)) {
1948 rbd_assert(img_request->obj_request != NULL);
1949 more = obj_request->which < img_request->obj_request_count - 1;
1950 } else {
1951 rbd_assert(img_request->rq != NULL);
1952 more = blk_end_request(img_request->rq, result, xferred);
1953 }
1954
1955 return more;
1217857f
AE
1956}
1957
2169238d
AE
1958static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1959{
1960 struct rbd_img_request *img_request;
1961 u32 which = obj_request->which;
1962 bool more = true;
1963
6365d33a 1964 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
1965 img_request = obj_request->img_request;
1966
1967 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1968 rbd_assert(img_request != NULL);
2169238d
AE
1969 rbd_assert(img_request->obj_request_count > 0);
1970 rbd_assert(which != BAD_WHICH);
1971 rbd_assert(which < img_request->obj_request_count);
1972 rbd_assert(which >= img_request->next_completion);
1973
1974 spin_lock_irq(&img_request->completion_lock);
1975 if (which != img_request->next_completion)
1976 goto out;
1977
1978 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
1979 rbd_assert(more);
1980 rbd_assert(which < img_request->obj_request_count);
1981
1982 if (!obj_request_done_test(obj_request))
1983 break;
1217857f 1984 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
1985 which++;
1986 }
1987
1988 rbd_assert(more ^ (which == img_request->obj_request_count));
1989 img_request->next_completion = which;
1990out:
1991 spin_unlock_irq(&img_request->completion_lock);
1992
1993 if (!more)
1994 rbd_img_request_complete(img_request);
1995}
1996
f1a4739f
AE
1997/*
1998 * Split up an image request into one or more object requests, each
1999 * to a different object. The "type" parameter indicates whether
2000 * "data_desc" is the pointer to the head of a list of bio
2001 * structures, or the base of a page array. In either case this
2002 * function assumes data_desc describes memory sufficient to hold
2003 * all data described by the image request.
2004 */
2005static int rbd_img_request_fill(struct rbd_img_request *img_request,
2006 enum obj_request_type type,
2007 void *data_desc)
bf0d5f50
AE
2008{
2009 struct rbd_device *rbd_dev = img_request->rbd_dev;
2010 struct rbd_obj_request *obj_request = NULL;
2011 struct rbd_obj_request *next_obj_request;
0c425248 2012 bool write_request = img_request_write_test(img_request);
f1a4739f
AE
2013 struct bio *bio_list;
2014 unsigned int bio_offset = 0;
2015 struct page **pages;
7da22d29 2016 u64 img_offset;
bf0d5f50
AE
2017 u64 resid;
2018 u16 opcode;
2019
f1a4739f
AE
2020 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2021 (int)type, data_desc);
37206ee5 2022
430c28c3 2023 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
7da22d29 2024 img_offset = img_request->offset;
bf0d5f50 2025 resid = img_request->length;
4dda41d3 2026 rbd_assert(resid > 0);
f1a4739f
AE
2027
2028 if (type == OBJ_REQUEST_BIO) {
2029 bio_list = data_desc;
2030 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2031 } else {
2032 rbd_assert(type == OBJ_REQUEST_PAGES);
2033 pages = data_desc;
2034 }
2035
bf0d5f50 2036 while (resid) {
2fa12320 2037 struct ceph_osd_request *osd_req;
bf0d5f50 2038 const char *object_name;
bf0d5f50
AE
2039 u64 offset;
2040 u64 length;
2041
7da22d29 2042 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
2043 if (!object_name)
2044 goto out_unwind;
7da22d29
AE
2045 offset = rbd_segment_offset(rbd_dev, img_offset);
2046 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2047 obj_request = rbd_obj_request_create(object_name,
f1a4739f 2048 offset, length, type);
78c2a44a
AE
2049 /* object request has its own copy of the object name */
2050 rbd_segment_name_free(object_name);
bf0d5f50
AE
2051 if (!obj_request)
2052 goto out_unwind;
2053
f1a4739f
AE
2054 if (type == OBJ_REQUEST_BIO) {
2055 unsigned int clone_size;
2056
2057 rbd_assert(length <= (u64)UINT_MAX);
2058 clone_size = (unsigned int)length;
2059 obj_request->bio_list =
2060 bio_chain_clone_range(&bio_list,
2061 &bio_offset,
2062 clone_size,
2063 GFP_ATOMIC);
2064 if (!obj_request->bio_list)
2065 goto out_partial;
2066 } else {
2067 unsigned int page_count;
2068
2069 obj_request->pages = pages;
2070 page_count = (u32)calc_pages_for(offset, length);
2071 obj_request->page_count = page_count;
2072 if ((offset + length) & ~PAGE_MASK)
2073 page_count--; /* more on last page */
2074 pages += page_count;
2075 }
bf0d5f50 2076
2fa12320
AE
2077 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2078 obj_request);
2079 if (!osd_req)
bf0d5f50 2080 goto out_partial;
2fa12320 2081 obj_request->osd_req = osd_req;
2169238d 2082 obj_request->callback = rbd_img_obj_callback;
430c28c3 2083
2fa12320
AE
2084 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2085 0, 0);
f1a4739f
AE
2086 if (type == OBJ_REQUEST_BIO)
2087 osd_req_op_extent_osd_data_bio(osd_req, 0,
2088 obj_request->bio_list, length);
2089 else
2090 osd_req_op_extent_osd_data_pages(osd_req, 0,
2091 obj_request->pages, length,
2092 offset & ~PAGE_MASK, false, false);
9d4df01f
AE
2093
2094 if (write_request)
2095 rbd_osd_req_format_write(obj_request);
2096 else
2097 rbd_osd_req_format_read(obj_request);
430c28c3 2098
7da22d29 2099 obj_request->img_offset = img_offset;
bf0d5f50
AE
2100 rbd_img_obj_request_add(img_request, obj_request);
2101
7da22d29 2102 img_offset += length;
bf0d5f50
AE
2103 resid -= length;
2104 }
2105
2106 return 0;
2107
2108out_partial:
2109 rbd_obj_request_put(obj_request);
2110out_unwind:
2111 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2112 rbd_obj_request_put(obj_request);
2113
2114 return -ENOMEM;
2115}
2116
0eefd470
AE
2117static void
2118rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2119{
2120 struct rbd_img_request *img_request;
2121 struct rbd_device *rbd_dev;
2122 u64 length;
2123 u32 page_count;
2124
2125 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2126 rbd_assert(obj_request_img_data_test(obj_request));
2127 img_request = obj_request->img_request;
2128 rbd_assert(img_request);
2129
2130 rbd_dev = img_request->rbd_dev;
2131 rbd_assert(rbd_dev);
2132 length = (u64)1 << rbd_dev->header.obj_order;
2133 page_count = (u32)calc_pages_for(0, length);
2134
2135 rbd_assert(obj_request->copyup_pages);
2136 ceph_release_page_vector(obj_request->copyup_pages, page_count);
2137 obj_request->copyup_pages = NULL;
2138
2139 /*
2140 * We want the transfer count to reflect the size of the
2141 * original write request. There is no such thing as a
2142 * successful short write, so if the request was successful
2143 * we can just set it to the originally-requested length.
2144 */
2145 if (!obj_request->result)
2146 obj_request->xferred = obj_request->length;
2147
2148 /* Finish up with the normal image object callback */
2149
2150 rbd_img_obj_callback(obj_request);
2151}
2152
3d7efd18
AE
2153static void
2154rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2155{
2156 struct rbd_obj_request *orig_request;
0eefd470
AE
2157 struct ceph_osd_request *osd_req;
2158 struct ceph_osd_client *osdc;
2159 struct rbd_device *rbd_dev;
3d7efd18 2160 struct page **pages;
3d7efd18
AE
2161 int result;
2162 u64 obj_size;
2163 u64 xferred;
2164
2165 rbd_assert(img_request_child_test(img_request));
2166
2167 /* First get what we need from the image request */
2168
2169 pages = img_request->copyup_pages;
2170 rbd_assert(pages != NULL);
2171 img_request->copyup_pages = NULL;
2172
2173 orig_request = img_request->obj_request;
2174 rbd_assert(orig_request != NULL);
0eefd470 2175 rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
3d7efd18
AE
2176 result = img_request->result;
2177 obj_size = img_request->length;
2178 xferred = img_request->xferred;
91c6febb 2179 rbd_img_request_put(img_request);
3d7efd18 2180
91c6febb
AE
2181 rbd_assert(orig_request->img_request);
2182 rbd_dev = orig_request->img_request->rbd_dev;
0eefd470
AE
2183 rbd_assert(rbd_dev);
2184 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2185
0eefd470
AE
2186 if (result)
2187 goto out_err;
2188
2189 /* Allocate the new copyup osd request for the original request */
2190
2191 result = -ENOMEM;
2192 rbd_assert(!orig_request->osd_req);
2193 osd_req = rbd_osd_req_create_copyup(orig_request);
2194 if (!osd_req)
2195 goto out_err;
2196 orig_request->osd_req = osd_req;
2197 orig_request->copyup_pages = pages;
3d7efd18 2198
0eefd470 2199 /* Initialize the copyup op */
3d7efd18 2200
0eefd470
AE
2201 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2202 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2203 false, false);
3d7efd18 2204
0eefd470
AE
2205 /* Then the original write request op */
2206
2207 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2208 orig_request->offset,
2209 orig_request->length, 0, 0);
2210 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2211 orig_request->length);
2212
2213 rbd_osd_req_format_write(orig_request);
2214
2215 /* All set, send it off. */
2216
2217 orig_request->callback = rbd_img_obj_copyup_callback;
2218 osdc = &rbd_dev->rbd_client->client->osdc;
2219 result = rbd_obj_request_submit(osdc, orig_request);
2220 if (!result)
2221 return;
2222out_err:
2223 /* Record the error code and complete the request */
2224
2225 orig_request->result = result;
2226 orig_request->xferred = 0;
2227 obj_request_done_set(orig_request);
2228 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2229}
2230
2231/*
2232 * Read from the parent image the range of data that covers the
2233 * entire target of the given object request. This is used for
2234 * satisfying a layered image write request when the target of an
2235 * object request from the image request does not exist.
2236 *
2237 * A page array big enough to hold the returned data is allocated
2238 * and supplied to rbd_img_request_fill() as the "data descriptor."
2239 * When the read completes, this page array will be transferred to
2240 * the original object request for the copyup operation.
2241 *
2242 * If an error occurs, record it as the result of the original
2243 * object request and mark it done so it gets completed.
2244 */
2245static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2246{
2247 struct rbd_img_request *img_request = NULL;
2248 struct rbd_img_request *parent_request = NULL;
2249 struct rbd_device *rbd_dev;
2250 u64 img_offset;
2251 u64 length;
2252 struct page **pages = NULL;
2253 u32 page_count;
2254 int result;
2255
2256 rbd_assert(obj_request_img_data_test(obj_request));
2257 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2258
2259 img_request = obj_request->img_request;
2260 rbd_assert(img_request != NULL);
2261 rbd_dev = img_request->rbd_dev;
2262 rbd_assert(rbd_dev->parent != NULL);
2263
0eefd470
AE
2264 /*
2265 * First things first. The original osd request is of no
2266 * use to use any more, we'll need a new one that can hold
2267 * the two ops in a copyup request. We'll get that later,
2268 * but for now we can release the old one.
2269 */
2270 rbd_osd_req_destroy(obj_request->osd_req);
2271 obj_request->osd_req = NULL;
2272
3d7efd18
AE
2273 /*
2274 * Determine the byte range covered by the object in the
2275 * child image to which the original request was to be sent.
2276 */
2277 img_offset = obj_request->img_offset - obj_request->offset;
2278 length = (u64)1 << rbd_dev->header.obj_order;
2279
a9e8ba2c
AE
2280 /*
2281 * There is no defined parent data beyond the parent
2282 * overlap, so limit what we read at that boundary if
2283 * necessary.
2284 */
2285 if (img_offset + length > rbd_dev->parent_overlap) {
2286 rbd_assert(img_offset < rbd_dev->parent_overlap);
2287 length = rbd_dev->parent_overlap - img_offset;
2288 }
2289
3d7efd18
AE
2290 /*
2291 * Allocate a page array big enough to receive the data read
2292 * from the parent.
2293 */
2294 page_count = (u32)calc_pages_for(0, length);
2295 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2296 if (IS_ERR(pages)) {
2297 result = PTR_ERR(pages);
2298 pages = NULL;
2299 goto out_err;
2300 }
2301
2302 result = -ENOMEM;
2303 parent_request = rbd_img_request_create(rbd_dev->parent,
2304 img_offset, length,
2305 false, true);
2306 if (!parent_request)
2307 goto out_err;
2308 rbd_obj_request_get(obj_request);
2309 parent_request->obj_request = obj_request;
2310
2311 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2312 if (result)
2313 goto out_err;
2314 parent_request->copyup_pages = pages;
2315
2316 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2317 result = rbd_img_request_submit(parent_request);
2318 if (!result)
2319 return 0;
2320
2321 parent_request->copyup_pages = NULL;
2322 parent_request->obj_request = NULL;
2323 rbd_obj_request_put(obj_request);
2324out_err:
2325 if (pages)
2326 ceph_release_page_vector(pages, page_count);
2327 if (parent_request)
2328 rbd_img_request_put(parent_request);
2329 obj_request->result = result;
2330 obj_request->xferred = 0;
2331 obj_request_done_set(obj_request);
2332
2333 return result;
2334}
2335
c5b5ef6c
AE
2336static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2337{
c5b5ef6c
AE
2338 struct rbd_obj_request *orig_request;
2339 int result;
2340
2341 rbd_assert(!obj_request_img_data_test(obj_request));
2342
2343 /*
2344 * All we need from the object request is the original
2345 * request and the result of the STAT op. Grab those, then
2346 * we're done with the request.
2347 */
2348 orig_request = obj_request->obj_request;
2349 obj_request->obj_request = NULL;
2350 rbd_assert(orig_request);
2351 rbd_assert(orig_request->img_request);
2352
2353 result = obj_request->result;
2354 obj_request->result = 0;
2355
2356 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2357 obj_request, orig_request, result,
2358 obj_request->xferred, obj_request->length);
2359 rbd_obj_request_put(obj_request);
2360
2361 rbd_assert(orig_request);
2362 rbd_assert(orig_request->img_request);
c5b5ef6c
AE
2363
2364 /*
2365 * Our only purpose here is to determine whether the object
2366 * exists, and we don't want to treat the non-existence as
2367 * an error. If something else comes back, transfer the
2368 * error to the original request and complete it now.
2369 */
2370 if (!result) {
2371 obj_request_existence_set(orig_request, true);
2372 } else if (result == -ENOENT) {
2373 obj_request_existence_set(orig_request, false);
2374 } else if (result) {
2375 orig_request->result = result;
3d7efd18 2376 goto out;
c5b5ef6c
AE
2377 }
2378
2379 /*
2380 * Resubmit the original request now that we have recorded
2381 * whether the target object exists.
2382 */
b454e36d 2383 orig_request->result = rbd_img_obj_request_submit(orig_request);
3d7efd18 2384out:
c5b5ef6c
AE
2385 if (orig_request->result)
2386 rbd_obj_request_complete(orig_request);
2387 rbd_obj_request_put(orig_request);
2388}
2389
2390static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2391{
2392 struct rbd_obj_request *stat_request;
2393 struct rbd_device *rbd_dev;
2394 struct ceph_osd_client *osdc;
2395 struct page **pages = NULL;
2396 u32 page_count;
2397 size_t size;
2398 int ret;
2399
2400 /*
2401 * The response data for a STAT call consists of:
2402 * le64 length;
2403 * struct {
2404 * le32 tv_sec;
2405 * le32 tv_nsec;
2406 * } mtime;
2407 */
2408 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2409 page_count = (u32)calc_pages_for(0, size);
2410 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2411 if (IS_ERR(pages))
2412 return PTR_ERR(pages);
2413
2414 ret = -ENOMEM;
2415 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2416 OBJ_REQUEST_PAGES);
2417 if (!stat_request)
2418 goto out;
2419
2420 rbd_obj_request_get(obj_request);
2421 stat_request->obj_request = obj_request;
2422 stat_request->pages = pages;
2423 stat_request->page_count = page_count;
2424
2425 rbd_assert(obj_request->img_request);
2426 rbd_dev = obj_request->img_request->rbd_dev;
2427 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2428 stat_request);
2429 if (!stat_request->osd_req)
2430 goto out;
2431 stat_request->callback = rbd_img_obj_exists_callback;
2432
2433 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2434 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2435 false, false);
9d4df01f 2436 rbd_osd_req_format_read(stat_request);
c5b5ef6c
AE
2437
2438 osdc = &rbd_dev->rbd_client->client->osdc;
2439 ret = rbd_obj_request_submit(osdc, stat_request);
2440out:
2441 if (ret)
2442 rbd_obj_request_put(obj_request);
2443
2444 return ret;
2445}
2446
b454e36d
AE
2447static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2448{
2449 struct rbd_img_request *img_request;
a9e8ba2c 2450 struct rbd_device *rbd_dev;
3d7efd18 2451 bool known;
b454e36d
AE
2452
2453 rbd_assert(obj_request_img_data_test(obj_request));
2454
2455 img_request = obj_request->img_request;
2456 rbd_assert(img_request);
a9e8ba2c 2457 rbd_dev = img_request->rbd_dev;
b454e36d 2458
b454e36d 2459 /*
a9e8ba2c
AE
2460 * Only writes to layered images need special handling.
2461 * Reads and non-layered writes are simple object requests.
2462 * Layered writes that start beyond the end of the overlap
2463 * with the parent have no parent data, so they too are
2464 * simple object requests. Finally, if the target object is
2465 * known to already exist, its parent data has already been
2466 * copied, so a write to the object can also be handled as a
2467 * simple object request.
b454e36d
AE
2468 */
2469 if (!img_request_write_test(img_request) ||
2470 !img_request_layered_test(img_request) ||
a9e8ba2c 2471 rbd_dev->parent_overlap <= obj_request->img_offset ||
3d7efd18
AE
2472 ((known = obj_request_known_test(obj_request)) &&
2473 obj_request_exists_test(obj_request))) {
b454e36d
AE
2474
2475 struct rbd_device *rbd_dev;
2476 struct ceph_osd_client *osdc;
2477
2478 rbd_dev = obj_request->img_request->rbd_dev;
2479 osdc = &rbd_dev->rbd_client->client->osdc;
2480
2481 return rbd_obj_request_submit(osdc, obj_request);
2482 }
2483
2484 /*
3d7efd18
AE
2485 * It's a layered write. The target object might exist but
2486 * we may not know that yet. If we know it doesn't exist,
2487 * start by reading the data for the full target object from
2488 * the parent so we can use it for a copyup to the target.
b454e36d 2489 */
3d7efd18
AE
2490 if (known)
2491 return rbd_img_obj_parent_read_full(obj_request);
2492
2493 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2494
2495 return rbd_img_obj_exists_submit(obj_request);
2496}
2497
bf0d5f50
AE
2498static int rbd_img_request_submit(struct rbd_img_request *img_request)
2499{
bf0d5f50 2500 struct rbd_obj_request *obj_request;
46faeed4 2501 struct rbd_obj_request *next_obj_request;
bf0d5f50 2502
37206ee5 2503 dout("%s: img %p\n", __func__, img_request);
46faeed4 2504 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
2505 int ret;
2506
b454e36d 2507 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50
AE
2508 if (ret)
2509 return ret;
bf0d5f50
AE
2510 }
2511
2512 return 0;
2513}
8b3e1a56
AE
2514
2515static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2516{
2517 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2518 struct rbd_device *rbd_dev;
2519 u64 obj_end;
8b3e1a56
AE
2520
2521 rbd_assert(img_request_child_test(img_request));
2522
2523 obj_request = img_request->obj_request;
a9e8ba2c
AE
2524 rbd_assert(obj_request);
2525 rbd_assert(obj_request->img_request);
2526
8b3e1a56 2527 obj_request->result = img_request->result;
a9e8ba2c
AE
2528 if (obj_request->result)
2529 goto out;
2530
2531 /*
2532 * We need to zero anything beyond the parent overlap
2533 * boundary. Since rbd_img_obj_request_read_callback()
2534 * will zero anything beyond the end of a short read, an
2535 * easy way to do this is to pretend the data from the
2536 * parent came up short--ending at the overlap boundary.
2537 */
2538 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2539 obj_end = obj_request->img_offset + obj_request->length;
2540 rbd_dev = obj_request->img_request->rbd_dev;
2541 if (obj_end > rbd_dev->parent_overlap) {
2542 u64 xferred = 0;
2543
2544 if (obj_request->img_offset < rbd_dev->parent_overlap)
2545 xferred = rbd_dev->parent_overlap -
2546 obj_request->img_offset;
8b3e1a56 2547
a9e8ba2c
AE
2548 obj_request->xferred = min(img_request->xferred, xferred);
2549 } else {
2550 obj_request->xferred = img_request->xferred;
2551 }
2552out:
b5b09be3 2553 rbd_img_request_put(img_request);
8b3e1a56
AE
2554 rbd_img_obj_request_read_callback(obj_request);
2555 rbd_obj_request_complete(obj_request);
2556}
2557
2558static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2559{
2560 struct rbd_device *rbd_dev;
2561 struct rbd_img_request *img_request;
2562 int result;
2563
2564 rbd_assert(obj_request_img_data_test(obj_request));
2565 rbd_assert(obj_request->img_request != NULL);
2566 rbd_assert(obj_request->result == (s32) -ENOENT);
5b2ab72d 2567 rbd_assert(obj_request_type_valid(obj_request->type));
8b3e1a56
AE
2568
2569 rbd_dev = obj_request->img_request->rbd_dev;
2570 rbd_assert(rbd_dev->parent != NULL);
2571 /* rbd_read_finish(obj_request, obj_request->length); */
2572 img_request = rbd_img_request_create(rbd_dev->parent,
2573 obj_request->img_offset,
2574 obj_request->length,
2575 false, true);
2576 result = -ENOMEM;
2577 if (!img_request)
2578 goto out_err;
2579
2580 rbd_obj_request_get(obj_request);
2581 img_request->obj_request = obj_request;
2582
5b2ab72d
AE
2583 if (obj_request->type == OBJ_REQUEST_BIO)
2584 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2585 obj_request->bio_list);
2586 else
2587 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2588 obj_request->pages);
8b3e1a56
AE
2589 if (result)
2590 goto out_err;
2591
2592 img_request->callback = rbd_img_parent_read_callback;
2593 result = rbd_img_request_submit(img_request);
2594 if (result)
2595 goto out_err;
2596
2597 return;
2598out_err:
2599 if (img_request)
2600 rbd_img_request_put(img_request);
2601 obj_request->result = result;
2602 obj_request->xferred = 0;
2603 obj_request_done_set(obj_request);
2604}
bf0d5f50 2605
cc4a38bd 2606static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
b8d70035
AE
2607{
2608 struct rbd_obj_request *obj_request;
2169238d 2609 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
b8d70035
AE
2610 int ret;
2611
2612 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2613 OBJ_REQUEST_NODATA);
2614 if (!obj_request)
2615 return -ENOMEM;
2616
2617 ret = -ENOMEM;
430c28c3 2618 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
b8d70035
AE
2619 if (!obj_request->osd_req)
2620 goto out;
2169238d 2621 obj_request->callback = rbd_obj_request_put;
b8d70035 2622
c99d2d4a 2623 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
cc4a38bd 2624 notify_id, 0, 0);
9d4df01f 2625 rbd_osd_req_format_read(obj_request);
430c28c3 2626
b8d70035 2627 ret = rbd_obj_request_submit(osdc, obj_request);
b8d70035 2628out:
cf81b60e
AE
2629 if (ret)
2630 rbd_obj_request_put(obj_request);
b8d70035
AE
2631
2632 return ret;
2633}
2634
2635static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2636{
2637 struct rbd_device *rbd_dev = (struct rbd_device *)data;
e627db08 2638 int ret;
b8d70035
AE
2639
2640 if (!rbd_dev)
2641 return;
2642
37206ee5 2643 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
cc4a38bd
AE
2644 rbd_dev->header_name, (unsigned long long)notify_id,
2645 (unsigned int)opcode);
e627db08
AE
2646 ret = rbd_dev_refresh(rbd_dev);
2647 if (ret)
2648 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
b8d70035 2649
cc4a38bd 2650 rbd_obj_notify_ack(rbd_dev, notify_id);
b8d70035
AE
2651}
2652
9969ebc5
AE
2653/*
2654 * Request sync osd watch/unwatch. The value of "start" determines
2655 * whether a watch request is being initiated or torn down.
2656 */
1f3ef788 2657static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
9969ebc5
AE
2658{
2659 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2660 struct rbd_obj_request *obj_request;
9969ebc5
AE
2661 int ret;
2662
2663 rbd_assert(start ^ !!rbd_dev->watch_event);
2664 rbd_assert(start ^ !!rbd_dev->watch_request);
2665
2666 if (start) {
3c663bbd 2667 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
2668 &rbd_dev->watch_event);
2669 if (ret < 0)
2670 return ret;
8eb87565 2671 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
2672 }
2673
2674 ret = -ENOMEM;
2675 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2676 OBJ_REQUEST_NODATA);
2677 if (!obj_request)
2678 goto out_cancel;
2679
430c28c3
AE
2680 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2681 if (!obj_request->osd_req)
2682 goto out_cancel;
2683
8eb87565 2684 if (start)
975241af 2685 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 2686 else
6977c3f9 2687 ceph_osdc_unregister_linger_request(osdc,
975241af 2688 rbd_dev->watch_request->osd_req);
2169238d
AE
2689
2690 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1f3ef788 2691 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
9d4df01f 2692 rbd_osd_req_format_write(obj_request);
2169238d 2693
9969ebc5
AE
2694 ret = rbd_obj_request_submit(osdc, obj_request);
2695 if (ret)
2696 goto out_cancel;
2697 ret = rbd_obj_request_wait(obj_request);
2698 if (ret)
2699 goto out_cancel;
9969ebc5
AE
2700 ret = obj_request->result;
2701 if (ret)
2702 goto out_cancel;
2703
8eb87565
AE
2704 /*
2705 * A watch request is set to linger, so the underlying osd
2706 * request won't go away until we unregister it. We retain
2707 * a pointer to the object request during that time (in
2708 * rbd_dev->watch_request), so we'll keep a reference to
2709 * it. We'll drop that reference (below) after we've
2710 * unregistered it.
2711 */
2712 if (start) {
2713 rbd_dev->watch_request = obj_request;
2714
2715 return 0;
2716 }
2717
2718 /* We have successfully torn down the watch request */
2719
2720 rbd_obj_request_put(rbd_dev->watch_request);
2721 rbd_dev->watch_request = NULL;
9969ebc5
AE
2722out_cancel:
2723 /* Cancel the event if we're tearing down, or on error */
2724 ceph_osdc_cancel_event(rbd_dev->watch_event);
2725 rbd_dev->watch_event = NULL;
9969ebc5
AE
2726 if (obj_request)
2727 rbd_obj_request_put(obj_request);
2728
2729 return ret;
2730}
2731
36be9a76 2732/*
f40eb349
AE
2733 * Synchronous osd object method call. Returns the number of bytes
2734 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
2735 */
2736static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2737 const char *object_name,
2738 const char *class_name,
2739 const char *method_name,
4157976b 2740 const void *outbound,
36be9a76 2741 size_t outbound_size,
4157976b 2742 void *inbound,
e2a58ee5 2743 size_t inbound_size)
36be9a76 2744{
2169238d 2745 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 2746 struct rbd_obj_request *obj_request;
36be9a76
AE
2747 struct page **pages;
2748 u32 page_count;
2749 int ret;
2750
2751 /*
6010a451
AE
2752 * Method calls are ultimately read operations. The result
2753 * should placed into the inbound buffer provided. They
2754 * also supply outbound data--parameters for the object
2755 * method. Currently if this is present it will be a
2756 * snapshot id.
36be9a76 2757 */
57385b51 2758 page_count = (u32)calc_pages_for(0, inbound_size);
36be9a76
AE
2759 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2760 if (IS_ERR(pages))
2761 return PTR_ERR(pages);
2762
2763 ret = -ENOMEM;
6010a451 2764 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
2765 OBJ_REQUEST_PAGES);
2766 if (!obj_request)
2767 goto out;
2768
2769 obj_request->pages = pages;
2770 obj_request->page_count = page_count;
2771
430c28c3 2772 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
36be9a76
AE
2773 if (!obj_request->osd_req)
2774 goto out;
2775
c99d2d4a 2776 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
2777 class_name, method_name);
2778 if (outbound_size) {
2779 struct ceph_pagelist *pagelist;
2780
2781 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2782 if (!pagelist)
2783 goto out;
2784
2785 ceph_pagelist_init(pagelist);
2786 ceph_pagelist_append(pagelist, outbound, outbound_size);
2787 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2788 pagelist);
2789 }
a4ce40a9
AE
2790 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2791 obj_request->pages, inbound_size,
44cd188d 2792 0, false, false);
9d4df01f 2793 rbd_osd_req_format_read(obj_request);
430c28c3 2794
36be9a76
AE
2795 ret = rbd_obj_request_submit(osdc, obj_request);
2796 if (ret)
2797 goto out;
2798 ret = rbd_obj_request_wait(obj_request);
2799 if (ret)
2800 goto out;
2801
2802 ret = obj_request->result;
2803 if (ret < 0)
2804 goto out;
57385b51
AE
2805
2806 rbd_assert(obj_request->xferred < (u64)INT_MAX);
2807 ret = (int)obj_request->xferred;
903bb32e 2808 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
2809out:
2810 if (obj_request)
2811 rbd_obj_request_put(obj_request);
2812 else
2813 ceph_release_page_vector(pages, page_count);
2814
2815 return ret;
2816}
2817
bf0d5f50 2818static void rbd_request_fn(struct request_queue *q)
cc344fa1 2819 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
2820{
2821 struct rbd_device *rbd_dev = q->queuedata;
2822 bool read_only = rbd_dev->mapping.read_only;
2823 struct request *rq;
2824 int result;
2825
2826 while ((rq = blk_fetch_request(q))) {
2827 bool write_request = rq_data_dir(rq) == WRITE;
2828 struct rbd_img_request *img_request;
2829 u64 offset;
2830 u64 length;
2831
2832 /* Ignore any non-FS requests that filter through. */
2833
2834 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
2835 dout("%s: non-fs request type %d\n", __func__,
2836 (int) rq->cmd_type);
2837 __blk_end_request_all(rq, 0);
2838 continue;
2839 }
2840
2841 /* Ignore/skip any zero-length requests */
2842
2843 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2844 length = (u64) blk_rq_bytes(rq);
2845
2846 if (!length) {
2847 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
2848 __blk_end_request_all(rq, 0);
2849 continue;
2850 }
2851
2852 spin_unlock_irq(q->queue_lock);
2853
2854 /* Disallow writes to a read-only device */
2855
2856 if (write_request) {
2857 result = -EROFS;
2858 if (read_only)
2859 goto end_request;
2860 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2861 }
2862
6d292906
AE
2863 /*
2864 * Quit early if the mapped snapshot no longer
2865 * exists. It's still possible the snapshot will
2866 * have disappeared by the time our request arrives
2867 * at the osd, but there's no sense in sending it if
2868 * we already know.
2869 */
2870 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
2871 dout("request for non-existent snapshot");
2872 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2873 result = -ENXIO;
2874 goto end_request;
2875 }
2876
bf0d5f50 2877 result = -EINVAL;
c0cd10db
AE
2878 if (offset && length > U64_MAX - offset + 1) {
2879 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2880 offset, length);
bf0d5f50 2881 goto end_request; /* Shouldn't happen */
c0cd10db 2882 }
bf0d5f50 2883
00a653e2
AE
2884 result = -EIO;
2885 if (offset + length > rbd_dev->mapping.size) {
2886 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2887 offset, length, rbd_dev->mapping.size);
2888 goto end_request;
2889 }
2890
bf0d5f50
AE
2891 result = -ENOMEM;
2892 img_request = rbd_img_request_create(rbd_dev, offset, length,
9849e986 2893 write_request, false);
bf0d5f50
AE
2894 if (!img_request)
2895 goto end_request;
2896
2897 img_request->rq = rq;
2898
f1a4739f
AE
2899 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2900 rq->bio);
bf0d5f50
AE
2901 if (!result)
2902 result = rbd_img_request_submit(img_request);
2903 if (result)
2904 rbd_img_request_put(img_request);
2905end_request:
2906 spin_lock_irq(q->queue_lock);
2907 if (result < 0) {
7da22d29
AE
2908 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2909 write_request ? "write" : "read",
2910 length, offset, result);
2911
bf0d5f50
AE
2912 __blk_end_request_all(rq, result);
2913 }
2914 }
2915}
2916
602adf40
YS
2917/*
2918 * a queue callback. Makes sure that we don't create a bio that spans across
2919 * multiple osd objects. One exception would be with a single page bios,
f7760dad 2920 * which we handle later at bio_chain_clone_range()
602adf40
YS
2921 */
2922static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2923 struct bio_vec *bvec)
2924{
2925 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
2926 sector_t sector_offset;
2927 sector_t sectors_per_obj;
2928 sector_t obj_sector_offset;
2929 int ret;
2930
2931 /*
2932 * Find how far into its rbd object the partition-relative
2933 * bio start sector is to offset relative to the enclosing
2934 * device.
2935 */
2936 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2937 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2938 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2939
2940 /*
2941 * Compute the number of bytes from that offset to the end
2942 * of the object. Account for what's already used by the bio.
2943 */
2944 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2945 if (ret > bmd->bi_size)
2946 ret -= bmd->bi_size;
2947 else
2948 ret = 0;
2949
2950 /*
2951 * Don't send back more than was asked for. And if the bio
2952 * was empty, let the whole thing through because: "Note
2953 * that a block device *must* allow a single page to be
2954 * added to an empty bio."
2955 */
2956 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2957 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2958 ret = (int) bvec->bv_len;
2959
2960 return ret;
602adf40
YS
2961}
2962
2963static void rbd_free_disk(struct rbd_device *rbd_dev)
2964{
2965 struct gendisk *disk = rbd_dev->disk;
2966
2967 if (!disk)
2968 return;
2969
a0cab924
AE
2970 rbd_dev->disk = NULL;
2971 if (disk->flags & GENHD_FL_UP) {
602adf40 2972 del_gendisk(disk);
a0cab924
AE
2973 if (disk->queue)
2974 blk_cleanup_queue(disk->queue);
2975 }
602adf40
YS
2976 put_disk(disk);
2977}
2978
788e2df3
AE
2979static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2980 const char *object_name,
7097f8df 2981 u64 offset, u64 length, void *buf)
788e2df3
AE
2982
2983{
2169238d 2984 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 2985 struct rbd_obj_request *obj_request;
788e2df3
AE
2986 struct page **pages = NULL;
2987 u32 page_count;
1ceae7ef 2988 size_t size;
788e2df3
AE
2989 int ret;
2990
2991 page_count = (u32) calc_pages_for(offset, length);
2992 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2993 if (IS_ERR(pages))
2994 ret = PTR_ERR(pages);
2995
2996 ret = -ENOMEM;
2997 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 2998 OBJ_REQUEST_PAGES);
788e2df3
AE
2999 if (!obj_request)
3000 goto out;
3001
3002 obj_request->pages = pages;
3003 obj_request->page_count = page_count;
3004
430c28c3 3005 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
788e2df3
AE
3006 if (!obj_request->osd_req)
3007 goto out;
3008
c99d2d4a
AE
3009 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3010 offset, length, 0, 0);
406e2c9f 3011 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 3012 obj_request->pages,
44cd188d
AE
3013 obj_request->length,
3014 obj_request->offset & ~PAGE_MASK,
3015 false, false);
9d4df01f 3016 rbd_osd_req_format_read(obj_request);
430c28c3 3017
788e2df3
AE
3018 ret = rbd_obj_request_submit(osdc, obj_request);
3019 if (ret)
3020 goto out;
3021 ret = rbd_obj_request_wait(obj_request);
3022 if (ret)
3023 goto out;
3024
3025 ret = obj_request->result;
3026 if (ret < 0)
3027 goto out;
1ceae7ef
AE
3028
3029 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3030 size = (size_t) obj_request->xferred;
903bb32e 3031 ceph_copy_from_page_vector(pages, buf, 0, size);
7097f8df
AE
3032 rbd_assert(size <= (size_t)INT_MAX);
3033 ret = (int)size;
788e2df3
AE
3034out:
3035 if (obj_request)
3036 rbd_obj_request_put(obj_request);
3037 else
3038 ceph_release_page_vector(pages, page_count);
3039
3040 return ret;
3041}
3042
602adf40 3043/*
662518b1
AE
3044 * Read the complete header for the given rbd device. On successful
3045 * return, the rbd_dev->header field will contain up-to-date
3046 * information about the image.
602adf40 3047 */
99a41ebc 3048static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 3049{
4156d998 3050 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 3051 u32 snap_count = 0;
4156d998
AE
3052 u64 names_size = 0;
3053 u32 want_count;
3054 int ret;
602adf40 3055
00f1f36f 3056 /*
4156d998
AE
3057 * The complete header will include an array of its 64-bit
3058 * snapshot ids, followed by the names of those snapshots as
3059 * a contiguous block of NUL-terminated strings. Note that
3060 * the number of snapshots could change by the time we read
3061 * it in, in which case we re-read it.
00f1f36f 3062 */
4156d998
AE
3063 do {
3064 size_t size;
3065
3066 kfree(ondisk);
3067
3068 size = sizeof (*ondisk);
3069 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3070 size += names_size;
3071 ondisk = kmalloc(size, GFP_KERNEL);
3072 if (!ondisk)
662518b1 3073 return -ENOMEM;
4156d998 3074
788e2df3 3075 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
7097f8df 3076 0, size, ondisk);
4156d998 3077 if (ret < 0)
662518b1 3078 goto out;
c0cd10db 3079 if ((size_t)ret < size) {
4156d998 3080 ret = -ENXIO;
06ecc6cb
AE
3081 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3082 size, ret);
662518b1 3083 goto out;
4156d998
AE
3084 }
3085 if (!rbd_dev_ondisk_valid(ondisk)) {
3086 ret = -ENXIO;
06ecc6cb 3087 rbd_warn(rbd_dev, "invalid header");
662518b1 3088 goto out;
81e759fb 3089 }
602adf40 3090
4156d998
AE
3091 names_size = le64_to_cpu(ondisk->snap_names_len);
3092 want_count = snap_count;
3093 snap_count = le32_to_cpu(ondisk->snap_count);
3094 } while (snap_count != want_count);
00f1f36f 3095
662518b1
AE
3096 ret = rbd_header_from_disk(rbd_dev, ondisk);
3097out:
4156d998
AE
3098 kfree(ondisk);
3099
3100 return ret;
602adf40
YS
3101}
3102
15228ede
AE
3103/*
3104 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3105 * has disappeared from the (just updated) snapshot context.
3106 */
3107static void rbd_exists_validate(struct rbd_device *rbd_dev)
3108{
3109 u64 snap_id;
3110
3111 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3112 return;
3113
3114 snap_id = rbd_dev->spec->snap_id;
3115 if (snap_id == CEPH_NOSNAP)
3116 return;
3117
3118 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3119 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3120}
3121
cc4a38bd 3122static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 3123{
e627db08 3124 u64 mapping_size;
1fe5e993
AE
3125 int ret;
3126
117973fb 3127 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
e627db08 3128 mapping_size = rbd_dev->mapping.size;
1fe5e993 3129 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
117973fb 3130 if (rbd_dev->image_format == 1)
99a41ebc 3131 ret = rbd_dev_v1_header_info(rbd_dev);
117973fb 3132 else
2df3fac7 3133 ret = rbd_dev_v2_header_info(rbd_dev);
15228ede
AE
3134
3135 /* If it's a mapped snapshot, validate its EXISTS flag */
3136
3137 rbd_exists_validate(rbd_dev);
1fe5e993 3138 mutex_unlock(&ctl_mutex);
00a653e2
AE
3139 if (mapping_size != rbd_dev->mapping.size) {
3140 sector_t size;
3141
3142 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3143 dout("setting size to %llu sectors", (unsigned long long)size);
3144 set_capacity(rbd_dev->disk, size);
a3fbe5d4 3145 revalidate_disk(rbd_dev->disk);
00a653e2 3146 }
1fe5e993
AE
3147
3148 return ret;
3149}
3150
602adf40
YS
3151static int rbd_init_disk(struct rbd_device *rbd_dev)
3152{
3153 struct gendisk *disk;
3154 struct request_queue *q;
593a9e7b 3155 u64 segment_size;
602adf40 3156
602adf40 3157 /* create gendisk info */
602adf40
YS
3158 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3159 if (!disk)
1fcdb8aa 3160 return -ENOMEM;
602adf40 3161
f0f8cef5 3162 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3163 rbd_dev->dev_id);
602adf40
YS
3164 disk->major = rbd_dev->major;
3165 disk->first_minor = 0;
3166 disk->fops = &rbd_bd_ops;
3167 disk->private_data = rbd_dev;
3168
bf0d5f50 3169 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
3170 if (!q)
3171 goto out_disk;
029bcbd8 3172
593a9e7b
AE
3173 /* We use the default size, but let's be explicit about it. */
3174 blk_queue_physical_block_size(q, SECTOR_SIZE);
3175
029bcbd8 3176 /* set io sizes to object size */
593a9e7b
AE
3177 segment_size = rbd_obj_bytes(&rbd_dev->header);
3178 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3179 blk_queue_max_segment_size(q, segment_size);
3180 blk_queue_io_min(q, segment_size);
3181 blk_queue_io_opt(q, segment_size);
029bcbd8 3182
602adf40
YS
3183 blk_queue_merge_bvec(q, rbd_merge_bvec);
3184 disk->queue = q;
3185
3186 q->queuedata = rbd_dev;
3187
3188 rbd_dev->disk = disk;
602adf40 3189
602adf40 3190 return 0;
602adf40
YS
3191out_disk:
3192 put_disk(disk);
1fcdb8aa
AE
3193
3194 return -ENOMEM;
602adf40
YS
3195}
3196
dfc5606d
YS
3197/*
3198 sysfs
3199*/
3200
593a9e7b
AE
3201static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3202{
3203 return container_of(dev, struct rbd_device, dev);
3204}
3205
dfc5606d
YS
3206static ssize_t rbd_size_show(struct device *dev,
3207 struct device_attribute *attr, char *buf)
3208{
593a9e7b 3209 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 3210
fc71d833
AE
3211 return sprintf(buf, "%llu\n",
3212 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
3213}
3214
34b13184
AE
3215/*
3216 * Note this shows the features for whatever's mapped, which is not
3217 * necessarily the base image.
3218 */
3219static ssize_t rbd_features_show(struct device *dev,
3220 struct device_attribute *attr, char *buf)
3221{
3222 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3223
3224 return sprintf(buf, "0x%016llx\n",
fc71d833 3225 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
3226}
3227
dfc5606d
YS
3228static ssize_t rbd_major_show(struct device *dev,
3229 struct device_attribute *attr, char *buf)
3230{
593a9e7b 3231 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 3232
fc71d833
AE
3233 if (rbd_dev->major)
3234 return sprintf(buf, "%d\n", rbd_dev->major);
3235
3236 return sprintf(buf, "(none)\n");
3237
dfc5606d
YS
3238}
3239
3240static ssize_t rbd_client_id_show(struct device *dev,
3241 struct device_attribute *attr, char *buf)
602adf40 3242{
593a9e7b 3243 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3244
1dbb4399
AE
3245 return sprintf(buf, "client%lld\n",
3246 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
3247}
3248
dfc5606d
YS
3249static ssize_t rbd_pool_show(struct device *dev,
3250 struct device_attribute *attr, char *buf)
602adf40 3251{
593a9e7b 3252 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3253
0d7dbfce 3254 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
3255}
3256
9bb2f334
AE
3257static ssize_t rbd_pool_id_show(struct device *dev,
3258 struct device_attribute *attr, char *buf)
3259{
3260 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3261
0d7dbfce 3262 return sprintf(buf, "%llu\n",
fc71d833 3263 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
3264}
3265
dfc5606d
YS
3266static ssize_t rbd_name_show(struct device *dev,
3267 struct device_attribute *attr, char *buf)
3268{
593a9e7b 3269 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3270
a92ffdf8
AE
3271 if (rbd_dev->spec->image_name)
3272 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3273
3274 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
3275}
3276
589d30e0
AE
3277static ssize_t rbd_image_id_show(struct device *dev,
3278 struct device_attribute *attr, char *buf)
3279{
3280 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3281
0d7dbfce 3282 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
3283}
3284
34b13184
AE
3285/*
3286 * Shows the name of the currently-mapped snapshot (or
3287 * RBD_SNAP_HEAD_NAME for the base image).
3288 */
dfc5606d
YS
3289static ssize_t rbd_snap_show(struct device *dev,
3290 struct device_attribute *attr,
3291 char *buf)
3292{
593a9e7b 3293 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3294
0d7dbfce 3295 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
3296}
3297
86b00e0d
AE
3298/*
3299 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3300 * for the parent image. If there is no parent, simply shows
3301 * "(no parent image)".
3302 */
3303static ssize_t rbd_parent_show(struct device *dev,
3304 struct device_attribute *attr,
3305 char *buf)
3306{
3307 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3308 struct rbd_spec *spec = rbd_dev->parent_spec;
3309 int count;
3310 char *bufp = buf;
3311
3312 if (!spec)
3313 return sprintf(buf, "(no parent image)\n");
3314
3315 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3316 (unsigned long long) spec->pool_id, spec->pool_name);
3317 if (count < 0)
3318 return count;
3319 bufp += count;
3320
3321 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3322 spec->image_name ? spec->image_name : "(unknown)");
3323 if (count < 0)
3324 return count;
3325 bufp += count;
3326
3327 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3328 (unsigned long long) spec->snap_id, spec->snap_name);
3329 if (count < 0)
3330 return count;
3331 bufp += count;
3332
3333 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3334 if (count < 0)
3335 return count;
3336 bufp += count;
3337
3338 return (ssize_t) (bufp - buf);
3339}
3340
dfc5606d
YS
3341static ssize_t rbd_image_refresh(struct device *dev,
3342 struct device_attribute *attr,
3343 const char *buf,
3344 size_t size)
3345{
593a9e7b 3346 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 3347 int ret;
602adf40 3348
cc4a38bd 3349 ret = rbd_dev_refresh(rbd_dev);
e627db08
AE
3350 if (ret)
3351 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
b813623a
AE
3352
3353 return ret < 0 ? ret : size;
dfc5606d 3354}
602adf40 3355
dfc5606d 3356static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 3357static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
3358static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3359static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3360static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 3361static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 3362static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 3363static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
3364static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3365static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 3366static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
3367
3368static struct attribute *rbd_attrs[] = {
3369 &dev_attr_size.attr,
34b13184 3370 &dev_attr_features.attr,
dfc5606d
YS
3371 &dev_attr_major.attr,
3372 &dev_attr_client_id.attr,
3373 &dev_attr_pool.attr,
9bb2f334 3374 &dev_attr_pool_id.attr,
dfc5606d 3375 &dev_attr_name.attr,
589d30e0 3376 &dev_attr_image_id.attr,
dfc5606d 3377 &dev_attr_current_snap.attr,
86b00e0d 3378 &dev_attr_parent.attr,
dfc5606d 3379 &dev_attr_refresh.attr,
dfc5606d
YS
3380 NULL
3381};
3382
3383static struct attribute_group rbd_attr_group = {
3384 .attrs = rbd_attrs,
3385};
3386
3387static const struct attribute_group *rbd_attr_groups[] = {
3388 &rbd_attr_group,
3389 NULL
3390};
3391
3392static void rbd_sysfs_dev_release(struct device *dev)
3393{
3394}
3395
3396static struct device_type rbd_device_type = {
3397 .name = "rbd",
3398 .groups = rbd_attr_groups,
3399 .release = rbd_sysfs_dev_release,
3400};
3401
8b8fb99c
AE
3402static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3403{
3404 kref_get(&spec->kref);
3405
3406 return spec;
3407}
3408
3409static void rbd_spec_free(struct kref *kref);
3410static void rbd_spec_put(struct rbd_spec *spec)
3411{
3412 if (spec)
3413 kref_put(&spec->kref, rbd_spec_free);
3414}
3415
3416static struct rbd_spec *rbd_spec_alloc(void)
3417{
3418 struct rbd_spec *spec;
3419
3420 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3421 if (!spec)
3422 return NULL;
3423 kref_init(&spec->kref);
3424
8b8fb99c
AE
3425 return spec;
3426}
3427
3428static void rbd_spec_free(struct kref *kref)
3429{
3430 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3431
3432 kfree(spec->pool_name);
3433 kfree(spec->image_id);
3434 kfree(spec->image_name);
3435 kfree(spec->snap_name);
3436 kfree(spec);
3437}
3438
cc344fa1 3439static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
3440 struct rbd_spec *spec)
3441{
3442 struct rbd_device *rbd_dev;
3443
3444 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3445 if (!rbd_dev)
3446 return NULL;
3447
3448 spin_lock_init(&rbd_dev->lock);
6d292906 3449 rbd_dev->flags = 0;
c53d5893 3450 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
3451 init_rwsem(&rbd_dev->header_rwsem);
3452
3453 rbd_dev->spec = spec;
3454 rbd_dev->rbd_client = rbdc;
3455
0903e875
AE
3456 /* Initialize the layout used for all rbd requests */
3457
3458 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3459 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3460 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3461 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3462
c53d5893
AE
3463 return rbd_dev;
3464}
3465
3466static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3467{
c53d5893
AE
3468 rbd_put_client(rbd_dev->rbd_client);
3469 rbd_spec_put(rbd_dev->spec);
3470 kfree(rbd_dev);
3471}
3472
9d475de5
AE
3473/*
3474 * Get the size and object order for an image snapshot, or if
3475 * snap_id is CEPH_NOSNAP, gets this information for the base
3476 * image.
3477 */
3478static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3479 u8 *order, u64 *snap_size)
3480{
3481 __le64 snapid = cpu_to_le64(snap_id);
3482 int ret;
3483 struct {
3484 u8 order;
3485 __le64 size;
3486 } __attribute__ ((packed)) size_buf = { 0 };
3487
36be9a76 3488 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5 3489 "rbd", "get_size",
4157976b 3490 &snapid, sizeof (snapid),
e2a58ee5 3491 &size_buf, sizeof (size_buf));
36be9a76 3492 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
3493 if (ret < 0)
3494 return ret;
57385b51
AE
3495 if (ret < sizeof (size_buf))
3496 return -ERANGE;
9d475de5 3497
c86f86e9
AE
3498 if (order)
3499 *order = size_buf.order;
9d475de5
AE
3500 *snap_size = le64_to_cpu(size_buf.size);
3501
3502 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
57385b51
AE
3503 (unsigned long long)snap_id, (unsigned int)*order,
3504 (unsigned long long)*snap_size);
9d475de5
AE
3505
3506 return 0;
3507}
3508
3509static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3510{
3511 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3512 &rbd_dev->header.obj_order,
3513 &rbd_dev->header.image_size);
3514}
3515
1e130199
AE
3516static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3517{
3518 void *reply_buf;
3519 int ret;
3520 void *p;
3521
3522 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3523 if (!reply_buf)
3524 return -ENOMEM;
3525
36be9a76 3526 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3527 "rbd", "get_object_prefix", NULL, 0,
e2a58ee5 3528 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 3529 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
3530 if (ret < 0)
3531 goto out;
3532
3533 p = reply_buf;
3534 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
3535 p + ret, NULL, GFP_NOIO);
3536 ret = 0;
1e130199
AE
3537
3538 if (IS_ERR(rbd_dev->header.object_prefix)) {
3539 ret = PTR_ERR(rbd_dev->header.object_prefix);
3540 rbd_dev->header.object_prefix = NULL;
3541 } else {
3542 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3543 }
1e130199
AE
3544out:
3545 kfree(reply_buf);
3546
3547 return ret;
3548}
3549
b1b5402a
AE
3550static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3551 u64 *snap_features)
3552{
3553 __le64 snapid = cpu_to_le64(snap_id);
3554 struct {
3555 __le64 features;
3556 __le64 incompat;
4157976b 3557 } __attribute__ ((packed)) features_buf = { 0 };
d889140c 3558 u64 incompat;
b1b5402a
AE
3559 int ret;
3560
36be9a76 3561 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a 3562 "rbd", "get_features",
4157976b 3563 &snapid, sizeof (snapid),
e2a58ee5 3564 &features_buf, sizeof (features_buf));
36be9a76 3565 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
3566 if (ret < 0)
3567 return ret;
57385b51
AE
3568 if (ret < sizeof (features_buf))
3569 return -ERANGE;
d889140c
AE
3570
3571 incompat = le64_to_cpu(features_buf.incompat);
5cbf6f12 3572 if (incompat & ~RBD_FEATURES_SUPPORTED)
b8f5c6ed 3573 return -ENXIO;
d889140c 3574
b1b5402a
AE
3575 *snap_features = le64_to_cpu(features_buf.features);
3576
3577 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
3578 (unsigned long long)snap_id,
3579 (unsigned long long)*snap_features,
3580 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
3581
3582 return 0;
3583}
3584
3585static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3586{
3587 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3588 &rbd_dev->header.features);
3589}
3590
86b00e0d
AE
3591static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3592{
3593 struct rbd_spec *parent_spec;
3594 size_t size;
3595 void *reply_buf = NULL;
3596 __le64 snapid;
3597 void *p;
3598 void *end;
3599 char *image_id;
3600 u64 overlap;
86b00e0d
AE
3601 int ret;
3602
3603 parent_spec = rbd_spec_alloc();
3604 if (!parent_spec)
3605 return -ENOMEM;
3606
3607 size = sizeof (__le64) + /* pool_id */
3608 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3609 sizeof (__le64) + /* snap_id */
3610 sizeof (__le64); /* overlap */
3611 reply_buf = kmalloc(size, GFP_KERNEL);
3612 if (!reply_buf) {
3613 ret = -ENOMEM;
3614 goto out_err;
3615 }
3616
3617 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 3618 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d 3619 "rbd", "get_parent",
4157976b 3620 &snapid, sizeof (snapid),
e2a58ee5 3621 reply_buf, size);
36be9a76 3622 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
3623 if (ret < 0)
3624 goto out_err;
3625
86b00e0d 3626 p = reply_buf;
57385b51
AE
3627 end = reply_buf + ret;
3628 ret = -ERANGE;
86b00e0d
AE
3629 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3630 if (parent_spec->pool_id == CEPH_NOPOOL)
3631 goto out; /* No parent? No problem. */
3632
0903e875
AE
3633 /* The ceph file layout needs to fit pool id in 32 bits */
3634
3635 ret = -EIO;
c0cd10db
AE
3636 if (parent_spec->pool_id > (u64)U32_MAX) {
3637 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3638 (unsigned long long)parent_spec->pool_id, U32_MAX);
57385b51 3639 goto out_err;
c0cd10db 3640 }
0903e875 3641
979ed480 3642 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
3643 if (IS_ERR(image_id)) {
3644 ret = PTR_ERR(image_id);
3645 goto out_err;
3646 }
3647 parent_spec->image_id = image_id;
3648 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3649 ceph_decode_64_safe(&p, end, overlap, out_err);
3650
3651 rbd_dev->parent_overlap = overlap;
3652 rbd_dev->parent_spec = parent_spec;
3653 parent_spec = NULL; /* rbd_dev now owns this */
3654out:
3655 ret = 0;
3656out_err:
3657 kfree(reply_buf);
3658 rbd_spec_put(parent_spec);
3659
3660 return ret;
3661}
3662
cc070d59
AE
3663static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3664{
3665 struct {
3666 __le64 stripe_unit;
3667 __le64 stripe_count;
3668 } __attribute__ ((packed)) striping_info_buf = { 0 };
3669 size_t size = sizeof (striping_info_buf);
3670 void *p;
3671 u64 obj_size;
3672 u64 stripe_unit;
3673 u64 stripe_count;
3674 int ret;
3675
3676 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3677 "rbd", "get_stripe_unit_count", NULL, 0,
e2a58ee5 3678 (char *)&striping_info_buf, size);
cc070d59
AE
3679 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3680 if (ret < 0)
3681 return ret;
3682 if (ret < size)
3683 return -ERANGE;
3684
3685 /*
3686 * We don't actually support the "fancy striping" feature
3687 * (STRIPINGV2) yet, but if the striping sizes are the
3688 * defaults the behavior is the same as before. So find
3689 * out, and only fail if the image has non-default values.
3690 */
3691 ret = -EINVAL;
3692 obj_size = (u64)1 << rbd_dev->header.obj_order;
3693 p = &striping_info_buf;
3694 stripe_unit = ceph_decode_64(&p);
3695 if (stripe_unit != obj_size) {
3696 rbd_warn(rbd_dev, "unsupported stripe unit "
3697 "(got %llu want %llu)",
3698 stripe_unit, obj_size);
3699 return -EINVAL;
3700 }
3701 stripe_count = ceph_decode_64(&p);
3702 if (stripe_count != 1) {
3703 rbd_warn(rbd_dev, "unsupported stripe count "
3704 "(got %llu want 1)", stripe_count);
3705 return -EINVAL;
3706 }
500d0c0f
AE
3707 rbd_dev->header.stripe_unit = stripe_unit;
3708 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
3709
3710 return 0;
3711}
3712
9e15b77d
AE
3713static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3714{
3715 size_t image_id_size;
3716 char *image_id;
3717 void *p;
3718 void *end;
3719 size_t size;
3720 void *reply_buf = NULL;
3721 size_t len = 0;
3722 char *image_name = NULL;
3723 int ret;
3724
3725 rbd_assert(!rbd_dev->spec->image_name);
3726
69e7a02f
AE
3727 len = strlen(rbd_dev->spec->image_id);
3728 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
3729 image_id = kmalloc(image_id_size, GFP_KERNEL);
3730 if (!image_id)
3731 return NULL;
3732
3733 p = image_id;
4157976b 3734 end = image_id + image_id_size;
57385b51 3735 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
3736
3737 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3738 reply_buf = kmalloc(size, GFP_KERNEL);
3739 if (!reply_buf)
3740 goto out;
3741
36be9a76 3742 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
3743 "rbd", "dir_get_name",
3744 image_id, image_id_size,
e2a58ee5 3745 reply_buf, size);
9e15b77d
AE
3746 if (ret < 0)
3747 goto out;
3748 p = reply_buf;
f40eb349
AE
3749 end = reply_buf + ret;
3750
9e15b77d
AE
3751 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3752 if (IS_ERR(image_name))
3753 image_name = NULL;
3754 else
3755 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3756out:
3757 kfree(reply_buf);
3758 kfree(image_id);
3759
3760 return image_name;
3761}
3762
2ad3d716
AE
3763static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3764{
3765 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3766 const char *snap_name;
3767 u32 which = 0;
3768
3769 /* Skip over names until we find the one we are looking for */
3770
3771 snap_name = rbd_dev->header.snap_names;
3772 while (which < snapc->num_snaps) {
3773 if (!strcmp(name, snap_name))
3774 return snapc->snaps[which];
3775 snap_name += strlen(snap_name) + 1;
3776 which++;
3777 }
3778 return CEPH_NOSNAP;
3779}
3780
3781static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3782{
3783 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3784 u32 which;
3785 bool found = false;
3786 u64 snap_id;
3787
3788 for (which = 0; !found && which < snapc->num_snaps; which++) {
3789 const char *snap_name;
3790
3791 snap_id = snapc->snaps[which];
3792 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3793 if (IS_ERR(snap_name))
3794 break;
3795 found = !strcmp(name, snap_name);
3796 kfree(snap_name);
3797 }
3798 return found ? snap_id : CEPH_NOSNAP;
3799}
3800
3801/*
3802 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3803 * no snapshot by that name is found, or if an error occurs.
3804 */
3805static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3806{
3807 if (rbd_dev->image_format == 1)
3808 return rbd_v1_snap_id_by_name(rbd_dev, name);
3809
3810 return rbd_v2_snap_id_by_name(rbd_dev, name);
3811}
3812
9e15b77d 3813/*
2e9f7f1c
AE
3814 * When an rbd image has a parent image, it is identified by the
3815 * pool, image, and snapshot ids (not names). This function fills
3816 * in the names for those ids. (It's OK if we can't figure out the
3817 * name for an image id, but the pool and snapshot ids should always
3818 * exist and have names.) All names in an rbd spec are dynamically
3819 * allocated.
e1d4213f
AE
3820 *
3821 * When an image being mapped (not a parent) is probed, we have the
3822 * pool name and pool id, image name and image id, and the snapshot
3823 * name. The only thing we're missing is the snapshot id.
9e15b77d 3824 */
2e9f7f1c 3825static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
9e15b77d 3826{
2e9f7f1c
AE
3827 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3828 struct rbd_spec *spec = rbd_dev->spec;
3829 const char *pool_name;
3830 const char *image_name;
3831 const char *snap_name;
9e15b77d
AE
3832 int ret;
3833
e1d4213f
AE
3834 /*
3835 * An image being mapped will have the pool name (etc.), but
3836 * we need to look up the snapshot id.
3837 */
2e9f7f1c
AE
3838 if (spec->pool_name) {
3839 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
2ad3d716 3840 u64 snap_id;
e1d4213f 3841
2ad3d716
AE
3842 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3843 if (snap_id == CEPH_NOSNAP)
e1d4213f 3844 return -ENOENT;
2ad3d716 3845 spec->snap_id = snap_id;
e1d4213f 3846 } else {
2e9f7f1c 3847 spec->snap_id = CEPH_NOSNAP;
e1d4213f
AE
3848 }
3849
3850 return 0;
3851 }
9e15b77d 3852
2e9f7f1c 3853 /* Get the pool name; we have to make our own copy of this */
9e15b77d 3854
2e9f7f1c
AE
3855 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3856 if (!pool_name) {
3857 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
3858 return -EIO;
3859 }
2e9f7f1c
AE
3860 pool_name = kstrdup(pool_name, GFP_KERNEL);
3861 if (!pool_name)
9e15b77d
AE
3862 return -ENOMEM;
3863
3864 /* Fetch the image name; tolerate failure here */
3865
2e9f7f1c
AE
3866 image_name = rbd_dev_image_name(rbd_dev);
3867 if (!image_name)
06ecc6cb 3868 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 3869
2e9f7f1c 3870 /* Look up the snapshot name, and make a copy */
9e15b77d 3871
2e9f7f1c 3872 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
2e9f7f1c
AE
3873 if (!snap_name) {
3874 ret = -ENOMEM;
9e15b77d 3875 goto out_err;
2e9f7f1c
AE
3876 }
3877
3878 spec->pool_name = pool_name;
3879 spec->image_name = image_name;
3880 spec->snap_name = snap_name;
9e15b77d
AE
3881
3882 return 0;
3883out_err:
2e9f7f1c
AE
3884 kfree(image_name);
3885 kfree(pool_name);
9e15b77d
AE
3886
3887 return ret;
3888}
3889
cc4a38bd 3890static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
3891{
3892 size_t size;
3893 int ret;
3894 void *reply_buf;
3895 void *p;
3896 void *end;
3897 u64 seq;
3898 u32 snap_count;
3899 struct ceph_snap_context *snapc;
3900 u32 i;
3901
3902 /*
3903 * We'll need room for the seq value (maximum snapshot id),
3904 * snapshot count, and array of that many snapshot ids.
3905 * For now we have a fixed upper limit on the number we're
3906 * prepared to receive.
3907 */
3908 size = sizeof (__le64) + sizeof (__le32) +
3909 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3910 reply_buf = kzalloc(size, GFP_KERNEL);
3911 if (!reply_buf)
3912 return -ENOMEM;
3913
36be9a76 3914 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3915 "rbd", "get_snapcontext", NULL, 0,
e2a58ee5 3916 reply_buf, size);
36be9a76 3917 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
3918 if (ret < 0)
3919 goto out;
3920
35d489f9 3921 p = reply_buf;
57385b51
AE
3922 end = reply_buf + ret;
3923 ret = -ERANGE;
35d489f9
AE
3924 ceph_decode_64_safe(&p, end, seq, out);
3925 ceph_decode_32_safe(&p, end, snap_count, out);
3926
3927 /*
3928 * Make sure the reported number of snapshot ids wouldn't go
3929 * beyond the end of our buffer. But before checking that,
3930 * make sure the computed size of the snapshot context we
3931 * allocate is representable in a size_t.
3932 */
3933 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3934 / sizeof (u64)) {
3935 ret = -EINVAL;
3936 goto out;
3937 }
3938 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3939 goto out;
468521c1 3940 ret = 0;
35d489f9 3941
812164f8 3942 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
3943 if (!snapc) {
3944 ret = -ENOMEM;
3945 goto out;
3946 }
35d489f9 3947 snapc->seq = seq;
35d489f9
AE
3948 for (i = 0; i < snap_count; i++)
3949 snapc->snaps[i] = ceph_decode_64(&p);
3950
49ece554 3951 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
3952 rbd_dev->header.snapc = snapc;
3953
3954 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 3955 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
3956out:
3957 kfree(reply_buf);
3958
57385b51 3959 return ret;
35d489f9
AE
3960}
3961
54cac61f
AE
3962static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
3963 u64 snap_id)
b8b1e2db
AE
3964{
3965 size_t size;
3966 void *reply_buf;
54cac61f 3967 __le64 snapid;
b8b1e2db
AE
3968 int ret;
3969 void *p;
3970 void *end;
b8b1e2db
AE
3971 char *snap_name;
3972
3973 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3974 reply_buf = kmalloc(size, GFP_KERNEL);
3975 if (!reply_buf)
3976 return ERR_PTR(-ENOMEM);
3977
54cac61f 3978 snapid = cpu_to_le64(snap_id);
36be9a76 3979 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db 3980 "rbd", "get_snapshot_name",
54cac61f 3981 &snapid, sizeof (snapid),
e2a58ee5 3982 reply_buf, size);
36be9a76 3983 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
3984 if (ret < 0) {
3985 snap_name = ERR_PTR(ret);
b8b1e2db 3986 goto out;
f40eb349 3987 }
b8b1e2db
AE
3988
3989 p = reply_buf;
f40eb349 3990 end = reply_buf + ret;
e5c35534 3991 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 3992 if (IS_ERR(snap_name))
b8b1e2db 3993 goto out;
b8b1e2db 3994
f40eb349 3995 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 3996 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
3997out:
3998 kfree(reply_buf);
3999
f40eb349 4000 return snap_name;
b8b1e2db
AE
4001}
4002
2df3fac7 4003static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 4004{
2df3fac7 4005 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 4006 int ret;
117973fb
AE
4007
4008 down_write(&rbd_dev->header_rwsem);
4009
2df3fac7
AE
4010 if (first_time) {
4011 ret = rbd_dev_v2_header_onetime(rbd_dev);
4012 if (ret)
4013 goto out;
4014 }
4015
117973fb
AE
4016 ret = rbd_dev_v2_image_size(rbd_dev);
4017 if (ret)
4018 goto out;
29334ba4
AE
4019 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4020 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4021 rbd_dev->mapping.size = rbd_dev->header.image_size;
117973fb 4022
cc4a38bd 4023 ret = rbd_dev_v2_snap_context(rbd_dev);
117973fb
AE
4024 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4025 if (ret)
4026 goto out;
117973fb
AE
4027out:
4028 up_write(&rbd_dev->header_rwsem);
4029
4030 return ret;
4031}
4032
dfc5606d
YS
4033static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4034{
dfc5606d 4035 struct device *dev;
cd789ab9 4036 int ret;
dfc5606d
YS
4037
4038 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
dfc5606d 4039
cd789ab9 4040 dev = &rbd_dev->dev;
dfc5606d
YS
4041 dev->bus = &rbd_bus_type;
4042 dev->type = &rbd_device_type;
4043 dev->parent = &rbd_root_dev;
200a6a8b 4044 dev->release = rbd_dev_device_release;
de71a297 4045 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 4046 ret = device_register(dev);
dfc5606d 4047
dfc5606d 4048 mutex_unlock(&ctl_mutex);
cd789ab9 4049
dfc5606d 4050 return ret;
602adf40
YS
4051}
4052
dfc5606d
YS
4053static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4054{
4055 device_unregister(&rbd_dev->dev);
4056}
4057
e2839308 4058static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
4059
4060/*
499afd5b
AE
4061 * Get a unique rbd identifier for the given new rbd_dev, and add
4062 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 4063 */
e2839308 4064static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 4065{
e2839308 4066 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
4067
4068 spin_lock(&rbd_dev_list_lock);
4069 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4070 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
4071 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4072 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 4073}
b7f23c36 4074
1ddbe94e 4075/*
499afd5b
AE
4076 * Remove an rbd_dev from the global list, and record that its
4077 * identifier is no longer in use.
1ddbe94e 4078 */
e2839308 4079static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 4080{
d184f6bf 4081 struct list_head *tmp;
de71a297 4082 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
4083 int max_id;
4084
aafb230e 4085 rbd_assert(rbd_id > 0);
499afd5b 4086
e2839308
AE
4087 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4088 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
4089 spin_lock(&rbd_dev_list_lock);
4090 list_del_init(&rbd_dev->node);
d184f6bf
AE
4091
4092 /*
4093 * If the id being "put" is not the current maximum, there
4094 * is nothing special we need to do.
4095 */
e2839308 4096 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
4097 spin_unlock(&rbd_dev_list_lock);
4098 return;
4099 }
4100
4101 /*
4102 * We need to update the current maximum id. Search the
4103 * list to find out what it is. We're more likely to find
4104 * the maximum at the end, so search the list backward.
4105 */
4106 max_id = 0;
4107 list_for_each_prev(tmp, &rbd_dev_list) {
4108 struct rbd_device *rbd_dev;
4109
4110 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
4111 if (rbd_dev->dev_id > max_id)
4112 max_id = rbd_dev->dev_id;
d184f6bf 4113 }
499afd5b 4114 spin_unlock(&rbd_dev_list_lock);
b7f23c36 4115
1ddbe94e 4116 /*
e2839308 4117 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
4118 * which case it now accurately reflects the new maximum.
4119 * Be careful not to overwrite the maximum value in that
4120 * case.
1ddbe94e 4121 */
e2839308
AE
4122 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4123 dout(" max dev id has been reset\n");
b7f23c36
AE
4124}
4125
e28fff26
AE
4126/*
4127 * Skips over white space at *buf, and updates *buf to point to the
4128 * first found non-space character (if any). Returns the length of
593a9e7b
AE
4129 * the token (string of non-white space characters) found. Note
4130 * that *buf must be terminated with '\0'.
e28fff26
AE
4131 */
4132static inline size_t next_token(const char **buf)
4133{
4134 /*
4135 * These are the characters that produce nonzero for
4136 * isspace() in the "C" and "POSIX" locales.
4137 */
4138 const char *spaces = " \f\n\r\t\v";
4139
4140 *buf += strspn(*buf, spaces); /* Find start of token */
4141
4142 return strcspn(*buf, spaces); /* Return token length */
4143}
4144
4145/*
4146 * Finds the next token in *buf, and if the provided token buffer is
4147 * big enough, copies the found token into it. The result, if
593a9e7b
AE
4148 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4149 * must be terminated with '\0' on entry.
e28fff26
AE
4150 *
4151 * Returns the length of the token found (not including the '\0').
4152 * Return value will be 0 if no token is found, and it will be >=
4153 * token_size if the token would not fit.
4154 *
593a9e7b 4155 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
4156 * found token. Note that this occurs even if the token buffer is
4157 * too small to hold it.
4158 */
4159static inline size_t copy_token(const char **buf,
4160 char *token,
4161 size_t token_size)
4162{
4163 size_t len;
4164
4165 len = next_token(buf);
4166 if (len < token_size) {
4167 memcpy(token, *buf, len);
4168 *(token + len) = '\0';
4169 }
4170 *buf += len;
4171
4172 return len;
4173}
4174
ea3352f4
AE
4175/*
4176 * Finds the next token in *buf, dynamically allocates a buffer big
4177 * enough to hold a copy of it, and copies the token into the new
4178 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4179 * that a duplicate buffer is created even for a zero-length token.
4180 *
4181 * Returns a pointer to the newly-allocated duplicate, or a null
4182 * pointer if memory for the duplicate was not available. If
4183 * the lenp argument is a non-null pointer, the length of the token
4184 * (not including the '\0') is returned in *lenp.
4185 *
4186 * If successful, the *buf pointer will be updated to point beyond
4187 * the end of the found token.
4188 *
4189 * Note: uses GFP_KERNEL for allocation.
4190 */
4191static inline char *dup_token(const char **buf, size_t *lenp)
4192{
4193 char *dup;
4194 size_t len;
4195
4196 len = next_token(buf);
4caf35f9 4197 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
4198 if (!dup)
4199 return NULL;
ea3352f4
AE
4200 *(dup + len) = '\0';
4201 *buf += len;
4202
4203 if (lenp)
4204 *lenp = len;
4205
4206 return dup;
4207}
4208
a725f65e 4209/*
859c31df
AE
4210 * Parse the options provided for an "rbd add" (i.e., rbd image
4211 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4212 * and the data written is passed here via a NUL-terminated buffer.
4213 * Returns 0 if successful or an error code otherwise.
d22f76e7 4214 *
859c31df
AE
4215 * The information extracted from these options is recorded in
4216 * the other parameters which return dynamically-allocated
4217 * structures:
4218 * ceph_opts
4219 * The address of a pointer that will refer to a ceph options
4220 * structure. Caller must release the returned pointer using
4221 * ceph_destroy_options() when it is no longer needed.
4222 * rbd_opts
4223 * Address of an rbd options pointer. Fully initialized by
4224 * this function; caller must release with kfree().
4225 * spec
4226 * Address of an rbd image specification pointer. Fully
4227 * initialized by this function based on parsed options.
4228 * Caller must release with rbd_spec_put().
4229 *
4230 * The options passed take this form:
4231 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4232 * where:
4233 * <mon_addrs>
4234 * A comma-separated list of one or more monitor addresses.
4235 * A monitor address is an ip address, optionally followed
4236 * by a port number (separated by a colon).
4237 * I.e.: ip1[:port1][,ip2[:port2]...]
4238 * <options>
4239 * A comma-separated list of ceph and/or rbd options.
4240 * <pool_name>
4241 * The name of the rados pool containing the rbd image.
4242 * <image_name>
4243 * The name of the image in that pool to map.
4244 * <snap_id>
4245 * An optional snapshot id. If provided, the mapping will
4246 * present data from the image at the time that snapshot was
4247 * created. The image head is used if no snapshot id is
4248 * provided. Snapshot mappings are always read-only.
a725f65e 4249 */
859c31df 4250static int rbd_add_parse_args(const char *buf,
dc79b113 4251 struct ceph_options **ceph_opts,
859c31df
AE
4252 struct rbd_options **opts,
4253 struct rbd_spec **rbd_spec)
e28fff26 4254{
d22f76e7 4255 size_t len;
859c31df 4256 char *options;
0ddebc0c 4257 const char *mon_addrs;
ecb4dc22 4258 char *snap_name;
0ddebc0c 4259 size_t mon_addrs_size;
859c31df 4260 struct rbd_spec *spec = NULL;
4e9afeba 4261 struct rbd_options *rbd_opts = NULL;
859c31df 4262 struct ceph_options *copts;
dc79b113 4263 int ret;
e28fff26
AE
4264
4265 /* The first four tokens are required */
4266
7ef3214a 4267 len = next_token(&buf);
4fb5d671
AE
4268 if (!len) {
4269 rbd_warn(NULL, "no monitor address(es) provided");
4270 return -EINVAL;
4271 }
0ddebc0c 4272 mon_addrs = buf;
f28e565a 4273 mon_addrs_size = len + 1;
7ef3214a 4274 buf += len;
a725f65e 4275
dc79b113 4276 ret = -EINVAL;
f28e565a
AE
4277 options = dup_token(&buf, NULL);
4278 if (!options)
dc79b113 4279 return -ENOMEM;
4fb5d671
AE
4280 if (!*options) {
4281 rbd_warn(NULL, "no options provided");
4282 goto out_err;
4283 }
e28fff26 4284
859c31df
AE
4285 spec = rbd_spec_alloc();
4286 if (!spec)
f28e565a 4287 goto out_mem;
859c31df
AE
4288
4289 spec->pool_name = dup_token(&buf, NULL);
4290 if (!spec->pool_name)
4291 goto out_mem;
4fb5d671
AE
4292 if (!*spec->pool_name) {
4293 rbd_warn(NULL, "no pool name provided");
4294 goto out_err;
4295 }
e28fff26 4296
69e7a02f 4297 spec->image_name = dup_token(&buf, NULL);
859c31df 4298 if (!spec->image_name)
f28e565a 4299 goto out_mem;
4fb5d671
AE
4300 if (!*spec->image_name) {
4301 rbd_warn(NULL, "no image name provided");
4302 goto out_err;
4303 }
d4b125e9 4304
f28e565a
AE
4305 /*
4306 * Snapshot name is optional; default is to use "-"
4307 * (indicating the head/no snapshot).
4308 */
3feeb894 4309 len = next_token(&buf);
820a5f3e 4310 if (!len) {
3feeb894
AE
4311 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4312 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 4313 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 4314 ret = -ENAMETOOLONG;
f28e565a 4315 goto out_err;
849b4260 4316 }
ecb4dc22
AE
4317 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4318 if (!snap_name)
f28e565a 4319 goto out_mem;
ecb4dc22
AE
4320 *(snap_name + len) = '\0';
4321 spec->snap_name = snap_name;
e5c35534 4322
0ddebc0c 4323 /* Initialize all rbd options to the defaults */
e28fff26 4324
4e9afeba
AE
4325 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4326 if (!rbd_opts)
4327 goto out_mem;
4328
4329 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 4330
859c31df 4331 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 4332 mon_addrs + mon_addrs_size - 1,
4e9afeba 4333 parse_rbd_opts_token, rbd_opts);
859c31df
AE
4334 if (IS_ERR(copts)) {
4335 ret = PTR_ERR(copts);
dc79b113
AE
4336 goto out_err;
4337 }
859c31df
AE
4338 kfree(options);
4339
4340 *ceph_opts = copts;
4e9afeba 4341 *opts = rbd_opts;
859c31df 4342 *rbd_spec = spec;
0ddebc0c 4343
dc79b113 4344 return 0;
f28e565a 4345out_mem:
dc79b113 4346 ret = -ENOMEM;
d22f76e7 4347out_err:
859c31df
AE
4348 kfree(rbd_opts);
4349 rbd_spec_put(spec);
f28e565a 4350 kfree(options);
d22f76e7 4351
dc79b113 4352 return ret;
a725f65e
AE
4353}
4354
589d30e0
AE
4355/*
4356 * An rbd format 2 image has a unique identifier, distinct from the
4357 * name given to it by the user. Internally, that identifier is
4358 * what's used to specify the names of objects related to the image.
4359 *
4360 * A special "rbd id" object is used to map an rbd image name to its
4361 * id. If that object doesn't exist, then there is no v2 rbd image
4362 * with the supplied name.
4363 *
4364 * This function will record the given rbd_dev's image_id field if
4365 * it can be determined, and in that case will return 0. If any
4366 * errors occur a negative errno will be returned and the rbd_dev's
4367 * image_id field will be unchanged (and should be NULL).
4368 */
4369static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4370{
4371 int ret;
4372 size_t size;
4373 char *object_name;
4374 void *response;
c0fba368 4375 char *image_id;
2f82ee54 4376
2c0d0a10
AE
4377 /*
4378 * When probing a parent image, the image id is already
4379 * known (and the image name likely is not). There's no
c0fba368
AE
4380 * need to fetch the image id again in this case. We
4381 * do still need to set the image format though.
2c0d0a10 4382 */
c0fba368
AE
4383 if (rbd_dev->spec->image_id) {
4384 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4385
2c0d0a10 4386 return 0;
c0fba368 4387 }
2c0d0a10 4388
589d30e0
AE
4389 /*
4390 * First, see if the format 2 image id file exists, and if
4391 * so, get the image's persistent id from it.
4392 */
69e7a02f 4393 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
4394 object_name = kmalloc(size, GFP_NOIO);
4395 if (!object_name)
4396 return -ENOMEM;
0d7dbfce 4397 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
4398 dout("rbd id object name is %s\n", object_name);
4399
4400 /* Response will be an encoded string, which includes a length */
4401
4402 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4403 response = kzalloc(size, GFP_NOIO);
4404 if (!response) {
4405 ret = -ENOMEM;
4406 goto out;
4407 }
4408
c0fba368
AE
4409 /* If it doesn't exist we'll assume it's a format 1 image */
4410
36be9a76 4411 ret = rbd_obj_method_sync(rbd_dev, object_name,
4157976b 4412 "rbd", "get_id", NULL, 0,
e2a58ee5 4413 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 4414 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
4415 if (ret == -ENOENT) {
4416 image_id = kstrdup("", GFP_KERNEL);
4417 ret = image_id ? 0 : -ENOMEM;
4418 if (!ret)
4419 rbd_dev->image_format = 1;
4420 } else if (ret > sizeof (__le32)) {
4421 void *p = response;
4422
4423 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 4424 NULL, GFP_NOIO);
c0fba368
AE
4425 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4426 if (!ret)
4427 rbd_dev->image_format = 2;
589d30e0 4428 } else {
c0fba368
AE
4429 ret = -EINVAL;
4430 }
4431
4432 if (!ret) {
4433 rbd_dev->spec->image_id = image_id;
4434 dout("image_id is %s\n", image_id);
589d30e0
AE
4435 }
4436out:
4437 kfree(response);
4438 kfree(object_name);
4439
4440 return ret;
4441}
4442
6fd48b3b
AE
4443/* Undo whatever state changes are made by v1 or v2 image probe */
4444
4445static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4446{
4447 struct rbd_image_header *header;
4448
4449 rbd_dev_remove_parent(rbd_dev);
4450 rbd_spec_put(rbd_dev->parent_spec);
4451 rbd_dev->parent_spec = NULL;
4452 rbd_dev->parent_overlap = 0;
4453
4454 /* Free dynamic fields from the header, then zero it out */
4455
4456 header = &rbd_dev->header;
812164f8 4457 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
4458 kfree(header->snap_sizes);
4459 kfree(header->snap_names);
4460 kfree(header->object_prefix);
4461 memset(header, 0, sizeof (*header));
4462}
4463
2df3fac7 4464static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9 4465{
9d475de5 4466 int ret;
a30b71b9 4467
1e130199 4468 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 4469 if (ret)
b1b5402a
AE
4470 goto out_err;
4471
2df3fac7
AE
4472 /*
4473 * Get the and check features for the image. Currently the
4474 * features are assumed to never change.
4475 */
b1b5402a 4476 ret = rbd_dev_v2_features(rbd_dev);
57385b51 4477 if (ret)
9d475de5 4478 goto out_err;
35d489f9 4479
86b00e0d
AE
4480 /* If the image supports layering, get the parent info */
4481
4482 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4483 ret = rbd_dev_v2_parent_info(rbd_dev);
57385b51 4484 if (ret)
86b00e0d 4485 goto out_err;
96882f55 4486 /*
c734b796
AE
4487 * Print a warning if this image has a parent.
4488 * Don't print it if the image now being probed
4489 * is itself a parent. We can tell at this point
4490 * because we won't know its pool name yet (just its
4491 * pool id).
96882f55 4492 */
c734b796 4493 if (rbd_dev->parent_spec && rbd_dev->spec->pool_name)
96882f55
AE
4494 rbd_warn(rbd_dev, "WARNING: kernel layering "
4495 "is EXPERIMENTAL!");
86b00e0d
AE
4496 }
4497
cc070d59
AE
4498 /* If the image supports fancy striping, get its parameters */
4499
4500 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4501 ret = rbd_dev_v2_striping_info(rbd_dev);
4502 if (ret < 0)
4503 goto out_err;
4504 }
2df3fac7 4505 /* No support for crypto and compression type format 2 images */
6e14b1a6 4506
35152979 4507 return 0;
9d475de5 4508out_err:
86b00e0d
AE
4509 rbd_dev->parent_overlap = 0;
4510 rbd_spec_put(rbd_dev->parent_spec);
4511 rbd_dev->parent_spec = NULL;
9d475de5
AE
4512 kfree(rbd_dev->header_name);
4513 rbd_dev->header_name = NULL;
1e130199
AE
4514 kfree(rbd_dev->header.object_prefix);
4515 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
4516
4517 return ret;
a30b71b9
AE
4518}
4519
124afba2 4520static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
83a06263 4521{
2f82ee54 4522 struct rbd_device *parent = NULL;
124afba2
AE
4523 struct rbd_spec *parent_spec;
4524 struct rbd_client *rbdc;
4525 int ret;
4526
4527 if (!rbd_dev->parent_spec)
4528 return 0;
4529 /*
4530 * We need to pass a reference to the client and the parent
4531 * spec when creating the parent rbd_dev. Images related by
4532 * parent/child relationships always share both.
4533 */
4534 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4535 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4536
4537 ret = -ENOMEM;
4538 parent = rbd_dev_create(rbdc, parent_spec);
4539 if (!parent)
4540 goto out_err;
4541
1f3ef788 4542 ret = rbd_dev_image_probe(parent, false);
124afba2
AE
4543 if (ret < 0)
4544 goto out_err;
4545 rbd_dev->parent = parent;
4546
4547 return 0;
4548out_err:
4549 if (parent) {
4550 rbd_spec_put(rbd_dev->parent_spec);
4551 kfree(rbd_dev->header_name);
4552 rbd_dev_destroy(parent);
4553 } else {
4554 rbd_put_client(rbdc);
4555 rbd_spec_put(parent_spec);
4556 }
4557
4558 return ret;
4559}
4560
200a6a8b 4561static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 4562{
83a06263 4563 int ret;
d1cf5788 4564
83a06263
AE
4565 /* generate unique id: find highest unique id, add one */
4566 rbd_dev_id_get(rbd_dev);
4567
4568 /* Fill in the device name, now that we have its id. */
4569 BUILD_BUG_ON(DEV_NAME_LEN
4570 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4571 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4572
4573 /* Get our block major device number. */
4574
4575 ret = register_blkdev(0, rbd_dev->name);
4576 if (ret < 0)
4577 goto err_out_id;
4578 rbd_dev->major = ret;
4579
4580 /* Set up the blkdev mapping. */
4581
4582 ret = rbd_init_disk(rbd_dev);
4583 if (ret)
4584 goto err_out_blkdev;
4585
f35a4dee 4586 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
4587 if (ret)
4588 goto err_out_disk;
f35a4dee
AE
4589 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4590
4591 ret = rbd_bus_add_dev(rbd_dev);
4592 if (ret)
4593 goto err_out_mapping;
83a06263 4594
83a06263
AE
4595 /* Everything's ready. Announce the disk to the world. */
4596
129b79d4 4597 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
83a06263
AE
4598 add_disk(rbd_dev->disk);
4599
4600 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4601 (unsigned long long) rbd_dev->mapping.size);
4602
4603 return ret;
2f82ee54 4604
f35a4dee
AE
4605err_out_mapping:
4606 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4607err_out_disk:
4608 rbd_free_disk(rbd_dev);
4609err_out_blkdev:
4610 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4611err_out_id:
4612 rbd_dev_id_put(rbd_dev);
d1cf5788 4613 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4614
4615 return ret;
4616}
4617
332bb12d
AE
4618static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4619{
4620 struct rbd_spec *spec = rbd_dev->spec;
4621 size_t size;
4622
4623 /* Record the header object name for this rbd image. */
4624
4625 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4626
4627 if (rbd_dev->image_format == 1)
4628 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4629 else
4630 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4631
4632 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4633 if (!rbd_dev->header_name)
4634 return -ENOMEM;
4635
4636 if (rbd_dev->image_format == 1)
4637 sprintf(rbd_dev->header_name, "%s%s",
4638 spec->image_name, RBD_SUFFIX);
4639 else
4640 sprintf(rbd_dev->header_name, "%s%s",
4641 RBD_HEADER_PREFIX, spec->image_id);
4642 return 0;
4643}
4644
200a6a8b
AE
4645static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4646{
6fd48b3b 4647 rbd_dev_unprobe(rbd_dev);
200a6a8b 4648 kfree(rbd_dev->header_name);
6fd48b3b
AE
4649 rbd_dev->header_name = NULL;
4650 rbd_dev->image_format = 0;
4651 kfree(rbd_dev->spec->image_id);
4652 rbd_dev->spec->image_id = NULL;
4653
200a6a8b
AE
4654 rbd_dev_destroy(rbd_dev);
4655}
4656
a30b71b9
AE
4657/*
4658 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
4659 * device. If this image is the one being mapped (i.e., not a
4660 * parent), initiate a watch on its header object before using that
4661 * object to get detailed information about the rbd image.
a30b71b9 4662 */
1f3ef788 4663static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
a30b71b9
AE
4664{
4665 int ret;
b644de2b 4666 int tmp;
a30b71b9
AE
4667
4668 /*
4669 * Get the id from the image id object. If it's not a
4670 * format 2 image, we'll get ENOENT back, and we'll assume
4671 * it's a format 1 image.
4672 */
4673 ret = rbd_dev_image_id(rbd_dev);
4674 if (ret)
c0fba368
AE
4675 return ret;
4676 rbd_assert(rbd_dev->spec->image_id);
4677 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4678
332bb12d
AE
4679 ret = rbd_dev_header_name(rbd_dev);
4680 if (ret)
4681 goto err_out_format;
4682
1f3ef788
AE
4683 if (mapping) {
4684 ret = rbd_dev_header_watch_sync(rbd_dev, true);
4685 if (ret)
4686 goto out_header_name;
4687 }
b644de2b 4688
c0fba368 4689 if (rbd_dev->image_format == 1)
99a41ebc 4690 ret = rbd_dev_v1_header_info(rbd_dev);
a30b71b9 4691 else
2df3fac7 4692 ret = rbd_dev_v2_header_info(rbd_dev);
5655c4d9 4693 if (ret)
b644de2b 4694 goto err_out_watch;
83a06263 4695
9bb81c9b
AE
4696 ret = rbd_dev_spec_update(rbd_dev);
4697 if (ret)
33dca39f 4698 goto err_out_probe;
9bb81c9b
AE
4699
4700 ret = rbd_dev_probe_parent(rbd_dev);
30d60ba2
AE
4701 if (ret)
4702 goto err_out_probe;
4703
4704 dout("discovered format %u image, header name is %s\n",
4705 rbd_dev->image_format, rbd_dev->header_name);
83a06263 4706
30d60ba2 4707 return 0;
6fd48b3b
AE
4708err_out_probe:
4709 rbd_dev_unprobe(rbd_dev);
b644de2b 4710err_out_watch:
1f3ef788
AE
4711 if (mapping) {
4712 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
4713 if (tmp)
4714 rbd_warn(rbd_dev, "unable to tear down "
4715 "watch request (%d)\n", tmp);
4716 }
332bb12d
AE
4717out_header_name:
4718 kfree(rbd_dev->header_name);
4719 rbd_dev->header_name = NULL;
4720err_out_format:
4721 rbd_dev->image_format = 0;
5655c4d9
AE
4722 kfree(rbd_dev->spec->image_id);
4723 rbd_dev->spec->image_id = NULL;
4724
4725 dout("probe failed, returning %d\n", ret);
4726
a30b71b9
AE
4727 return ret;
4728}
4729
59c2be1e
YS
4730static ssize_t rbd_add(struct bus_type *bus,
4731 const char *buf,
4732 size_t count)
602adf40 4733{
cb8627c7 4734 struct rbd_device *rbd_dev = NULL;
dc79b113 4735 struct ceph_options *ceph_opts = NULL;
4e9afeba 4736 struct rbd_options *rbd_opts = NULL;
859c31df 4737 struct rbd_spec *spec = NULL;
9d3997fd 4738 struct rbd_client *rbdc;
27cc2594 4739 struct ceph_osd_client *osdc;
51344a38 4740 bool read_only;
27cc2594 4741 int rc = -ENOMEM;
602adf40
YS
4742
4743 if (!try_module_get(THIS_MODULE))
4744 return -ENODEV;
4745
602adf40 4746 /* parse add command */
859c31df 4747 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 4748 if (rc < 0)
bd4ba655 4749 goto err_out_module;
51344a38
AE
4750 read_only = rbd_opts->read_only;
4751 kfree(rbd_opts);
4752 rbd_opts = NULL; /* done with this */
78cea76e 4753
9d3997fd
AE
4754 rbdc = rbd_get_client(ceph_opts);
4755 if (IS_ERR(rbdc)) {
4756 rc = PTR_ERR(rbdc);
0ddebc0c 4757 goto err_out_args;
9d3997fd 4758 }
c53d5893 4759 ceph_opts = NULL; /* rbd_dev client now owns this */
602adf40 4760
602adf40 4761 /* pick the pool */
9d3997fd 4762 osdc = &rbdc->client->osdc;
859c31df 4763 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
4764 if (rc < 0)
4765 goto err_out_client;
c0cd10db 4766 spec->pool_id = (u64)rc;
859c31df 4767
0903e875
AE
4768 /* The ceph file layout needs to fit pool id in 32 bits */
4769
c0cd10db
AE
4770 if (spec->pool_id > (u64)U32_MAX) {
4771 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4772 (unsigned long long)spec->pool_id, U32_MAX);
0903e875
AE
4773 rc = -EIO;
4774 goto err_out_client;
4775 }
4776
c53d5893 4777 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
4778 if (!rbd_dev)
4779 goto err_out_client;
c53d5893
AE
4780 rbdc = NULL; /* rbd_dev now owns this */
4781 spec = NULL; /* rbd_dev now owns this */
602adf40 4782
1f3ef788 4783 rc = rbd_dev_image_probe(rbd_dev, true);
a30b71b9 4784 if (rc < 0)
c53d5893 4785 goto err_out_rbd_dev;
05fd6f6f 4786
7ce4eef7
AE
4787 /* If we are mapping a snapshot it must be marked read-only */
4788
4789 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
4790 read_only = true;
4791 rbd_dev->mapping.read_only = read_only;
4792
b536f69a
AE
4793 rc = rbd_dev_device_setup(rbd_dev);
4794 if (!rc)
4795 return count;
4796
4797 rbd_dev_image_release(rbd_dev);
c53d5893
AE
4798err_out_rbd_dev:
4799 rbd_dev_destroy(rbd_dev);
bd4ba655 4800err_out_client:
9d3997fd 4801 rbd_put_client(rbdc);
0ddebc0c 4802err_out_args:
78cea76e
AE
4803 if (ceph_opts)
4804 ceph_destroy_options(ceph_opts);
4e9afeba 4805 kfree(rbd_opts);
859c31df 4806 rbd_spec_put(spec);
bd4ba655
AE
4807err_out_module:
4808 module_put(THIS_MODULE);
27cc2594 4809
602adf40 4810 dout("Error adding device %s\n", buf);
27cc2594 4811
c0cd10db 4812 return (ssize_t)rc;
602adf40
YS
4813}
4814
de71a297 4815static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
4816{
4817 struct list_head *tmp;
4818 struct rbd_device *rbd_dev;
4819
e124a82f 4820 spin_lock(&rbd_dev_list_lock);
602adf40
YS
4821 list_for_each(tmp, &rbd_dev_list) {
4822 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 4823 if (rbd_dev->dev_id == dev_id) {
e124a82f 4824 spin_unlock(&rbd_dev_list_lock);
602adf40 4825 return rbd_dev;
e124a82f 4826 }
602adf40 4827 }
e124a82f 4828 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
4829 return NULL;
4830}
4831
200a6a8b 4832static void rbd_dev_device_release(struct device *dev)
602adf40 4833{
593a9e7b 4834 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4835
602adf40 4836 rbd_free_disk(rbd_dev);
200a6a8b 4837 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6d80b130 4838 rbd_dev_mapping_clear(rbd_dev);
602adf40 4839 unregister_blkdev(rbd_dev->major, rbd_dev->name);
200a6a8b 4840 rbd_dev->major = 0;
e2839308 4841 rbd_dev_id_put(rbd_dev);
d1cf5788 4842 rbd_dev_mapping_clear(rbd_dev);
602adf40
YS
4843}
4844
05a46afd
AE
4845static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4846{
ad945fc1 4847 while (rbd_dev->parent) {
05a46afd
AE
4848 struct rbd_device *first = rbd_dev;
4849 struct rbd_device *second = first->parent;
4850 struct rbd_device *third;
4851
4852 /*
4853 * Follow to the parent with no grandparent and
4854 * remove it.
4855 */
4856 while (second && (third = second->parent)) {
4857 first = second;
4858 second = third;
4859 }
ad945fc1 4860 rbd_assert(second);
8ad42cd0 4861 rbd_dev_image_release(second);
ad945fc1
AE
4862 first->parent = NULL;
4863 first->parent_overlap = 0;
4864
4865 rbd_assert(first->parent_spec);
05a46afd
AE
4866 rbd_spec_put(first->parent_spec);
4867 first->parent_spec = NULL;
05a46afd
AE
4868 }
4869}
4870
dfc5606d
YS
4871static ssize_t rbd_remove(struct bus_type *bus,
4872 const char *buf,
4873 size_t count)
602adf40
YS
4874{
4875 struct rbd_device *rbd_dev = NULL;
0d8189e1 4876 int target_id;
602adf40 4877 unsigned long ul;
0d8189e1 4878 int ret;
602adf40 4879
0d8189e1
AE
4880 ret = strict_strtoul(buf, 10, &ul);
4881 if (ret)
4882 return ret;
602adf40
YS
4883
4884 /* convert to int; abort if we lost anything in the conversion */
4885 target_id = (int) ul;
4886 if (target_id != ul)
4887 return -EINVAL;
4888
4889 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4890
4891 rbd_dev = __rbd_get_dev(target_id);
4892 if (!rbd_dev) {
4893 ret = -ENOENT;
4894 goto done;
42382b70
AE
4895 }
4896
a14ea269 4897 spin_lock_irq(&rbd_dev->lock);
b82d167b 4898 if (rbd_dev->open_count)
42382b70 4899 ret = -EBUSY;
b82d167b
AE
4900 else
4901 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
a14ea269 4902 spin_unlock_irq(&rbd_dev->lock);
b82d167b 4903 if (ret < 0)
42382b70 4904 goto done;
b480815a 4905 rbd_bus_del_dev(rbd_dev);
1f3ef788
AE
4906 ret = rbd_dev_header_watch_sync(rbd_dev, false);
4907 if (ret)
4908 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
8ad42cd0 4909 rbd_dev_image_release(rbd_dev);
79ab7558 4910 module_put(THIS_MODULE);
1f3ef788 4911 ret = count;
602adf40
YS
4912done:
4913 mutex_unlock(&ctl_mutex);
aafb230e 4914
602adf40
YS
4915 return ret;
4916}
4917
602adf40
YS
4918/*
4919 * create control files in sysfs
dfc5606d 4920 * /sys/bus/rbd/...
602adf40
YS
4921 */
4922static int rbd_sysfs_init(void)
4923{
dfc5606d 4924 int ret;
602adf40 4925
fed4c143 4926 ret = device_register(&rbd_root_dev);
21079786 4927 if (ret < 0)
dfc5606d 4928 return ret;
602adf40 4929
fed4c143
AE
4930 ret = bus_register(&rbd_bus_type);
4931 if (ret < 0)
4932 device_unregister(&rbd_root_dev);
602adf40 4933
602adf40
YS
4934 return ret;
4935}
4936
4937static void rbd_sysfs_cleanup(void)
4938{
dfc5606d 4939 bus_unregister(&rbd_bus_type);
fed4c143 4940 device_unregister(&rbd_root_dev);
602adf40
YS
4941}
4942
1c2a9dfe
AE
4943static int rbd_slab_init(void)
4944{
4945 rbd_assert(!rbd_img_request_cache);
4946 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
4947 sizeof (struct rbd_img_request),
4948 __alignof__(struct rbd_img_request),
4949 0, NULL);
868311b1
AE
4950 if (!rbd_img_request_cache)
4951 return -ENOMEM;
4952
4953 rbd_assert(!rbd_obj_request_cache);
4954 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
4955 sizeof (struct rbd_obj_request),
4956 __alignof__(struct rbd_obj_request),
4957 0, NULL);
78c2a44a
AE
4958 if (!rbd_obj_request_cache)
4959 goto out_err;
4960
4961 rbd_assert(!rbd_segment_name_cache);
4962 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
4963 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
4964 if (rbd_segment_name_cache)
1c2a9dfe 4965 return 0;
78c2a44a
AE
4966out_err:
4967 if (rbd_obj_request_cache) {
4968 kmem_cache_destroy(rbd_obj_request_cache);
4969 rbd_obj_request_cache = NULL;
4970 }
1c2a9dfe 4971
868311b1
AE
4972 kmem_cache_destroy(rbd_img_request_cache);
4973 rbd_img_request_cache = NULL;
4974
1c2a9dfe
AE
4975 return -ENOMEM;
4976}
4977
4978static void rbd_slab_exit(void)
4979{
78c2a44a
AE
4980 rbd_assert(rbd_segment_name_cache);
4981 kmem_cache_destroy(rbd_segment_name_cache);
4982 rbd_segment_name_cache = NULL;
4983
868311b1
AE
4984 rbd_assert(rbd_obj_request_cache);
4985 kmem_cache_destroy(rbd_obj_request_cache);
4986 rbd_obj_request_cache = NULL;
4987
1c2a9dfe
AE
4988 rbd_assert(rbd_img_request_cache);
4989 kmem_cache_destroy(rbd_img_request_cache);
4990 rbd_img_request_cache = NULL;
4991}
4992
cc344fa1 4993static int __init rbd_init(void)
602adf40
YS
4994{
4995 int rc;
4996
1e32d34c
AE
4997 if (!libceph_compatible(NULL)) {
4998 rbd_warn(NULL, "libceph incompatibility (quitting)");
4999
5000 return -EINVAL;
5001 }
1c2a9dfe 5002 rc = rbd_slab_init();
602adf40
YS
5003 if (rc)
5004 return rc;
1c2a9dfe
AE
5005 rc = rbd_sysfs_init();
5006 if (rc)
5007 rbd_slab_exit();
5008 else
5009 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5010
5011 return rc;
602adf40
YS
5012}
5013
cc344fa1 5014static void __exit rbd_exit(void)
602adf40
YS
5015{
5016 rbd_sysfs_cleanup();
1c2a9dfe 5017 rbd_slab_exit();
602adf40
YS
5018}
5019
5020module_init(rbd_init);
5021module_exit(rbd_exit);
5022
5023MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5024MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5025MODULE_DESCRIPTION("rados block device");
5026
5027/* following authorship retained from original osdblk.c */
5028MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5029
5030MODULE_LICENSE("GPL");