rbd: update in-core header directly
[linux-2.6-block.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/decode.h>
59c2be1e 35#include <linux/parser.h>
30d1cff8 36#include <linux/bsearch.h>
602adf40
YS
37
38#include <linux/kernel.h>
39#include <linux/device.h>
40#include <linux/module.h>
41#include <linux/fs.h>
42#include <linux/blkdev.h>
1c2a9dfe 43#include <linux/slab.h>
602adf40
YS
44
45#include "rbd_types.h"
46
aafb230e
AE
47#define RBD_DEBUG /* Activate rbd_assert() calls */
48
593a9e7b
AE
49/*
50 * The basic unit of block I/O is a sector. It is interpreted in a
51 * number of contexts in Linux (blk, bio, genhd), but the default is
52 * universally 512 bytes. These symbols are just slightly more
53 * meaningful than the bare numbers they represent.
54 */
55#define SECTOR_SHIFT 9
56#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
57
f0f8cef5
AE
58#define RBD_DRV_NAME "rbd"
59#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
60
61#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
62
d4b125e9
AE
63#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
64#define RBD_MAX_SNAP_NAME_LEN \
65 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
66
35d489f9 67#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
68
69#define RBD_SNAP_HEAD_NAME "-"
70
9682fc6d
AE
71#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
72
9e15b77d
AE
73/* This allows a single page to hold an image name sent by OSD */
74#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 75#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 76
1e130199 77#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 78
d889140c
AE
79/* Feature bits */
80
5cbf6f12
AE
81#define RBD_FEATURE_LAYERING (1<<0)
82#define RBD_FEATURE_STRIPINGV2 (1<<1)
83#define RBD_FEATURES_ALL \
84 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
85
86/* Features supported by this (client software) implementation. */
87
770eba6e 88#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 89
81a89793
AE
90/*
91 * An RBD device name will be "rbd#", where the "rbd" comes from
92 * RBD_DRV_NAME above, and # is a unique integer identifier.
93 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94 * enough to hold all possible device names.
95 */
602adf40 96#define DEV_NAME_LEN 32
81a89793 97#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
98
99/*
100 * block device image metadata (in-memory version)
101 */
102struct rbd_image_header {
f35a4dee 103 /* These six fields never change for a given rbd image */
849b4260 104 char *object_prefix;
602adf40
YS
105 __u8 obj_order;
106 __u8 crypt_type;
107 __u8 comp_type;
f35a4dee
AE
108 u64 stripe_unit;
109 u64 stripe_count;
110 u64 features; /* Might be changeable someday? */
602adf40 111
f84344f3
AE
112 /* The remaining fields need to be updated occasionally */
113 u64 image_size;
114 struct ceph_snap_context *snapc;
f35a4dee
AE
115 char *snap_names; /* format 1 only */
116 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
117};
118
0d7dbfce
AE
119/*
120 * An rbd image specification.
121 *
122 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
123 * identify an image. Each rbd_dev structure includes a pointer to
124 * an rbd_spec structure that encapsulates this identity.
125 *
126 * Each of the id's in an rbd_spec has an associated name. For a
127 * user-mapped image, the names are supplied and the id's associated
128 * with them are looked up. For a layered image, a parent image is
129 * defined by the tuple, and the names are looked up.
130 *
131 * An rbd_dev structure contains a parent_spec pointer which is
132 * non-null if the image it represents is a child in a layered
133 * image. This pointer will refer to the rbd_spec structure used
134 * by the parent rbd_dev for its own identity (i.e., the structure
135 * is shared between the parent and child).
136 *
137 * Since these structures are populated once, during the discovery
138 * phase of image construction, they are effectively immutable so
139 * we make no effort to synchronize access to them.
140 *
141 * Note that code herein does not assume the image name is known (it
142 * could be a null pointer).
0d7dbfce
AE
143 */
144struct rbd_spec {
145 u64 pool_id;
ecb4dc22 146 const char *pool_name;
0d7dbfce 147
ecb4dc22
AE
148 const char *image_id;
149 const char *image_name;
0d7dbfce
AE
150
151 u64 snap_id;
ecb4dc22 152 const char *snap_name;
0d7dbfce
AE
153
154 struct kref kref;
155};
156
602adf40 157/*
f0f8cef5 158 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
159 */
160struct rbd_client {
161 struct ceph_client *client;
162 struct kref kref;
163 struct list_head node;
164};
165
bf0d5f50
AE
166struct rbd_img_request;
167typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
168
169#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
170
171struct rbd_obj_request;
172typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
173
9969ebc5
AE
174enum obj_request_type {
175 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
176};
bf0d5f50 177
926f9b3f
AE
178enum obj_req_flags {
179 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 180 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
181 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
182 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
183};
184
bf0d5f50
AE
185struct rbd_obj_request {
186 const char *object_name;
187 u64 offset; /* object start byte */
188 u64 length; /* bytes from offset */
926f9b3f 189 unsigned long flags;
bf0d5f50 190
c5b5ef6c
AE
191 /*
192 * An object request associated with an image will have its
193 * img_data flag set; a standalone object request will not.
194 *
195 * A standalone object request will have which == BAD_WHICH
196 * and a null obj_request pointer.
197 *
198 * An object request initiated in support of a layered image
199 * object (to check for its existence before a write) will
200 * have which == BAD_WHICH and a non-null obj_request pointer.
201 *
202 * Finally, an object request for rbd image data will have
203 * which != BAD_WHICH, and will have a non-null img_request
204 * pointer. The value of which will be in the range
205 * 0..(img_request->obj_request_count-1).
206 */
207 union {
208 struct rbd_obj_request *obj_request; /* STAT op */
209 struct {
210 struct rbd_img_request *img_request;
211 u64 img_offset;
212 /* links for img_request->obj_requests list */
213 struct list_head links;
214 };
215 };
bf0d5f50
AE
216 u32 which; /* posn image request list */
217
218 enum obj_request_type type;
788e2df3
AE
219 union {
220 struct bio *bio_list;
221 struct {
222 struct page **pages;
223 u32 page_count;
224 };
225 };
0eefd470 226 struct page **copyup_pages;
bf0d5f50
AE
227
228 struct ceph_osd_request *osd_req;
229
230 u64 xferred; /* bytes transferred */
1b83bef2 231 int result;
bf0d5f50
AE
232
233 rbd_obj_callback_t callback;
788e2df3 234 struct completion completion;
bf0d5f50
AE
235
236 struct kref kref;
237};
238
0c425248 239enum img_req_flags {
9849e986
AE
240 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
241 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 242 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
243};
244
bf0d5f50 245struct rbd_img_request {
bf0d5f50
AE
246 struct rbd_device *rbd_dev;
247 u64 offset; /* starting image byte offset */
248 u64 length; /* byte count from offset */
0c425248 249 unsigned long flags;
bf0d5f50 250 union {
9849e986 251 u64 snap_id; /* for reads */
bf0d5f50 252 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
253 };
254 union {
255 struct request *rq; /* block request */
256 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 257 };
3d7efd18 258 struct page **copyup_pages;
bf0d5f50
AE
259 spinlock_t completion_lock;/* protects next_completion */
260 u32 next_completion;
261 rbd_img_callback_t callback;
55f27e09 262 u64 xferred;/* aggregate bytes transferred */
a5a337d4 263 int result; /* first nonzero obj_request result */
bf0d5f50
AE
264
265 u32 obj_request_count;
266 struct list_head obj_requests; /* rbd_obj_request structs */
267
268 struct kref kref;
269};
270
271#define for_each_obj_request(ireq, oreq) \
ef06f4d3 272 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 273#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 274 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 275#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 276 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 277
f84344f3 278struct rbd_mapping {
99c1f08f 279 u64 size;
34b13184 280 u64 features;
f84344f3
AE
281 bool read_only;
282};
283
602adf40
YS
284/*
285 * a single device
286 */
287struct rbd_device {
de71a297 288 int dev_id; /* blkdev unique id */
602adf40
YS
289
290 int major; /* blkdev assigned major */
291 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 292
a30b71b9 293 u32 image_format; /* Either 1 or 2 */
602adf40
YS
294 struct rbd_client *rbd_client;
295
296 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
297
b82d167b 298 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
299
300 struct rbd_image_header header;
b82d167b 301 unsigned long flags; /* possibly lock protected */
0d7dbfce 302 struct rbd_spec *spec;
602adf40 303
0d7dbfce 304 char *header_name;
971f839a 305
0903e875
AE
306 struct ceph_file_layout layout;
307
59c2be1e 308 struct ceph_osd_event *watch_event;
975241af 309 struct rbd_obj_request *watch_request;
59c2be1e 310
86b00e0d
AE
311 struct rbd_spec *parent_spec;
312 u64 parent_overlap;
2f82ee54 313 struct rbd_device *parent;
86b00e0d 314
c666601a
JD
315 /* protects updating the header */
316 struct rw_semaphore header_rwsem;
f84344f3
AE
317
318 struct rbd_mapping mapping;
602adf40
YS
319
320 struct list_head node;
dfc5606d 321
dfc5606d
YS
322 /* sysfs related */
323 struct device dev;
b82d167b 324 unsigned long open_count; /* protected by lock */
dfc5606d
YS
325};
326
b82d167b
AE
327/*
328 * Flag bits for rbd_dev->flags. If atomicity is required,
329 * rbd_dev->lock is used to protect access.
330 *
331 * Currently, only the "removing" flag (which is coupled with the
332 * "open_count" field) requires atomic access.
333 */
6d292906
AE
334enum rbd_dev_flags {
335 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 336 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
337};
338
602adf40 339static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 340
602adf40 341static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
342static DEFINE_SPINLOCK(rbd_dev_list_lock);
343
432b8587
AE
344static LIST_HEAD(rbd_client_list); /* clients */
345static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 346
78c2a44a
AE
347/* Slab caches for frequently-allocated structures */
348
1c2a9dfe 349static struct kmem_cache *rbd_img_request_cache;
868311b1 350static struct kmem_cache *rbd_obj_request_cache;
78c2a44a 351static struct kmem_cache *rbd_segment_name_cache;
1c2a9dfe 352
3d7efd18
AE
353static int rbd_img_request_submit(struct rbd_img_request *img_request);
354
200a6a8b 355static void rbd_dev_device_release(struct device *dev);
dfc5606d 356
f0f8cef5
AE
357static ssize_t rbd_add(struct bus_type *bus, const char *buf,
358 size_t count);
359static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
360 size_t count);
51344a38 361static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only);
f0f8cef5
AE
362
363static struct bus_attribute rbd_bus_attrs[] = {
364 __ATTR(add, S_IWUSR, NULL, rbd_add),
365 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
366 __ATTR_NULL
367};
368
369static struct bus_type rbd_bus_type = {
370 .name = "rbd",
371 .bus_attrs = rbd_bus_attrs,
372};
373
374static void rbd_root_dev_release(struct device *dev)
375{
376}
377
378static struct device rbd_root_dev = {
379 .init_name = "rbd",
380 .release = rbd_root_dev_release,
381};
382
06ecc6cb
AE
383static __printf(2, 3)
384void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
385{
386 struct va_format vaf;
387 va_list args;
388
389 va_start(args, fmt);
390 vaf.fmt = fmt;
391 vaf.va = &args;
392
393 if (!rbd_dev)
394 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
395 else if (rbd_dev->disk)
396 printk(KERN_WARNING "%s: %s: %pV\n",
397 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
398 else if (rbd_dev->spec && rbd_dev->spec->image_name)
399 printk(KERN_WARNING "%s: image %s: %pV\n",
400 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
401 else if (rbd_dev->spec && rbd_dev->spec->image_id)
402 printk(KERN_WARNING "%s: id %s: %pV\n",
403 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
404 else /* punt */
405 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
406 RBD_DRV_NAME, rbd_dev, &vaf);
407 va_end(args);
408}
409
aafb230e
AE
410#ifdef RBD_DEBUG
411#define rbd_assert(expr) \
412 if (unlikely(!(expr))) { \
413 printk(KERN_ERR "\nAssertion failure in %s() " \
414 "at line %d:\n\n" \
415 "\trbd_assert(%s);\n\n", \
416 __func__, __LINE__, #expr); \
417 BUG(); \
418 }
419#else /* !RBD_DEBUG */
420# define rbd_assert(expr) ((void) 0)
421#endif /* !RBD_DEBUG */
dfc5606d 422
b454e36d 423static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
424static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
425static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 426
cc4a38bd
AE
427static int rbd_dev_refresh(struct rbd_device *rbd_dev);
428static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
54cac61f
AE
429static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
430 u64 snap_id);
2ad3d716
AE
431static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
432 u8 *order, u64 *snap_size);
433static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
434 u64 *snap_features);
435static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
59c2be1e 436
602adf40
YS
437static int rbd_open(struct block_device *bdev, fmode_t mode)
438{
f0f8cef5 439 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 440 bool removing = false;
602adf40 441
f84344f3 442 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
443 return -EROFS;
444
a14ea269 445 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
446 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
447 removing = true;
448 else
449 rbd_dev->open_count++;
a14ea269 450 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
451 if (removing)
452 return -ENOENT;
453
42382b70 454 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 455 (void) get_device(&rbd_dev->dev);
f84344f3 456 set_device_ro(bdev, rbd_dev->mapping.read_only);
42382b70 457 mutex_unlock(&ctl_mutex);
340c7a2b 458
602adf40
YS
459 return 0;
460}
461
dfc5606d
YS
462static int rbd_release(struct gendisk *disk, fmode_t mode)
463{
464 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
465 unsigned long open_count_before;
466
a14ea269 467 spin_lock_irq(&rbd_dev->lock);
b82d167b 468 open_count_before = rbd_dev->open_count--;
a14ea269 469 spin_unlock_irq(&rbd_dev->lock);
b82d167b 470 rbd_assert(open_count_before > 0);
dfc5606d 471
42382b70 472 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 473 put_device(&rbd_dev->dev);
42382b70 474 mutex_unlock(&ctl_mutex);
dfc5606d
YS
475
476 return 0;
477}
478
602adf40
YS
479static const struct block_device_operations rbd_bd_ops = {
480 .owner = THIS_MODULE,
481 .open = rbd_open,
dfc5606d 482 .release = rbd_release,
602adf40
YS
483};
484
485/*
486 * Initialize an rbd client instance.
43ae4701 487 * We own *ceph_opts.
602adf40 488 */
f8c38929 489static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
490{
491 struct rbd_client *rbdc;
492 int ret = -ENOMEM;
493
37206ee5 494 dout("%s:\n", __func__);
602adf40
YS
495 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
496 if (!rbdc)
497 goto out_opt;
498
499 kref_init(&rbdc->kref);
500 INIT_LIST_HEAD(&rbdc->node);
501
bc534d86
AE
502 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
503
43ae4701 504 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 505 if (IS_ERR(rbdc->client))
bc534d86 506 goto out_mutex;
43ae4701 507 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
508
509 ret = ceph_open_session(rbdc->client);
510 if (ret < 0)
511 goto out_err;
512
432b8587 513 spin_lock(&rbd_client_list_lock);
602adf40 514 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 515 spin_unlock(&rbd_client_list_lock);
602adf40 516
bc534d86 517 mutex_unlock(&ctl_mutex);
37206ee5 518 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 519
602adf40
YS
520 return rbdc;
521
522out_err:
523 ceph_destroy_client(rbdc->client);
bc534d86
AE
524out_mutex:
525 mutex_unlock(&ctl_mutex);
602adf40
YS
526 kfree(rbdc);
527out_opt:
43ae4701
AE
528 if (ceph_opts)
529 ceph_destroy_options(ceph_opts);
37206ee5
AE
530 dout("%s: error %d\n", __func__, ret);
531
28f259b7 532 return ERR_PTR(ret);
602adf40
YS
533}
534
2f82ee54
AE
535static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
536{
537 kref_get(&rbdc->kref);
538
539 return rbdc;
540}
541
602adf40 542/*
1f7ba331
AE
543 * Find a ceph client with specific addr and configuration. If
544 * found, bump its reference count.
602adf40 545 */
1f7ba331 546static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
547{
548 struct rbd_client *client_node;
1f7ba331 549 bool found = false;
602adf40 550
43ae4701 551 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
552 return NULL;
553
1f7ba331
AE
554 spin_lock(&rbd_client_list_lock);
555 list_for_each_entry(client_node, &rbd_client_list, node) {
556 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
557 __rbd_get_client(client_node);
558
1f7ba331
AE
559 found = true;
560 break;
561 }
562 }
563 spin_unlock(&rbd_client_list_lock);
564
565 return found ? client_node : NULL;
602adf40
YS
566}
567
59c2be1e
YS
568/*
569 * mount options
570 */
571enum {
59c2be1e
YS
572 Opt_last_int,
573 /* int args above */
574 Opt_last_string,
575 /* string args above */
cc0538b6
AE
576 Opt_read_only,
577 Opt_read_write,
578 /* Boolean args above */
579 Opt_last_bool,
59c2be1e
YS
580};
581
43ae4701 582static match_table_t rbd_opts_tokens = {
59c2be1e
YS
583 /* int args above */
584 /* string args above */
be466c1c 585 {Opt_read_only, "read_only"},
cc0538b6
AE
586 {Opt_read_only, "ro"}, /* Alternate spelling */
587 {Opt_read_write, "read_write"},
588 {Opt_read_write, "rw"}, /* Alternate spelling */
589 /* Boolean args above */
59c2be1e
YS
590 {-1, NULL}
591};
592
98571b5a
AE
593struct rbd_options {
594 bool read_only;
595};
596
597#define RBD_READ_ONLY_DEFAULT false
598
59c2be1e
YS
599static int parse_rbd_opts_token(char *c, void *private)
600{
43ae4701 601 struct rbd_options *rbd_opts = private;
59c2be1e
YS
602 substring_t argstr[MAX_OPT_ARGS];
603 int token, intval, ret;
604
43ae4701 605 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
606 if (token < 0)
607 return -EINVAL;
608
609 if (token < Opt_last_int) {
610 ret = match_int(&argstr[0], &intval);
611 if (ret < 0) {
612 pr_err("bad mount option arg (not int) "
613 "at '%s'\n", c);
614 return ret;
615 }
616 dout("got int token %d val %d\n", token, intval);
617 } else if (token > Opt_last_int && token < Opt_last_string) {
618 dout("got string token %d val %s\n", token,
619 argstr[0].from);
cc0538b6
AE
620 } else if (token > Opt_last_string && token < Opt_last_bool) {
621 dout("got Boolean token %d\n", token);
59c2be1e
YS
622 } else {
623 dout("got token %d\n", token);
624 }
625
626 switch (token) {
cc0538b6
AE
627 case Opt_read_only:
628 rbd_opts->read_only = true;
629 break;
630 case Opt_read_write:
631 rbd_opts->read_only = false;
632 break;
59c2be1e 633 default:
aafb230e
AE
634 rbd_assert(false);
635 break;
59c2be1e
YS
636 }
637 return 0;
638}
639
602adf40
YS
640/*
641 * Get a ceph client with specific addr and configuration, if one does
642 * not exist create it.
643 */
9d3997fd 644static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 645{
f8c38929 646 struct rbd_client *rbdc;
59c2be1e 647
1f7ba331 648 rbdc = rbd_client_find(ceph_opts);
9d3997fd 649 if (rbdc) /* using an existing client */
43ae4701 650 ceph_destroy_options(ceph_opts);
9d3997fd 651 else
f8c38929 652 rbdc = rbd_client_create(ceph_opts);
602adf40 653
9d3997fd 654 return rbdc;
602adf40
YS
655}
656
657/*
658 * Destroy ceph client
d23a4b3f 659 *
432b8587 660 * Caller must hold rbd_client_list_lock.
602adf40
YS
661 */
662static void rbd_client_release(struct kref *kref)
663{
664 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
665
37206ee5 666 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 667 spin_lock(&rbd_client_list_lock);
602adf40 668 list_del(&rbdc->node);
cd9d9f5d 669 spin_unlock(&rbd_client_list_lock);
602adf40
YS
670
671 ceph_destroy_client(rbdc->client);
672 kfree(rbdc);
673}
674
675/*
676 * Drop reference to ceph client node. If it's not referenced anymore, release
677 * it.
678 */
9d3997fd 679static void rbd_put_client(struct rbd_client *rbdc)
602adf40 680{
c53d5893
AE
681 if (rbdc)
682 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
683}
684
a30b71b9
AE
685static bool rbd_image_format_valid(u32 image_format)
686{
687 return image_format == 1 || image_format == 2;
688}
689
8e94af8e
AE
690static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
691{
103a150f
AE
692 size_t size;
693 u32 snap_count;
694
695 /* The header has to start with the magic rbd header text */
696 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
697 return false;
698
db2388b6
AE
699 /* The bio layer requires at least sector-sized I/O */
700
701 if (ondisk->options.order < SECTOR_SHIFT)
702 return false;
703
704 /* If we use u64 in a few spots we may be able to loosen this */
705
706 if (ondisk->options.order > 8 * sizeof (int) - 1)
707 return false;
708
103a150f
AE
709 /*
710 * The size of a snapshot header has to fit in a size_t, and
711 * that limits the number of snapshots.
712 */
713 snap_count = le32_to_cpu(ondisk->snap_count);
714 size = SIZE_MAX - sizeof (struct ceph_snap_context);
715 if (snap_count > size / sizeof (__le64))
716 return false;
717
718 /*
719 * Not only that, but the size of the entire the snapshot
720 * header must also be representable in a size_t.
721 */
722 size -= snap_count * sizeof (__le64);
723 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
724 return false;
725
726 return true;
8e94af8e
AE
727}
728
602adf40 729/*
bb23e37a
AE
730 * Fill an rbd image header with information from the given format 1
731 * on-disk header.
602adf40 732 */
662518b1 733static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 734 struct rbd_image_header_ondisk *ondisk)
602adf40 735{
662518b1 736 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
737 bool first_time = header->object_prefix == NULL;
738 struct ceph_snap_context *snapc;
739 char *object_prefix = NULL;
740 char *snap_names = NULL;
741 u64 *snap_sizes = NULL;
ccece235 742 u32 snap_count;
d2bb24e5 743 size_t size;
bb23e37a 744 int ret = -ENOMEM;
621901d6 745 u32 i;
602adf40 746
bb23e37a
AE
747 /* Allocate this now to avoid having to handle failure below */
748
749 if (first_time) {
750 size_t len;
751
752 len = strnlen(ondisk->object_prefix,
753 sizeof (ondisk->object_prefix));
754 object_prefix = kmalloc(len + 1, GFP_KERNEL);
755 if (!object_prefix)
756 return -ENOMEM;
757 memcpy(object_prefix, ondisk->object_prefix, len);
758 object_prefix[len] = '\0';
759 }
103a150f 760
bb23e37a 761 /* Allocate the snapshot context and fill it in */
00f1f36f 762
bb23e37a
AE
763 snap_count = le32_to_cpu(ondisk->snap_count);
764 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
765 if (!snapc)
766 goto out_err;
767 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 768 if (snap_count) {
bb23e37a 769 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
770 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
771
bb23e37a 772 /* We'll keep a copy of the snapshot names... */
621901d6 773
bb23e37a
AE
774 if (snap_names_len > (u64)SIZE_MAX)
775 goto out_2big;
776 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
777 if (!snap_names)
6a52325f
AE
778 goto out_err;
779
bb23e37a 780 /* ...as well as the array of their sizes. */
621901d6 781
d2bb24e5 782 size = snap_count * sizeof (*header->snap_sizes);
bb23e37a
AE
783 snap_sizes = kmalloc(size, GFP_KERNEL);
784 if (!snap_sizes)
6a52325f 785 goto out_err;
bb23e37a
AE
786
787 /*
788 * Copy the names, and fill in each snapshot's id
789 * and size.
790 *
791 * Note that rbd_dev_v1_header_read() guarantees the
792 * ondisk buffer we're working with has
793 * snap_names_len bytes beyond the end of the
794 * snapshot id array, this memcpy() is safe.
795 */
796 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
797 snaps = ondisk->snaps;
798 for (i = 0; i < snap_count; i++) {
799 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
800 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
801 }
602adf40 802 }
849b4260 803
bb23e37a
AE
804 /* We won't fail any more, fill in the header */
805
662518b1 806 down_write(&rbd_dev->header_rwsem);
bb23e37a
AE
807 if (first_time) {
808 header->object_prefix = object_prefix;
809 header->obj_order = ondisk->options.order;
810 header->crypt_type = ondisk->options.crypt_type;
811 header->comp_type = ondisk->options.comp_type;
812 /* The rest aren't used for format 1 images */
813 header->stripe_unit = 0;
814 header->stripe_count = 0;
815 header->features = 0;
662518b1
AE
816 } else {
817 ceph_put_snap_context(header->snapc);
818 kfree(header->snap_names);
819 kfree(header->snap_sizes);
bb23e37a 820 }
6a52325f 821
bb23e37a 822 /* The remaining fields always get updated (when we refresh) */
621901d6 823
f84344f3 824 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
825 header->snapc = snapc;
826 header->snap_names = snap_names;
827 header->snap_sizes = snap_sizes;
602adf40 828
662518b1
AE
829 /* Make sure mapping size is consistent with header info */
830
831 if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
832 if (rbd_dev->mapping.size != header->image_size)
833 rbd_dev->mapping.size = header->image_size;
834
835 up_write(&rbd_dev->header_rwsem);
836
602adf40 837 return 0;
bb23e37a
AE
838out_2big:
839 ret = -EIO;
6a52325f 840out_err:
bb23e37a
AE
841 kfree(snap_sizes);
842 kfree(snap_names);
843 ceph_put_snap_context(snapc);
844 kfree(object_prefix);
845
846 return ret;
602adf40
YS
847}
848
9682fc6d
AE
849static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
850{
851 const char *snap_name;
852
853 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
854
855 /* Skip over names until we find the one we are looking for */
856
857 snap_name = rbd_dev->header.snap_names;
858 while (which--)
859 snap_name += strlen(snap_name) + 1;
860
861 return kstrdup(snap_name, GFP_KERNEL);
862}
863
30d1cff8
AE
864/*
865 * Snapshot id comparison function for use with qsort()/bsearch().
866 * Note that result is for snapshots in *descending* order.
867 */
868static int snapid_compare_reverse(const void *s1, const void *s2)
869{
870 u64 snap_id1 = *(u64 *)s1;
871 u64 snap_id2 = *(u64 *)s2;
872
873 if (snap_id1 < snap_id2)
874 return 1;
875 return snap_id1 == snap_id2 ? 0 : -1;
876}
877
878/*
879 * Search a snapshot context to see if the given snapshot id is
880 * present.
881 *
882 * Returns the position of the snapshot id in the array if it's found,
883 * or BAD_SNAP_INDEX otherwise.
884 *
885 * Note: The snapshot array is in kept sorted (by the osd) in
886 * reverse order, highest snapshot id first.
887 */
9682fc6d
AE
888static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
889{
890 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 891 u64 *found;
9682fc6d 892
30d1cff8
AE
893 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
894 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 895
30d1cff8 896 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
897}
898
2ad3d716
AE
899static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
900 u64 snap_id)
9e15b77d 901{
54cac61f 902 u32 which;
9e15b77d 903
54cac61f
AE
904 which = rbd_dev_snap_index(rbd_dev, snap_id);
905 if (which == BAD_SNAP_INDEX)
906 return NULL;
907
908 return _rbd_dev_v1_snap_name(rbd_dev, which);
909}
910
911static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
912{
9e15b77d
AE
913 if (snap_id == CEPH_NOSNAP)
914 return RBD_SNAP_HEAD_NAME;
915
54cac61f
AE
916 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
917 if (rbd_dev->image_format == 1)
918 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 919
54cac61f 920 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
921}
922
2ad3d716
AE
923static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
924 u64 *snap_size)
602adf40 925{
2ad3d716
AE
926 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
927 if (snap_id == CEPH_NOSNAP) {
928 *snap_size = rbd_dev->header.image_size;
929 } else if (rbd_dev->image_format == 1) {
930 u32 which;
602adf40 931
2ad3d716
AE
932 which = rbd_dev_snap_index(rbd_dev, snap_id);
933 if (which == BAD_SNAP_INDEX)
934 return -ENOENT;
e86924a8 935
2ad3d716
AE
936 *snap_size = rbd_dev->header.snap_sizes[which];
937 } else {
938 u64 size = 0;
939 int ret;
940
941 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
942 if (ret)
943 return ret;
944
945 *snap_size = size;
946 }
947 return 0;
602adf40
YS
948}
949
2ad3d716
AE
950static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
951 u64 *snap_features)
602adf40 952{
2ad3d716
AE
953 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
954 if (snap_id == CEPH_NOSNAP) {
955 *snap_features = rbd_dev->header.features;
956 } else if (rbd_dev->image_format == 1) {
957 *snap_features = 0; /* No features for format 1 */
602adf40 958 } else {
2ad3d716
AE
959 u64 features = 0;
960 int ret;
8b0241f8 961
2ad3d716
AE
962 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
963 if (ret)
964 return ret;
965
966 *snap_features = features;
967 }
968 return 0;
969}
970
971static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
972{
8f4b7d98 973 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
974 u64 size = 0;
975 u64 features = 0;
976 int ret;
977
2ad3d716
AE
978 ret = rbd_snap_size(rbd_dev, snap_id, &size);
979 if (ret)
980 return ret;
981 ret = rbd_snap_features(rbd_dev, snap_id, &features);
982 if (ret)
983 return ret;
984
985 rbd_dev->mapping.size = size;
986 rbd_dev->mapping.features = features;
987
8b0241f8 988 return 0;
602adf40
YS
989}
990
d1cf5788
AE
991static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
992{
993 rbd_dev->mapping.size = 0;
994 rbd_dev->mapping.features = 0;
d1cf5788
AE
995}
996
98571b5a 997static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 998{
65ccfe21
AE
999 char *name;
1000 u64 segment;
1001 int ret;
602adf40 1002
78c2a44a 1003 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
65ccfe21
AE
1004 if (!name)
1005 return NULL;
1006 segment = offset >> rbd_dev->header.obj_order;
2fd82b9e 1007 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
65ccfe21 1008 rbd_dev->header.object_prefix, segment);
2fd82b9e 1009 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
1010 pr_err("error formatting segment name for #%llu (%d)\n",
1011 segment, ret);
1012 kfree(name);
1013 name = NULL;
1014 }
602adf40 1015
65ccfe21
AE
1016 return name;
1017}
602adf40 1018
78c2a44a
AE
1019static void rbd_segment_name_free(const char *name)
1020{
1021 /* The explicit cast here is needed to drop the const qualifier */
1022
1023 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1024}
1025
65ccfe21
AE
1026static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1027{
1028 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 1029
65ccfe21
AE
1030 return offset & (segment_size - 1);
1031}
1032
1033static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1034 u64 offset, u64 length)
1035{
1036 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1037
1038 offset &= segment_size - 1;
1039
aafb230e 1040 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1041 if (offset + length > segment_size)
1042 length = segment_size - offset;
1043
1044 return length;
602adf40
YS
1045}
1046
029bcbd8
JD
1047/*
1048 * returns the size of an object in the image
1049 */
1050static u64 rbd_obj_bytes(struct rbd_image_header *header)
1051{
1052 return 1 << header->obj_order;
1053}
1054
602adf40
YS
1055/*
1056 * bio helpers
1057 */
1058
1059static void bio_chain_put(struct bio *chain)
1060{
1061 struct bio *tmp;
1062
1063 while (chain) {
1064 tmp = chain;
1065 chain = chain->bi_next;
1066 bio_put(tmp);
1067 }
1068}
1069
1070/*
1071 * zeros a bio chain, starting at specific offset
1072 */
1073static void zero_bio_chain(struct bio *chain, int start_ofs)
1074{
1075 struct bio_vec *bv;
1076 unsigned long flags;
1077 void *buf;
1078 int i;
1079 int pos = 0;
1080
1081 while (chain) {
1082 bio_for_each_segment(bv, chain, i) {
1083 if (pos + bv->bv_len > start_ofs) {
1084 int remainder = max(start_ofs - pos, 0);
1085 buf = bvec_kmap_irq(bv, &flags);
1086 memset(buf + remainder, 0,
1087 bv->bv_len - remainder);
85b5aaa6 1088 bvec_kunmap_irq(buf, &flags);
602adf40
YS
1089 }
1090 pos += bv->bv_len;
1091 }
1092
1093 chain = chain->bi_next;
1094 }
1095}
1096
b9434c5b
AE
1097/*
1098 * similar to zero_bio_chain(), zeros data defined by a page array,
1099 * starting at the given byte offset from the start of the array and
1100 * continuing up to the given end offset. The pages array is
1101 * assumed to be big enough to hold all bytes up to the end.
1102 */
1103static void zero_pages(struct page **pages, u64 offset, u64 end)
1104{
1105 struct page **page = &pages[offset >> PAGE_SHIFT];
1106
1107 rbd_assert(end > offset);
1108 rbd_assert(end - offset <= (u64)SIZE_MAX);
1109 while (offset < end) {
1110 size_t page_offset;
1111 size_t length;
1112 unsigned long flags;
1113 void *kaddr;
1114
1115 page_offset = (size_t)(offset & ~PAGE_MASK);
1116 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1117 local_irq_save(flags);
1118 kaddr = kmap_atomic(*page);
1119 memset(kaddr + page_offset, 0, length);
1120 kunmap_atomic(kaddr);
1121 local_irq_restore(flags);
1122
1123 offset += length;
1124 page++;
1125 }
1126}
1127
602adf40 1128/*
f7760dad
AE
1129 * Clone a portion of a bio, starting at the given byte offset
1130 * and continuing for the number of bytes indicated.
602adf40 1131 */
f7760dad
AE
1132static struct bio *bio_clone_range(struct bio *bio_src,
1133 unsigned int offset,
1134 unsigned int len,
1135 gfp_t gfpmask)
602adf40 1136{
f7760dad
AE
1137 struct bio_vec *bv;
1138 unsigned int resid;
1139 unsigned short idx;
1140 unsigned int voff;
1141 unsigned short end_idx;
1142 unsigned short vcnt;
1143 struct bio *bio;
1144
1145 /* Handle the easy case for the caller */
1146
1147 if (!offset && len == bio_src->bi_size)
1148 return bio_clone(bio_src, gfpmask);
1149
1150 if (WARN_ON_ONCE(!len))
1151 return NULL;
1152 if (WARN_ON_ONCE(len > bio_src->bi_size))
1153 return NULL;
1154 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1155 return NULL;
1156
1157 /* Find first affected segment... */
1158
1159 resid = offset;
1160 __bio_for_each_segment(bv, bio_src, idx, 0) {
1161 if (resid < bv->bv_len)
1162 break;
1163 resid -= bv->bv_len;
602adf40 1164 }
f7760dad 1165 voff = resid;
602adf40 1166
f7760dad 1167 /* ...and the last affected segment */
602adf40 1168
f7760dad
AE
1169 resid += len;
1170 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1171 if (resid <= bv->bv_len)
1172 break;
1173 resid -= bv->bv_len;
1174 }
1175 vcnt = end_idx - idx + 1;
1176
1177 /* Build the clone */
1178
1179 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1180 if (!bio)
1181 return NULL; /* ENOMEM */
602adf40 1182
f7760dad
AE
1183 bio->bi_bdev = bio_src->bi_bdev;
1184 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1185 bio->bi_rw = bio_src->bi_rw;
1186 bio->bi_flags |= 1 << BIO_CLONED;
1187
1188 /*
1189 * Copy over our part of the bio_vec, then update the first
1190 * and last (or only) entries.
1191 */
1192 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1193 vcnt * sizeof (struct bio_vec));
1194 bio->bi_io_vec[0].bv_offset += voff;
1195 if (vcnt > 1) {
1196 bio->bi_io_vec[0].bv_len -= voff;
1197 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1198 } else {
1199 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
1200 }
1201
f7760dad
AE
1202 bio->bi_vcnt = vcnt;
1203 bio->bi_size = len;
1204 bio->bi_idx = 0;
1205
1206 return bio;
1207}
1208
1209/*
1210 * Clone a portion of a bio chain, starting at the given byte offset
1211 * into the first bio in the source chain and continuing for the
1212 * number of bytes indicated. The result is another bio chain of
1213 * exactly the given length, or a null pointer on error.
1214 *
1215 * The bio_src and offset parameters are both in-out. On entry they
1216 * refer to the first source bio and the offset into that bio where
1217 * the start of data to be cloned is located.
1218 *
1219 * On return, bio_src is updated to refer to the bio in the source
1220 * chain that contains first un-cloned byte, and *offset will
1221 * contain the offset of that byte within that bio.
1222 */
1223static struct bio *bio_chain_clone_range(struct bio **bio_src,
1224 unsigned int *offset,
1225 unsigned int len,
1226 gfp_t gfpmask)
1227{
1228 struct bio *bi = *bio_src;
1229 unsigned int off = *offset;
1230 struct bio *chain = NULL;
1231 struct bio **end;
1232
1233 /* Build up a chain of clone bios up to the limit */
1234
1235 if (!bi || off >= bi->bi_size || !len)
1236 return NULL; /* Nothing to clone */
602adf40 1237
f7760dad
AE
1238 end = &chain;
1239 while (len) {
1240 unsigned int bi_size;
1241 struct bio *bio;
1242
f5400b7a
AE
1243 if (!bi) {
1244 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1245 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1246 }
f7760dad
AE
1247 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1248 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1249 if (!bio)
1250 goto out_err; /* ENOMEM */
1251
1252 *end = bio;
1253 end = &bio->bi_next;
602adf40 1254
f7760dad
AE
1255 off += bi_size;
1256 if (off == bi->bi_size) {
1257 bi = bi->bi_next;
1258 off = 0;
1259 }
1260 len -= bi_size;
1261 }
1262 *bio_src = bi;
1263 *offset = off;
1264
1265 return chain;
1266out_err:
1267 bio_chain_put(chain);
602adf40 1268
602adf40
YS
1269 return NULL;
1270}
1271
926f9b3f
AE
1272/*
1273 * The default/initial value for all object request flags is 0. For
1274 * each flag, once its value is set to 1 it is never reset to 0
1275 * again.
1276 */
57acbaa7 1277static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1278{
57acbaa7 1279 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1280 struct rbd_device *rbd_dev;
1281
57acbaa7
AE
1282 rbd_dev = obj_request->img_request->rbd_dev;
1283 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
926f9b3f
AE
1284 obj_request);
1285 }
1286}
1287
57acbaa7 1288static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1289{
1290 smp_mb();
57acbaa7 1291 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1292}
1293
57acbaa7 1294static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1295{
57acbaa7
AE
1296 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1297 struct rbd_device *rbd_dev = NULL;
6365d33a 1298
57acbaa7
AE
1299 if (obj_request_img_data_test(obj_request))
1300 rbd_dev = obj_request->img_request->rbd_dev;
1301 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
6365d33a
AE
1302 obj_request);
1303 }
1304}
1305
57acbaa7 1306static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1307{
1308 smp_mb();
57acbaa7 1309 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1310}
1311
5679c59f
AE
1312/*
1313 * This sets the KNOWN flag after (possibly) setting the EXISTS
1314 * flag. The latter is set based on the "exists" value provided.
1315 *
1316 * Note that for our purposes once an object exists it never goes
1317 * away again. It's possible that the response from two existence
1318 * checks are separated by the creation of the target object, and
1319 * the first ("doesn't exist") response arrives *after* the second
1320 * ("does exist"). In that case we ignore the second one.
1321 */
1322static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1323 bool exists)
1324{
1325 if (exists)
1326 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1327 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1328 smp_mb();
1329}
1330
1331static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1332{
1333 smp_mb();
1334 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1335}
1336
1337static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1338{
1339 smp_mb();
1340 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1341}
1342
bf0d5f50
AE
1343static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1344{
37206ee5
AE
1345 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1346 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1347 kref_get(&obj_request->kref);
1348}
1349
1350static void rbd_obj_request_destroy(struct kref *kref);
1351static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1352{
1353 rbd_assert(obj_request != NULL);
37206ee5
AE
1354 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1355 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1356 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1357}
1358
1359static void rbd_img_request_get(struct rbd_img_request *img_request)
1360{
37206ee5
AE
1361 dout("%s: img %p (was %d)\n", __func__, img_request,
1362 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1363 kref_get(&img_request->kref);
1364}
1365
1366static void rbd_img_request_destroy(struct kref *kref);
1367static void rbd_img_request_put(struct rbd_img_request *img_request)
1368{
1369 rbd_assert(img_request != NULL);
37206ee5
AE
1370 dout("%s: img %p (was %d)\n", __func__, img_request,
1371 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1372 kref_put(&img_request->kref, rbd_img_request_destroy);
1373}
1374
1375static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1376 struct rbd_obj_request *obj_request)
1377{
25dcf954
AE
1378 rbd_assert(obj_request->img_request == NULL);
1379
b155e86c 1380 /* Image request now owns object's original reference */
bf0d5f50 1381 obj_request->img_request = img_request;
25dcf954 1382 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1383 rbd_assert(!obj_request_img_data_test(obj_request));
1384 obj_request_img_data_set(obj_request);
bf0d5f50 1385 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1386 img_request->obj_request_count++;
1387 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1388 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1389 obj_request->which);
bf0d5f50
AE
1390}
1391
1392static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1393 struct rbd_obj_request *obj_request)
1394{
1395 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1396
37206ee5
AE
1397 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1398 obj_request->which);
bf0d5f50 1399 list_del(&obj_request->links);
25dcf954
AE
1400 rbd_assert(img_request->obj_request_count > 0);
1401 img_request->obj_request_count--;
1402 rbd_assert(obj_request->which == img_request->obj_request_count);
1403 obj_request->which = BAD_WHICH;
6365d33a 1404 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1405 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1406 obj_request->img_request = NULL;
25dcf954 1407 obj_request->callback = NULL;
bf0d5f50
AE
1408 rbd_obj_request_put(obj_request);
1409}
1410
1411static bool obj_request_type_valid(enum obj_request_type type)
1412{
1413 switch (type) {
9969ebc5 1414 case OBJ_REQUEST_NODATA:
bf0d5f50 1415 case OBJ_REQUEST_BIO:
788e2df3 1416 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1417 return true;
1418 default:
1419 return false;
1420 }
1421}
1422
bf0d5f50
AE
1423static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1424 struct rbd_obj_request *obj_request)
1425{
37206ee5
AE
1426 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1427
bf0d5f50
AE
1428 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1429}
1430
1431static void rbd_img_request_complete(struct rbd_img_request *img_request)
1432{
55f27e09 1433
37206ee5 1434 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1435
1436 /*
1437 * If no error occurred, compute the aggregate transfer
1438 * count for the image request. We could instead use
1439 * atomic64_cmpxchg() to update it as each object request
1440 * completes; not clear which way is better off hand.
1441 */
1442 if (!img_request->result) {
1443 struct rbd_obj_request *obj_request;
1444 u64 xferred = 0;
1445
1446 for_each_obj_request(img_request, obj_request)
1447 xferred += obj_request->xferred;
1448 img_request->xferred = xferred;
1449 }
1450
bf0d5f50
AE
1451 if (img_request->callback)
1452 img_request->callback(img_request);
1453 else
1454 rbd_img_request_put(img_request);
1455}
1456
788e2df3
AE
1457/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1458
1459static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1460{
37206ee5
AE
1461 dout("%s: obj %p\n", __func__, obj_request);
1462
788e2df3
AE
1463 return wait_for_completion_interruptible(&obj_request->completion);
1464}
1465
0c425248
AE
1466/*
1467 * The default/initial value for all image request flags is 0. Each
1468 * is conditionally set to 1 at image request initialization time
1469 * and currently never change thereafter.
1470 */
1471static void img_request_write_set(struct rbd_img_request *img_request)
1472{
1473 set_bit(IMG_REQ_WRITE, &img_request->flags);
1474 smp_mb();
1475}
1476
1477static bool img_request_write_test(struct rbd_img_request *img_request)
1478{
1479 smp_mb();
1480 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1481}
1482
9849e986
AE
1483static void img_request_child_set(struct rbd_img_request *img_request)
1484{
1485 set_bit(IMG_REQ_CHILD, &img_request->flags);
1486 smp_mb();
1487}
1488
1489static bool img_request_child_test(struct rbd_img_request *img_request)
1490{
1491 smp_mb();
1492 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1493}
1494
d0b2e944
AE
1495static void img_request_layered_set(struct rbd_img_request *img_request)
1496{
1497 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1498 smp_mb();
1499}
1500
1501static bool img_request_layered_test(struct rbd_img_request *img_request)
1502{
1503 smp_mb();
1504 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1505}
1506
6e2a4505
AE
1507static void
1508rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1509{
b9434c5b
AE
1510 u64 xferred = obj_request->xferred;
1511 u64 length = obj_request->length;
1512
6e2a4505
AE
1513 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1514 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1515 xferred, length);
6e2a4505
AE
1516 /*
1517 * ENOENT means a hole in the image. We zero-fill the
1518 * entire length of the request. A short read also implies
1519 * zero-fill to the end of the request. Either way we
1520 * update the xferred count to indicate the whole request
1521 * was satisfied.
1522 */
b9434c5b 1523 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1524 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1525 if (obj_request->type == OBJ_REQUEST_BIO)
1526 zero_bio_chain(obj_request->bio_list, 0);
1527 else
1528 zero_pages(obj_request->pages, 0, length);
6e2a4505 1529 obj_request->result = 0;
b9434c5b
AE
1530 obj_request->xferred = length;
1531 } else if (xferred < length && !obj_request->result) {
1532 if (obj_request->type == OBJ_REQUEST_BIO)
1533 zero_bio_chain(obj_request->bio_list, xferred);
1534 else
1535 zero_pages(obj_request->pages, xferred, length);
1536 obj_request->xferred = length;
6e2a4505
AE
1537 }
1538 obj_request_done_set(obj_request);
1539}
1540
bf0d5f50
AE
1541static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1542{
37206ee5
AE
1543 dout("%s: obj %p cb %p\n", __func__, obj_request,
1544 obj_request->callback);
bf0d5f50
AE
1545 if (obj_request->callback)
1546 obj_request->callback(obj_request);
788e2df3
AE
1547 else
1548 complete_all(&obj_request->completion);
bf0d5f50
AE
1549}
1550
c47f9371 1551static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1552{
1553 dout("%s: obj %p\n", __func__, obj_request);
1554 obj_request_done_set(obj_request);
1555}
1556
c47f9371 1557static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1558{
57acbaa7 1559 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1560 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1561 bool layered = false;
1562
1563 if (obj_request_img_data_test(obj_request)) {
1564 img_request = obj_request->img_request;
1565 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1566 rbd_dev = img_request->rbd_dev;
57acbaa7 1567 }
8b3e1a56
AE
1568
1569 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1570 obj_request, img_request, obj_request->result,
1571 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1572 if (layered && obj_request->result == -ENOENT &&
1573 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1574 rbd_img_parent_read(obj_request);
1575 else if (img_request)
6e2a4505
AE
1576 rbd_img_obj_request_read_callback(obj_request);
1577 else
1578 obj_request_done_set(obj_request);
bf0d5f50
AE
1579}
1580
c47f9371 1581static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1582{
1b83bef2
SW
1583 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1584 obj_request->result, obj_request->length);
1585 /*
8b3e1a56
AE
1586 * There is no such thing as a successful short write. Set
1587 * it to our originally-requested length.
1b83bef2
SW
1588 */
1589 obj_request->xferred = obj_request->length;
07741308 1590 obj_request_done_set(obj_request);
bf0d5f50
AE
1591}
1592
fbfab539
AE
1593/*
1594 * For a simple stat call there's nothing to do. We'll do more if
1595 * this is part of a write sequence for a layered image.
1596 */
c47f9371 1597static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1598{
37206ee5 1599 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1600 obj_request_done_set(obj_request);
1601}
1602
bf0d5f50
AE
1603static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1604 struct ceph_msg *msg)
1605{
1606 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1607 u16 opcode;
1608
37206ee5 1609 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50 1610 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1611 if (obj_request_img_data_test(obj_request)) {
1612 rbd_assert(obj_request->img_request);
1613 rbd_assert(obj_request->which != BAD_WHICH);
1614 } else {
1615 rbd_assert(obj_request->which == BAD_WHICH);
1616 }
bf0d5f50 1617
1b83bef2
SW
1618 if (osd_req->r_result < 0)
1619 obj_request->result = osd_req->r_result;
bf0d5f50 1620
0eefd470 1621 BUG_ON(osd_req->r_num_ops > 2);
bf0d5f50 1622
c47f9371
AE
1623 /*
1624 * We support a 64-bit length, but ultimately it has to be
1625 * passed to blk_end_request(), which takes an unsigned int.
1626 */
1b83bef2 1627 obj_request->xferred = osd_req->r_reply_op_len[0];
8b3e1a56 1628 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
79528734 1629 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1630 switch (opcode) {
1631 case CEPH_OSD_OP_READ:
c47f9371 1632 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1633 break;
1634 case CEPH_OSD_OP_WRITE:
c47f9371 1635 rbd_osd_write_callback(obj_request);
bf0d5f50 1636 break;
fbfab539 1637 case CEPH_OSD_OP_STAT:
c47f9371 1638 rbd_osd_stat_callback(obj_request);
fbfab539 1639 break;
36be9a76 1640 case CEPH_OSD_OP_CALL:
b8d70035 1641 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1642 case CEPH_OSD_OP_WATCH:
c47f9371 1643 rbd_osd_trivial_callback(obj_request);
9969ebc5 1644 break;
bf0d5f50
AE
1645 default:
1646 rbd_warn(NULL, "%s: unsupported op %hu\n",
1647 obj_request->object_name, (unsigned short) opcode);
1648 break;
1649 }
1650
07741308 1651 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1652 rbd_obj_request_complete(obj_request);
1653}
1654
9d4df01f 1655static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3
AE
1656{
1657 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1658 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1659 u64 snap_id;
430c28c3 1660
8c042b0d 1661 rbd_assert(osd_req != NULL);
430c28c3 1662
9d4df01f 1663 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
8c042b0d 1664 ceph_osdc_build_request(osd_req, obj_request->offset,
9d4df01f
AE
1665 NULL, snap_id, NULL);
1666}
1667
1668static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1669{
1670 struct rbd_img_request *img_request = obj_request->img_request;
1671 struct ceph_osd_request *osd_req = obj_request->osd_req;
1672 struct ceph_snap_context *snapc;
1673 struct timespec mtime = CURRENT_TIME;
1674
1675 rbd_assert(osd_req != NULL);
1676
1677 snapc = img_request ? img_request->snapc : NULL;
1678 ceph_osdc_build_request(osd_req, obj_request->offset,
1679 snapc, CEPH_NOSNAP, &mtime);
430c28c3
AE
1680}
1681
bf0d5f50
AE
1682static struct ceph_osd_request *rbd_osd_req_create(
1683 struct rbd_device *rbd_dev,
1684 bool write_request,
430c28c3 1685 struct rbd_obj_request *obj_request)
bf0d5f50 1686{
bf0d5f50
AE
1687 struct ceph_snap_context *snapc = NULL;
1688 struct ceph_osd_client *osdc;
1689 struct ceph_osd_request *osd_req;
bf0d5f50 1690
6365d33a
AE
1691 if (obj_request_img_data_test(obj_request)) {
1692 struct rbd_img_request *img_request = obj_request->img_request;
1693
0c425248
AE
1694 rbd_assert(write_request ==
1695 img_request_write_test(img_request));
1696 if (write_request)
bf0d5f50 1697 snapc = img_request->snapc;
bf0d5f50
AE
1698 }
1699
1700 /* Allocate and initialize the request, for the single op */
1701
1702 osdc = &rbd_dev->rbd_client->client->osdc;
1703 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1704 if (!osd_req)
1705 return NULL; /* ENOMEM */
bf0d5f50 1706
430c28c3 1707 if (write_request)
bf0d5f50 1708 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1709 else
bf0d5f50 1710 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1711
1712 osd_req->r_callback = rbd_osd_req_callback;
1713 osd_req->r_priv = obj_request;
1714
1715 osd_req->r_oid_len = strlen(obj_request->object_name);
1716 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1717 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1718
1719 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1720
bf0d5f50
AE
1721 return osd_req;
1722}
1723
0eefd470
AE
1724/*
1725 * Create a copyup osd request based on the information in the
1726 * object request supplied. A copyup request has two osd ops,
1727 * a copyup method call, and a "normal" write request.
1728 */
1729static struct ceph_osd_request *
1730rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1731{
1732 struct rbd_img_request *img_request;
1733 struct ceph_snap_context *snapc;
1734 struct rbd_device *rbd_dev;
1735 struct ceph_osd_client *osdc;
1736 struct ceph_osd_request *osd_req;
1737
1738 rbd_assert(obj_request_img_data_test(obj_request));
1739 img_request = obj_request->img_request;
1740 rbd_assert(img_request);
1741 rbd_assert(img_request_write_test(img_request));
1742
1743 /* Allocate and initialize the request, for the two ops */
1744
1745 snapc = img_request->snapc;
1746 rbd_dev = img_request->rbd_dev;
1747 osdc = &rbd_dev->rbd_client->client->osdc;
1748 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1749 if (!osd_req)
1750 return NULL; /* ENOMEM */
1751
1752 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1753 osd_req->r_callback = rbd_osd_req_callback;
1754 osd_req->r_priv = obj_request;
1755
1756 osd_req->r_oid_len = strlen(obj_request->object_name);
1757 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1758 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1759
1760 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1761
1762 return osd_req;
1763}
1764
1765
bf0d5f50
AE
1766static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1767{
1768 ceph_osdc_put_request(osd_req);
1769}
1770
1771/* object_name is assumed to be a non-null pointer and NUL-terminated */
1772
1773static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1774 u64 offset, u64 length,
1775 enum obj_request_type type)
1776{
1777 struct rbd_obj_request *obj_request;
1778 size_t size;
1779 char *name;
1780
1781 rbd_assert(obj_request_type_valid(type));
1782
1783 size = strlen(object_name) + 1;
f907ad55
AE
1784 name = kmalloc(size, GFP_KERNEL);
1785 if (!name)
bf0d5f50
AE
1786 return NULL;
1787
868311b1 1788 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
f907ad55
AE
1789 if (!obj_request) {
1790 kfree(name);
1791 return NULL;
1792 }
1793
bf0d5f50
AE
1794 obj_request->object_name = memcpy(name, object_name, size);
1795 obj_request->offset = offset;
1796 obj_request->length = length;
926f9b3f 1797 obj_request->flags = 0;
bf0d5f50
AE
1798 obj_request->which = BAD_WHICH;
1799 obj_request->type = type;
1800 INIT_LIST_HEAD(&obj_request->links);
788e2df3 1801 init_completion(&obj_request->completion);
bf0d5f50
AE
1802 kref_init(&obj_request->kref);
1803
37206ee5
AE
1804 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1805 offset, length, (int)type, obj_request);
1806
bf0d5f50
AE
1807 return obj_request;
1808}
1809
1810static void rbd_obj_request_destroy(struct kref *kref)
1811{
1812 struct rbd_obj_request *obj_request;
1813
1814 obj_request = container_of(kref, struct rbd_obj_request, kref);
1815
37206ee5
AE
1816 dout("%s: obj %p\n", __func__, obj_request);
1817
bf0d5f50
AE
1818 rbd_assert(obj_request->img_request == NULL);
1819 rbd_assert(obj_request->which == BAD_WHICH);
1820
1821 if (obj_request->osd_req)
1822 rbd_osd_req_destroy(obj_request->osd_req);
1823
1824 rbd_assert(obj_request_type_valid(obj_request->type));
1825 switch (obj_request->type) {
9969ebc5
AE
1826 case OBJ_REQUEST_NODATA:
1827 break; /* Nothing to do */
bf0d5f50
AE
1828 case OBJ_REQUEST_BIO:
1829 if (obj_request->bio_list)
1830 bio_chain_put(obj_request->bio_list);
1831 break;
788e2df3
AE
1832 case OBJ_REQUEST_PAGES:
1833 if (obj_request->pages)
1834 ceph_release_page_vector(obj_request->pages,
1835 obj_request->page_count);
1836 break;
bf0d5f50
AE
1837 }
1838
f907ad55 1839 kfree(obj_request->object_name);
868311b1
AE
1840 obj_request->object_name = NULL;
1841 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
1842}
1843
1844/*
1845 * Caller is responsible for filling in the list of object requests
1846 * that comprises the image request, and the Linux request pointer
1847 * (if there is one).
1848 */
cc344fa1
AE
1849static struct rbd_img_request *rbd_img_request_create(
1850 struct rbd_device *rbd_dev,
bf0d5f50 1851 u64 offset, u64 length,
9849e986
AE
1852 bool write_request,
1853 bool child_request)
bf0d5f50
AE
1854{
1855 struct rbd_img_request *img_request;
bf0d5f50 1856
1c2a9dfe 1857 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
bf0d5f50
AE
1858 if (!img_request)
1859 return NULL;
1860
1861 if (write_request) {
1862 down_read(&rbd_dev->header_rwsem);
812164f8 1863 ceph_get_snap_context(rbd_dev->header.snapc);
bf0d5f50 1864 up_read(&rbd_dev->header_rwsem);
bf0d5f50
AE
1865 }
1866
1867 img_request->rq = NULL;
1868 img_request->rbd_dev = rbd_dev;
1869 img_request->offset = offset;
1870 img_request->length = length;
0c425248
AE
1871 img_request->flags = 0;
1872 if (write_request) {
1873 img_request_write_set(img_request);
468521c1 1874 img_request->snapc = rbd_dev->header.snapc;
0c425248 1875 } else {
bf0d5f50 1876 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 1877 }
9849e986
AE
1878 if (child_request)
1879 img_request_child_set(img_request);
d0b2e944
AE
1880 if (rbd_dev->parent_spec)
1881 img_request_layered_set(img_request);
bf0d5f50
AE
1882 spin_lock_init(&img_request->completion_lock);
1883 img_request->next_completion = 0;
1884 img_request->callback = NULL;
a5a337d4 1885 img_request->result = 0;
bf0d5f50
AE
1886 img_request->obj_request_count = 0;
1887 INIT_LIST_HEAD(&img_request->obj_requests);
1888 kref_init(&img_request->kref);
1889
1890 rbd_img_request_get(img_request); /* Avoid a warning */
1891 rbd_img_request_put(img_request); /* TEMPORARY */
1892
37206ee5
AE
1893 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1894 write_request ? "write" : "read", offset, length,
1895 img_request);
1896
bf0d5f50
AE
1897 return img_request;
1898}
1899
1900static void rbd_img_request_destroy(struct kref *kref)
1901{
1902 struct rbd_img_request *img_request;
1903 struct rbd_obj_request *obj_request;
1904 struct rbd_obj_request *next_obj_request;
1905
1906 img_request = container_of(kref, struct rbd_img_request, kref);
1907
37206ee5
AE
1908 dout("%s: img %p\n", __func__, img_request);
1909
bf0d5f50
AE
1910 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1911 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 1912 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 1913
0c425248 1914 if (img_request_write_test(img_request))
812164f8 1915 ceph_put_snap_context(img_request->snapc);
bf0d5f50 1916
8b3e1a56
AE
1917 if (img_request_child_test(img_request))
1918 rbd_obj_request_put(img_request->obj_request);
1919
1c2a9dfe 1920 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
1921}
1922
1217857f
AE
1923static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1924{
6365d33a 1925 struct rbd_img_request *img_request;
1217857f
AE
1926 unsigned int xferred;
1927 int result;
8b3e1a56 1928 bool more;
1217857f 1929
6365d33a
AE
1930 rbd_assert(obj_request_img_data_test(obj_request));
1931 img_request = obj_request->img_request;
1932
1217857f
AE
1933 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1934 xferred = (unsigned int)obj_request->xferred;
1935 result = obj_request->result;
1936 if (result) {
1937 struct rbd_device *rbd_dev = img_request->rbd_dev;
1938
1939 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1940 img_request_write_test(img_request) ? "write" : "read",
1941 obj_request->length, obj_request->img_offset,
1942 obj_request->offset);
1943 rbd_warn(rbd_dev, " result %d xferred %x\n",
1944 result, xferred);
1945 if (!img_request->result)
1946 img_request->result = result;
1947 }
1948
f1a4739f
AE
1949 /* Image object requests don't own their page array */
1950
1951 if (obj_request->type == OBJ_REQUEST_PAGES) {
1952 obj_request->pages = NULL;
1953 obj_request->page_count = 0;
1954 }
1955
8b3e1a56
AE
1956 if (img_request_child_test(img_request)) {
1957 rbd_assert(img_request->obj_request != NULL);
1958 more = obj_request->which < img_request->obj_request_count - 1;
1959 } else {
1960 rbd_assert(img_request->rq != NULL);
1961 more = blk_end_request(img_request->rq, result, xferred);
1962 }
1963
1964 return more;
1217857f
AE
1965}
1966
2169238d
AE
1967static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1968{
1969 struct rbd_img_request *img_request;
1970 u32 which = obj_request->which;
1971 bool more = true;
1972
6365d33a 1973 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
1974 img_request = obj_request->img_request;
1975
1976 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1977 rbd_assert(img_request != NULL);
2169238d
AE
1978 rbd_assert(img_request->obj_request_count > 0);
1979 rbd_assert(which != BAD_WHICH);
1980 rbd_assert(which < img_request->obj_request_count);
1981 rbd_assert(which >= img_request->next_completion);
1982
1983 spin_lock_irq(&img_request->completion_lock);
1984 if (which != img_request->next_completion)
1985 goto out;
1986
1987 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
1988 rbd_assert(more);
1989 rbd_assert(which < img_request->obj_request_count);
1990
1991 if (!obj_request_done_test(obj_request))
1992 break;
1217857f 1993 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
1994 which++;
1995 }
1996
1997 rbd_assert(more ^ (which == img_request->obj_request_count));
1998 img_request->next_completion = which;
1999out:
2000 spin_unlock_irq(&img_request->completion_lock);
2001
2002 if (!more)
2003 rbd_img_request_complete(img_request);
2004}
2005
f1a4739f
AE
2006/*
2007 * Split up an image request into one or more object requests, each
2008 * to a different object. The "type" parameter indicates whether
2009 * "data_desc" is the pointer to the head of a list of bio
2010 * structures, or the base of a page array. In either case this
2011 * function assumes data_desc describes memory sufficient to hold
2012 * all data described by the image request.
2013 */
2014static int rbd_img_request_fill(struct rbd_img_request *img_request,
2015 enum obj_request_type type,
2016 void *data_desc)
bf0d5f50
AE
2017{
2018 struct rbd_device *rbd_dev = img_request->rbd_dev;
2019 struct rbd_obj_request *obj_request = NULL;
2020 struct rbd_obj_request *next_obj_request;
0c425248 2021 bool write_request = img_request_write_test(img_request);
f1a4739f
AE
2022 struct bio *bio_list;
2023 unsigned int bio_offset = 0;
2024 struct page **pages;
7da22d29 2025 u64 img_offset;
bf0d5f50
AE
2026 u64 resid;
2027 u16 opcode;
2028
f1a4739f
AE
2029 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2030 (int)type, data_desc);
37206ee5 2031
430c28c3 2032 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
7da22d29 2033 img_offset = img_request->offset;
bf0d5f50 2034 resid = img_request->length;
4dda41d3 2035 rbd_assert(resid > 0);
f1a4739f
AE
2036
2037 if (type == OBJ_REQUEST_BIO) {
2038 bio_list = data_desc;
2039 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2040 } else {
2041 rbd_assert(type == OBJ_REQUEST_PAGES);
2042 pages = data_desc;
2043 }
2044
bf0d5f50 2045 while (resid) {
2fa12320 2046 struct ceph_osd_request *osd_req;
bf0d5f50 2047 const char *object_name;
bf0d5f50
AE
2048 u64 offset;
2049 u64 length;
2050
7da22d29 2051 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
2052 if (!object_name)
2053 goto out_unwind;
7da22d29
AE
2054 offset = rbd_segment_offset(rbd_dev, img_offset);
2055 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2056 obj_request = rbd_obj_request_create(object_name,
f1a4739f 2057 offset, length, type);
78c2a44a
AE
2058 /* object request has its own copy of the object name */
2059 rbd_segment_name_free(object_name);
bf0d5f50
AE
2060 if (!obj_request)
2061 goto out_unwind;
2062
f1a4739f
AE
2063 if (type == OBJ_REQUEST_BIO) {
2064 unsigned int clone_size;
2065
2066 rbd_assert(length <= (u64)UINT_MAX);
2067 clone_size = (unsigned int)length;
2068 obj_request->bio_list =
2069 bio_chain_clone_range(&bio_list,
2070 &bio_offset,
2071 clone_size,
2072 GFP_ATOMIC);
2073 if (!obj_request->bio_list)
2074 goto out_partial;
2075 } else {
2076 unsigned int page_count;
2077
2078 obj_request->pages = pages;
2079 page_count = (u32)calc_pages_for(offset, length);
2080 obj_request->page_count = page_count;
2081 if ((offset + length) & ~PAGE_MASK)
2082 page_count--; /* more on last page */
2083 pages += page_count;
2084 }
bf0d5f50 2085
2fa12320
AE
2086 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2087 obj_request);
2088 if (!osd_req)
bf0d5f50 2089 goto out_partial;
2fa12320 2090 obj_request->osd_req = osd_req;
2169238d 2091 obj_request->callback = rbd_img_obj_callback;
430c28c3 2092
2fa12320
AE
2093 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2094 0, 0);
f1a4739f
AE
2095 if (type == OBJ_REQUEST_BIO)
2096 osd_req_op_extent_osd_data_bio(osd_req, 0,
2097 obj_request->bio_list, length);
2098 else
2099 osd_req_op_extent_osd_data_pages(osd_req, 0,
2100 obj_request->pages, length,
2101 offset & ~PAGE_MASK, false, false);
9d4df01f
AE
2102
2103 if (write_request)
2104 rbd_osd_req_format_write(obj_request);
2105 else
2106 rbd_osd_req_format_read(obj_request);
430c28c3 2107
7da22d29 2108 obj_request->img_offset = img_offset;
bf0d5f50
AE
2109 rbd_img_obj_request_add(img_request, obj_request);
2110
7da22d29 2111 img_offset += length;
bf0d5f50
AE
2112 resid -= length;
2113 }
2114
2115 return 0;
2116
2117out_partial:
2118 rbd_obj_request_put(obj_request);
2119out_unwind:
2120 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2121 rbd_obj_request_put(obj_request);
2122
2123 return -ENOMEM;
2124}
2125
0eefd470
AE
2126static void
2127rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2128{
2129 struct rbd_img_request *img_request;
2130 struct rbd_device *rbd_dev;
2131 u64 length;
2132 u32 page_count;
2133
2134 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2135 rbd_assert(obj_request_img_data_test(obj_request));
2136 img_request = obj_request->img_request;
2137 rbd_assert(img_request);
2138
2139 rbd_dev = img_request->rbd_dev;
2140 rbd_assert(rbd_dev);
2141 length = (u64)1 << rbd_dev->header.obj_order;
2142 page_count = (u32)calc_pages_for(0, length);
2143
2144 rbd_assert(obj_request->copyup_pages);
2145 ceph_release_page_vector(obj_request->copyup_pages, page_count);
2146 obj_request->copyup_pages = NULL;
2147
2148 /*
2149 * We want the transfer count to reflect the size of the
2150 * original write request. There is no such thing as a
2151 * successful short write, so if the request was successful
2152 * we can just set it to the originally-requested length.
2153 */
2154 if (!obj_request->result)
2155 obj_request->xferred = obj_request->length;
2156
2157 /* Finish up with the normal image object callback */
2158
2159 rbd_img_obj_callback(obj_request);
2160}
2161
3d7efd18
AE
2162static void
2163rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2164{
2165 struct rbd_obj_request *orig_request;
0eefd470
AE
2166 struct ceph_osd_request *osd_req;
2167 struct ceph_osd_client *osdc;
2168 struct rbd_device *rbd_dev;
3d7efd18 2169 struct page **pages;
3d7efd18
AE
2170 int result;
2171 u64 obj_size;
2172 u64 xferred;
2173
2174 rbd_assert(img_request_child_test(img_request));
2175
2176 /* First get what we need from the image request */
2177
2178 pages = img_request->copyup_pages;
2179 rbd_assert(pages != NULL);
2180 img_request->copyup_pages = NULL;
2181
2182 orig_request = img_request->obj_request;
2183 rbd_assert(orig_request != NULL);
0eefd470 2184 rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
3d7efd18
AE
2185 result = img_request->result;
2186 obj_size = img_request->length;
2187 xferred = img_request->xferred;
2188
0eefd470
AE
2189 rbd_dev = img_request->rbd_dev;
2190 rbd_assert(rbd_dev);
2191 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2192
3d7efd18
AE
2193 rbd_img_request_put(img_request);
2194
0eefd470
AE
2195 if (result)
2196 goto out_err;
2197
2198 /* Allocate the new copyup osd request for the original request */
2199
2200 result = -ENOMEM;
2201 rbd_assert(!orig_request->osd_req);
2202 osd_req = rbd_osd_req_create_copyup(orig_request);
2203 if (!osd_req)
2204 goto out_err;
2205 orig_request->osd_req = osd_req;
2206 orig_request->copyup_pages = pages;
3d7efd18 2207
0eefd470 2208 /* Initialize the copyup op */
3d7efd18 2209
0eefd470
AE
2210 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2211 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2212 false, false);
3d7efd18 2213
0eefd470
AE
2214 /* Then the original write request op */
2215
2216 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2217 orig_request->offset,
2218 orig_request->length, 0, 0);
2219 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2220 orig_request->length);
2221
2222 rbd_osd_req_format_write(orig_request);
2223
2224 /* All set, send it off. */
2225
2226 orig_request->callback = rbd_img_obj_copyup_callback;
2227 osdc = &rbd_dev->rbd_client->client->osdc;
2228 result = rbd_obj_request_submit(osdc, orig_request);
2229 if (!result)
2230 return;
2231out_err:
2232 /* Record the error code and complete the request */
2233
2234 orig_request->result = result;
2235 orig_request->xferred = 0;
2236 obj_request_done_set(orig_request);
2237 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2238}
2239
2240/*
2241 * Read from the parent image the range of data that covers the
2242 * entire target of the given object request. This is used for
2243 * satisfying a layered image write request when the target of an
2244 * object request from the image request does not exist.
2245 *
2246 * A page array big enough to hold the returned data is allocated
2247 * and supplied to rbd_img_request_fill() as the "data descriptor."
2248 * When the read completes, this page array will be transferred to
2249 * the original object request for the copyup operation.
2250 *
2251 * If an error occurs, record it as the result of the original
2252 * object request and mark it done so it gets completed.
2253 */
2254static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2255{
2256 struct rbd_img_request *img_request = NULL;
2257 struct rbd_img_request *parent_request = NULL;
2258 struct rbd_device *rbd_dev;
2259 u64 img_offset;
2260 u64 length;
2261 struct page **pages = NULL;
2262 u32 page_count;
2263 int result;
2264
2265 rbd_assert(obj_request_img_data_test(obj_request));
2266 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2267
2268 img_request = obj_request->img_request;
2269 rbd_assert(img_request != NULL);
2270 rbd_dev = img_request->rbd_dev;
2271 rbd_assert(rbd_dev->parent != NULL);
2272
0eefd470
AE
2273 /*
2274 * First things first. The original osd request is of no
2275 * use to use any more, we'll need a new one that can hold
2276 * the two ops in a copyup request. We'll get that later,
2277 * but for now we can release the old one.
2278 */
2279 rbd_osd_req_destroy(obj_request->osd_req);
2280 obj_request->osd_req = NULL;
2281
3d7efd18
AE
2282 /*
2283 * Determine the byte range covered by the object in the
2284 * child image to which the original request was to be sent.
2285 */
2286 img_offset = obj_request->img_offset - obj_request->offset;
2287 length = (u64)1 << rbd_dev->header.obj_order;
2288
a9e8ba2c
AE
2289 /*
2290 * There is no defined parent data beyond the parent
2291 * overlap, so limit what we read at that boundary if
2292 * necessary.
2293 */
2294 if (img_offset + length > rbd_dev->parent_overlap) {
2295 rbd_assert(img_offset < rbd_dev->parent_overlap);
2296 length = rbd_dev->parent_overlap - img_offset;
2297 }
2298
3d7efd18
AE
2299 /*
2300 * Allocate a page array big enough to receive the data read
2301 * from the parent.
2302 */
2303 page_count = (u32)calc_pages_for(0, length);
2304 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2305 if (IS_ERR(pages)) {
2306 result = PTR_ERR(pages);
2307 pages = NULL;
2308 goto out_err;
2309 }
2310
2311 result = -ENOMEM;
2312 parent_request = rbd_img_request_create(rbd_dev->parent,
2313 img_offset, length,
2314 false, true);
2315 if (!parent_request)
2316 goto out_err;
2317 rbd_obj_request_get(obj_request);
2318 parent_request->obj_request = obj_request;
2319
2320 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2321 if (result)
2322 goto out_err;
2323 parent_request->copyup_pages = pages;
2324
2325 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2326 result = rbd_img_request_submit(parent_request);
2327 if (!result)
2328 return 0;
2329
2330 parent_request->copyup_pages = NULL;
2331 parent_request->obj_request = NULL;
2332 rbd_obj_request_put(obj_request);
2333out_err:
2334 if (pages)
2335 ceph_release_page_vector(pages, page_count);
2336 if (parent_request)
2337 rbd_img_request_put(parent_request);
2338 obj_request->result = result;
2339 obj_request->xferred = 0;
2340 obj_request_done_set(obj_request);
2341
2342 return result;
2343}
2344
c5b5ef6c
AE
2345static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2346{
c5b5ef6c
AE
2347 struct rbd_obj_request *orig_request;
2348 int result;
2349
2350 rbd_assert(!obj_request_img_data_test(obj_request));
2351
2352 /*
2353 * All we need from the object request is the original
2354 * request and the result of the STAT op. Grab those, then
2355 * we're done with the request.
2356 */
2357 orig_request = obj_request->obj_request;
2358 obj_request->obj_request = NULL;
2359 rbd_assert(orig_request);
2360 rbd_assert(orig_request->img_request);
2361
2362 result = obj_request->result;
2363 obj_request->result = 0;
2364
2365 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2366 obj_request, orig_request, result,
2367 obj_request->xferred, obj_request->length);
2368 rbd_obj_request_put(obj_request);
2369
2370 rbd_assert(orig_request);
2371 rbd_assert(orig_request->img_request);
c5b5ef6c
AE
2372
2373 /*
2374 * Our only purpose here is to determine whether the object
2375 * exists, and we don't want to treat the non-existence as
2376 * an error. If something else comes back, transfer the
2377 * error to the original request and complete it now.
2378 */
2379 if (!result) {
2380 obj_request_existence_set(orig_request, true);
2381 } else if (result == -ENOENT) {
2382 obj_request_existence_set(orig_request, false);
2383 } else if (result) {
2384 orig_request->result = result;
3d7efd18 2385 goto out;
c5b5ef6c
AE
2386 }
2387
2388 /*
2389 * Resubmit the original request now that we have recorded
2390 * whether the target object exists.
2391 */
b454e36d 2392 orig_request->result = rbd_img_obj_request_submit(orig_request);
3d7efd18 2393out:
c5b5ef6c
AE
2394 if (orig_request->result)
2395 rbd_obj_request_complete(orig_request);
2396 rbd_obj_request_put(orig_request);
2397}
2398
2399static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2400{
2401 struct rbd_obj_request *stat_request;
2402 struct rbd_device *rbd_dev;
2403 struct ceph_osd_client *osdc;
2404 struct page **pages = NULL;
2405 u32 page_count;
2406 size_t size;
2407 int ret;
2408
2409 /*
2410 * The response data for a STAT call consists of:
2411 * le64 length;
2412 * struct {
2413 * le32 tv_sec;
2414 * le32 tv_nsec;
2415 * } mtime;
2416 */
2417 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2418 page_count = (u32)calc_pages_for(0, size);
2419 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2420 if (IS_ERR(pages))
2421 return PTR_ERR(pages);
2422
2423 ret = -ENOMEM;
2424 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2425 OBJ_REQUEST_PAGES);
2426 if (!stat_request)
2427 goto out;
2428
2429 rbd_obj_request_get(obj_request);
2430 stat_request->obj_request = obj_request;
2431 stat_request->pages = pages;
2432 stat_request->page_count = page_count;
2433
2434 rbd_assert(obj_request->img_request);
2435 rbd_dev = obj_request->img_request->rbd_dev;
2436 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2437 stat_request);
2438 if (!stat_request->osd_req)
2439 goto out;
2440 stat_request->callback = rbd_img_obj_exists_callback;
2441
2442 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2443 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2444 false, false);
9d4df01f 2445 rbd_osd_req_format_read(stat_request);
c5b5ef6c
AE
2446
2447 osdc = &rbd_dev->rbd_client->client->osdc;
2448 ret = rbd_obj_request_submit(osdc, stat_request);
2449out:
2450 if (ret)
2451 rbd_obj_request_put(obj_request);
2452
2453 return ret;
2454}
2455
b454e36d
AE
2456static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2457{
2458 struct rbd_img_request *img_request;
a9e8ba2c 2459 struct rbd_device *rbd_dev;
3d7efd18 2460 bool known;
b454e36d
AE
2461
2462 rbd_assert(obj_request_img_data_test(obj_request));
2463
2464 img_request = obj_request->img_request;
2465 rbd_assert(img_request);
a9e8ba2c 2466 rbd_dev = img_request->rbd_dev;
b454e36d 2467
b454e36d 2468 /*
a9e8ba2c
AE
2469 * Only writes to layered images need special handling.
2470 * Reads and non-layered writes are simple object requests.
2471 * Layered writes that start beyond the end of the overlap
2472 * with the parent have no parent data, so they too are
2473 * simple object requests. Finally, if the target object is
2474 * known to already exist, its parent data has already been
2475 * copied, so a write to the object can also be handled as a
2476 * simple object request.
b454e36d
AE
2477 */
2478 if (!img_request_write_test(img_request) ||
2479 !img_request_layered_test(img_request) ||
a9e8ba2c 2480 rbd_dev->parent_overlap <= obj_request->img_offset ||
3d7efd18
AE
2481 ((known = obj_request_known_test(obj_request)) &&
2482 obj_request_exists_test(obj_request))) {
b454e36d
AE
2483
2484 struct rbd_device *rbd_dev;
2485 struct ceph_osd_client *osdc;
2486
2487 rbd_dev = obj_request->img_request->rbd_dev;
2488 osdc = &rbd_dev->rbd_client->client->osdc;
2489
2490 return rbd_obj_request_submit(osdc, obj_request);
2491 }
2492
2493 /*
3d7efd18
AE
2494 * It's a layered write. The target object might exist but
2495 * we may not know that yet. If we know it doesn't exist,
2496 * start by reading the data for the full target object from
2497 * the parent so we can use it for a copyup to the target.
b454e36d 2498 */
3d7efd18
AE
2499 if (known)
2500 return rbd_img_obj_parent_read_full(obj_request);
2501
2502 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2503
2504 return rbd_img_obj_exists_submit(obj_request);
2505}
2506
bf0d5f50
AE
2507static int rbd_img_request_submit(struct rbd_img_request *img_request)
2508{
bf0d5f50 2509 struct rbd_obj_request *obj_request;
46faeed4 2510 struct rbd_obj_request *next_obj_request;
bf0d5f50 2511
37206ee5 2512 dout("%s: img %p\n", __func__, img_request);
46faeed4 2513 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
2514 int ret;
2515
b454e36d 2516 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50
AE
2517 if (ret)
2518 return ret;
bf0d5f50
AE
2519 }
2520
2521 return 0;
2522}
8b3e1a56
AE
2523
2524static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2525{
2526 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2527 struct rbd_device *rbd_dev;
2528 u64 obj_end;
8b3e1a56
AE
2529
2530 rbd_assert(img_request_child_test(img_request));
2531
2532 obj_request = img_request->obj_request;
a9e8ba2c
AE
2533 rbd_assert(obj_request);
2534 rbd_assert(obj_request->img_request);
2535
8b3e1a56 2536 obj_request->result = img_request->result;
a9e8ba2c
AE
2537 if (obj_request->result)
2538 goto out;
2539
2540 /*
2541 * We need to zero anything beyond the parent overlap
2542 * boundary. Since rbd_img_obj_request_read_callback()
2543 * will zero anything beyond the end of a short read, an
2544 * easy way to do this is to pretend the data from the
2545 * parent came up short--ending at the overlap boundary.
2546 */
2547 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2548 obj_end = obj_request->img_offset + obj_request->length;
2549 rbd_dev = obj_request->img_request->rbd_dev;
2550 if (obj_end > rbd_dev->parent_overlap) {
2551 u64 xferred = 0;
2552
2553 if (obj_request->img_offset < rbd_dev->parent_overlap)
2554 xferred = rbd_dev->parent_overlap -
2555 obj_request->img_offset;
8b3e1a56 2556
a9e8ba2c
AE
2557 obj_request->xferred = min(img_request->xferred, xferred);
2558 } else {
2559 obj_request->xferred = img_request->xferred;
2560 }
2561out:
b5b09be3 2562 rbd_img_request_put(img_request);
8b3e1a56
AE
2563 rbd_img_obj_request_read_callback(obj_request);
2564 rbd_obj_request_complete(obj_request);
2565}
2566
2567static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2568{
2569 struct rbd_device *rbd_dev;
2570 struct rbd_img_request *img_request;
2571 int result;
2572
2573 rbd_assert(obj_request_img_data_test(obj_request));
2574 rbd_assert(obj_request->img_request != NULL);
2575 rbd_assert(obj_request->result == (s32) -ENOENT);
2576 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2577
2578 rbd_dev = obj_request->img_request->rbd_dev;
2579 rbd_assert(rbd_dev->parent != NULL);
2580 /* rbd_read_finish(obj_request, obj_request->length); */
2581 img_request = rbd_img_request_create(rbd_dev->parent,
2582 obj_request->img_offset,
2583 obj_request->length,
2584 false, true);
2585 result = -ENOMEM;
2586 if (!img_request)
2587 goto out_err;
2588
2589 rbd_obj_request_get(obj_request);
2590 img_request->obj_request = obj_request;
2591
f1a4739f
AE
2592 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2593 obj_request->bio_list);
8b3e1a56
AE
2594 if (result)
2595 goto out_err;
2596
2597 img_request->callback = rbd_img_parent_read_callback;
2598 result = rbd_img_request_submit(img_request);
2599 if (result)
2600 goto out_err;
2601
2602 return;
2603out_err:
2604 if (img_request)
2605 rbd_img_request_put(img_request);
2606 obj_request->result = result;
2607 obj_request->xferred = 0;
2608 obj_request_done_set(obj_request);
2609}
bf0d5f50 2610
cc4a38bd 2611static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
b8d70035
AE
2612{
2613 struct rbd_obj_request *obj_request;
2169238d 2614 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
b8d70035
AE
2615 int ret;
2616
2617 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2618 OBJ_REQUEST_NODATA);
2619 if (!obj_request)
2620 return -ENOMEM;
2621
2622 ret = -ENOMEM;
430c28c3 2623 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
b8d70035
AE
2624 if (!obj_request->osd_req)
2625 goto out;
2169238d 2626 obj_request->callback = rbd_obj_request_put;
b8d70035 2627
c99d2d4a 2628 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
cc4a38bd 2629 notify_id, 0, 0);
9d4df01f 2630 rbd_osd_req_format_read(obj_request);
430c28c3 2631
b8d70035 2632 ret = rbd_obj_request_submit(osdc, obj_request);
b8d70035 2633out:
cf81b60e
AE
2634 if (ret)
2635 rbd_obj_request_put(obj_request);
b8d70035
AE
2636
2637 return ret;
2638}
2639
2640static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2641{
2642 struct rbd_device *rbd_dev = (struct rbd_device *)data;
e627db08 2643 int ret;
b8d70035
AE
2644
2645 if (!rbd_dev)
2646 return;
2647
37206ee5 2648 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
cc4a38bd
AE
2649 rbd_dev->header_name, (unsigned long long)notify_id,
2650 (unsigned int)opcode);
e627db08
AE
2651 ret = rbd_dev_refresh(rbd_dev);
2652 if (ret)
2653 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
b8d70035 2654
cc4a38bd 2655 rbd_obj_notify_ack(rbd_dev, notify_id);
b8d70035
AE
2656}
2657
9969ebc5
AE
2658/*
2659 * Request sync osd watch/unwatch. The value of "start" determines
2660 * whether a watch request is being initiated or torn down.
2661 */
2662static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2663{
2664 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2665 struct rbd_obj_request *obj_request;
9969ebc5
AE
2666 int ret;
2667
2668 rbd_assert(start ^ !!rbd_dev->watch_event);
2669 rbd_assert(start ^ !!rbd_dev->watch_request);
2670
2671 if (start) {
3c663bbd 2672 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
2673 &rbd_dev->watch_event);
2674 if (ret < 0)
2675 return ret;
8eb87565 2676 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
2677 }
2678
2679 ret = -ENOMEM;
2680 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2681 OBJ_REQUEST_NODATA);
2682 if (!obj_request)
2683 goto out_cancel;
2684
430c28c3
AE
2685 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2686 if (!obj_request->osd_req)
2687 goto out_cancel;
2688
8eb87565 2689 if (start)
975241af 2690 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 2691 else
6977c3f9 2692 ceph_osdc_unregister_linger_request(osdc,
975241af 2693 rbd_dev->watch_request->osd_req);
2169238d
AE
2694
2695 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
b21ebddd 2696 rbd_dev->watch_event->cookie, 0, start);
9d4df01f 2697 rbd_osd_req_format_write(obj_request);
2169238d 2698
9969ebc5
AE
2699 ret = rbd_obj_request_submit(osdc, obj_request);
2700 if (ret)
2701 goto out_cancel;
2702 ret = rbd_obj_request_wait(obj_request);
2703 if (ret)
2704 goto out_cancel;
9969ebc5
AE
2705 ret = obj_request->result;
2706 if (ret)
2707 goto out_cancel;
2708
8eb87565
AE
2709 /*
2710 * A watch request is set to linger, so the underlying osd
2711 * request won't go away until we unregister it. We retain
2712 * a pointer to the object request during that time (in
2713 * rbd_dev->watch_request), so we'll keep a reference to
2714 * it. We'll drop that reference (below) after we've
2715 * unregistered it.
2716 */
2717 if (start) {
2718 rbd_dev->watch_request = obj_request;
2719
2720 return 0;
2721 }
2722
2723 /* We have successfully torn down the watch request */
2724
2725 rbd_obj_request_put(rbd_dev->watch_request);
2726 rbd_dev->watch_request = NULL;
9969ebc5
AE
2727out_cancel:
2728 /* Cancel the event if we're tearing down, or on error */
2729 ceph_osdc_cancel_event(rbd_dev->watch_event);
2730 rbd_dev->watch_event = NULL;
9969ebc5
AE
2731 if (obj_request)
2732 rbd_obj_request_put(obj_request);
2733
2734 return ret;
2735}
2736
36be9a76 2737/*
f40eb349
AE
2738 * Synchronous osd object method call. Returns the number of bytes
2739 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
2740 */
2741static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2742 const char *object_name,
2743 const char *class_name,
2744 const char *method_name,
4157976b 2745 const void *outbound,
36be9a76 2746 size_t outbound_size,
4157976b 2747 void *inbound,
e2a58ee5 2748 size_t inbound_size)
36be9a76 2749{
2169238d 2750 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 2751 struct rbd_obj_request *obj_request;
36be9a76
AE
2752 struct page **pages;
2753 u32 page_count;
2754 int ret;
2755
2756 /*
6010a451
AE
2757 * Method calls are ultimately read operations. The result
2758 * should placed into the inbound buffer provided. They
2759 * also supply outbound data--parameters for the object
2760 * method. Currently if this is present it will be a
2761 * snapshot id.
36be9a76 2762 */
57385b51 2763 page_count = (u32)calc_pages_for(0, inbound_size);
36be9a76
AE
2764 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2765 if (IS_ERR(pages))
2766 return PTR_ERR(pages);
2767
2768 ret = -ENOMEM;
6010a451 2769 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
2770 OBJ_REQUEST_PAGES);
2771 if (!obj_request)
2772 goto out;
2773
2774 obj_request->pages = pages;
2775 obj_request->page_count = page_count;
2776
430c28c3 2777 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
36be9a76
AE
2778 if (!obj_request->osd_req)
2779 goto out;
2780
c99d2d4a 2781 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
2782 class_name, method_name);
2783 if (outbound_size) {
2784 struct ceph_pagelist *pagelist;
2785
2786 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2787 if (!pagelist)
2788 goto out;
2789
2790 ceph_pagelist_init(pagelist);
2791 ceph_pagelist_append(pagelist, outbound, outbound_size);
2792 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2793 pagelist);
2794 }
a4ce40a9
AE
2795 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2796 obj_request->pages, inbound_size,
44cd188d 2797 0, false, false);
9d4df01f 2798 rbd_osd_req_format_read(obj_request);
430c28c3 2799
36be9a76
AE
2800 ret = rbd_obj_request_submit(osdc, obj_request);
2801 if (ret)
2802 goto out;
2803 ret = rbd_obj_request_wait(obj_request);
2804 if (ret)
2805 goto out;
2806
2807 ret = obj_request->result;
2808 if (ret < 0)
2809 goto out;
57385b51
AE
2810
2811 rbd_assert(obj_request->xferred < (u64)INT_MAX);
2812 ret = (int)obj_request->xferred;
903bb32e 2813 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
2814out:
2815 if (obj_request)
2816 rbd_obj_request_put(obj_request);
2817 else
2818 ceph_release_page_vector(pages, page_count);
2819
2820 return ret;
2821}
2822
bf0d5f50 2823static void rbd_request_fn(struct request_queue *q)
cc344fa1 2824 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
2825{
2826 struct rbd_device *rbd_dev = q->queuedata;
2827 bool read_only = rbd_dev->mapping.read_only;
2828 struct request *rq;
2829 int result;
2830
2831 while ((rq = blk_fetch_request(q))) {
2832 bool write_request = rq_data_dir(rq) == WRITE;
2833 struct rbd_img_request *img_request;
2834 u64 offset;
2835 u64 length;
2836
2837 /* Ignore any non-FS requests that filter through. */
2838
2839 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
2840 dout("%s: non-fs request type %d\n", __func__,
2841 (int) rq->cmd_type);
2842 __blk_end_request_all(rq, 0);
2843 continue;
2844 }
2845
2846 /* Ignore/skip any zero-length requests */
2847
2848 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2849 length = (u64) blk_rq_bytes(rq);
2850
2851 if (!length) {
2852 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
2853 __blk_end_request_all(rq, 0);
2854 continue;
2855 }
2856
2857 spin_unlock_irq(q->queue_lock);
2858
2859 /* Disallow writes to a read-only device */
2860
2861 if (write_request) {
2862 result = -EROFS;
2863 if (read_only)
2864 goto end_request;
2865 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2866 }
2867
6d292906
AE
2868 /*
2869 * Quit early if the mapped snapshot no longer
2870 * exists. It's still possible the snapshot will
2871 * have disappeared by the time our request arrives
2872 * at the osd, but there's no sense in sending it if
2873 * we already know.
2874 */
2875 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
2876 dout("request for non-existent snapshot");
2877 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2878 result = -ENXIO;
2879 goto end_request;
2880 }
2881
bf0d5f50 2882 result = -EINVAL;
c0cd10db
AE
2883 if (offset && length > U64_MAX - offset + 1) {
2884 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2885 offset, length);
bf0d5f50 2886 goto end_request; /* Shouldn't happen */
c0cd10db 2887 }
bf0d5f50 2888
00a653e2
AE
2889 result = -EIO;
2890 if (offset + length > rbd_dev->mapping.size) {
2891 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2892 offset, length, rbd_dev->mapping.size);
2893 goto end_request;
2894 }
2895
bf0d5f50
AE
2896 result = -ENOMEM;
2897 img_request = rbd_img_request_create(rbd_dev, offset, length,
9849e986 2898 write_request, false);
bf0d5f50
AE
2899 if (!img_request)
2900 goto end_request;
2901
2902 img_request->rq = rq;
2903
f1a4739f
AE
2904 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2905 rq->bio);
bf0d5f50
AE
2906 if (!result)
2907 result = rbd_img_request_submit(img_request);
2908 if (result)
2909 rbd_img_request_put(img_request);
2910end_request:
2911 spin_lock_irq(q->queue_lock);
2912 if (result < 0) {
7da22d29
AE
2913 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2914 write_request ? "write" : "read",
2915 length, offset, result);
2916
bf0d5f50
AE
2917 __blk_end_request_all(rq, result);
2918 }
2919 }
2920}
2921
602adf40
YS
2922/*
2923 * a queue callback. Makes sure that we don't create a bio that spans across
2924 * multiple osd objects. One exception would be with a single page bios,
f7760dad 2925 * which we handle later at bio_chain_clone_range()
602adf40
YS
2926 */
2927static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2928 struct bio_vec *bvec)
2929{
2930 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
2931 sector_t sector_offset;
2932 sector_t sectors_per_obj;
2933 sector_t obj_sector_offset;
2934 int ret;
2935
2936 /*
2937 * Find how far into its rbd object the partition-relative
2938 * bio start sector is to offset relative to the enclosing
2939 * device.
2940 */
2941 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2942 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2943 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2944
2945 /*
2946 * Compute the number of bytes from that offset to the end
2947 * of the object. Account for what's already used by the bio.
2948 */
2949 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2950 if (ret > bmd->bi_size)
2951 ret -= bmd->bi_size;
2952 else
2953 ret = 0;
2954
2955 /*
2956 * Don't send back more than was asked for. And if the bio
2957 * was empty, let the whole thing through because: "Note
2958 * that a block device *must* allow a single page to be
2959 * added to an empty bio."
2960 */
2961 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2962 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2963 ret = (int) bvec->bv_len;
2964
2965 return ret;
602adf40
YS
2966}
2967
2968static void rbd_free_disk(struct rbd_device *rbd_dev)
2969{
2970 struct gendisk *disk = rbd_dev->disk;
2971
2972 if (!disk)
2973 return;
2974
a0cab924
AE
2975 rbd_dev->disk = NULL;
2976 if (disk->flags & GENHD_FL_UP) {
602adf40 2977 del_gendisk(disk);
a0cab924
AE
2978 if (disk->queue)
2979 blk_cleanup_queue(disk->queue);
2980 }
602adf40
YS
2981 put_disk(disk);
2982}
2983
788e2df3
AE
2984static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2985 const char *object_name,
7097f8df 2986 u64 offset, u64 length, void *buf)
788e2df3
AE
2987
2988{
2169238d 2989 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 2990 struct rbd_obj_request *obj_request;
788e2df3
AE
2991 struct page **pages = NULL;
2992 u32 page_count;
1ceae7ef 2993 size_t size;
788e2df3
AE
2994 int ret;
2995
2996 page_count = (u32) calc_pages_for(offset, length);
2997 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2998 if (IS_ERR(pages))
2999 ret = PTR_ERR(pages);
3000
3001 ret = -ENOMEM;
3002 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 3003 OBJ_REQUEST_PAGES);
788e2df3
AE
3004 if (!obj_request)
3005 goto out;
3006
3007 obj_request->pages = pages;
3008 obj_request->page_count = page_count;
3009
430c28c3 3010 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
788e2df3
AE
3011 if (!obj_request->osd_req)
3012 goto out;
3013
c99d2d4a
AE
3014 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3015 offset, length, 0, 0);
406e2c9f 3016 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 3017 obj_request->pages,
44cd188d
AE
3018 obj_request->length,
3019 obj_request->offset & ~PAGE_MASK,
3020 false, false);
9d4df01f 3021 rbd_osd_req_format_read(obj_request);
430c28c3 3022
788e2df3
AE
3023 ret = rbd_obj_request_submit(osdc, obj_request);
3024 if (ret)
3025 goto out;
3026 ret = rbd_obj_request_wait(obj_request);
3027 if (ret)
3028 goto out;
3029
3030 ret = obj_request->result;
3031 if (ret < 0)
3032 goto out;
1ceae7ef
AE
3033
3034 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3035 size = (size_t) obj_request->xferred;
903bb32e 3036 ceph_copy_from_page_vector(pages, buf, 0, size);
7097f8df
AE
3037 rbd_assert(size <= (size_t)INT_MAX);
3038 ret = (int)size;
788e2df3
AE
3039out:
3040 if (obj_request)
3041 rbd_obj_request_put(obj_request);
3042 else
3043 ceph_release_page_vector(pages, page_count);
3044
3045 return ret;
3046}
3047
602adf40 3048/*
662518b1
AE
3049 * Read the complete header for the given rbd device. On successful
3050 * return, the rbd_dev->header field will contain up-to-date
3051 * information about the image.
602adf40 3052 */
662518b1 3053static int rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
602adf40 3054{
4156d998 3055 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 3056 u32 snap_count = 0;
4156d998
AE
3057 u64 names_size = 0;
3058 u32 want_count;
3059 int ret;
602adf40 3060
00f1f36f 3061 /*
4156d998
AE
3062 * The complete header will include an array of its 64-bit
3063 * snapshot ids, followed by the names of those snapshots as
3064 * a contiguous block of NUL-terminated strings. Note that
3065 * the number of snapshots could change by the time we read
3066 * it in, in which case we re-read it.
00f1f36f 3067 */
4156d998
AE
3068 do {
3069 size_t size;
3070
3071 kfree(ondisk);
3072
3073 size = sizeof (*ondisk);
3074 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3075 size += names_size;
3076 ondisk = kmalloc(size, GFP_KERNEL);
3077 if (!ondisk)
662518b1 3078 return -ENOMEM;
4156d998 3079
788e2df3 3080 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
7097f8df 3081 0, size, ondisk);
4156d998 3082 if (ret < 0)
662518b1 3083 goto out;
c0cd10db 3084 if ((size_t)ret < size) {
4156d998 3085 ret = -ENXIO;
06ecc6cb
AE
3086 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3087 size, ret);
662518b1 3088 goto out;
4156d998
AE
3089 }
3090 if (!rbd_dev_ondisk_valid(ondisk)) {
3091 ret = -ENXIO;
06ecc6cb 3092 rbd_warn(rbd_dev, "invalid header");
662518b1 3093 goto out;
81e759fb 3094 }
602adf40 3095
4156d998
AE
3096 names_size = le64_to_cpu(ondisk->snap_names_len);
3097 want_count = snap_count;
3098 snap_count = le32_to_cpu(ondisk->snap_count);
3099 } while (snap_count != want_count);
00f1f36f 3100
662518b1
AE
3101 ret = rbd_header_from_disk(rbd_dev, ondisk);
3102out:
4156d998
AE
3103 kfree(ondisk);
3104
3105 return ret;
602adf40
YS
3106}
3107
602adf40
YS
3108/*
3109 * only read the first part of the ondisk header, without the snaps info
3110 */
cc4a38bd 3111static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
602adf40 3112{
662518b1 3113 return rbd_dev_v1_header_read(rbd_dev);
602adf40
YS
3114}
3115
15228ede
AE
3116/*
3117 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3118 * has disappeared from the (just updated) snapshot context.
3119 */
3120static void rbd_exists_validate(struct rbd_device *rbd_dev)
3121{
3122 u64 snap_id;
3123
3124 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3125 return;
3126
3127 snap_id = rbd_dev->spec->snap_id;
3128 if (snap_id == CEPH_NOSNAP)
3129 return;
3130
3131 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3132 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3133}
3134
cc4a38bd 3135static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 3136{
e627db08 3137 u64 mapping_size;
1fe5e993
AE
3138 int ret;
3139
117973fb 3140 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
e627db08 3141 mapping_size = rbd_dev->mapping.size;
1fe5e993 3142 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
117973fb 3143 if (rbd_dev->image_format == 1)
cc4a38bd 3144 ret = rbd_dev_v1_refresh(rbd_dev);
117973fb 3145 else
cc4a38bd 3146 ret = rbd_dev_v2_refresh(rbd_dev);
15228ede
AE
3147
3148 /* If it's a mapped snapshot, validate its EXISTS flag */
3149
3150 rbd_exists_validate(rbd_dev);
1fe5e993 3151 mutex_unlock(&ctl_mutex);
00a653e2
AE
3152 if (mapping_size != rbd_dev->mapping.size) {
3153 sector_t size;
3154
3155 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3156 dout("setting size to %llu sectors", (unsigned long long)size);
3157 set_capacity(rbd_dev->disk, size);
a3fbe5d4 3158 revalidate_disk(rbd_dev->disk);
00a653e2 3159 }
1fe5e993
AE
3160
3161 return ret;
3162}
3163
602adf40
YS
3164static int rbd_init_disk(struct rbd_device *rbd_dev)
3165{
3166 struct gendisk *disk;
3167 struct request_queue *q;
593a9e7b 3168 u64 segment_size;
602adf40 3169
602adf40 3170 /* create gendisk info */
602adf40
YS
3171 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3172 if (!disk)
1fcdb8aa 3173 return -ENOMEM;
602adf40 3174
f0f8cef5 3175 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3176 rbd_dev->dev_id);
602adf40
YS
3177 disk->major = rbd_dev->major;
3178 disk->first_minor = 0;
3179 disk->fops = &rbd_bd_ops;
3180 disk->private_data = rbd_dev;
3181
bf0d5f50 3182 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
3183 if (!q)
3184 goto out_disk;
029bcbd8 3185
593a9e7b
AE
3186 /* We use the default size, but let's be explicit about it. */
3187 blk_queue_physical_block_size(q, SECTOR_SIZE);
3188
029bcbd8 3189 /* set io sizes to object size */
593a9e7b
AE
3190 segment_size = rbd_obj_bytes(&rbd_dev->header);
3191 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3192 blk_queue_max_segment_size(q, segment_size);
3193 blk_queue_io_min(q, segment_size);
3194 blk_queue_io_opt(q, segment_size);
029bcbd8 3195
602adf40
YS
3196 blk_queue_merge_bvec(q, rbd_merge_bvec);
3197 disk->queue = q;
3198
3199 q->queuedata = rbd_dev;
3200
3201 rbd_dev->disk = disk;
602adf40 3202
602adf40 3203 return 0;
602adf40
YS
3204out_disk:
3205 put_disk(disk);
1fcdb8aa
AE
3206
3207 return -ENOMEM;
602adf40
YS
3208}
3209
dfc5606d
YS
3210/*
3211 sysfs
3212*/
3213
593a9e7b
AE
3214static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3215{
3216 return container_of(dev, struct rbd_device, dev);
3217}
3218
dfc5606d
YS
3219static ssize_t rbd_size_show(struct device *dev,
3220 struct device_attribute *attr, char *buf)
3221{
593a9e7b 3222 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 3223
fc71d833
AE
3224 return sprintf(buf, "%llu\n",
3225 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
3226}
3227
34b13184
AE
3228/*
3229 * Note this shows the features for whatever's mapped, which is not
3230 * necessarily the base image.
3231 */
3232static ssize_t rbd_features_show(struct device *dev,
3233 struct device_attribute *attr, char *buf)
3234{
3235 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3236
3237 return sprintf(buf, "0x%016llx\n",
fc71d833 3238 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
3239}
3240
dfc5606d
YS
3241static ssize_t rbd_major_show(struct device *dev,
3242 struct device_attribute *attr, char *buf)
3243{
593a9e7b 3244 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 3245
fc71d833
AE
3246 if (rbd_dev->major)
3247 return sprintf(buf, "%d\n", rbd_dev->major);
3248
3249 return sprintf(buf, "(none)\n");
3250
dfc5606d
YS
3251}
3252
3253static ssize_t rbd_client_id_show(struct device *dev,
3254 struct device_attribute *attr, char *buf)
602adf40 3255{
593a9e7b 3256 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3257
1dbb4399
AE
3258 return sprintf(buf, "client%lld\n",
3259 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
3260}
3261
dfc5606d
YS
3262static ssize_t rbd_pool_show(struct device *dev,
3263 struct device_attribute *attr, char *buf)
602adf40 3264{
593a9e7b 3265 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3266
0d7dbfce 3267 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
3268}
3269
9bb2f334
AE
3270static ssize_t rbd_pool_id_show(struct device *dev,
3271 struct device_attribute *attr, char *buf)
3272{
3273 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3274
0d7dbfce 3275 return sprintf(buf, "%llu\n",
fc71d833 3276 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
3277}
3278
dfc5606d
YS
3279static ssize_t rbd_name_show(struct device *dev,
3280 struct device_attribute *attr, char *buf)
3281{
593a9e7b 3282 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3283
a92ffdf8
AE
3284 if (rbd_dev->spec->image_name)
3285 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3286
3287 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
3288}
3289
589d30e0
AE
3290static ssize_t rbd_image_id_show(struct device *dev,
3291 struct device_attribute *attr, char *buf)
3292{
3293 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3294
0d7dbfce 3295 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
3296}
3297
34b13184
AE
3298/*
3299 * Shows the name of the currently-mapped snapshot (or
3300 * RBD_SNAP_HEAD_NAME for the base image).
3301 */
dfc5606d
YS
3302static ssize_t rbd_snap_show(struct device *dev,
3303 struct device_attribute *attr,
3304 char *buf)
3305{
593a9e7b 3306 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3307
0d7dbfce 3308 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
3309}
3310
86b00e0d
AE
3311/*
3312 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3313 * for the parent image. If there is no parent, simply shows
3314 * "(no parent image)".
3315 */
3316static ssize_t rbd_parent_show(struct device *dev,
3317 struct device_attribute *attr,
3318 char *buf)
3319{
3320 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3321 struct rbd_spec *spec = rbd_dev->parent_spec;
3322 int count;
3323 char *bufp = buf;
3324
3325 if (!spec)
3326 return sprintf(buf, "(no parent image)\n");
3327
3328 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3329 (unsigned long long) spec->pool_id, spec->pool_name);
3330 if (count < 0)
3331 return count;
3332 bufp += count;
3333
3334 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3335 spec->image_name ? spec->image_name : "(unknown)");
3336 if (count < 0)
3337 return count;
3338 bufp += count;
3339
3340 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3341 (unsigned long long) spec->snap_id, spec->snap_name);
3342 if (count < 0)
3343 return count;
3344 bufp += count;
3345
3346 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3347 if (count < 0)
3348 return count;
3349 bufp += count;
3350
3351 return (ssize_t) (bufp - buf);
3352}
3353
dfc5606d
YS
3354static ssize_t rbd_image_refresh(struct device *dev,
3355 struct device_attribute *attr,
3356 const char *buf,
3357 size_t size)
3358{
593a9e7b 3359 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 3360 int ret;
602adf40 3361
cc4a38bd 3362 ret = rbd_dev_refresh(rbd_dev);
e627db08
AE
3363 if (ret)
3364 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
b813623a
AE
3365
3366 return ret < 0 ? ret : size;
dfc5606d 3367}
602adf40 3368
dfc5606d 3369static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 3370static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
3371static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3372static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3373static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 3374static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 3375static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 3376static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
3377static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3378static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 3379static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
3380
3381static struct attribute *rbd_attrs[] = {
3382 &dev_attr_size.attr,
34b13184 3383 &dev_attr_features.attr,
dfc5606d
YS
3384 &dev_attr_major.attr,
3385 &dev_attr_client_id.attr,
3386 &dev_attr_pool.attr,
9bb2f334 3387 &dev_attr_pool_id.attr,
dfc5606d 3388 &dev_attr_name.attr,
589d30e0 3389 &dev_attr_image_id.attr,
dfc5606d 3390 &dev_attr_current_snap.attr,
86b00e0d 3391 &dev_attr_parent.attr,
dfc5606d 3392 &dev_attr_refresh.attr,
dfc5606d
YS
3393 NULL
3394};
3395
3396static struct attribute_group rbd_attr_group = {
3397 .attrs = rbd_attrs,
3398};
3399
3400static const struct attribute_group *rbd_attr_groups[] = {
3401 &rbd_attr_group,
3402 NULL
3403};
3404
3405static void rbd_sysfs_dev_release(struct device *dev)
3406{
3407}
3408
3409static struct device_type rbd_device_type = {
3410 .name = "rbd",
3411 .groups = rbd_attr_groups,
3412 .release = rbd_sysfs_dev_release,
3413};
3414
8b8fb99c
AE
3415static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3416{
3417 kref_get(&spec->kref);
3418
3419 return spec;
3420}
3421
3422static void rbd_spec_free(struct kref *kref);
3423static void rbd_spec_put(struct rbd_spec *spec)
3424{
3425 if (spec)
3426 kref_put(&spec->kref, rbd_spec_free);
3427}
3428
3429static struct rbd_spec *rbd_spec_alloc(void)
3430{
3431 struct rbd_spec *spec;
3432
3433 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3434 if (!spec)
3435 return NULL;
3436 kref_init(&spec->kref);
3437
8b8fb99c
AE
3438 return spec;
3439}
3440
3441static void rbd_spec_free(struct kref *kref)
3442{
3443 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3444
3445 kfree(spec->pool_name);
3446 kfree(spec->image_id);
3447 kfree(spec->image_name);
3448 kfree(spec->snap_name);
3449 kfree(spec);
3450}
3451
cc344fa1 3452static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
3453 struct rbd_spec *spec)
3454{
3455 struct rbd_device *rbd_dev;
3456
3457 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3458 if (!rbd_dev)
3459 return NULL;
3460
3461 spin_lock_init(&rbd_dev->lock);
6d292906 3462 rbd_dev->flags = 0;
c53d5893 3463 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
3464 init_rwsem(&rbd_dev->header_rwsem);
3465
3466 rbd_dev->spec = spec;
3467 rbd_dev->rbd_client = rbdc;
3468
0903e875
AE
3469 /* Initialize the layout used for all rbd requests */
3470
3471 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3472 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3473 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3474 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3475
c53d5893
AE
3476 return rbd_dev;
3477}
3478
3479static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3480{
c53d5893
AE
3481 rbd_put_client(rbd_dev->rbd_client);
3482 rbd_spec_put(rbd_dev->spec);
3483 kfree(rbd_dev);
3484}
3485
9d475de5
AE
3486/*
3487 * Get the size and object order for an image snapshot, or if
3488 * snap_id is CEPH_NOSNAP, gets this information for the base
3489 * image.
3490 */
3491static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3492 u8 *order, u64 *snap_size)
3493{
3494 __le64 snapid = cpu_to_le64(snap_id);
3495 int ret;
3496 struct {
3497 u8 order;
3498 __le64 size;
3499 } __attribute__ ((packed)) size_buf = { 0 };
3500
36be9a76 3501 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5 3502 "rbd", "get_size",
4157976b 3503 &snapid, sizeof (snapid),
e2a58ee5 3504 &size_buf, sizeof (size_buf));
36be9a76 3505 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
3506 if (ret < 0)
3507 return ret;
57385b51
AE
3508 if (ret < sizeof (size_buf))
3509 return -ERANGE;
9d475de5 3510
c86f86e9
AE
3511 if (order)
3512 *order = size_buf.order;
9d475de5
AE
3513 *snap_size = le64_to_cpu(size_buf.size);
3514
3515 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
57385b51
AE
3516 (unsigned long long)snap_id, (unsigned int)*order,
3517 (unsigned long long)*snap_size);
9d475de5
AE
3518
3519 return 0;
3520}
3521
3522static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3523{
3524 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3525 &rbd_dev->header.obj_order,
3526 &rbd_dev->header.image_size);
3527}
3528
1e130199
AE
3529static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3530{
3531 void *reply_buf;
3532 int ret;
3533 void *p;
3534
3535 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3536 if (!reply_buf)
3537 return -ENOMEM;
3538
36be9a76 3539 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3540 "rbd", "get_object_prefix", NULL, 0,
e2a58ee5 3541 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 3542 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
3543 if (ret < 0)
3544 goto out;
3545
3546 p = reply_buf;
3547 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
3548 p + ret, NULL, GFP_NOIO);
3549 ret = 0;
1e130199
AE
3550
3551 if (IS_ERR(rbd_dev->header.object_prefix)) {
3552 ret = PTR_ERR(rbd_dev->header.object_prefix);
3553 rbd_dev->header.object_prefix = NULL;
3554 } else {
3555 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3556 }
1e130199
AE
3557out:
3558 kfree(reply_buf);
3559
3560 return ret;
3561}
3562
b1b5402a
AE
3563static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3564 u64 *snap_features)
3565{
3566 __le64 snapid = cpu_to_le64(snap_id);
3567 struct {
3568 __le64 features;
3569 __le64 incompat;
4157976b 3570 } __attribute__ ((packed)) features_buf = { 0 };
d889140c 3571 u64 incompat;
b1b5402a
AE
3572 int ret;
3573
36be9a76 3574 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a 3575 "rbd", "get_features",
4157976b 3576 &snapid, sizeof (snapid),
e2a58ee5 3577 &features_buf, sizeof (features_buf));
36be9a76 3578 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
3579 if (ret < 0)
3580 return ret;
57385b51
AE
3581 if (ret < sizeof (features_buf))
3582 return -ERANGE;
d889140c
AE
3583
3584 incompat = le64_to_cpu(features_buf.incompat);
5cbf6f12 3585 if (incompat & ~RBD_FEATURES_SUPPORTED)
b8f5c6ed 3586 return -ENXIO;
d889140c 3587
b1b5402a
AE
3588 *snap_features = le64_to_cpu(features_buf.features);
3589
3590 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
3591 (unsigned long long)snap_id,
3592 (unsigned long long)*snap_features,
3593 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
3594
3595 return 0;
3596}
3597
3598static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3599{
3600 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3601 &rbd_dev->header.features);
3602}
3603
86b00e0d
AE
3604static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3605{
3606 struct rbd_spec *parent_spec;
3607 size_t size;
3608 void *reply_buf = NULL;
3609 __le64 snapid;
3610 void *p;
3611 void *end;
3612 char *image_id;
3613 u64 overlap;
86b00e0d
AE
3614 int ret;
3615
3616 parent_spec = rbd_spec_alloc();
3617 if (!parent_spec)
3618 return -ENOMEM;
3619
3620 size = sizeof (__le64) + /* pool_id */
3621 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3622 sizeof (__le64) + /* snap_id */
3623 sizeof (__le64); /* overlap */
3624 reply_buf = kmalloc(size, GFP_KERNEL);
3625 if (!reply_buf) {
3626 ret = -ENOMEM;
3627 goto out_err;
3628 }
3629
3630 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 3631 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d 3632 "rbd", "get_parent",
4157976b 3633 &snapid, sizeof (snapid),
e2a58ee5 3634 reply_buf, size);
36be9a76 3635 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
3636 if (ret < 0)
3637 goto out_err;
3638
86b00e0d 3639 p = reply_buf;
57385b51
AE
3640 end = reply_buf + ret;
3641 ret = -ERANGE;
86b00e0d
AE
3642 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3643 if (parent_spec->pool_id == CEPH_NOPOOL)
3644 goto out; /* No parent? No problem. */
3645
0903e875
AE
3646 /* The ceph file layout needs to fit pool id in 32 bits */
3647
3648 ret = -EIO;
c0cd10db
AE
3649 if (parent_spec->pool_id > (u64)U32_MAX) {
3650 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3651 (unsigned long long)parent_spec->pool_id, U32_MAX);
57385b51 3652 goto out_err;
c0cd10db 3653 }
0903e875 3654
979ed480 3655 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
3656 if (IS_ERR(image_id)) {
3657 ret = PTR_ERR(image_id);
3658 goto out_err;
3659 }
3660 parent_spec->image_id = image_id;
3661 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3662 ceph_decode_64_safe(&p, end, overlap, out_err);
3663
3664 rbd_dev->parent_overlap = overlap;
3665 rbd_dev->parent_spec = parent_spec;
3666 parent_spec = NULL; /* rbd_dev now owns this */
3667out:
3668 ret = 0;
3669out_err:
3670 kfree(reply_buf);
3671 rbd_spec_put(parent_spec);
3672
3673 return ret;
3674}
3675
cc070d59
AE
3676static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3677{
3678 struct {
3679 __le64 stripe_unit;
3680 __le64 stripe_count;
3681 } __attribute__ ((packed)) striping_info_buf = { 0 };
3682 size_t size = sizeof (striping_info_buf);
3683 void *p;
3684 u64 obj_size;
3685 u64 stripe_unit;
3686 u64 stripe_count;
3687 int ret;
3688
3689 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3690 "rbd", "get_stripe_unit_count", NULL, 0,
e2a58ee5 3691 (char *)&striping_info_buf, size);
cc070d59
AE
3692 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3693 if (ret < 0)
3694 return ret;
3695 if (ret < size)
3696 return -ERANGE;
3697
3698 /*
3699 * We don't actually support the "fancy striping" feature
3700 * (STRIPINGV2) yet, but if the striping sizes are the
3701 * defaults the behavior is the same as before. So find
3702 * out, and only fail if the image has non-default values.
3703 */
3704 ret = -EINVAL;
3705 obj_size = (u64)1 << rbd_dev->header.obj_order;
3706 p = &striping_info_buf;
3707 stripe_unit = ceph_decode_64(&p);
3708 if (stripe_unit != obj_size) {
3709 rbd_warn(rbd_dev, "unsupported stripe unit "
3710 "(got %llu want %llu)",
3711 stripe_unit, obj_size);
3712 return -EINVAL;
3713 }
3714 stripe_count = ceph_decode_64(&p);
3715 if (stripe_count != 1) {
3716 rbd_warn(rbd_dev, "unsupported stripe count "
3717 "(got %llu want 1)", stripe_count);
3718 return -EINVAL;
3719 }
500d0c0f
AE
3720 rbd_dev->header.stripe_unit = stripe_unit;
3721 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
3722
3723 return 0;
3724}
3725
9e15b77d
AE
3726static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3727{
3728 size_t image_id_size;
3729 char *image_id;
3730 void *p;
3731 void *end;
3732 size_t size;
3733 void *reply_buf = NULL;
3734 size_t len = 0;
3735 char *image_name = NULL;
3736 int ret;
3737
3738 rbd_assert(!rbd_dev->spec->image_name);
3739
69e7a02f
AE
3740 len = strlen(rbd_dev->spec->image_id);
3741 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
3742 image_id = kmalloc(image_id_size, GFP_KERNEL);
3743 if (!image_id)
3744 return NULL;
3745
3746 p = image_id;
4157976b 3747 end = image_id + image_id_size;
57385b51 3748 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
3749
3750 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3751 reply_buf = kmalloc(size, GFP_KERNEL);
3752 if (!reply_buf)
3753 goto out;
3754
36be9a76 3755 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
3756 "rbd", "dir_get_name",
3757 image_id, image_id_size,
e2a58ee5 3758 reply_buf, size);
9e15b77d
AE
3759 if (ret < 0)
3760 goto out;
3761 p = reply_buf;
f40eb349
AE
3762 end = reply_buf + ret;
3763
9e15b77d
AE
3764 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3765 if (IS_ERR(image_name))
3766 image_name = NULL;
3767 else
3768 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3769out:
3770 kfree(reply_buf);
3771 kfree(image_id);
3772
3773 return image_name;
3774}
3775
2ad3d716
AE
3776static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3777{
3778 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3779 const char *snap_name;
3780 u32 which = 0;
3781
3782 /* Skip over names until we find the one we are looking for */
3783
3784 snap_name = rbd_dev->header.snap_names;
3785 while (which < snapc->num_snaps) {
3786 if (!strcmp(name, snap_name))
3787 return snapc->snaps[which];
3788 snap_name += strlen(snap_name) + 1;
3789 which++;
3790 }
3791 return CEPH_NOSNAP;
3792}
3793
3794static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3795{
3796 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3797 u32 which;
3798 bool found = false;
3799 u64 snap_id;
3800
3801 for (which = 0; !found && which < snapc->num_snaps; which++) {
3802 const char *snap_name;
3803
3804 snap_id = snapc->snaps[which];
3805 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3806 if (IS_ERR(snap_name))
3807 break;
3808 found = !strcmp(name, snap_name);
3809 kfree(snap_name);
3810 }
3811 return found ? snap_id : CEPH_NOSNAP;
3812}
3813
3814/*
3815 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3816 * no snapshot by that name is found, or if an error occurs.
3817 */
3818static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3819{
3820 if (rbd_dev->image_format == 1)
3821 return rbd_v1_snap_id_by_name(rbd_dev, name);
3822
3823 return rbd_v2_snap_id_by_name(rbd_dev, name);
3824}
3825
9e15b77d 3826/*
2e9f7f1c
AE
3827 * When an rbd image has a parent image, it is identified by the
3828 * pool, image, and snapshot ids (not names). This function fills
3829 * in the names for those ids. (It's OK if we can't figure out the
3830 * name for an image id, but the pool and snapshot ids should always
3831 * exist and have names.) All names in an rbd spec are dynamically
3832 * allocated.
e1d4213f
AE
3833 *
3834 * When an image being mapped (not a parent) is probed, we have the
3835 * pool name and pool id, image name and image id, and the snapshot
3836 * name. The only thing we're missing is the snapshot id.
9e15b77d 3837 */
2e9f7f1c 3838static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
9e15b77d 3839{
2e9f7f1c
AE
3840 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3841 struct rbd_spec *spec = rbd_dev->spec;
3842 const char *pool_name;
3843 const char *image_name;
3844 const char *snap_name;
9e15b77d
AE
3845 int ret;
3846
e1d4213f
AE
3847 /*
3848 * An image being mapped will have the pool name (etc.), but
3849 * we need to look up the snapshot id.
3850 */
2e9f7f1c
AE
3851 if (spec->pool_name) {
3852 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
2ad3d716 3853 u64 snap_id;
e1d4213f 3854
2ad3d716
AE
3855 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3856 if (snap_id == CEPH_NOSNAP)
e1d4213f 3857 return -ENOENT;
2ad3d716 3858 spec->snap_id = snap_id;
e1d4213f 3859 } else {
2e9f7f1c 3860 spec->snap_id = CEPH_NOSNAP;
e1d4213f
AE
3861 }
3862
3863 return 0;
3864 }
9e15b77d 3865
2e9f7f1c 3866 /* Get the pool name; we have to make our own copy of this */
9e15b77d 3867
2e9f7f1c
AE
3868 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3869 if (!pool_name) {
3870 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
3871 return -EIO;
3872 }
2e9f7f1c
AE
3873 pool_name = kstrdup(pool_name, GFP_KERNEL);
3874 if (!pool_name)
9e15b77d
AE
3875 return -ENOMEM;
3876
3877 /* Fetch the image name; tolerate failure here */
3878
2e9f7f1c
AE
3879 image_name = rbd_dev_image_name(rbd_dev);
3880 if (!image_name)
06ecc6cb 3881 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 3882
2e9f7f1c 3883 /* Look up the snapshot name, and make a copy */
9e15b77d 3884
2e9f7f1c 3885 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
2e9f7f1c
AE
3886 if (!snap_name) {
3887 ret = -ENOMEM;
9e15b77d 3888 goto out_err;
2e9f7f1c
AE
3889 }
3890
3891 spec->pool_name = pool_name;
3892 spec->image_name = image_name;
3893 spec->snap_name = snap_name;
9e15b77d
AE
3894
3895 return 0;
3896out_err:
2e9f7f1c
AE
3897 kfree(image_name);
3898 kfree(pool_name);
9e15b77d
AE
3899
3900 return ret;
3901}
3902
cc4a38bd 3903static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
3904{
3905 size_t size;
3906 int ret;
3907 void *reply_buf;
3908 void *p;
3909 void *end;
3910 u64 seq;
3911 u32 snap_count;
3912 struct ceph_snap_context *snapc;
3913 u32 i;
3914
3915 /*
3916 * We'll need room for the seq value (maximum snapshot id),
3917 * snapshot count, and array of that many snapshot ids.
3918 * For now we have a fixed upper limit on the number we're
3919 * prepared to receive.
3920 */
3921 size = sizeof (__le64) + sizeof (__le32) +
3922 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3923 reply_buf = kzalloc(size, GFP_KERNEL);
3924 if (!reply_buf)
3925 return -ENOMEM;
3926
36be9a76 3927 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3928 "rbd", "get_snapcontext", NULL, 0,
e2a58ee5 3929 reply_buf, size);
36be9a76 3930 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
3931 if (ret < 0)
3932 goto out;
3933
35d489f9 3934 p = reply_buf;
57385b51
AE
3935 end = reply_buf + ret;
3936 ret = -ERANGE;
35d489f9
AE
3937 ceph_decode_64_safe(&p, end, seq, out);
3938 ceph_decode_32_safe(&p, end, snap_count, out);
3939
3940 /*
3941 * Make sure the reported number of snapshot ids wouldn't go
3942 * beyond the end of our buffer. But before checking that,
3943 * make sure the computed size of the snapshot context we
3944 * allocate is representable in a size_t.
3945 */
3946 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3947 / sizeof (u64)) {
3948 ret = -EINVAL;
3949 goto out;
3950 }
3951 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3952 goto out;
468521c1 3953 ret = 0;
35d489f9 3954
812164f8 3955 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
3956 if (!snapc) {
3957 ret = -ENOMEM;
3958 goto out;
3959 }
35d489f9 3960 snapc->seq = seq;
35d489f9
AE
3961 for (i = 0; i < snap_count; i++)
3962 snapc->snaps[i] = ceph_decode_64(&p);
3963
49ece554 3964 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
3965 rbd_dev->header.snapc = snapc;
3966
3967 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 3968 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
3969out:
3970 kfree(reply_buf);
3971
57385b51 3972 return ret;
35d489f9
AE
3973}
3974
54cac61f
AE
3975static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
3976 u64 snap_id)
b8b1e2db
AE
3977{
3978 size_t size;
3979 void *reply_buf;
54cac61f 3980 __le64 snapid;
b8b1e2db
AE
3981 int ret;
3982 void *p;
3983 void *end;
b8b1e2db
AE
3984 char *snap_name;
3985
3986 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3987 reply_buf = kmalloc(size, GFP_KERNEL);
3988 if (!reply_buf)
3989 return ERR_PTR(-ENOMEM);
3990
54cac61f 3991 snapid = cpu_to_le64(snap_id);
36be9a76 3992 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db 3993 "rbd", "get_snapshot_name",
54cac61f 3994 &snapid, sizeof (snapid),
e2a58ee5 3995 reply_buf, size);
36be9a76 3996 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
3997 if (ret < 0) {
3998 snap_name = ERR_PTR(ret);
b8b1e2db 3999 goto out;
f40eb349 4000 }
b8b1e2db
AE
4001
4002 p = reply_buf;
f40eb349 4003 end = reply_buf + ret;
e5c35534 4004 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 4005 if (IS_ERR(snap_name))
b8b1e2db 4006 goto out;
b8b1e2db 4007
f40eb349 4008 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 4009 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
4010out:
4011 kfree(reply_buf);
4012
f40eb349 4013 return snap_name;
b8b1e2db
AE
4014}
4015
cc4a38bd 4016static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
117973fb
AE
4017{
4018 int ret;
117973fb
AE
4019
4020 down_write(&rbd_dev->header_rwsem);
4021
117973fb
AE
4022 ret = rbd_dev_v2_image_size(rbd_dev);
4023 if (ret)
4024 goto out;
29334ba4
AE
4025 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4026 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4027 rbd_dev->mapping.size = rbd_dev->header.image_size;
117973fb 4028
cc4a38bd 4029 ret = rbd_dev_v2_snap_context(rbd_dev);
117973fb
AE
4030 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4031 if (ret)
4032 goto out;
117973fb
AE
4033out:
4034 up_write(&rbd_dev->header_rwsem);
4035
4036 return ret;
4037}
4038
dfc5606d
YS
4039static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4040{
dfc5606d 4041 struct device *dev;
cd789ab9 4042 int ret;
dfc5606d
YS
4043
4044 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
dfc5606d 4045
cd789ab9 4046 dev = &rbd_dev->dev;
dfc5606d
YS
4047 dev->bus = &rbd_bus_type;
4048 dev->type = &rbd_device_type;
4049 dev->parent = &rbd_root_dev;
200a6a8b 4050 dev->release = rbd_dev_device_release;
de71a297 4051 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 4052 ret = device_register(dev);
dfc5606d 4053
dfc5606d 4054 mutex_unlock(&ctl_mutex);
cd789ab9 4055
dfc5606d 4056 return ret;
602adf40
YS
4057}
4058
dfc5606d
YS
4059static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4060{
4061 device_unregister(&rbd_dev->dev);
4062}
4063
e2839308 4064static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
4065
4066/*
499afd5b
AE
4067 * Get a unique rbd identifier for the given new rbd_dev, and add
4068 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 4069 */
e2839308 4070static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 4071{
e2839308 4072 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
4073
4074 spin_lock(&rbd_dev_list_lock);
4075 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4076 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
4077 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4078 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 4079}
b7f23c36 4080
1ddbe94e 4081/*
499afd5b
AE
4082 * Remove an rbd_dev from the global list, and record that its
4083 * identifier is no longer in use.
1ddbe94e 4084 */
e2839308 4085static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 4086{
d184f6bf 4087 struct list_head *tmp;
de71a297 4088 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
4089 int max_id;
4090
aafb230e 4091 rbd_assert(rbd_id > 0);
499afd5b 4092
e2839308
AE
4093 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4094 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
4095 spin_lock(&rbd_dev_list_lock);
4096 list_del_init(&rbd_dev->node);
d184f6bf
AE
4097
4098 /*
4099 * If the id being "put" is not the current maximum, there
4100 * is nothing special we need to do.
4101 */
e2839308 4102 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
4103 spin_unlock(&rbd_dev_list_lock);
4104 return;
4105 }
4106
4107 /*
4108 * We need to update the current maximum id. Search the
4109 * list to find out what it is. We're more likely to find
4110 * the maximum at the end, so search the list backward.
4111 */
4112 max_id = 0;
4113 list_for_each_prev(tmp, &rbd_dev_list) {
4114 struct rbd_device *rbd_dev;
4115
4116 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
4117 if (rbd_dev->dev_id > max_id)
4118 max_id = rbd_dev->dev_id;
d184f6bf 4119 }
499afd5b 4120 spin_unlock(&rbd_dev_list_lock);
b7f23c36 4121
1ddbe94e 4122 /*
e2839308 4123 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
4124 * which case it now accurately reflects the new maximum.
4125 * Be careful not to overwrite the maximum value in that
4126 * case.
1ddbe94e 4127 */
e2839308
AE
4128 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4129 dout(" max dev id has been reset\n");
b7f23c36
AE
4130}
4131
e28fff26
AE
4132/*
4133 * Skips over white space at *buf, and updates *buf to point to the
4134 * first found non-space character (if any). Returns the length of
593a9e7b
AE
4135 * the token (string of non-white space characters) found. Note
4136 * that *buf must be terminated with '\0'.
e28fff26
AE
4137 */
4138static inline size_t next_token(const char **buf)
4139{
4140 /*
4141 * These are the characters that produce nonzero for
4142 * isspace() in the "C" and "POSIX" locales.
4143 */
4144 const char *spaces = " \f\n\r\t\v";
4145
4146 *buf += strspn(*buf, spaces); /* Find start of token */
4147
4148 return strcspn(*buf, spaces); /* Return token length */
4149}
4150
4151/*
4152 * Finds the next token in *buf, and if the provided token buffer is
4153 * big enough, copies the found token into it. The result, if
593a9e7b
AE
4154 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4155 * must be terminated with '\0' on entry.
e28fff26
AE
4156 *
4157 * Returns the length of the token found (not including the '\0').
4158 * Return value will be 0 if no token is found, and it will be >=
4159 * token_size if the token would not fit.
4160 *
593a9e7b 4161 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
4162 * found token. Note that this occurs even if the token buffer is
4163 * too small to hold it.
4164 */
4165static inline size_t copy_token(const char **buf,
4166 char *token,
4167 size_t token_size)
4168{
4169 size_t len;
4170
4171 len = next_token(buf);
4172 if (len < token_size) {
4173 memcpy(token, *buf, len);
4174 *(token + len) = '\0';
4175 }
4176 *buf += len;
4177
4178 return len;
4179}
4180
ea3352f4
AE
4181/*
4182 * Finds the next token in *buf, dynamically allocates a buffer big
4183 * enough to hold a copy of it, and copies the token into the new
4184 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4185 * that a duplicate buffer is created even for a zero-length token.
4186 *
4187 * Returns a pointer to the newly-allocated duplicate, or a null
4188 * pointer if memory for the duplicate was not available. If
4189 * the lenp argument is a non-null pointer, the length of the token
4190 * (not including the '\0') is returned in *lenp.
4191 *
4192 * If successful, the *buf pointer will be updated to point beyond
4193 * the end of the found token.
4194 *
4195 * Note: uses GFP_KERNEL for allocation.
4196 */
4197static inline char *dup_token(const char **buf, size_t *lenp)
4198{
4199 char *dup;
4200 size_t len;
4201
4202 len = next_token(buf);
4caf35f9 4203 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
4204 if (!dup)
4205 return NULL;
ea3352f4
AE
4206 *(dup + len) = '\0';
4207 *buf += len;
4208
4209 if (lenp)
4210 *lenp = len;
4211
4212 return dup;
4213}
4214
a725f65e 4215/*
859c31df
AE
4216 * Parse the options provided for an "rbd add" (i.e., rbd image
4217 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4218 * and the data written is passed here via a NUL-terminated buffer.
4219 * Returns 0 if successful or an error code otherwise.
d22f76e7 4220 *
859c31df
AE
4221 * The information extracted from these options is recorded in
4222 * the other parameters which return dynamically-allocated
4223 * structures:
4224 * ceph_opts
4225 * The address of a pointer that will refer to a ceph options
4226 * structure. Caller must release the returned pointer using
4227 * ceph_destroy_options() when it is no longer needed.
4228 * rbd_opts
4229 * Address of an rbd options pointer. Fully initialized by
4230 * this function; caller must release with kfree().
4231 * spec
4232 * Address of an rbd image specification pointer. Fully
4233 * initialized by this function based on parsed options.
4234 * Caller must release with rbd_spec_put().
4235 *
4236 * The options passed take this form:
4237 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4238 * where:
4239 * <mon_addrs>
4240 * A comma-separated list of one or more monitor addresses.
4241 * A monitor address is an ip address, optionally followed
4242 * by a port number (separated by a colon).
4243 * I.e.: ip1[:port1][,ip2[:port2]...]
4244 * <options>
4245 * A comma-separated list of ceph and/or rbd options.
4246 * <pool_name>
4247 * The name of the rados pool containing the rbd image.
4248 * <image_name>
4249 * The name of the image in that pool to map.
4250 * <snap_id>
4251 * An optional snapshot id. If provided, the mapping will
4252 * present data from the image at the time that snapshot was
4253 * created. The image head is used if no snapshot id is
4254 * provided. Snapshot mappings are always read-only.
a725f65e 4255 */
859c31df 4256static int rbd_add_parse_args(const char *buf,
dc79b113 4257 struct ceph_options **ceph_opts,
859c31df
AE
4258 struct rbd_options **opts,
4259 struct rbd_spec **rbd_spec)
e28fff26 4260{
d22f76e7 4261 size_t len;
859c31df 4262 char *options;
0ddebc0c 4263 const char *mon_addrs;
ecb4dc22 4264 char *snap_name;
0ddebc0c 4265 size_t mon_addrs_size;
859c31df 4266 struct rbd_spec *spec = NULL;
4e9afeba 4267 struct rbd_options *rbd_opts = NULL;
859c31df 4268 struct ceph_options *copts;
dc79b113 4269 int ret;
e28fff26
AE
4270
4271 /* The first four tokens are required */
4272
7ef3214a 4273 len = next_token(&buf);
4fb5d671
AE
4274 if (!len) {
4275 rbd_warn(NULL, "no monitor address(es) provided");
4276 return -EINVAL;
4277 }
0ddebc0c 4278 mon_addrs = buf;
f28e565a 4279 mon_addrs_size = len + 1;
7ef3214a 4280 buf += len;
a725f65e 4281
dc79b113 4282 ret = -EINVAL;
f28e565a
AE
4283 options = dup_token(&buf, NULL);
4284 if (!options)
dc79b113 4285 return -ENOMEM;
4fb5d671
AE
4286 if (!*options) {
4287 rbd_warn(NULL, "no options provided");
4288 goto out_err;
4289 }
e28fff26 4290
859c31df
AE
4291 spec = rbd_spec_alloc();
4292 if (!spec)
f28e565a 4293 goto out_mem;
859c31df
AE
4294
4295 spec->pool_name = dup_token(&buf, NULL);
4296 if (!spec->pool_name)
4297 goto out_mem;
4fb5d671
AE
4298 if (!*spec->pool_name) {
4299 rbd_warn(NULL, "no pool name provided");
4300 goto out_err;
4301 }
e28fff26 4302
69e7a02f 4303 spec->image_name = dup_token(&buf, NULL);
859c31df 4304 if (!spec->image_name)
f28e565a 4305 goto out_mem;
4fb5d671
AE
4306 if (!*spec->image_name) {
4307 rbd_warn(NULL, "no image name provided");
4308 goto out_err;
4309 }
d4b125e9 4310
f28e565a
AE
4311 /*
4312 * Snapshot name is optional; default is to use "-"
4313 * (indicating the head/no snapshot).
4314 */
3feeb894 4315 len = next_token(&buf);
820a5f3e 4316 if (!len) {
3feeb894
AE
4317 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4318 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 4319 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 4320 ret = -ENAMETOOLONG;
f28e565a 4321 goto out_err;
849b4260 4322 }
ecb4dc22
AE
4323 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4324 if (!snap_name)
f28e565a 4325 goto out_mem;
ecb4dc22
AE
4326 *(snap_name + len) = '\0';
4327 spec->snap_name = snap_name;
e5c35534 4328
0ddebc0c 4329 /* Initialize all rbd options to the defaults */
e28fff26 4330
4e9afeba
AE
4331 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4332 if (!rbd_opts)
4333 goto out_mem;
4334
4335 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 4336
859c31df 4337 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 4338 mon_addrs + mon_addrs_size - 1,
4e9afeba 4339 parse_rbd_opts_token, rbd_opts);
859c31df
AE
4340 if (IS_ERR(copts)) {
4341 ret = PTR_ERR(copts);
dc79b113
AE
4342 goto out_err;
4343 }
859c31df
AE
4344 kfree(options);
4345
4346 *ceph_opts = copts;
4e9afeba 4347 *opts = rbd_opts;
859c31df 4348 *rbd_spec = spec;
0ddebc0c 4349
dc79b113 4350 return 0;
f28e565a 4351out_mem:
dc79b113 4352 ret = -ENOMEM;
d22f76e7 4353out_err:
859c31df
AE
4354 kfree(rbd_opts);
4355 rbd_spec_put(spec);
f28e565a 4356 kfree(options);
d22f76e7 4357
dc79b113 4358 return ret;
a725f65e
AE
4359}
4360
589d30e0
AE
4361/*
4362 * An rbd format 2 image has a unique identifier, distinct from the
4363 * name given to it by the user. Internally, that identifier is
4364 * what's used to specify the names of objects related to the image.
4365 *
4366 * A special "rbd id" object is used to map an rbd image name to its
4367 * id. If that object doesn't exist, then there is no v2 rbd image
4368 * with the supplied name.
4369 *
4370 * This function will record the given rbd_dev's image_id field if
4371 * it can be determined, and in that case will return 0. If any
4372 * errors occur a negative errno will be returned and the rbd_dev's
4373 * image_id field will be unchanged (and should be NULL).
4374 */
4375static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4376{
4377 int ret;
4378 size_t size;
4379 char *object_name;
4380 void *response;
c0fba368 4381 char *image_id;
2f82ee54 4382
2c0d0a10
AE
4383 /*
4384 * When probing a parent image, the image id is already
4385 * known (and the image name likely is not). There's no
c0fba368
AE
4386 * need to fetch the image id again in this case. We
4387 * do still need to set the image format though.
2c0d0a10 4388 */
c0fba368
AE
4389 if (rbd_dev->spec->image_id) {
4390 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4391
2c0d0a10 4392 return 0;
c0fba368 4393 }
2c0d0a10 4394
589d30e0
AE
4395 /*
4396 * First, see if the format 2 image id file exists, and if
4397 * so, get the image's persistent id from it.
4398 */
69e7a02f 4399 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
4400 object_name = kmalloc(size, GFP_NOIO);
4401 if (!object_name)
4402 return -ENOMEM;
0d7dbfce 4403 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
4404 dout("rbd id object name is %s\n", object_name);
4405
4406 /* Response will be an encoded string, which includes a length */
4407
4408 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4409 response = kzalloc(size, GFP_NOIO);
4410 if (!response) {
4411 ret = -ENOMEM;
4412 goto out;
4413 }
4414
c0fba368
AE
4415 /* If it doesn't exist we'll assume it's a format 1 image */
4416
36be9a76 4417 ret = rbd_obj_method_sync(rbd_dev, object_name,
4157976b 4418 "rbd", "get_id", NULL, 0,
e2a58ee5 4419 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 4420 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
4421 if (ret == -ENOENT) {
4422 image_id = kstrdup("", GFP_KERNEL);
4423 ret = image_id ? 0 : -ENOMEM;
4424 if (!ret)
4425 rbd_dev->image_format = 1;
4426 } else if (ret > sizeof (__le32)) {
4427 void *p = response;
4428
4429 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 4430 NULL, GFP_NOIO);
c0fba368
AE
4431 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4432 if (!ret)
4433 rbd_dev->image_format = 2;
589d30e0 4434 } else {
c0fba368
AE
4435 ret = -EINVAL;
4436 }
4437
4438 if (!ret) {
4439 rbd_dev->spec->image_id = image_id;
4440 dout("image_id is %s\n", image_id);
589d30e0
AE
4441 }
4442out:
4443 kfree(response);
4444 kfree(object_name);
4445
4446 return ret;
4447}
4448
6fd48b3b
AE
4449/* Undo whatever state changes are made by v1 or v2 image probe */
4450
4451static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4452{
4453 struct rbd_image_header *header;
4454
4455 rbd_dev_remove_parent(rbd_dev);
4456 rbd_spec_put(rbd_dev->parent_spec);
4457 rbd_dev->parent_spec = NULL;
4458 rbd_dev->parent_overlap = 0;
4459
4460 /* Free dynamic fields from the header, then zero it out */
4461
4462 header = &rbd_dev->header;
812164f8 4463 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
4464 kfree(header->snap_sizes);
4465 kfree(header->snap_names);
4466 kfree(header->object_prefix);
4467 memset(header, 0, sizeof (*header));
4468}
4469
a30b71b9
AE
4470static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4471{
4472 int ret;
a30b71b9
AE
4473
4474 /* Populate rbd image metadata */
4475
662518b1 4476 ret = rbd_dev_v1_header_read(rbd_dev);
a30b71b9
AE
4477 if (ret < 0)
4478 goto out_err;
86b00e0d
AE
4479
4480 /* Version 1 images have no parent (no layering) */
4481
4482 rbd_dev->parent_spec = NULL;
4483 rbd_dev->parent_overlap = 0;
4484
a30b71b9
AE
4485 dout("discovered version 1 image, header name is %s\n",
4486 rbd_dev->header_name);
4487
4488 return 0;
4489
4490out_err:
4491 kfree(rbd_dev->header_name);
4492 rbd_dev->header_name = NULL;
0d7dbfce
AE
4493 kfree(rbd_dev->spec->image_id);
4494 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
4495
4496 return ret;
4497}
4498
4499static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4500{
9d475de5 4501 int ret;
a30b71b9 4502
9d475de5 4503 ret = rbd_dev_v2_image_size(rbd_dev);
57385b51 4504 if (ret)
1e130199
AE
4505 goto out_err;
4506
4507 /* Get the object prefix (a.k.a. block_name) for the image */
4508
4509 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 4510 if (ret)
b1b5402a
AE
4511 goto out_err;
4512
d889140c 4513 /* Get the and check features for the image */
b1b5402a
AE
4514
4515 ret = rbd_dev_v2_features(rbd_dev);
57385b51 4516 if (ret)
9d475de5 4517 goto out_err;
35d489f9 4518
86b00e0d
AE
4519 /* If the image supports layering, get the parent info */
4520
4521 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4522 ret = rbd_dev_v2_parent_info(rbd_dev);
57385b51 4523 if (ret)
86b00e0d 4524 goto out_err;
96882f55 4525 /*
c734b796
AE
4526 * Print a warning if this image has a parent.
4527 * Don't print it if the image now being probed
4528 * is itself a parent. We can tell at this point
4529 * because we won't know its pool name yet (just its
4530 * pool id).
96882f55 4531 */
c734b796 4532 if (rbd_dev->parent_spec && rbd_dev->spec->pool_name)
96882f55
AE
4533 rbd_warn(rbd_dev, "WARNING: kernel layering "
4534 "is EXPERIMENTAL!");
86b00e0d
AE
4535 }
4536
cc070d59
AE
4537 /* If the image supports fancy striping, get its parameters */
4538
4539 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4540 ret = rbd_dev_v2_striping_info(rbd_dev);
4541 if (ret < 0)
4542 goto out_err;
4543 }
4544
6e14b1a6
AE
4545 /* crypto and compression type aren't (yet) supported for v2 images */
4546
4547 rbd_dev->header.crypt_type = 0;
4548 rbd_dev->header.comp_type = 0;
35d489f9 4549
6e14b1a6
AE
4550 /* Get the snapshot context, plus the header version */
4551
cc4a38bd 4552 ret = rbd_dev_v2_snap_context(rbd_dev);
35d489f9
AE
4553 if (ret)
4554 goto out_err;
6e14b1a6 4555
a30b71b9
AE
4556 dout("discovered version 2 image, header name is %s\n",
4557 rbd_dev->header_name);
4558
35152979 4559 return 0;
9d475de5 4560out_err:
86b00e0d
AE
4561 rbd_dev->parent_overlap = 0;
4562 rbd_spec_put(rbd_dev->parent_spec);
4563 rbd_dev->parent_spec = NULL;
9d475de5
AE
4564 kfree(rbd_dev->header_name);
4565 rbd_dev->header_name = NULL;
1e130199
AE
4566 kfree(rbd_dev->header.object_prefix);
4567 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
4568
4569 return ret;
a30b71b9
AE
4570}
4571
124afba2 4572static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
83a06263 4573{
2f82ee54 4574 struct rbd_device *parent = NULL;
124afba2
AE
4575 struct rbd_spec *parent_spec;
4576 struct rbd_client *rbdc;
4577 int ret;
4578
4579 if (!rbd_dev->parent_spec)
4580 return 0;
4581 /*
4582 * We need to pass a reference to the client and the parent
4583 * spec when creating the parent rbd_dev. Images related by
4584 * parent/child relationships always share both.
4585 */
4586 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4587 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4588
4589 ret = -ENOMEM;
4590 parent = rbd_dev_create(rbdc, parent_spec);
4591 if (!parent)
4592 goto out_err;
4593
51344a38 4594 ret = rbd_dev_image_probe(parent, true);
124afba2
AE
4595 if (ret < 0)
4596 goto out_err;
4597 rbd_dev->parent = parent;
4598
4599 return 0;
4600out_err:
4601 if (parent) {
4602 rbd_spec_put(rbd_dev->parent_spec);
4603 kfree(rbd_dev->header_name);
4604 rbd_dev_destroy(parent);
4605 } else {
4606 rbd_put_client(rbdc);
4607 rbd_spec_put(parent_spec);
4608 }
4609
4610 return ret;
4611}
4612
200a6a8b 4613static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 4614{
83a06263 4615 int ret;
d1cf5788 4616
83a06263
AE
4617 /* generate unique id: find highest unique id, add one */
4618 rbd_dev_id_get(rbd_dev);
4619
4620 /* Fill in the device name, now that we have its id. */
4621 BUILD_BUG_ON(DEV_NAME_LEN
4622 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4623 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4624
4625 /* Get our block major device number. */
4626
4627 ret = register_blkdev(0, rbd_dev->name);
4628 if (ret < 0)
4629 goto err_out_id;
4630 rbd_dev->major = ret;
4631
4632 /* Set up the blkdev mapping. */
4633
4634 ret = rbd_init_disk(rbd_dev);
4635 if (ret)
4636 goto err_out_blkdev;
4637
f35a4dee 4638 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
4639 if (ret)
4640 goto err_out_disk;
f35a4dee
AE
4641 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4642
4643 ret = rbd_bus_add_dev(rbd_dev);
4644 if (ret)
4645 goto err_out_mapping;
83a06263 4646
83a06263
AE
4647 /* Everything's ready. Announce the disk to the world. */
4648
129b79d4 4649 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
83a06263
AE
4650 add_disk(rbd_dev->disk);
4651
4652 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4653 (unsigned long long) rbd_dev->mapping.size);
4654
4655 return ret;
2f82ee54 4656
f35a4dee
AE
4657err_out_mapping:
4658 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4659err_out_disk:
4660 rbd_free_disk(rbd_dev);
4661err_out_blkdev:
4662 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4663err_out_id:
4664 rbd_dev_id_put(rbd_dev);
d1cf5788 4665 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4666
4667 return ret;
4668}
4669
332bb12d
AE
4670static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4671{
4672 struct rbd_spec *spec = rbd_dev->spec;
4673 size_t size;
4674
4675 /* Record the header object name for this rbd image. */
4676
4677 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4678
4679 if (rbd_dev->image_format == 1)
4680 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4681 else
4682 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4683
4684 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4685 if (!rbd_dev->header_name)
4686 return -ENOMEM;
4687
4688 if (rbd_dev->image_format == 1)
4689 sprintf(rbd_dev->header_name, "%s%s",
4690 spec->image_name, RBD_SUFFIX);
4691 else
4692 sprintf(rbd_dev->header_name, "%s%s",
4693 RBD_HEADER_PREFIX, spec->image_id);
4694 return 0;
4695}
4696
200a6a8b
AE
4697static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4698{
6fd48b3b
AE
4699 int ret;
4700
6fd48b3b
AE
4701 rbd_dev_unprobe(rbd_dev);
4702 ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4703 if (ret)
4704 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
200a6a8b 4705 kfree(rbd_dev->header_name);
6fd48b3b
AE
4706 rbd_dev->header_name = NULL;
4707 rbd_dev->image_format = 0;
4708 kfree(rbd_dev->spec->image_id);
4709 rbd_dev->spec->image_id = NULL;
4710
200a6a8b
AE
4711 rbd_dev_destroy(rbd_dev);
4712}
4713
a30b71b9
AE
4714/*
4715 * Probe for the existence of the header object for the given rbd
4716 * device. For format 2 images this includes determining the image
4717 * id.
4718 */
51344a38 4719static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only)
a30b71b9
AE
4720{
4721 int ret;
b644de2b 4722 int tmp;
a30b71b9
AE
4723
4724 /*
4725 * Get the id from the image id object. If it's not a
4726 * format 2 image, we'll get ENOENT back, and we'll assume
4727 * it's a format 1 image.
4728 */
4729 ret = rbd_dev_image_id(rbd_dev);
4730 if (ret)
c0fba368
AE
4731 return ret;
4732 rbd_assert(rbd_dev->spec->image_id);
4733 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4734
332bb12d
AE
4735 ret = rbd_dev_header_name(rbd_dev);
4736 if (ret)
4737 goto err_out_format;
4738
b644de2b
AE
4739 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4740 if (ret)
4741 goto out_header_name;
4742
c0fba368 4743 if (rbd_dev->image_format == 1)
a30b71b9
AE
4744 ret = rbd_dev_v1_probe(rbd_dev);
4745 else
4746 ret = rbd_dev_v2_probe(rbd_dev);
5655c4d9 4747 if (ret)
b644de2b 4748 goto err_out_watch;
83a06263 4749
9bb81c9b
AE
4750 ret = rbd_dev_spec_update(rbd_dev);
4751 if (ret)
33dca39f 4752 goto err_out_probe;
9bb81c9b 4753
51344a38
AE
4754 /* If we are mapping a snapshot it must be marked read-only */
4755
4756 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
4757 read_only = true;
4758 rbd_dev->mapping.read_only = read_only;
4759
9bb81c9b 4760 ret = rbd_dev_probe_parent(rbd_dev);
6fd48b3b
AE
4761 if (!ret)
4762 return 0;
83a06263 4763
6fd48b3b
AE
4764err_out_probe:
4765 rbd_dev_unprobe(rbd_dev);
b644de2b
AE
4766err_out_watch:
4767 tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4768 if (tmp)
4769 rbd_warn(rbd_dev, "unable to tear down watch request\n");
332bb12d
AE
4770out_header_name:
4771 kfree(rbd_dev->header_name);
4772 rbd_dev->header_name = NULL;
4773err_out_format:
4774 rbd_dev->image_format = 0;
5655c4d9
AE
4775 kfree(rbd_dev->spec->image_id);
4776 rbd_dev->spec->image_id = NULL;
4777
4778 dout("probe failed, returning %d\n", ret);
4779
a30b71b9
AE
4780 return ret;
4781}
4782
59c2be1e
YS
4783static ssize_t rbd_add(struct bus_type *bus,
4784 const char *buf,
4785 size_t count)
602adf40 4786{
cb8627c7 4787 struct rbd_device *rbd_dev = NULL;
dc79b113 4788 struct ceph_options *ceph_opts = NULL;
4e9afeba 4789 struct rbd_options *rbd_opts = NULL;
859c31df 4790 struct rbd_spec *spec = NULL;
9d3997fd 4791 struct rbd_client *rbdc;
27cc2594 4792 struct ceph_osd_client *osdc;
51344a38 4793 bool read_only;
27cc2594 4794 int rc = -ENOMEM;
602adf40
YS
4795
4796 if (!try_module_get(THIS_MODULE))
4797 return -ENODEV;
4798
602adf40 4799 /* parse add command */
859c31df 4800 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 4801 if (rc < 0)
bd4ba655 4802 goto err_out_module;
51344a38
AE
4803 read_only = rbd_opts->read_only;
4804 kfree(rbd_opts);
4805 rbd_opts = NULL; /* done with this */
78cea76e 4806
9d3997fd
AE
4807 rbdc = rbd_get_client(ceph_opts);
4808 if (IS_ERR(rbdc)) {
4809 rc = PTR_ERR(rbdc);
0ddebc0c 4810 goto err_out_args;
9d3997fd 4811 }
c53d5893 4812 ceph_opts = NULL; /* rbd_dev client now owns this */
602adf40 4813
602adf40 4814 /* pick the pool */
9d3997fd 4815 osdc = &rbdc->client->osdc;
859c31df 4816 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
4817 if (rc < 0)
4818 goto err_out_client;
c0cd10db 4819 spec->pool_id = (u64)rc;
859c31df 4820
0903e875
AE
4821 /* The ceph file layout needs to fit pool id in 32 bits */
4822
c0cd10db
AE
4823 if (spec->pool_id > (u64)U32_MAX) {
4824 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4825 (unsigned long long)spec->pool_id, U32_MAX);
0903e875
AE
4826 rc = -EIO;
4827 goto err_out_client;
4828 }
4829
c53d5893 4830 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
4831 if (!rbd_dev)
4832 goto err_out_client;
c53d5893
AE
4833 rbdc = NULL; /* rbd_dev now owns this */
4834 spec = NULL; /* rbd_dev now owns this */
602adf40 4835
51344a38 4836 rc = rbd_dev_image_probe(rbd_dev, read_only);
a30b71b9 4837 if (rc < 0)
c53d5893 4838 goto err_out_rbd_dev;
05fd6f6f 4839
b536f69a
AE
4840 rc = rbd_dev_device_setup(rbd_dev);
4841 if (!rc)
4842 return count;
4843
4844 rbd_dev_image_release(rbd_dev);
c53d5893
AE
4845err_out_rbd_dev:
4846 rbd_dev_destroy(rbd_dev);
bd4ba655 4847err_out_client:
9d3997fd 4848 rbd_put_client(rbdc);
0ddebc0c 4849err_out_args:
78cea76e
AE
4850 if (ceph_opts)
4851 ceph_destroy_options(ceph_opts);
4e9afeba 4852 kfree(rbd_opts);
859c31df 4853 rbd_spec_put(spec);
bd4ba655
AE
4854err_out_module:
4855 module_put(THIS_MODULE);
27cc2594 4856
602adf40 4857 dout("Error adding device %s\n", buf);
27cc2594 4858
c0cd10db 4859 return (ssize_t)rc;
602adf40
YS
4860}
4861
de71a297 4862static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
4863{
4864 struct list_head *tmp;
4865 struct rbd_device *rbd_dev;
4866
e124a82f 4867 spin_lock(&rbd_dev_list_lock);
602adf40
YS
4868 list_for_each(tmp, &rbd_dev_list) {
4869 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 4870 if (rbd_dev->dev_id == dev_id) {
e124a82f 4871 spin_unlock(&rbd_dev_list_lock);
602adf40 4872 return rbd_dev;
e124a82f 4873 }
602adf40 4874 }
e124a82f 4875 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
4876 return NULL;
4877}
4878
200a6a8b 4879static void rbd_dev_device_release(struct device *dev)
602adf40 4880{
593a9e7b 4881 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4882
602adf40 4883 rbd_free_disk(rbd_dev);
200a6a8b 4884 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6d80b130 4885 rbd_dev_mapping_clear(rbd_dev);
602adf40 4886 unregister_blkdev(rbd_dev->major, rbd_dev->name);
200a6a8b 4887 rbd_dev->major = 0;
e2839308 4888 rbd_dev_id_put(rbd_dev);
d1cf5788 4889 rbd_dev_mapping_clear(rbd_dev);
602adf40
YS
4890}
4891
05a46afd
AE
4892static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4893{
ad945fc1 4894 while (rbd_dev->parent) {
05a46afd
AE
4895 struct rbd_device *first = rbd_dev;
4896 struct rbd_device *second = first->parent;
4897 struct rbd_device *third;
4898
4899 /*
4900 * Follow to the parent with no grandparent and
4901 * remove it.
4902 */
4903 while (second && (third = second->parent)) {
4904 first = second;
4905 second = third;
4906 }
ad945fc1 4907 rbd_assert(second);
8ad42cd0 4908 rbd_dev_image_release(second);
ad945fc1
AE
4909 first->parent = NULL;
4910 first->parent_overlap = 0;
4911
4912 rbd_assert(first->parent_spec);
05a46afd
AE
4913 rbd_spec_put(first->parent_spec);
4914 first->parent_spec = NULL;
05a46afd
AE
4915 }
4916}
4917
dfc5606d
YS
4918static ssize_t rbd_remove(struct bus_type *bus,
4919 const char *buf,
4920 size_t count)
602adf40
YS
4921{
4922 struct rbd_device *rbd_dev = NULL;
0d8189e1 4923 int target_id;
602adf40 4924 unsigned long ul;
0d8189e1 4925 int ret;
602adf40 4926
0d8189e1
AE
4927 ret = strict_strtoul(buf, 10, &ul);
4928 if (ret)
4929 return ret;
602adf40
YS
4930
4931 /* convert to int; abort if we lost anything in the conversion */
4932 target_id = (int) ul;
4933 if (target_id != ul)
4934 return -EINVAL;
4935
4936 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4937
4938 rbd_dev = __rbd_get_dev(target_id);
4939 if (!rbd_dev) {
4940 ret = -ENOENT;
4941 goto done;
42382b70
AE
4942 }
4943
a14ea269 4944 spin_lock_irq(&rbd_dev->lock);
b82d167b 4945 if (rbd_dev->open_count)
42382b70 4946 ret = -EBUSY;
b82d167b
AE
4947 else
4948 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
a14ea269 4949 spin_unlock_irq(&rbd_dev->lock);
b82d167b 4950 if (ret < 0)
42382b70 4951 goto done;
0d8189e1 4952 ret = count;
b480815a 4953 rbd_bus_del_dev(rbd_dev);
8ad42cd0 4954 rbd_dev_image_release(rbd_dev);
79ab7558 4955 module_put(THIS_MODULE);
602adf40
YS
4956done:
4957 mutex_unlock(&ctl_mutex);
aafb230e 4958
602adf40
YS
4959 return ret;
4960}
4961
602adf40
YS
4962/*
4963 * create control files in sysfs
dfc5606d 4964 * /sys/bus/rbd/...
602adf40
YS
4965 */
4966static int rbd_sysfs_init(void)
4967{
dfc5606d 4968 int ret;
602adf40 4969
fed4c143 4970 ret = device_register(&rbd_root_dev);
21079786 4971 if (ret < 0)
dfc5606d 4972 return ret;
602adf40 4973
fed4c143
AE
4974 ret = bus_register(&rbd_bus_type);
4975 if (ret < 0)
4976 device_unregister(&rbd_root_dev);
602adf40 4977
602adf40
YS
4978 return ret;
4979}
4980
4981static void rbd_sysfs_cleanup(void)
4982{
dfc5606d 4983 bus_unregister(&rbd_bus_type);
fed4c143 4984 device_unregister(&rbd_root_dev);
602adf40
YS
4985}
4986
1c2a9dfe
AE
4987static int rbd_slab_init(void)
4988{
4989 rbd_assert(!rbd_img_request_cache);
4990 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
4991 sizeof (struct rbd_img_request),
4992 __alignof__(struct rbd_img_request),
4993 0, NULL);
868311b1
AE
4994 if (!rbd_img_request_cache)
4995 return -ENOMEM;
4996
4997 rbd_assert(!rbd_obj_request_cache);
4998 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
4999 sizeof (struct rbd_obj_request),
5000 __alignof__(struct rbd_obj_request),
5001 0, NULL);
78c2a44a
AE
5002 if (!rbd_obj_request_cache)
5003 goto out_err;
5004
5005 rbd_assert(!rbd_segment_name_cache);
5006 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5007 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5008 if (rbd_segment_name_cache)
1c2a9dfe 5009 return 0;
78c2a44a
AE
5010out_err:
5011 if (rbd_obj_request_cache) {
5012 kmem_cache_destroy(rbd_obj_request_cache);
5013 rbd_obj_request_cache = NULL;
5014 }
1c2a9dfe 5015
868311b1
AE
5016 kmem_cache_destroy(rbd_img_request_cache);
5017 rbd_img_request_cache = NULL;
5018
1c2a9dfe
AE
5019 return -ENOMEM;
5020}
5021
5022static void rbd_slab_exit(void)
5023{
78c2a44a
AE
5024 rbd_assert(rbd_segment_name_cache);
5025 kmem_cache_destroy(rbd_segment_name_cache);
5026 rbd_segment_name_cache = NULL;
5027
868311b1
AE
5028 rbd_assert(rbd_obj_request_cache);
5029 kmem_cache_destroy(rbd_obj_request_cache);
5030 rbd_obj_request_cache = NULL;
5031
1c2a9dfe
AE
5032 rbd_assert(rbd_img_request_cache);
5033 kmem_cache_destroy(rbd_img_request_cache);
5034 rbd_img_request_cache = NULL;
5035}
5036
cc344fa1 5037static int __init rbd_init(void)
602adf40
YS
5038{
5039 int rc;
5040
1e32d34c
AE
5041 if (!libceph_compatible(NULL)) {
5042 rbd_warn(NULL, "libceph incompatibility (quitting)");
5043
5044 return -EINVAL;
5045 }
1c2a9dfe 5046 rc = rbd_slab_init();
602adf40
YS
5047 if (rc)
5048 return rc;
1c2a9dfe
AE
5049 rc = rbd_sysfs_init();
5050 if (rc)
5051 rbd_slab_exit();
5052 else
5053 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5054
5055 return rc;
602adf40
YS
5056}
5057
cc344fa1 5058static void __exit rbd_exit(void)
602adf40
YS
5059{
5060 rbd_sysfs_cleanup();
1c2a9dfe 5061 rbd_slab_exit();
602adf40
YS
5062}
5063
5064module_init(rbd_init);
5065module_exit(rbd_exit);
5066
5067MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5068MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5069MODULE_DESCRIPTION("rados block device");
5070
5071/* following authorship retained from original osdblk.c */
5072MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5073
5074MODULE_LICENSE("GPL");